Files
2023-10-07 14:13:58 +08:00

183 lines
6.5 KiB
Python

import asyncio
import io
import json
import os
import pathlib
import subprocess
import time
import aiohttp
import cv2
import fastdeploy as fd
import numpy as np
from fastapi import APIRouter, HTTPException, UploadFile, WebSocket, WebSocketDisconnect
from fastapi.responses import RedirectResponse, StreamingResponse
from fastdeploy.serving.utils import cv2_to_base64
from image_identification.model import model_instance
router = APIRouter(prefix="/image")
async def request(url: str, data, headers):
async with aiohttp.ClientSession() as session:
async with session.post(url=url, data=data, headers=headers) as response:
return await response.json()
# @router.post("/get_visuallized_image", tags=["analyze"])
# async def getVisualized_image(image: UploadFile):
# contents = await image.read()
# nparr = np.fromstring(contents, np.uint8)
# im = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# url = "http://127.0.0.1:8000/fd/yolov5s"
# headers = {"Content-Type": "application/json"}
# data = {"data": {"image": cv2_to_base64(im)}, "parameters": {}}
# try:
# resp = await asyncio.gather(
# *[request(url=url, data=json.dumps(data), headers=headers)]
# )
# r_json = json.loads(resp[0]["result"])
# det_result = fd.vision.utils.json_to_detection(r_json)
# vis_im = fd.vision.vis_detection(im, det_result, score_threshold=0.5)
# _res, im_jpg = cv2.imencode(".jpg", vis_im)
# return StreamingResponse(io.BytesIO(im_jpg.tobytes()), media_type="image/jpeg")
# except:
# raise HTTPException(status_code=500, detail="获取失败")
@router.post("/analyze", tags=["analyze"], summary="summary", description="description")
async def analyze(image: UploadFile):
image_content = await image.read()
nparr = np.fromstring(image_content, np.uint8)
im = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
result = model_instance.predict(im)
vis_im = fd.vision.vis_detection(im, result, score_threshold=0.5)
im_jpg = cv2.imencode(".jpg", vis_im)[1]
return StreamingResponse(io.BytesIO(im_jpg.tobytes()), media_type="image/jpeg")
@router.websocket("/analyze_video")
async def analyze_video(websocket: WebSocket):
rtmp_url = "rtmp://192.168.0.136/live/livestream"
push_url = "rtmp://192.168.0.136/live/livestream_output"
output_url = "http://192.168.0.136/live/livestream_output.flv"
cap = cv2.VideoCapture(rtmp_url)
frame_number = 0
fps = int(cap.get(cv2.CAP_PROP_FPS))
await websocket.accept()
# fourcc = cv2.VideoWriter.fourcc("M", "P", "4", "V")
# os.chdir("framers")
# video = None
ffmpeg_command = None
ffmpeg_process = None
try:
while True:
start = time.perf_counter()
ret, frame = cap.read()
if not ret:
print("无法读取视频帧")
break
print(f"\n{frame_number}")
# 是否保存帧
# if True:
if frame_number % 4 == 0:
result = model_instance.predict(frame)
vis_im = fd.vision.vis_detection(frame, result, score_threshold=0.5)
width = vis_im.shape[1]
height = vis_im.shape[0]
# if video is None:
# video = cv2.VideoWriter(
# "__output__.mp4",
# fourcc,
# 30.0,
# (width, height),
# )
if ffmpeg_command is None:
# 创建ffmpeg进程以推送视频流
ffmpeg_cmd = [
"ffmpeg",
"-y",
"-f",
"rawvideo",
"-vcodec",
"rawvideo",
"-s",
f"{width}x{height}",
"-pix_fmt",
"bgr24",
"-r",
str(fps),
"-i",
"-",
"-c:v",
"libx264",
"-pix_fmt",
"yuv420p",
"-preset",
"ultrafast",
"-f",
"flv",
push_url,
]
if ffmpeg_process is None:
ffmpeg_process = subprocess.Popen(ffmpeg_cmd, stdin=subprocess.PIPE)
ffmpeg_process.stdin.write(vis_im.tobytes())
# video.write(vis_im)
print(result)
await websocket.send_json(
data={
"output": output_url,
"result": json.loads(
fd.vision.fd_result_to_json(result=result)
),
}
)
else:
width = frame.shape[1]
height = frame.shape[0]
if ffmpeg_command is None:
# 创建ffmpeg进程以推送视频流
ffmpeg_cmd = [
"ffmpeg",
"-y",
"-f",
"rawvideo",
"-vcodec",
"rawvideo",
"-s",
f"{width}x{height}",
"-pix_fmt",
"bgr24",
"-r",
str(fps),
"-i",
"-",
"-c:v",
"libx264",
"-pix_fmt",
"yuv420p",
"-preset",
"ultrafast",
"-f",
"flv",
push_url,
]
if ffmpeg_process is None:
ffmpeg_process = subprocess.Popen(ffmpeg_cmd, stdin=subprocess.PIPE)
ffmpeg_process.stdin.write(frame.tobytes())
await websocket.send_json(data={"output": output_url})
# video.write(frame)
end = time.perf_counter()
cost = end - start
print(fps)
print(f"time spend {cost}")
frame_number += 1
except WebSocketDisconnect:
print("close")
await websocket.send_text("error")
await websocket.close()