import asyncio import json from io import BytesIO import requests from custom_exception import FeishuAuthException from event_map_mapping import mapping from redis import asyncio as aioredis app_id = "cli_a525f3d78e3e500c" app_secret = "JVhnbKfXifddjHVwcTqbEfn1rDQBYqDD" headers = {"Authorization": ""} def get_access_token(api_id, secret): url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" data = { "app_id": api_id, "app_secret": secret } resp = requests.post(url, data=data).content.decode('utf-8') return json.loads(resp)['tenant_access_token'] # 获取事件的图片 def get_image_url(event): analysis_key = mapping.get(event['eventType'])['analyze_key'] if analysis_key is None: return None images = [] for el in event['data'][analysis_key]: images.append(el['imageUrl']) if len(images) == 0: return None return images def get_value_by_nested_key(my_dict, nested_key): ret = my_dict for key in nested_key: ret = ret[key] return ret # 上次图片到飞书 def upload_image(url): image_content = requests.get(url).content payload = {'image_type': 'message'} files = [ ('image', ('file', BytesIO(image_content), 'application/octet-stream')) ] image = requests.request("POST", "https://open.feishu.cn/open-apis/im/v1/images", data=payload, files=files, headers=headers).content.decode('utf-8') data = json.loads(image) code = data['code'] if code == 99991663: raise FeishuAuthException image_key = json.loads(image)['data']['image_key'] return image_key # 推送到飞书 def push_to_feishu(data): push_url = "https://open.feishu.cn/open-apis/message/v4/batch_send/" # push_url = "http://192.168.20.115:8000/ipaasuat/engine_company/anycross/trigger/callback/MmRhZTE4OTRiYjVkZDQ5YWNmOGRmZDI0NjQ1MTBlODUw/1.0.0" data = json.dumps(data) r = requests.post(push_url, data=data, headers=headers).content.decode('utf-8') data = json.loads(r) code = data['code'] if code == 99991663: raise FeishuAuthException print(r) async def read_event(): access_token = get_access_token(app_id, app_secret) headers['Authorization'] = f"Bearer {access_token}" redis_client = await aioredis.from_url("redis://localhost") while True: data = await redis_client.brpop("event") event_json = json.loads(data[1].decode('utf-8')) try: events = event_json["params"]["events"] for event in events: event_type = mapping.get(event['eventType'], {"type": "未知事件类型"}) print(event_type) # TODO: 上传图片到飞书链接 images = get_image_url(event) feishu_image_key = None if images is not None: feishu_image_key = upload_image(images[0]) print(feishu_image_key) # TODO: 根据ip_address获取对应userid # ip_address = event["data"]["ipAddress"] # channel_name = get_value_by_nested_key(event, ("data", "channelName")) # TODO: 向飞书推送消息 push_data = { "user_ids": [ "8dg79c18" ], "msg_type": "interactive", "card": { "config": { "wide_screen_mode": True }, "elements": [ { "tag": "markdown", "content": "这里是卡片文本,支持使用markdown标签设置文本格式。例如:\n*斜体* 、**粗体**、~~删除线~~、[文字链接](" "https://www.feishu.cn)、 彩色文本 " }, { "alt": { "content": "", "tag": "plain_text" }, "img_key": feishu_image_key, "tag": "img", "mode": "fit_horizontal", "compact_width": False }, { "tag": "action", "actions": [ { "tag": "button", "text": { "tag": "plain_text", "content": "这是跳转按钮" }, "type": "primary", "url": "https://feishu.cn" } ] } ], "header": { "template": "blue", "title": { "content": event_type["type"], "tag": "plain_text" } } } } push_to_feishu(push_data) print(event_type["type"], ip_address) except FeishuAuthException: print("飞书登录失败") # TODO: 重新获取 access token access_token = get_access_token(app_id, app_secret) headers['Authorization'] = f"Bearer {access_token}" await redis_client.lpush("event", json.dumps(event_json)) continue except Exception as e: print("error: ", e) continue if __name__ == "__main__": asyncio.run(read_event())