基YOLOV5实现的AI智能盒子

慈云数据 2024-03-13 技术支持 61 0

基于yolov5实现的AI智能盒子框架

  • 开发背景
  • 技术实现
  • 产品效果
  • 源码预览
  • 功能介绍

    2021-2023是沉淀的几年,经济不景气,各行各业都不太好混,所以这几年也没有太多心思花在csdn上为各大网友写一些技术文章,2024年初,也算是给自己留下一点岁月的足迹吧,所以把这段时间精心研究的东西写出来供大家交流,顺带也看看是否有机会遇到能帮助到其他技术公司或朋友,在交流中实现双赢,有需要的朋友可以威信(幺捌零叁捌捌伍陆柒零贰,威信与电画同号),具体合作方式具体交流。

    首先看一下AI盒子的效果(安全帽检测预警、语音播报)

    AI智能分析预警盒子

    开发背景

    从事安防多年,但基本都是从事音视频的编解码工作,很少处理图形图像相关算法,原因如下:

    (1)图形图像相关算法如车牌识别识别、人脸识别等如果是自己研发,涉及到高等数学等及图形图像处理等高深的支持,开发难度较大;

    (2)前n年基本没有开源的、简易的不需要熟悉底层算法的AI识别框架,很难将生活中涉及的AI识别或机器学习的相关算法变成现实;

    主要基于以上两点,导致AI相关的算法落地很难、实现很难、应用更难!

    然后,基于百度paddle和yolo的开源框架出现了,这使得AI识别难度大大下降,应用门槛大大降低,然而paddle则是面向服务器,对硬件要求较高,部署也相对复杂,很难将普遍应用到生活中,所以经过多番调研yolo则是我们最好的选择。

    选择yolo有如下优势

    (1)开源且识别效率非常高,仅需一次识别即可完成所有对象的分类识别。

    (2)可以应用到服务器中,也可以应用到小型硬件中,本文部署的硬件就支持windows/ubun,支持jetson nano、jetson orin nano等3款中高低档硬件(约5000元、3000元、1000元)

    (3)开源技术论坛和资料较多

    (4)支持使用python,支持跨平台部署(一套代码,多套环境部署)

    在安防领域,基于音视频的基础操作已经基本上没有任何难度(看视频、直播、录像等),但基于音视频的AI应用却很难,但随着技术的成熟,这些应用也变得越来越多,结合生活实际需求的场景就有很多,例如

    (1)非法闯入:夜间无人值守或重要地点进出监控

    (2)摔倒检测:关爱老人,老人摔倒检测或打架斗殴跌倒检测,可用于社区、监狱、广场、学校等场所。

    (3)明火识别:严禁烟火的第三方识别火焰,防止火宅,可用于森林、车间、化工产等场所。

    (4)烟雾排放:同明火场景。

    (5)越界检测:越过指定边界,产生报警,一般用于行人识别与闯入检测,结合视频区域检测。可用于无人值守场景。

    (6)睡岗检测:工作期间睡觉检测,避免安全事故发生。

    (7)离岗检测:工作期间离开岗位检测,避免安全事故发生。

    (8)人群聚众:检测人员聚集,避免打群架、避免踩踏事件发生。

    (9)攀高检测:检测人员是否进行攀爬,避免安全事故发生。

    (10)打架斗殴:检测人员是否打架,可用于学校、公共场所。

    (11)人脸抓拍:人脸数据抓拍,可推送给人脸识别服务进行1&1识别及陌生人识别。

    (12)遮挡检测:检测摄像机是否被人为遮挡或被损坏。协调运维人员进行维护。

    (13)垃圾满溢:检测垃圾桶垃圾是否满溢,协助环卫人员智能调度环卫车辆,节省人力和物力。

    (14)占道经营:检测是否有占用道路非法经营,协助城管管理,减轻工作量。

    (15)安全帽识别:工地安全帽识别,提高工地安全。

    (16)反光衣识别:工地反光衣识别,防止非工作人员闯入工地区域。

    (17)电动车进电梯:检测电动车进出电梯,防止火灾发生。

    (18)口罩检测:明厨亮灶,检测食品从业人员是否佩戴口罩。

    (19)虫害识别:智慧农业,通过AI识别虫害,智能指导农户作业。

    (20)动物识别:公共场所,不允许动物进出场所。

    (21)电梯超员检测:电梯人员是否超载工作。

    (22)河边垂钓检测:严禁垂钓河边检测人员是否有钓鱼或捕鱼行为。

    (23)河边游泳检测:严禁游泳河边检测人员是否有下水游泳行为。

    (24)人数统计:人数统计或客流统计,通过AI方式统计绘制市场的客流热力图。

    (25)抽烟检测:明厨亮灶,检测从业人员是否有吸烟行为。

    (25)泥头车识别:街道泥头车随意、掉土的事件。

    (27)打电话识别:检测开车是否有打电话行为。

    (28)机动车/非机动车识别:机动车和非机动车识别。

    (29)车流量统计:识别车辆及统计车流量

    以上是我总结的贴近生活,很有可能在生活中非常实用的场景,这些算法都是可以通过数据采集进行一一训练的。

    技术实现

    开发环境:pycharm

    开发语言:python、vue2.0、pytorch、vision

    部署环境:

    (1)windows-conda、jetson-nano:conda(低配)、

    (2)jetson orin nano:python、cuda、cudnn(sdk)(中高配)

    (3)orange PI(正在适配中)

    硬件选型:

    (1)低配jetson naco(b01替换版本),ubuntu,价格1312元(含外壳),0.5tops,分析实时视频约4路;一张图约200ms~300ms耗时;

    (2)中配jetson orin nano,ubuntu,价格3200元上下,20tops,分析实时视频约8路;一张图约100ms耗时;

    (3)高配jetson orin nano,Ubuntu,价格在5200元上下,70-100tops,分析实时视频约16路;一张图约30ms耗时;

    (4)国产华为芯片orange PI,4G,16核心,价格在1000元左右,分析视频预估在16路;(正在适配中,硬件一片难求)

    在这里插入图片描述

    产品效果

    为此,我开发了一个AI盒子框架,这个框架可以动态添加训练好的模型、动态添加需要分析的网络摄像机、动态为每一路摄像机添加不同的分析场景(算法)、动态配置AI盒子参数、动态重启AI盒子等功能;

    AI盒子提供了

    (1)登录AI盒子

    AI盒子登录页面提供用户名和密码模式登录,登录后可以修改初始密码。

    在这里插入图片描述

    为适配不同的地方需求,AI盒子最新版本,支持中文简体版本、英文版本和中文繁体版本。

    在这里插入图片描述

    (2)系统首页

    AI盒子主要提供设备管理、报警管理、录像管理、模型配置以及系统设置功能。

    在这里插入图片描述

    (3)设备管理

    可以动态添加需要分析的设备,此处的设备为网络设备,AI盒子通过设备的rtsp标准协议从摄像机获取视频流,然后进行抽帧分析,抽帧间隔可以动态进行配置。

    在这里插入图片描述

    可以控制设备进行AI抽帧的分析时间段控制,如早上08:00开始分析,到晚上23:00截止。在这里插入图片描述

    可以配置设备分析的区域,区域支持多边形绘制(区域入侵)、绘制边界线段(周界检测)在这里插入图片描述

    (4)场景管理

    一个设备抽帧的图片,可以做不同场景的算法分析,例如一个摄像机可以同时分析明火烟雾检测、攀爬检测,也可以分析更多的算法(当然,算法越多,耗时就增加,不过对于在1-3秒内能实时响应就已经是非常适用了,可以忽略)在这里插入图片描述

    (5)报警管理

    当分析场景检测到超过设定置信度的预警时,会存储到AI盒子中,AI盒子会将报警异步推送到配置的第三方平台中,可以在AI盒子中保存n天

    在这里插入图片描述

    如果AI盒子打开报警录像功能,AI盒子会自动录取事件发生的前3秒以及后3秒,总共6秒录像,这些录像也会被推送给第三方平台

    在这里插入图片描述

    (6)录像管理

    此外AI盒子支持视频转发(rtmp)、手动抓拍、远程录像功能,录像后可以存储在AI盒子,当推送给第三方平台后会自动从本地移除

    在这里插入图片描述

    (7)模型管理

    AI盒子提供模型载入、识别、上报等完整框架,并不固化整个流程,所以对AI盒子而已,算法、设备、场景都是灵活可以配置的,支持动态的模型添加功能,我们可以将训练好的模型动态添加到AI盒子中

    在这里插入图片描述

    模型可以包括所有分析,哪些是属于正常的(不会报警的),哪些属于异常的(显示红框,可能要报警的)

    在这里插入图片描述

    (8)系统配置

    AI盒子支持系统配置,配置项目包括AI盒子编码(推送到第三方,区别多个盒子)、报警保存天数、并发分析数量控制、识别后是否显示标签、是否支持报警联动录像、是否凌晨自动重启、报警第三方平台介绍地址、长连接控制地址。

    在这里插入图片描述

    (9)GPIO及modbus联动

    此外,AI盒子支持自身引脚联动或t通过外接modbus协议进行对接,以下为AI盒子定制的一个仓库物料领取场景:

    A、人通过人脸机刷脸开门进入物料仓库(人脸机器联动门磁)

    B、门禁从常开(1状态)变为断点开门状态(0),AI盒子检测到人员进入,开始联动摄像机进行录像;

    C、人员进入仓库后领取物料,拿去完成之后将物料放置到物料台,然后按墙壁上的拍照IO开关;

    D、AI盒子检测到AI开关后,联动摄像机视频进行物料实物抓拍,可以抓图多张;

    E、人员拍照后拿取物料并离开物料仓库,门磁从开门(0)状态恢复为常闭状态(0);

    F、AI盒子检测到关门信号,停止联动录像,并将从门禁到出门的录像+物料拍照打包存储并发送给第三方平台;

    这样,一个人从进门领取物料开始录像==>物料拍照==>停止录像整个过程留影留像,保留了整个过程的证据。如果是非法闯入则自动通过AI预警进行报警录像,产生非法闯入预警,如果正常人脸刷脸进入则不会产生非法报警。

    源码预览

    (1)服务启动

    """===========导入安装的python库==========="""
    import sys
    from pathlib import Path
    from box.box import Box
    from utils.general import check_requirements
    # 获取当前文件路径
    FILE = Path(__file__).resolve()
    # 获取当前文件父目录-YOLOv5根目录
    ROOT = FILE.parents[0]
    # 获取绝对路径
    PARENT = ROOT
    # 将根目录添加到系统path中
    if str(ROOT) not in sys.path:
        sys.path.append(str(ROOT))
    # 程序启动入口
    if __name__ == '__main__':
        # 检查请求参数
        check_requirements(ROOT / 'requirements.txt', exclude=('tensorboard', 'thop'))
        # 创建AI盒子
        box = Box()
        # 初始化盒子
        box.init()
        # 启动AI盒子
        box.start()
        # 等待盒子退出
        box.join()
        # 停止AI盒子
        box.stop()
    

    (2)websocket协议

    # 报警推送实现
    class SocketService(threading.Thread):
        # 构造函数
        def __init__(self):
            # 重写父类方法
            threading.Thread.__init__(self)
            # 套接口
            self.ws = None
            self.connected = False
            self.do_run = True
            # 数据接收
            self.msg_thread = None
            # 视频数量
            self.count = 0
            # 视频回调
            self.video_back = None
            pass
        # 停止服务
        def stop(self):
            self.do_run = False
            # 关闭套接口
            if self.ws is not None:
                self.ws.close()
            # 等待接收退出
            if self.msg_thread is not None:
                if self.msg_thread.is_alive():
                    self.msg_thread.join()
            pass
        # 设置命令回调
        def set_video_back(self, call):
            self.video_back = call
        # 响应结果
        def send_result(self, url, cmdId, state, desc, data=None, file_path=None):
            try:
                if url is None or cmdId is None:
                    return
                # 请求字典
                dict_info = {}
                # 追加文件
                if file_path is not None:
                    # 读取文件内容
                    file = open(file_path, "rb")
                    name = os.path.basename(file_path)
                    dict_info['file'] = (name, file, 'application/octet-stream')
                # 追加数据
                if data is not None:
                    dict_info['data'] = data
                # multipart编码
                encoder = MultipartEncoder(
                    fields=dict_info
                )
                # 请求头部
                headers = {'Content-Type': encoder.content_type}
                # 发送到第三方
                result_url = (url + "?boxId=" + SystemConfig.ID + "&cmdId=" + cmdId +
                              "&state=" + str(state) + "&desc=" + desc)
                response = requests.post(result_url, json=data, headers=headers)
                if response.status_code != 200:
                    return False
                # 处理返回结果
                result = response.json()
                # 处理返回结果
                if result['error'] != 0:
                    error = result['error']
                    LOGGER.error(f'upload box {SystemConfig.ID} command result failed, reason: {error}')
                    return False
                # 返回成功结果
                return True
            except Exception as e:
                LOGGER.error(f'upload alarm error {e}')
            pass
        # 报警推送
        def run(self) -> None:
            # 开始时间
            last_time = datetime.datetime.now()
            while not Global.restart and self.do_run:
                try:
                    # 查询通信地址
                    result_url = SystemConfig.ALARM_URL
                    if SystemConfig.ALARM_URL is not None:
                        if not result_url.endswith("/"):
                            result_url += "/"
                        result_url += "result"
                    # 系统重启
                    if Global.restart:
                        break
                    # 连接服务器
                    if not self.connected:
                        if SystemConfig.SOCKET_URL is None or SystemConfig.SOCKET_URL == '':
                            time.sleep(1)
                            continue
                        try:
                            # 连接服务
                            self.ws = create_connection(SystemConfig.SOCKET_URL)
                            self.connected = self.ws.connected
                            # 连接成功
                            if self.ws.connected:
                                LOGGER.info(f'@connect websocket success: {SystemConfig.SOCKET_URL}')
                                # 连接信息
                                box = {
                                    'id': SystemConfig.ID,
                                    'ip': Global.ip,
                                    'port': Global.port,
                                    'cmd': 'connect'
                                }
                                # 发送连接
                                self.ws.send(json.dumps(box))
                                # 更新心跳时间
                                last_time = datetime.datetime.now()
                                # 同步盒子数据
                                self.sync_server_data()
                                pass
                        except Exception as e2:
                            LOGGER.error(f'websocket connect error:{e2}')
                            self.connected = False
                            pass
                        # 链接失败等待
                        if not self.connected:
                            time.sleep(3)
                            continue
    

    (3)系统配置

    # 系统全局配置
    import uuid
    from box.util import SystemUtil
    # 全局配置
    class Global:
        # 数据库配置
        dbIp = '127.0.0.1'
        dbPort = 3306
        dbUser = 'root'
        dbPwd = 'root'
        dbName = 'box3'
        # web配置
        ip = '127.0.0.1'
        port = 5700
        # 系统重启标志
        restart = False
        # 启用串口信号读取
        enableIO = False
        # linux-设备名
        LUX_IO_NAME = '/dev/ttyCH341USB0'
        # win-设备名
        WIN_IO_NAME = 'COM13'
        # 初始化
        def __init__(self):
            pass
        # 获取本机ip
        @staticmethod
        def local_ip():
            try:
                if Global.ip == '127.0.0.1':
                    Global.ip = SystemUtil.get_local_ip()
            except Exception as e:
                print(f'get local ip error {e}')
    # 系统配置
    class SystemConfig:
        # 盒子当前编码
        ID = str(uuid.uuid1()).replace("-", "")
        # 系统登录账号
        ADMIN = 'admin'
        # 系统账号密码
        PASSWORD = 'dd123456'
        # 系统并发分析数
        PATROL_NUM = 4
        # 报警保存天数
        SAVE_DAYS = 1
        # 是否显示标签
        SHOW_LABEL = 1
        # 是否报警录像
        ALARM_RECORD = 1
        # 是否定时重启
        AUTO_RESTART = 1
        # 报警推送地址
        ALARM_URL = ''
        # 长连接地址
        SOCKET_URL = ''
    

    (4)文件清理

    import threading
    import os
    import time
    from threading import Lock
    from box.config import Global
    from utils.general import LOGGER
    # 文件异步清理
    class FileClear(threading.Thread):
        # 待清理文件列表
        files = []
        # 全局文件列表锁
        lock = Lock()
        # 构造函数
        def __init__(self):
            # 重写父类方法
            threading.Thread.__init__(self)
            pass
        # 添加清理文件
        @classmethod
        def push(cls, file):
            if file is None:
                return
            cls.lock.acquire()
            try:
                cls.files.append(file)
            except Exception as e:
                LOGGER.warning(f'add clear file error {e}')
            finally:
                cls.lock.release()
            pass
        # 报警推送
        def run(self) -> None:
            # 系统未重启
            while not Global.restart:
                try:
                    # 系统重启
                    if Global.restart:
                        break
                    # 尝试清理
                    try:
                        # 获取文件大锁
                        self.lock.acquire()
                        # 当前系统无任务
                        if len(self.files) 
微信扫一扫加客服

微信扫一扫加客服

点击启动AI问答
Draggable Icon