import asyncio import json from typing import Dict from redis import asyncio as aioredis import uvicorn from fastapi import FastAPI, BackgroundTasks app = FastAPI() async def add_stream(data: dict): redis = await aioredis.from_url("redis://localhost", db=0) await redis.xadd("hik-event", {"event": json.dumps(data)}) await redis.close() @app.get("/") async def root(): return {"message": "Hello World"} @app.post("/eventRcv") async def event_rcv(data: dict, background_tasks: BackgroundTasks): background_tasks.add_task(add_stream, data) return {"data": data} @app.on_event("startup") async def background_task(): redis = await aioredis.from_url("redis://localhost", db=0) # await redis.xgroup_create(groupname="consumer-group", name="hik-event", mkstream=True) while True: # result = await redis.xread(streams={"hik-event": 0}) result = await redis.xreadgroup(groupname="consumer-group", consumername="consumer", streams={"hik-event": 0}, ) print(result) if len(result[0][1]) == 0: print("wuyou") # await redis.close() continue id = result[0][1][0][0].decode("utf-8") await redis.xack("hik-event", "consumer-group", id) # print(result[0][1][0][0].decode("utf-8")) print(result[0][1][0][1][b'event'].decode("utf-8")) await asyncio.sleep(1) await redis.close() def run_app(): uvicorn.run("main:app", host="0.0.0.0", port=8000) if __name__ == "__main__": run_app()