Files
hik-push/hik_push/read_event.py

112 lines
3.4 KiB
Python
Raw Normal View History

2024-01-26 11:34:46 +08:00
import asyncio
import json
from io import BytesIO
import requests
from custom_exception import FeishuAuthException
from mapping.event_map_mapping import mapping
from mapping.device_map import device_map
from mapping.user_map import user_map
from redis import asyncio as aioredis
app_id = "cli_a525f3d78e3e500c"
app_secret = "JVhnbKfXifddjHVwcTqbEfn1rDQBYqDD"
headers = {"Authorization": ""}
def get_access_token(api_id, secret):
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
data = {
"app_id": api_id,
"app_secret": secret
}
resp = requests.post(url, data=data).content.decode('utf-8')
return json.loads(resp)['tenant_access_token']
# 获取事件的图片
def get_image_url(event):
analysis_key = mapping.get(event['eventType'])['detection_key']
if analysis_key is None:
return None
images = []
for el in event['data'][analysis_key]:
images.append(el['imageUrl'])
if len(images) == 0:
return None
return images
def get_value_by_nested_key(my_dict, nested_key):
ret = my_dict
for key in nested_key:
ret = ret[key]
return ret
# 上次图片到飞书
def upload_image(url):
image_content = requests.get(url).content
payload = {'image_type': 'message'}
files = [
('image', ('file', BytesIO(image_content), 'application/octet-stream'))
]
image = requests.request("POST", "https://open.feishu.cn/open-apis/im/v1/images", data=payload,
files=files, headers=headers).content.decode('utf-8')
data = json.loads(image)
code = data['code']
if code == 99991663:
raise FeishuAuthException
image_key = json.loads(image)['data']['image_key']
return image_key
# 推送到飞书
def push_to_feishu(data):
push_url = "https://open.feishu.cn/open-apis/message/v4/batch_send/"
data = json.dumps(data)
r = requests.post(push_url, data=data, headers=headers).content.decode('utf-8')
data = json.loads(r)
code = data['code']
if code == 99991663:
raise FeishuAuthException
print(r)
async def read_event():
redis_client = await aioredis.Redis(host="127.0.0.1", port=6379)
while True:
data = await redis_client.brpop("hik-sub-event")
sub_json = json.loads(data[1].decode('utf-8'))
try:
events = sub_json["params"]["events"]
for event in events:
# 将eventType ID 替换为中文字符串
event_type = event['eventType']
event_type_str = mapping.get(event_type, "未知事件类型")
event['eventType'] = event_type_str
# ip address
ip_address = event["data"]["ipAddress"]
# 根据 ip_address 获取对应设备名称
device_name = device_map.get(ip_address, "未知设备")
event['deviceName'] = device_name
# print(event['data'][event['data']['eventType']])
# TODO: 根据 ip_address 获取对应 userid
user_ids = user_map.get(ip_address, [])
event['userIds'] = user_ids
# TODO: 请求推送 api
#
print(event, device_name)
except Exception as e:
print("error: ", e)
continue
await asyncio.sleep(0.5)
if __name__ == "__main__":
asyncio.run(read_event())