1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| """ 飞书群机器人发送通知 """ import json from logging import Logger import requests import datetime
class FlybookRobotAlert(): def __init__(self, webhook_url, logger=Logger("飞书通知")): self.webhook = webhook_url self.logger = logger
self.headers = {'Content-Type': 'application/json; charset=UTF-8'}
def post_to_robot(self, post_data): ''' 给飞书机器人发送请求 :param data: :return: ''' try: resp = requests.request( method="POST", url=self.webhook, data=post_data, headers=self.headers).json() if resp.get("StatusCode") == 0 and resp.get("msg") == "success": self.logger.info(f"飞书通知发送成功,msg={resp}") else: self.logger.warning(f"飞书通知发送失败,{resp}") except Exception as e: self.logger.warning("飞书通知发送异常") self.logger.warning(e) pass
def send_message(self, date, acc, traf): robot_headers = 'cdn用量报告' field_list = [ { "is_short": False, "text": { "tag": "lark_md", "content": f"**请求数**:<font color=\"green\">{{}}</font> **次**\n".format(acc) } }, { "is_short": False, "text": { "tag": "lark_md", "content": f"**流量**:<font color=\"green\">{{:.2f}}</font> **MB**\n".format(traf / 1e6) } } ]
elements = [ { "tag": "div", "text": { "content": date.strftime("%Y年%m月%d日"), "tag": "lark_md" } }, { "tag": "div", "fields": field_list } ]
card = json.dumps({ "config": { "wide_screen_mode": True }, "elements": elements, "header": { "template": "blue", "title": { "content": robot_headers, "tag": "plain_text" } } })
msg_body = json.dumps({"msg_type": "interactive", "card": card}) self.post_to_robot(msg_body) return
|