import asyncio import json from io import BytesIO import requests from custom_exception import FeishuAuthException from mapping.event_map_mapping import mapping from mapping.device_map import device_map from mapping.user_map import user_map from redis import asyncio as aioredis app_id = "cli_a525f3d78e3e500c" app_secret = "JVhnbKfXifddjHVwcTqbEfn1rDQBYqDD" headers = {"Authorization": ""} def get_access_token(api_id, secret): url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" data = { "app_id": api_id, "app_secret": secret } resp = requests.post(url, data=data).content.decode('utf-8') return json.loads(resp)['tenant_access_token'] # 获取事件的图片 def get_image_url(event): analysis_key = mapping.get(event['eventType'])['detection_key'] if analysis_key is None: return None images = [] for el in event['data'][analysis_key]: images.append(el['imageUrl']) if len(images) == 0: return None return images def get_value_by_nested_key(my_dict, nested_key): ret = my_dict for key in nested_key: ret = ret[key] return ret # 上次图片到飞书 def upload_image(url): image_content = requests.get(url).content payload = {'image_type': 'message'} files = [ ('image', ('file', BytesIO(image_content), 'application/octet-stream')) ] image = requests.request("POST", "https://open.feishu.cn/open-apis/im/v1/images", data=payload, files=files, headers=headers).content.decode('utf-8') data = json.loads(image) code = data['code'] if code == 99991663: raise FeishuAuthException image_key = json.loads(image)['data']['image_key'] return image_key # 推送到飞书 def push_to_feishu(data): push_url = "https://open.feishu.cn/open-apis/message/v4/batch_send/" data = json.dumps(data) r = requests.post(push_url, data=data, headers=headers).content.decode('utf-8') data = json.loads(r) code = data['code'] if code == 99991663: raise FeishuAuthException print(r) async def read_event(): redis_client = await aioredis.Redis(host="127.0.0.1", port=6379) while True: data = await redis_client.brpop("hik-sub-event") sub_json = json.loads(data[1].decode('utf-8')) try: events = sub_json["params"]["events"] for event in events: # 将eventType ID 替换为中文字符串 event_type = event['eventType'] event_type_str = mapping.get(event_type, "未知事件类型") event['eventType'] = event_type_str # ip address ip_address = event["data"]["ipAddress"] # 根据 ip_address 获取对应设备名称 device_name = device_map.get(ip_address, "未知设备") event['deviceName'] = device_name # print(event['data'][event['data']['eventType']]) # TODO: 根据 ip_address 获取对应 userid user_ids = user_map.get(ip_address, []) event['userIds'] = user_ids # TODO: 请求推送 api # print(event, device_name) except Exception as e: print("error: ", e) continue await asyncio.sleep(0.5) if __name__ == "__main__": asyncio.run(read_event())