在本次评测中,我们在 Banana Pi K230D Zero 上启用了 RTSP 服务,结合板载摄像头模块,将视频数据压缩编码并推送到局域网中,通过 WiFi 与 PC 连接后,可使用如 VLC 等播放器观看实时视频流。
本实验展示了 K230D Zero 在多媒体视频采集、压缩编码、网络传输和多线程任务调度方面的能力,适合用于边缘视频处理、低功耗远程监控等场景。
media.sensor.Sensor
控制摄像头传感器。
方法:
reset()
:初始化摄像头。
set_framesize(width, height)
:设置图像大小。
set_pixformat(format)
:设置像素格式,如 Sensor.YUV420SP
。
run()
/ stop()
:启动或关闭视频流。
media.vencoder.Encoder
视频编码器,用于将摄像头采集的图像压缩为 H.264/H.265。
方法:SetOutBufs()
:设置编码输出缓冲区。
Create()
:创建编码通道。
Start()
/ Stop()
:启动/停止编码。
GetStream()
/ ReleaseStream()
:获取并释放一帧编码后的视频流。
multimedia.rtsp_server
RTSP 服务端类,支持 H.264/H.265 视频流推送。
方法:
rtspserver_init(port)
:初始化服务器端口。
rtspserver_createsession(name, type, enable_audio)
:创建 RTSP session。
rtspserver_start()
/ stop()
/ deinit()
:启动、停止、注销服务器。
rtspserver_sendvideodata()
:推送视频数据。
rtspserver_getrtspurl()
:获取完整的 RTSP 播放地址。
network.WLAN
控制 WiFi 网络连接。
类似 ESP32 的 network
模块。
常用方法:
connect(ssid, key)
:连接热点。
isconnected()
:返回连接状态。
disconnect()
:断开连接。
scan()
:扫描可用热点。
_thread
用于创建多线程(例如 RTSP 推流线程)。
ssid = '' # WIFI名
password = '' # 密码
nic = network.WLAN(network.STA_IF)
while not nic.isconnected():
nic.connect(ssid=ssid, key=password)
启用 STA 模式连接指定 WiFi。
建议增加超时机制以防死循环。
rtspserver = RtspServer()
rtspserver.start()
print("rtsp server start:", rtspserver.get_rtsp_url())
实例化 RtspServer
类。
自动调用:摄像头初始化;编码器设置;绑定摄像头与编码器;创建 RTSP Session;启动推流线程 _do_rtsp_stream
。
rtspserver.stop()
nic.disconnect()
关闭推流线程 → 停止编码器与摄像头 → 清理 buffer。安全退出,避免资源泄露。
from media.vencoder import *
from media.sensor import *
from media.media import *
import time, os
import _thread
import multimedia as mm
from time import *
import network
import gc
class RtspServer:
def __init__(self,session_name="test",port=8554,video_type = mm.multi_media_type.media_h264,enable_audio=False):
self.session_name = session_name # session name
self.video_type = video_type # 视频类型264/265
self.enable_audio = enable_audio # 是否启用音频
self.port = port #rtsp 端口号
self.rtspserver = mm.rtsp_server() # 实例化rtsp server
self.venc_chn = VENC_CHN_ID_0 #venc通道
self.start_stream = False #是否启动推流线程
self.runthread_over = False #推流线程是否结束
def start(self):
# 初始化推流
self._init_stream()
self.rtspserver.rtspserver_init(self.port)
# 创建session
self.rtspserver.rtspserver_createsession(self.session_name,self.video_type,self.enable_audio)
# 启动rtsp server
self.rtspserver.rtspserver_start()
self._start_stream()
# 启动推流线程
self.start_stream = True
_thread.start_new_thread(self._do_rtsp_stream,())
def stop(self):
if (self.start_stream == False):
return
# 等待推流线程退出
self.start_stream = False
while not self.runthread_over:
sleep(0.1)
self.runthread_over = False
# 停止推流
self._stop_stream()
self.rtspserver.rtspserver_stop()
#self.rtspserver.rtspserver_destroysession(self.session_name)
self.rtspserver.rtspserver_deinit()
def get_rtsp_url(self):
return self.rtspserver.rtspserver_getrtspurl(self.session_name)
def _init_stream(self):
width = 1280
height = 720
width = ALIGN_UP(width, 16)
# 初始化sensor
self.sensor = Sensor()
self.sensor.reset()
self.sensor.set_framesize(width = width, height = height, alignment=12)
self.sensor.set_pixformat(Sensor.YUV420SP)
# 实例化video encoder
self.encoder = Encoder()
self.encoder.SetOutBufs(self.venc_chn, 8, width, height)
# 绑定camera和venc
self.link = MediaManager.link(self.sensor.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, self.venc_chn))
# init media manager
MediaManager.init()
# 创建编码器
chnAttr = ChnAttrStr(self.encoder.PAYLOAD_TYPE_H264, self.encoder.H264_PROFILE_MAIN, width, height)
self.encoder.Create(self.venc_chn, chnAttr)
def _start_stream(self):
# 开始编码
self.encoder.Start(self.venc_chn)
# 启动camera
self.sensor.run()
def _stop_stream(self):
# 停止camera
self.sensor.stop()
# 接绑定camera和venc
del self.link
# 停止编码
self.encoder.Stop(self.venc_chn)
self.encoder.Destroy(self.venc_chn)
# 清理buffer
MediaManager.deinit()
def _do_rtsp_stream(self):
try:
streamData = StreamData()
while self.start_stream:
os.exitpoint()
# 获取一帧码流
self.encoder.GetStream(self.venc_chn, streamData)
# 推流
for pack_idx in range(0, streamData.pack_cnt):
stream_data = bytes(uctypes.bytearray_at(streamData.data[pack_idx], streamData.data_size[pack_idx]))
self.rtspserver.rtspserver_sendvideodata(self.session_name,stream_data, streamData.data_size[pack_idx],1000)
#print("stream size: ", streamData.data_size[pack_idx], "stream type: ", streamData.stream_type[pack_idx])
# 释放一帧码流
self.encoder.ReleaseStream(self.venc_chn, streamData)
except BaseException as e:
print(f"Exception {e}")
finally:
self.runthread_over = True
# 停止rtsp server
self.stop()
self.runthread_over = True
if __name__ == "__main__":
ssid = ''
password = ''
# 启用 STA 模式并连接到 WiFi 接入点
nic = network.WLAN(network.STA_IF)
print(nic.scan())
while nic.isconnected() == False :
nic.connect(ssid=ssid, key=password) # 尝试重连WIFI
os.exitpoint(os.EXITPOINT_ENABLE)
try:
# 创建rtsp server对象
rtspserver = RtspServer()
# 启动rtsp server
rtspserver.start()
# 打印rtsp url
print("rtsp server start:",rtspserver.get_rtsp_url())
# 推流 120s
sleep(120)
except Exception as e:
print(e)
finally:
# 停止rtsp server 断开WIFI
nic.disconnect()
rtspserver.stop()
print("stop")
终端打印如下:
在电脑端的OBS中,添加一个VLC视频源,填入IDE终端打印的地址rtsp://192.168.137.54:8554/test
。这里使用手机秒表进行推流延迟测试,大约有3秒的延时。
更多回帖