1 简介
本文基于CanMV K230 的环境监控系统的详细方案与实现流程,结合硬件选型、软件部署、算法优化和系统集成。系统架构如下所示。

2 系统功能
1、识别能力
• 识别速度:< 300 ms(含活体)
• 准确率:≥ 99.5 %
2、监控逻辑
• 硬件 NPU 人形/宠物/车辆检测,误报低
• 事件触发 → 本地 JPG 截图 + MQTT 告警 → 微信群通知
• 环境MQTT 上报记录(JSON:ID、时间、抓拍图 base64)
3、管理后台
• Web 端(Python+ Django)
• 与安防系统联动:安防信号输入→报警
• 支持 HTTPS Web 端、微信小程序、Home-Assistant 插件
• HTTP RESTful API(远程监控、参数配置)
3 硬件架构
1、计算平台
• CanMV K230开发板
• 供电:5 V/2 A Type-C
2、数据采集
• MIPI-CSI 摄像头
• AHT10 温度度传感器
3、网络与交互
• 100 M 以太网/WiFi
4 软件与算法栈
4.1 人体检测
from libs.PipeLine import PipeLine
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
from libs.Utils import *
import os,sys,ujson,gc,math
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import aicube
class PersonDetectionApp(AIBase):
def __init__(self,kmodel_path,model_input_size,labels,anchors,confidence_threshold=0.2,nms_threshold=0.5,nms_option=False,strides=[8,16,32],rgb888p_size=[224,224],display_size=[1920,1080],debug_mode=0):
super().__init__(kmodel_path,model_input_size,rgb888p_size,debug_mode)
self.kmodel_path=kmodel_path
self.model_input_size=model_input_size
self.labels=labels
self.anchors=anchors
self.strides=strides
self.confidence_threshold=confidence_threshold
self.nms_threshold=nms_threshold
self.nms_option=nms_option
self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
self.debug_mode=debug_mode
self.ai2d=Ai2d(debug_mode)
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)
def config_preprocess(self,input_image_size=None):
with ScopedTiming("set preprocess config",self.debug_mode > 0):
ai2d_input_size=input_image_size if input_image_size else self.rgb888p_size
top,bottom,left,right,_=center_pad_param(self.rgb888p_size,self.model_input_size)
self.ai2d.pad([0,0,0,0,top,bottom,left,right], 0, [0,0,0])
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义当前任务的后处理
def postprocess(self,results):
with ScopedTiming("postprocess",self.debug_mode > 0):
# 这里使用了aicube模型的后处理接口anchorbasedet_post_preocess
dets = aicube.anchorbasedet_post_process(results[0], results[1], results[2], self.model_input_size, self.rgb888p_size, self.strides, len(self.labels), self.confidence_threshold, self.nms_threshold, self.anchors, self.nms_option)
return dets
# 绘制结果
def draw_result(self,pl,dets):
with ScopedTiming("display_draw",self.debug_mode >0):
if dets:
pl.osd_img.clear()
for det_box in dets:
x1, y1, x2, y2 = det_box[2],det_box[3],det_box[4],det_box[5]
w = float(x2 - x1) * self.display_size[0] // self.rgb888p_size[0]
h = float(y2 - y1) * self.display_size[1] // self.rgb888p_size[1]
x1 = int(x1 * self.display_size[0] // self.rgb888p_size[0])
y1 = int(y1 * self.display_size[1] // self.rgb888p_size[1])
x2 = int(x2 * self.display_size[0] // self.rgb888p_size[0])
y2 = int(y2 * self.display_size[1] // self.rgb888p_size[1])
if (h<(0.1*self.display_size[0])):
continue
if (w<(0.25*self.display_size[0]) and ((x1<(0.03*self.display_size[0])) or (x2>(0.97*self.display_size[0])))):
continue
if (w<(0.15*self.display_size[0]) and ((x1<(0.01*self.display_size[0])) or (x2>(0.99*self.display_size[0])))):
continue
pl.osd_img.draw_rectangle(x1 , y1 , int(w) , int(h), color=(255, 0, 255, 0), thickness = 2)
pl.osd_img.draw_string_advanced( x1 , y1-50,32, " " + self.labels[det_box[0]] + " " + str(round(det_box[1],2)), color=(255,0, 255, 0))
else:
pl.osd_img.clear()
4.2 MQTTClient
import usocket as socket
import ustruct as struct
from ubinascii import hexlify
class MQTTException(Exception):
pass
class MQTTClient:
def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
ssl=False, ssl_params={}):
if port == 0:
port = 8883 if ssl else 1883
self.client_id = client_id
self.sock = None
self.server = server
self.port = port
self.ssl = ssl
self.ssl_params = ssl_params
self.pid = 0
self.cb = None
self.user = user
self.pswd = password
self.keepalive = keepalive
self.lw_topic = None
self.lw_msg = None
self.lw_qos = 0
self.lw_retain = False
def _send_str(self, s):
self.sock.write(struct.pack("!H", len(s)))
self.sock.write(s)
def _recv_len(self):
n = 0
sh = 0
while 1:
b = self.sock.read(1)[0]
n |= (b & 0x7f) << sh
if not b & 0x80:
return n
sh += 7
def set_callback(self, f):
self.cb = f
def set_last_will(self, topic, msg, retain=False, qos=0):
assert 0 <= qos <= 2
assert topic
self.lw_topic = topic
self.lw_msg = msg
self.lw_qos = qos
self.lw_retain = retain
def connect(self, clean_session=True):
self.sock = socket.socket()
addr = socket.getaddrinfo(self.server, self.port)[0][-1]
self.sock.connect(addr)
if self.ssl:
import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
premsg = bytearray(b"\x10\0\0\0\0\0")
msg = bytearray(b"\x04MQTT\x04\x02\0\0")
sz = 10 + 2 + len(self.client_id)
msg[6] = clean_session << 1
if self.user is not None:
sz += 2 + len(self.user) + 2 + len(self.pswd)
msg[6] |= 0xC0
if self.keepalive:
assert self.keepalive < 65536
msg[7] |= self.keepalive >> 8
msg[8] |= self.keepalive & 0x00FF
if self.lw_topic:
sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
msg[6] |= self.lw_retain << 5
i = 1
while sz > 0x7f:
premsg[i] = (sz & 0x7f) | 0x80
sz >>= 7
i += 1
premsg[i] = sz
self.sock.write(premsg, i + 2)
self.sock.write(msg)
self._send_str(self.client_id)
if self.lw_topic:
self._send_str(self.lw_topic)
self._send_str(self.lw_msg)
if self.user is not None:
self._send_str(self.user)
self._send_str(self.pswd)
resp = self.sock.read(4)
assert resp[0] == 0x20 and resp[1] == 0x02
if resp[3] != 0:
raise MQTTException(resp[3])
return resp[2] & 1
def disconnect(self):
self.sock.write(b"\xe0\0")
self.sock.close()
def ping(self):
self.sock.write(b"\xc0\0")
def publish(self, topic, msg, retain=False, qos=0):
pkt = bytearray(b"\x30\0\0\0")
pkt[0] |= qos << 1 | retain
sz = 2 + len(topic) + len(msg)
if qos > 0:
sz += 2
assert sz < 2097152
i = 1
while sz > 0x7f:
pkt[i] = (sz & 0x7f) | 0x80
sz >>= 7
i += 1
pkt[i] = sz
self.sock.write(pkt, i + 1)
self._send_str(topic)
if qos > 0:
self.pid += 1
pid = self.pid
struct.pack_into("!H", pkt, 0, pid)
self.sock.write(pkt, 2)
self.sock.write(msg)
if qos == 1:
while 1:
op = self.wait_msg()
if op == 0x40:
sz = self.sock.read(1)
assert sz == b"\x02"
rcv_pid = self.sock.read(2)
rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
if pid == rcv_pid:
return
elif qos == 2:
assert 0
def subscribe(self, topic, qos=0):
assert self.cb is not None, "Subscribe callback is not set"
pkt = bytearray(b"\x82\0\0\0")
self.pid += 1
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
self.sock.write(pkt)
self._send_str(topic)
self.sock.write(qos.to_bytes(1, "little"))
while 1:
op = self.wait_msg()
if op == 0x90:
resp = self.sock.read(4)
assert resp[1] == pkt[2] and resp[2] == pkt[3]
if resp[3] == 0x80:
raise MQTTException(resp[3])
return
def wait_msg(self):
res = self.sock.read(1)
self.sock.setblocking(True)
if res is None:
return None
if res == b"":
raise OSError(-1)
if res == b"\xd0":
sz = self.sock.read(1)[0]
assert sz == 0
return None
op = res[0]
if op & 0xf0 != 0x30:
return op
sz = self._recv_len()
topic_len = self.sock.read(2)
topic_len = (topic_len[0] << 8) | topic_len[1]
topic = self.sock.read(topic_len)
sz -= topic_len + 2
if op & 6:
pid = self.sock.read(2)
pid = pid[0] << 8 | pid[1]
sz -= 2
msg = self.sock.read(sz)
self.cb(topic, msg)
if op & 6 == 2:
pkt = bytearray(b"\x40\x02\0\0")
struct.pack_into("!H", pkt, 2, pid)
self.sock.write(pkt)
elif op & 6 == 4:
assert 0
def check_msg(self):
self.sock.setblocking(False)
return self.wait_msg()
import network,time
class MQTT:
def __init__(self, client_id, server, port):
self.client = MQTTClient(client_id, server, port)
self.client.connect()
def mqtt_pub(self, topic, message):
self.client.publish(topic, message)
4.3 AHT10
from machine import Pin
from machine import FPIOA, I2C
import time
AHT10_ADDR = 0x38
AHT10_TRIG_MEAS = 0xAC
class AHT10:
def __init__(self, hard_flag=True, i2c_bus=2, sda_pin=12, scl_pin=11, freq=400000):
if hard_flag:
fpioa = FPIOA()
fpioa.set_function(scl_pin, FPIOA.IIC2_SCL)
fpioa.set_function(sda_pin, FPIOA.IIC2_SDA)
self.i2c=I2C(i2c_bus, freq = freq)
print(self.i2c.scan())
else:
self.i2c = I2C(i2c_bus, scl=scl_pin, sda=sda_pin, freq=freq)
print(self.i2c.scan())
def aht10_get_data(self):
self.i2c.writeto(AHT10_ADDR, bytearray([AHT10_TRIG_MEAS])) # AHT10_TRIG_MEAS
timeout = 100
while timeout > 0:
status = self.i2c.readfrom(AHT10_ADDR, True)[0]
if not (status & 0x80): # 检查忙标志
break
time.sleep_ms(10)
timeout -= 1
if timeout <= 0:
raise OSError("AHT10 测量超时")
data = self.i2c.readfrom(AHT10_ADDR, 6)
hum_raw = ((data[1] << 16) | (data[2] << 8) | data[3]) >> 4
temp_raw = ((data[3] & 0x0F) << 16) | (data[4] << 8) | data[5]
humidity = (hum_raw / (2**20)) * 100
temperature = (temp_raw / (2**20)) * 200 - 50
return temperature, humidity
def run(self):
while True:
temp, hum = self.aht10_get_data()
print(f"温度: {temp:.1f}°C, 湿度: {hum:.1f}%")
time.sleep(2) # AHT10最小测量间隔0.5秒
4.4 WiFi
import network,time
from machine import Pin
class WIFI:
def __init__(self, state_pin=52, ssid='Wifi_2G', passwork='12345678'):
self.wifi_led = Pin(state_pin, Pin.OUT)
self.wlan = network.WLAN(network.STA_IF)
self.ssid = ssid
self.passwork = passwork
def wifi_status(self):
return self.wlan.isconnected()
def wifi_connect(self):
self.wlan.active(True)
if not self.wifi_status():
print('connecting to network...')
for i in range(3):
self.wlan.connect(self.ssid, self.passwork)
if self.wlan.isconnected():
break
if self.wifi_status():
print('connect success')
self.wifi_led.value(1)
while self.wlan.ifconfig()[0] == '0.0.0.0':
pass
print('network information:', self.wlan.ifconfig())
else:
for i in range(10):
self.wifi_led.value(1)
time.sleep_ms(300)
self.wifi_led.value(0)
time.sleep_ms(300)
核心逻辑如下:
import _thread
from libs.PipeLine import PipeLine
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
from libs.Utils import *
import nncase_runtime as nn
import ulab.numpy as np
import aidemo
from media.display import *
from media.media import *
from media.sensor import *
import time, os, sys, gc
import lvgl as lv
from machine import TOUCH
from machine import RTC
WIFI_SSID = 'Wifi_2G'
PASSWORK = '12345678'
SERVER = 'mq.tongxinmao.com'
PORT = 18830
CLIENT_ID = 'CanMV-K230'
TOPIC_AHT10 = 'canmv/sensor/aht10'
TOPIC_CAMERA = 'canmv/sensor/camera'
DISPLAY_WIDTH = ALIGN_UP(800, 16)
DISPLAY_HEIGHT = 480
sensor = None
rgb888p_size=[1280,720]
display_size = [800, 480]
lock = _thread.allocate_lock()
def media_init():
global sensor,osd_img,rgb888p_size,display_size,face_osd_img,yolo_osd_img
Display.init(Display.ST7701, width = DISPLAY_WIDTH, height = DISPLAY_HEIGHT, to_ide = True, osd_num=3)
sensor = Sensor(fps=30)
sensor.reset()
sensor.set_framesize(w = 800, h = 480,chn=CAM_CHN_ID_0)
sensor.set_pixformat(Sensor.YUV420SP)
sensor.set_framesize(w = rgb888p_size[0], h = rgb888p_size[1], chn=CAM_CHN_ID_2)
sensor.set_pixformat(Sensor.RGBP888, chn=CAM_CHN_ID_2)
sensor_bind_info = sensor.bind_info(x = 0, y = 0, chn = CAM_CHN_ID_0)
Display.bind_layer(**sensor_bind_info, layer = Display.LAYER_VIDEO1)
face_osd_img = image.Image(display_size[0], display_size[1], image.ARGB8888)
yolo_osd_img = image.Image(display_size[0], display_size[1], image.ARGB8888)
MediaManager.init()
sensor.run()
def media_deinit():
global sensor
os.exitpoint(os.EXITPOINT_ENABLE_SLEEP)
sensor.stop()
Display.deinit()
time.sleep_ms(50)
MediaManager.deinit()
def save_img(img):
if img.format() == image.YUV420:
suffix = "yuv420sp"
elif img.format() == image.RGB888:
suffix = "rgb888"
elif img.format() == image.RGBP888:
suffix = "rgb888p"
else:
suffix = "unkown"
filename = f"/sdcard/camera_{img.width()}x{img.height()}.{suffix}"
print("save capture image to file:", filename)
img.save(filename)
return filename
def wifi_thread(arg):
wf = WIFI(52, WIFI_SSID, PASSWORK)
wf.wifi_connect()
retry = 0
count = 0
while True:
count = count+1
print("wifi thread, {} ".format(count))
with lock:
if not wf.wifi_status():
retry = retry+1
wf.wifi_connect()
print("重新连接,重连次数 {} 次".format(retry))
time.sleep(15)
def sensor_thread(arg):
count = 0
display_mode="hdmi"
rgb888p_size = [1280, 720]
kmodel_path="/sdcard/examples/kmodel/person_detect_yolov5n.kmodel"
confidence_threshold = 0.2
nms_threshold = 0.6
labels = ["person"]
anchors = [10, 13, 16, 30, 33, 23, 30, 61, 62, 45, 59, 119, 116, 90, 156, 198, 373, 326]
person_det=PersonDetectionApp(kmodel_path,model_input_size=[640,640],labels=labels,anchors=anchors,confidence_threshold=confidence_threshold,nms_threshold=nms_threshold,nms_option=False,strides=[8,16,32],rgb888p_size=rgb888p_size,display_size=display_size,debug_mode=0)
person_det.config_preprocess()
mqtt = MQTT(CLIENT_ID, SERVER, PORT)
ah = AHT10(True, 2, 12, 11, 400 * 1000)
while True:
time.sleep(1)
count = count+1
print("sensor thread, {} ".format(count))
img = sensor.snapshot(chn = CAM_CHN_ID_2)
img_np =img.to_numpy_ref()
with lock:
res=person_det.run(img_np)
if res:
length = len(res)
print(f"Number of human bodies: {length}, {res}")
print(f'监控检测到人体,请及时查看监控!')
mqtt.mqtt_pub(TOPIC_CAMERA, f'{{"Number":{length}}}')
if ((count % 10) == 0):
temp, hum = ah.aht10_get_data()
print(f"温度: {temp:.1f}°C, 湿度: {hum:.1f}%")
mqtt.mqtt_pub(TOPIC_AHT10, f'{{"temp":{temp}, "hum":{hum}}}')
if count > 65536 :
count = 0
gc.collect()
person_det.deinit()
def start_run():
media_init()
_thread.start_new_thread(wifi_thread, ("wifi_thread",))
_thread.start_new_thread(sensor_thread,("sensor_thread",))
while True:
time.sleep_ms(5)
if __name__ == '__main__':
start_run()
5 系统演示
订阅者演示:


开发板端调试信息:

