基于yolov5实现的AI智能盒子框架
- 开发背景
- 技术实现
- 产品效果
- 源码预览
- 功能介绍
2021-2023是沉淀的几年,经济不景气,各行各业都不太好混,所以这几年也没有太多心思花在csdn上为各大网友写一些技术文章,2024年初,也算是给自己留下一点岁月的足迹吧,所以把这段时间精心研究的东西写出来供大家交流,顺带也看看是否有机会遇到能帮助到其他技术公司或朋友,在交流中实现双赢,有需要的朋友可以威信(幺捌零叁捌捌伍陆柒零贰,威信与电画同号),具体合作方式具体交流。
首先看一下AI盒子的效果(安全帽检测预警、语音播报)
AI智能分析预警盒子
开发背景
从事安防多年,但基本都是从事音视频的编解码工作,很少处理图形图像相关算法,原因如下:
(1)图形图像相关算法如车牌识别识别、人脸识别等如果是自己研发,涉及到高等数学等及图形图像处理等高深的支持,开发难度较大;
(2)前n年基本没有开源的、简易的不需要熟悉底层算法的AI识别框架,很难将生活中涉及的AI识别或机器学习的相关算法变成现实;
主要基于以上两点,导致AI相关的算法落地很难、实现很难、应用更难!
然后,基于百度paddle和yolo的开源框架出现了,这使得AI识别难度大大下降,应用门槛大大降低,然而paddle则是面向服务器,对硬件要求较高,部署也相对复杂,很难将普遍应用到生活中,所以经过多番调研yolo则是我们最好的选择。
选择yolo有如下优势
(1)开源且识别效率非常高,仅需一次识别即可完成所有对象的分类识别。
(2)可以应用到服务器中,也可以应用到小型硬件中,本文部署的硬件就支持windows/ubun,支持jetson nano、jetson orin nano等3款中高低档硬件(约5000元、3000元、1000元)
(3)开源技术论坛和资料较多
(4)支持使用python,支持跨平台部署(一套代码,多套环境部署)
在安防领域,基于音视频的基础操作已经基本上没有任何难度(看视频、直播、录像等),但基于音视频的AI应用却很难,但随着技术的成熟,这些应用也变得越来越多,结合生活实际需求的场景就有很多,例如
(1)非法闯入:夜间无人值守或重要地点进出监控
(2)摔倒检测:关爱老人,老人摔倒检测或打架斗殴跌倒检测,可用于社区、监狱、广场、学校等场所。
(3)明火识别:严禁烟火的第三方识别火焰,防止火宅,可用于森林、车间、化工产等场所。
(4)烟雾排放:同明火场景。
(5)越界检测:越过指定边界,产生报警,一般用于行人识别与闯入检测,结合视频区域检测。可用于无人值守场景。
(6)睡岗检测:工作期间睡觉检测,避免安全事故发生。
(7)离岗检测:工作期间离开岗位检测,避免安全事故发生。
(8)人群聚众:检测人员聚集,避免打群架、避免踩踏事件发生。
(9)攀高检测:检测人员是否进行攀爬,避免安全事故发生。
(10)打架斗殴:检测人员是否打架,可用于学校、公共场所。
(11)人脸抓拍:人脸数据抓拍,可推送给人脸识别服务进行1&1识别及陌生人识别。
(12)遮挡检测:检测摄像机是否被人为遮挡或被损坏。协调运维人员进行维护。
(13)垃圾满溢:检测垃圾桶垃圾是否满溢,协助环卫人员智能调度环卫车辆,节省人力和物力。
(14)占道经营:检测是否有占用道路非法经营,协助城管管理,减轻工作量。
(15)安全帽识别:工地安全帽识别,提高工地安全。
(16)反光衣识别:工地反光衣识别,防止非工作人员闯入工地区域。
(17)电动车进电梯:检测电动车进出电梯,防止火灾发生。
(18)口罩检测:明厨亮灶,检测食品从业人员是否佩戴口罩。
(19)虫害识别:智慧农业,通过AI识别虫害,智能指导农户作业。
(20)动物识别:公共场所,不允许动物进出场所。
(21)电梯超员检测:电梯人员是否超载工作。
(22)河边垂钓检测:严禁垂钓河边检测人员是否有钓鱼或捕鱼行为。
(23)河边游泳检测:严禁游泳河边检测人员是否有下水游泳行为。
(24)人数统计:人数统计或客流统计,通过AI方式统计绘制市场的客流热力图。
(25)抽烟检测:明厨亮灶,检测从业人员是否有吸烟行为。
(25)泥头车识别:街道泥头车随意、掉土的事件。
(27)打电话识别:检测开车是否有打电话行为。
(28)机动车/非机动车识别:机动车和非机动车识别。
(29)车流量统计:识别车辆及统计车流量
以上是我总结的贴近生活,很有可能在生活中非常实用的场景,这些算法都是可以通过数据采集进行一一训练的。
技术实现
开发环境:pycharm
开发语言:python、vue2.0、pytorch、vision
部署环境:
(1)windows-conda、jetson-nano:conda(低配)、
(2)jetson orin nano:python、cuda、cudnn(sdk)(中高配)
(3)orange PI(正在适配中)
硬件选型:
(1)低配jetson naco(b01替换版本),ubuntu,价格1312元(含外壳),0.5tops,分析实时视频约4路;一张图约200ms~300ms耗时;
(2)中配jetson orin nano,ubuntu,价格3200元上下,20tops,分析实时视频约8路;一张图约100ms耗时;
(3)高配jetson orin nano,Ubuntu,价格在5200元上下,70-100tops,分析实时视频约16路;一张图约30ms耗时;
(4)国产华为芯片orange PI,4G,16核心,价格在1000元左右,分析视频预估在16路;(正在适配中,硬件一片难求)
产品效果
为此,我开发了一个AI盒子框架,这个框架可以动态添加训练好的模型、动态添加需要分析的网络摄像机、动态为每一路摄像机添加不同的分析场景(算法)、动态配置AI盒子参数、动态重启AI盒子等功能;
AI盒子提供了
(1)登录AI盒子
AI盒子登录页面提供用户名和密码模式登录,登录后可以修改初始密码。
为适配不同的地方需求,AI盒子最新版本,支持中文简体版本、英文版本和中文繁体版本。
(2)系统首页
AI盒子主要提供设备管理、报警管理、录像管理、模型配置以及系统设置功能。
(3)设备管理
可以动态添加需要分析的设备,此处的设备为网络设备,AI盒子通过设备的rtsp标准协议从摄像机获取视频流,然后进行抽帧分析,抽帧间隔可以动态进行配置。
可以控制设备进行AI抽帧的分析时间段控制,如早上08:00开始分析,到晚上23:00截止。
可以配置设备分析的区域,区域支持多边形绘制(区域入侵)、绘制边界线段(周界检测)
(4)场景管理
一个设备抽帧的图片,可以做不同场景的算法分析,例如一个摄像机可以同时分析明火烟雾检测、攀爬检测,也可以分析更多的算法(当然,算法越多,耗时就增加,不过对于在1-3秒内能实时响应就已经是非常适用了,可以忽略)
(5)报警管理
当分析场景检测到超过设定置信度的预警时,会存储到AI盒子中,AI盒子会将报警异步推送到配置的第三方平台中,可以在AI盒子中保存n天
如果AI盒子打开报警录像功能,AI盒子会自动录取事件发生的前3秒以及后3秒,总共6秒录像,这些录像也会被推送给第三方平台
(6)录像管理
此外AI盒子支持视频转发(rtmp)、手动抓拍、远程录像功能,录像后可以存储在AI盒子,当推送给第三方平台后会自动从本地移除
(7)模型管理
AI盒子提供模型载入、识别、上报等完整框架,并不固化整个流程,所以对AI盒子而已,算法、设备、场景都是灵活可以配置的,支持动态的模型添加功能,我们可以将训练好的模型动态添加到AI盒子中
模型可以包括所有分析,哪些是属于正常的(不会报警的),哪些属于异常的(显示红框,可能要报警的)
(8)系统配置
AI盒子支持系统配置,配置项目包括AI盒子编码(推送到第三方,区别多个盒子)、报警保存天数、并发分析数量控制、识别后是否显示标签、是否支持报警联动录像、是否凌晨自动重启、报警第三方平台介绍地址、长连接控制地址。
(9)GPIO及modbus联动
此外,AI盒子支持自身引脚联动或t通过外接modbus协议进行对接,以下为AI盒子定制的一个仓库物料领取场景:
A、人通过人脸机刷脸开门进入物料仓库(人脸机器联动门磁)
B、门禁从常开(1状态)变为断点开门状态(0),AI盒子检测到人员进入,开始联动摄像机进行录像;
C、人员进入仓库后领取物料,拿去完成之后将物料放置到物料台,然后按墙壁上的拍照IO开关;
D、AI盒子检测到AI开关后,联动摄像机视频进行物料实物抓拍,可以抓图多张;
E、人员拍照后拿取物料并离开物料仓库,门磁从开门(0)状态恢复为常闭状态(0);
F、AI盒子检测到关门信号,停止联动录像,并将从门禁到出门的录像+物料拍照打包存储并发送给第三方平台;
这样,一个人从进门领取物料开始录像==>物料拍照==>停止录像整个过程留影留像,保留了整个过程的证据。如果是非法闯入则自动通过AI预警进行报警录像,产生非法闯入预警,如果正常人脸刷脸进入则不会产生非法报警。
源码预览
(1)服务启动
"""===========导入安装的python库===========""" import sys from pathlib import Path from box.box import Box from utils.general import check_requirements # 获取当前文件路径 FILE = Path(__file__).resolve() # 获取当前文件父目录-YOLOv5根目录 ROOT = FILE.parents[0] # 获取绝对路径 PARENT = ROOT # 将根目录添加到系统path中 if str(ROOT) not in sys.path: sys.path.append(str(ROOT)) # 程序启动入口 if __name__ == '__main__': # 检查请求参数 check_requirements(ROOT / 'requirements.txt', exclude=('tensorboard', 'thop')) # 创建AI盒子 box = Box() # 初始化盒子 box.init() # 启动AI盒子 box.start() # 等待盒子退出 box.join() # 停止AI盒子 box.stop()
(2)websocket协议
# 报警推送实现 class SocketService(threading.Thread): # 构造函数 def __init__(self): # 重写父类方法 threading.Thread.__init__(self) # 套接口 self.ws = None self.connected = False self.do_run = True # 数据接收 self.msg_thread = None # 视频数量 self.count = 0 # 视频回调 self.video_back = None pass # 停止服务 def stop(self): self.do_run = False # 关闭套接口 if self.ws is not None: self.ws.close() # 等待接收退出 if self.msg_thread is not None: if self.msg_thread.is_alive(): self.msg_thread.join() pass # 设置命令回调 def set_video_back(self, call): self.video_back = call # 响应结果 def send_result(self, url, cmdId, state, desc, data=None, file_path=None): try: if url is None or cmdId is None: return # 请求字典 dict_info = {} # 追加文件 if file_path is not None: # 读取文件内容 file = open(file_path, "rb") name = os.path.basename(file_path) dict_info['file'] = (name, file, 'application/octet-stream') # 追加数据 if data is not None: dict_info['data'] = data # multipart编码 encoder = MultipartEncoder( fields=dict_info ) # 请求头部 headers = {'Content-Type': encoder.content_type} # 发送到第三方 result_url = (url + "?boxId=" + SystemConfig.ID + "&cmdId=" + cmdId + "&state=" + str(state) + "&desc=" + desc) response = requests.post(result_url, json=data, headers=headers) if response.status_code != 200: return False # 处理返回结果 result = response.json() # 处理返回结果 if result['error'] != 0: error = result['error'] LOGGER.error(f'upload box {SystemConfig.ID} command result failed, reason: {error}') return False # 返回成功结果 return True except Exception as e: LOGGER.error(f'upload alarm error {e}') pass # 报警推送 def run(self) -> None: # 开始时间 last_time = datetime.datetime.now() while not Global.restart and self.do_run: try: # 查询通信地址 result_url = SystemConfig.ALARM_URL if SystemConfig.ALARM_URL is not None: if not result_url.endswith("/"): result_url += "/" result_url += "result" # 系统重启 if Global.restart: break # 连接服务器 if not self.connected: if SystemConfig.SOCKET_URL is None or SystemConfig.SOCKET_URL == '': time.sleep(1) continue try: # 连接服务 self.ws = create_connection(SystemConfig.SOCKET_URL) self.connected = self.ws.connected # 连接成功 if self.ws.connected: LOGGER.info(f'@connect websocket success: {SystemConfig.SOCKET_URL}') # 连接信息 box = { 'id': SystemConfig.ID, 'ip': Global.ip, 'port': Global.port, 'cmd': 'connect' } # 发送连接 self.ws.send(json.dumps(box)) # 更新心跳时间 last_time = datetime.datetime.now() # 同步盒子数据 self.sync_server_data() pass except Exception as e2: LOGGER.error(f'websocket connect error:{e2}') self.connected = False pass # 链接失败等待 if not self.connected: time.sleep(3) continue
(3)系统配置
# 系统全局配置 import uuid from box.util import SystemUtil # 全局配置 class Global: # 数据库配置 dbIp = '127.0.0.1' dbPort = 3306 dbUser = 'root' dbPwd = 'root' dbName = 'box3' # web配置 ip = '127.0.0.1' port = 5700 # 系统重启标志 restart = False # 启用串口信号读取 enableIO = False # linux-设备名 LUX_IO_NAME = '/dev/ttyCH341USB0' # win-设备名 WIN_IO_NAME = 'COM13' # 初始化 def __init__(self): pass # 获取本机ip @staticmethod def local_ip(): try: if Global.ip == '127.0.0.1': Global.ip = SystemUtil.get_local_ip() except Exception as e: print(f'get local ip error {e}') # 系统配置 class SystemConfig: # 盒子当前编码 ID = str(uuid.uuid1()).replace("-", "") # 系统登录账号 ADMIN = 'admin' # 系统账号密码 PASSWORD = 'dd123456' # 系统并发分析数 PATROL_NUM = 4 # 报警保存天数 SAVE_DAYS = 1 # 是否显示标签 SHOW_LABEL = 1 # 是否报警录像 ALARM_RECORD = 1 # 是否定时重启 AUTO_RESTART = 1 # 报警推送地址 ALARM_URL = '' # 长连接地址 SOCKET_URL = ''
(4)文件清理
import threading import os import time from threading import Lock from box.config import Global from utils.general import LOGGER # 文件异步清理 class FileClear(threading.Thread): # 待清理文件列表 files = [] # 全局文件列表锁 lock = Lock() # 构造函数 def __init__(self): # 重写父类方法 threading.Thread.__init__(self) pass # 添加清理文件 @classmethod def push(cls, file): if file is None: return cls.lock.acquire() try: cls.files.append(file) except Exception as e: LOGGER.warning(f'add clear file error {e}') finally: cls.lock.release() pass # 报警推送 def run(self) -> None: # 系统未重启 while not Global.restart: try: # 系统重启 if Global.restart: break # 尝试清理 try: # 获取文件大锁 self.lock.acquire() # 当前系统无任务 if len(self.files)