Files
hik-push/hik_push/read_event.py
quantulr 4b4ee17bc7 update
2024-01-24 16:15:00 +08:00

137 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
from io import BytesIO
import requests
from event_map_mapping import mapping
from redis import asyncio as aioredis
headers = {"Authorization": "Bearer t-g1041oeFF3LTS5MA3HQYUHYFOK246EAEPFJWPRWP"}
# 获取事件的图片
def get_image_url(event):
analysis_key = mapping.get(event['eventType'])['analyze_key']
print(analysis_key)
if analysis_key is None:
return []
images = []
for el in event['data'][analysis_key]:
images.append(el['imageUrl'])
return images
def get_value_by_nested_key(my_dict, nested_key):
ret = my_dict
for key in nested_key:
ret = ret[key]
return ret
# 上次图片到飞书
def upload_image(url):
image_content = requests.get(url).content
payload = {'image_type': 'message'}
files = [
('image', ('file', BytesIO(image_content), 'application/octet-stream'))
]
image = requests.request("POST", "https://open.feishu.cn/open-apis/im/v1/images", data=payload,
files=files, headers=headers).content.decode('utf-8')
image_key = json.loads(image)['data']['image_key']
return image_key
# 推送到飞书
def push_to_feishu(data):
push_url = "https://open.feishu.cn/open-apis/message/v4/batch_send/"
# push_url = "http://192.168.20.115:8000/ipaasuat/engine_company/anycross/trigger/callback/MmRhZTE4OTRiYjVkZDQ5YWNmOGRmZDI0NjQ1MTBlODUw/1.0.0"
data = json.dumps(data)
r = requests.post(push_url, data=data, headers=headers).content.decode('utf-8')
print(r)
async def read_event():
redis_client = await aioredis.from_url("redis://localhost")
while True:
data = await redis_client.brpop("event")
event_json = json.loads(data[1].decode('utf-8'))
try:
events = event_json["params"]["events"]
for event in events:
event_type = mapping.get(event['eventType'], {"type": "未知事件类型"})
print(event_type)
# TODO: 上传图片到飞书链接
images = get_image_url(event)
feishu_image_url = upload_image(images[0])
print(feishu_image_url)
# TODO: 根据ip_address获取对应userid
#
ip_address = event["data"]["ipAddress"]
# channel_name = get_value_by_nested_key(event, ("data", "channelName"))
# TODO: 向飞书推送消息
push_data = {
"user_ids": [
"8dg79c18"
],
"msg_type": "interactive",
"card": {
"config": {
"wide_screen_mode": True
},
"elements": [
{
"tag": "markdown",
"content": "这里是卡片文本支持使用markdown标签设置文本格式。例如\n*斜体* 、**粗体**、~~删除线~~、[文字链接](https://www.feishu.cn)、<at id=all></at>、<font color='red'> 彩色文本 </font>"
},
{
"alt": {
"content": "",
"tag": "plain_text"
},
"img_key": feishu_image_url,
"tag": "img",
"mode": "fit_horizontal",
"compact_width": False
},
{
"tag": "action",
"actions": [
{
"tag": "button",
"text": {
"tag": "plain_text",
"content": "这是跳转按钮"
},
"type": "primary",
"url": "https://feishu.cn"
}
]
}
],
"header": {
"template": "blue",
"title": {
"content": event_type["type"],
"tag": "plain_text"
}
}
}
}
push_to_feishu(push_data)
print(event_type["type"], ip_address)
except Exception as e:
print("error: ", e)
continue
# except Exception as e:
# print("error", e)
# # await redis_client.lpush("event", json.dumps(event_json))
# continue
if __name__ == "__main__":
asyncio.run(read_event())