M2 Dock开发板的MaixPY开发环境,自身支持获取摄像头视频数据,并对外提供MJPEG图传。
前几天,研究了 米尔MYD-YT507开发板USB摄像头使用从入门到放弃,并成功实现了MJPEG推流图传。
于是,想着 M2 Dock 获取 MJPEG推流数据,接入到MaxiPY,显示到屏幕上,以便于进一步的处理。
通过官方的在线手册 MaixPy3 image 模块 - Sipeed Wiki 了解到,通常创建一个image对象,使用的是Image.open()来打开一个文件。
最简单的方法,可以在M2 Dock上,不间断的获取MJPEG的数据,并保存到临时文件,再使用 Image.open() 打开,然后再显示到屏幕:
with open(tmp_file, "wb") as binary_file:
binary_file.write(jpg)
img = image.open(tmp_file)
display.show(img)
上述代码中的jpg,即为获取的MJPEG数据。
但这种方法,还需要进过一次保存文件的中转,多了一步操作。
如果能够把获取的数据,直接给转换成maix需要的数据格式,那就方便了。
进一步查阅官方手册 MaixPy3 image 模块 - Sipeed Wiki,了解到maix的image对象,还支持如下的调用方式:
Image.load(data, [size = (240, 240) , [mode = "RGB"]])
在 python 对象中加载出一张图像,会将 python 对象的数据 copy 到 Image 对象内部,如将 tobytes 的二进制数据重新恢复成 Image对象。
date可以是PIL对象, image.Image() 对象,bytes对象,numpy 对象.
当data为bytes,numpy对象时,需要提供size和mode参数.
返回 Image 对象,以便您可以使用 . 表示法调用另一个方法。
而这里的data,可以是多种来源,例如PIL。
PIL可以通过JPG的bianry流数据,直接生成Image对象。
那么结合两者,就可以跳过文件保存再调用的步骤了:
bytes_stream = BytesIO(jpg)
pimg = Image.open(bytes_stream)
img = image.load(pimg)
display.show(img)
最终,经过反复尝试,实现了M2 Dock的MaxiPY,获取MJPEG推流数据并显示到屏幕。
完整的代码如下:
import numpy as np
import platform
if platform.uname().node == "sipeed":
from io import BytesIO
from PIL import Image
from maix import camera, mjpg, utils, display, image
else:
import cv2
READ_TYPE = "socket"
MJPEG_HOST = "192.168.2.207"
MJPEG_PORT = 8080
MJPEG_QUERY = "/?action=stream"
def img_data_show(jpg):
global img_bytes
global tmp_file
global is_sipeed
global BytesIO
global Image
global np
global image
global display
if is_sipeed:
if True:
bytes_stream = BytesIO(jpg)
pimg = Image.open(bytes_stream)
img = image.load(pimg)
display.show(img)
else:
with open(tmp_file, "wb") as binary_file:
binary_file.write(jpg)
img = image.open(tmp_file)
display.show(img)
else:
img = cv2.imdecode(np.frombuffer(
jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
cv2.imshow('i', img)
if cv2.waitKey(1) == 27:
exit(0)
def img_data_match(chunk):
global img_bytes
global tmp_file
global is_sipeed
global BytesIO
global Image
global np
global image
global display
global img_data_show
img_bytes += chunk
a = img_bytes.find(b'ÿØ')
b = img_bytes.find(b'ÿÙ')
if a != -1 and b != -1:
jpg = img_bytes[a:b+2]
img_bytes = img_bytes[b+2:]
img_data_show(jpg)
img_bytes = b''
tmp_file = "/tmp/test.jpg"
is_sipeed = platform.uname().node == "sipeed"
print("Connect to %s:%d with %s on %s" % (MJPEG_HOST, MJPEG_PORT, READ_TYPE, platform.uname().node))
if READ_TYPE == "url":
import requests
MJPEG_URL = "http://%s:%s%s" % (MJPEG_HOST, MJPEG_PORT, MJPEG_QUERY)
r = requests.get(MJPEG_URL, stream=True)
if(r.status_code == 200):
print("connect success!")
for chunk in r.iter_content(chunk_size=1024):
img_data_match(chunk)
else:
print("Received unexpected status code {}".format(r.status_code))
elif READ_TYPE == "socket":
import socket
client = socket.socket()
ret = client.connect((MJPEG_HOST, MJPEG_PORT))
request_url = "GET %s HTTP/1.1
Host:%s
Connection:Close
" % (
MJPEG_QUERY, MJPEG_HOST)
if(ret == -1):
print("connet error!")
exit(-1)
else:
print("connect success!")
client.send(request_url.encode())
chunk = client.recv(1024)
chunk = client.recv(1024)
while chunk:
img_data_match(chunk)
chunk = client.recv(1024)
上述代码中,包含如下的部分:
- 获取MJPEG数据时,可以试用python的request模块,或者使用socket模块,后者效率更高
- 获取到MJPEG数据后,自动分析其中的JPG数据帧,一旦检测到,则进行显示处理
- 显示处理部分,会自动区分是在电脑上,还是在M2 Dock上
- 如果是在电脑上,则使用cv2进行处理显示,如果需要退出可以按ESC按键
- 如果是在M2 Dock上,则使用
BytesIO
来 获取的数据转换为二进制流,然后提供给PIL生成进行处理生成Image对象,再提供给display模块显示。
上述源码,提供在 M2_Dock: M2 Dock学习研究与实例分享 (gitee.com)。
最终具体的呈现效果,可以查看附件的视频。