随着物联网(IoT)和智能设备的快速发展,边缘计算技术已成为高效数据处理和服务交付的重要组成部分。当我们考虑利用边缘端设备进行实时监控时,一个常见的需求是通过摄像头捕捉视频,并在局域网内实现视频流的传输。这种设置不仅适用于家庭和小型企业的安全监控,也能满足远程教育、医疗监护等多个领域的需要。
面对局域网内的视频流传输挑战,有多种方法可以实现从摄像头到显示终端的数据传递,每种方法都有其特点和适用场景。本文将介绍一种基于TCP/IP协议栈和Socket编程的方法,这种方法因其稳定性和易用性而被广泛采用。
选择TCP/IP与Socket编程的理由:
接下来,我们将探讨如何利用这些技术构建一个简易的局域网视频流传输系统,包括具体的实现步骤和技术细节。您将看到,通过合理的设计和配置,即使是普通用户也能够在自己的环境中轻松搭建起一套实用的实时监控解决方案。如果您对视频流传输有着更高的要求,如低延迟、高清晰度或更复杂的安全特性,后续部分还会提及一些进阶技术和最佳实践建议。
TCP/IP(Transmission Control Protocol/Internet Protocol,传输控制协议/网际协议)是指能够在多个不同网络间实现信息传输的协议簇。TCP/IP协议不仅仅指的是TCP 和IP两个协议,而是指一个由FTP、SMTP、TCP、UDP、IP等协议构成的协议簇, 只是因为在TCP/IP协议中TCP协议和IP协议最具代表性,所以被称为TCP/IP协议。
TCP/IP传输协议是严格来说是一个四层的体系结构,应用层、传输层、网络层和数据链路层都包含其中。
| 层级名称 | 主要协议 | 功能描述 |
|---|---|---|
| 应用层 | Telnet, FTP, SMTP, HTTP, HTTPS, DNS 等 | 接收来自传输层的数据或按不同应用要求与方式将数据传输至传输层;提供用户接口和应用服务。 |
| 传输层 | TCP (传输控制协议), UDP (用户数据报协议) | 提供端到端的通信服务,确保数据可靠地从一台机器传输到另一台机器(TCP)或不保证顺序和可靠性但更快速的数据传输(UDP)。 |
| 网络层 | IP (网际协议), ICMP, IGMP | 负责网络中数据包的传送,包括路由选择、数据包转发等;ICMP用于报告错误并交换有限的控制消息;IGMP用于管理组播成员关系。 |
| 网络访问层/链路层 | ARP (地址解析协议), RARP, Ethernet, Wi-Fi | 提供链路管理和错误检测;处理对不同通信媒介有关的信息细节问题,如物理地址的解析(ARP),以及在局域网中直接传递数据帧。 |
TCP(传输控制协议)是TCP/IP模型传输层中最重要的协议之一,它提供了一种面向连接、可靠的字节流服务。这意味着在两个应用程序之间建立通信之前,必须先通过三次握手过程来建立一个连接;而当数据传输完成后,则需要通过四次挥手过程来断开这个连接。TCP确保了数据包按序到达,并且能够检测并重传丢失或损坏的数据包,从而保证了数据传输的可靠性。
TCP的主要特点:
TCP适用场景:
TCP工作流程概述
综上所述,TCP因其高可靠性和安全性,广泛应用于那些对数据完整性和顺序有严格要求的应用程序中。
UDP(用户数据报协议)是TCP/IP模型传输层的一个重要成员,它提供了一种无需建立连接即可发送和接收数据包的通信方式。与TCP不同的是,UDP不保证数据的顺序性和可靠性,也不进行流量控制或拥塞控制。这意味着使用UDP时,数据可能会丢失、重复或者乱序到达。
UDP的主要特点:
UDP适用场景:
| 特性 | TCP (Transmission Control Protocol) | UDP (User Datagram Protocol) |
|---|---|---|
| 连接类型 | 面向连接,需要三次握手建立连接 | 无连接,即发即用 |
| 可靠性 | 可靠的数据传输,确保数据按序完整到达 | 不可靠,不保证数据包会到达,可能丢失或乱序 |
| 速度 | 较慢,因为有握手过程和错误检查 | 更快,因为它没有这些额外的过程 |
| 流量控制 | 支持,通过滑动窗口机制 | 不支持 |
| 拥塞控制 | 支持 | 不支持 |
| 头部开销 | 较大,因为包含更多的控制信息 | 较小,只有必要的控制信息 |
| 用途 | 适合需要高可靠性的应用程序,如文件传输、电子邮件 | 适合对时间敏感但可容忍一定数据损失的应用,如视频流 |
Socket(套接字)是网络编程中的一个重要概念,它提供了一种跨进程通信的方式,使得不同计算机上的应用程序能够通过网络交换数据。在实现TCP/IP服务时,Socket扮演着至关重要的角色,它是程序员用来编写客户端和服务器端程序的接口。
Socket的基本特性:
AF_INET)或IPv6(AF_INET6)。SOCK_STREAM,流式套接字)或UDP(SOCK_DGRAM,数据报套接字)。使用Socket实现TCP/IP服务的步骤:
socket()函数来创建一个套接字对象,该对象将用于发送和接收数据。bind()函数将其套接字与特定的IP地址和端口号关联起来,以便其他客户端可以通过这些信息找到并连接到服务器。listen()函数开始监听来自客户端的连接请求。这一步骤会将套接字转换为被动模式,准备接受连接。accept()函数来接受这个连接,并返回一个新的套接字,专门用于与那个特定客户端之间的通信。connect()函数发起对服务器的连接请求,指定要连接的服务器IP地址和端口号。send()和recv()函数来进行数据的发送和接收。对于TCP来说,这意味着可以进行可靠的数据流传输。close()函数来关闭各自的套接字,释放资源。以上代码展示了如何使用Python内置的socket库来实现一个简单的TCP回显服务器和客户端。
我此次使用的设备是凌智视觉模块,但是该设备是没有带WiFi的,如果需要使用WiFi,需要外界一块WiFi模块。
下面我将搭建一个简易的服务器,用于通过TCP连接向客户端发送视频帧。它使用OpenCV库捕获图像,并将图像编码为JPEG格式后通过网络传输给客户端。此外,它还支持命令控制和视频流模式,允许客户端发送命令来控制视频帧的传输。
以下是代码的主要功能模块和逻辑:
导入必要的库:
cv2:来自lockzhiner_vision_module的OpenCV库,用于视频捕获和图像处理。Thread, Event:来自threading模块,用于创建线程和事件对象以同步线程间的操作。socket:用于网络通信。os, time:分别用于操作系统相关的功能和时间测量。定义辅助函数:
send_image(conn, frame):负责将一帧图像编码成JPEG格式并通过TCP连接发送出去。定义线程处理函数:
handle_client_send(conn, start_event, shutdown_event, confirm_event, streaming_event, cap):此函数在一个独立线程中运行,负责读取摄像头图像并根据事件状态决定是否发送图像。它还会计算和打印帧率。handle_client_receive(conn, addr, cap, start_event, shutdown_event, confirm_event, streaming_event):此函数也在一个独立线程中运行,负责接收来自客户端的命令并对这些命令作出响应,比如开始发送单个图像、确认接收到图像、关闭连接或切换视频流模式等。主程序逻辑:
HOST)和端口(PORT),并初始化了OpenCV的视频捕获对象(cap)。KeyboardInterrupt)时优雅地关闭所有资源。事件机制:
Event对象来协调不同线程之间的交互,确保了正确的顺序和状态管理。视频流模式:
import lockzhiner_vision_module.cv2 as cv2
from threading import Thread, Event
import socket
import os
import time
def send_image(conn, frame):
try:
# 使用imencode将图像编码为JPEG格式
ret, img_encode = cv2.imencode('.jpg', frame)
if not ret:
print("Failed to encode image")
return
# 假设img_encode已经是bytes类型,直接使用
data = img_encode
# 发送图像大小和图像数据
conn.sendall(len(data).to_bytes(4, byteorder='big'))
conn.sendall(data)
print("Image sent.")
except (ConnectionResetError, OSError) as e:
print(f"Error sending image: {e}")
def handle_client_send(conn, start_event, shutdown_event, confirm_event, streaming_event, cap):
frame_counter = 0
start_time = time.time()
while not shutdown_event.is_set():
if start_event.is_set() or streaming_event.is_set(): # 检查开始或流式事件是否被设置
ret, frame = cap.read()
if ret:
send_image(conn, frame)
frame_counter += 1
if not streaming_event.is_set(): # 如果不是流式传输,则等待客户端确认
confirm_event.wait() # 等待客户端确认
confirm_event.clear() # 清除确认事件
start_event.clear() # 图像发送完成并且已确认后清除事件
# 计算帧率
elapsed_time = time.time() - start_time
if elapsed_time >= 1: # 每一秒打印一次帧率
fps = frame_counter / elapsed_time
print(f"FPS: {fps}")
# 重置计数器和时间戳
frame_counter = 0
start_time = time.time()
# 接收线程处理函数,接收命令并根据命令执行操作
def handle_client_receive(conn, addr, cap, start_event, shutdown_event, confirm_event, streaming_event):
print(f"Connected by {addr}")
try:
while not shutdown_event.is_set():
try:
data = conn.recv(1024)
except ConnectionResetError:
print("Connection reset by peer.")
break
if not data or shutdown_event.is_set():
print("Connection closed.")
break
command = data.decode().strip()
print(f"Received command: {command}")
if command == '0':
ret, frame = cap.read()
if ret:
if streaming_event.is_set():
# 如果处于流式传输模式,则不需要设置start_event
pass
else:
start_event.set() # 设置开始事件以通知发送线程
elif command == '1':
confirm_event.set() # 设置确认事件
elif command == '2':
shutdown_event.set()
break
elif command == '3': # 开始视频流模式
streaming_event.set()
print("Video stream mode started.")
elif command == 'q': # 停止视频流模式
streaming_event.clear()
print("Video stream mode stopped.")
else:
conn.sendall("Unknown command".encode())
finally:
conn.close()
if __name__ == "__main__":
HOST = '172.32.0.144'
PORT = 6810
cap = cv2.VideoCapture()
# if not cap.isOpened():
if cap.open(0) is False:
print("Failed to open capture")
exit(1)
print("video is all ready")
start_event = Event()
shutdown_event = Event()
confirm_event = Event()
streaming_event = Event() # 新增流式传输事件
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((HOST, PORT))
server_socket.listen()
print("Server started, waiting for connections...")
try:
while not shutdown_event.is_set():
conn, addr = server_socket.accept()
client_send_thread = Thread(target=handle_client_send,
args=(conn, start_event, shutdown_event, confirm_event, streaming_event, cap))
client_receive_thread = Thread(target=handle_client_receive,
args=(conn, addr, cap, start_event, shutdown_event, confirm_event, streaming_event))
client_send_thread.start()
client_receive_thread.start()
except KeyboardInterrupt:
print("Server interrupted.")
finally:
cap.release()
server_socket.close()
print("Server stopped and resources released.")
客户端通过TCP连接接收图像帧,并根据命令执行不同操作的功能。以下是代码的详细解释:
客户端功能概述
接收图像:
receive_image(conn, timeout=5) 函数用于从服务器接收图像数据。主程序逻辑 (main 函数):
设置了服务器的IP地址(HOST)和端口(PORT),与服务端保持一致。
创建了一个TCP/IP套接字并尝试连接到服务器。
根据用户输入的不同命令来控制视频帧的请求和处理:
'0' 请求单张图像。'1' 不做任何操作(假设用作确认信号)。'2' 发送退出命令给服务器并终止程序。'3' 启动视频流模式,允许连续接收图像帧,并将这些帧显示出来,同时保存为本地视频文件。视频流模式:
q键停止视频流模式,并保存录制的视频文件。异常处理:
资源管理:
使用with语句确保即使出现异常,套接字也会被正确关闭。
**确保视频写入器在异常情况下也能够释放资源。
注意事项
命令同步:客户端发送命令后,通常需要等待服务器的响应。这里的实现假设服务器会在接收到命令后立即采取行动,因此客户端紧接着就会尝试接收图像数据。
超时设置:对于图像接收设置了超时参数,以防止程序在无响应的情况下卡住。
视频编码格式:选择了XVID作为视频编码格式,这是一个常见的选择,但并不是唯一可用的选项。可以根据需要更改。
视频保存:视频流模式下,视频会被保存到本地磁盘。每次开始新的视频流都会创建一个新的文件名,以避免覆盖旧文件。
代码
import socket
import cv2
import numpy as np
import time
def receive_image(conn, timeout=5):
conn.settimeout(timeout) # 设置接收超时
try:
data_length = int.from_bytes(conn.recv(4), byteorder='big')
image_data = b''
received = 0
while received < data_length:
packet = conn.recv(min(1024, data_length - received))
if not packet:
break
image_data += packet
received += len(packet)
# 将接收到的数据转换为NumPy数组,并使用OpenCV解码为图像
image_np = np.frombuffer(image_data, dtype=np.uint8)
frame = cv2.imdecode(image_np, cv2.IMREAD_COLOR)
return frame
except socket.timeout:
print("Timeout waiting for image from server.")
return None
except Exception as e:
print(f"Error receiving image: {e}")
return None
def main():
HOST = '172.32.0.144'
PORT = 6810 # 应与服务端使用的端口号相同
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.connect((HOST, PORT))
video_writer = None # 初始化视频写入器为None
while True:
command = input("Enter command (0 to request image, 1 to do nothing, 2 to exit, 3 for video stream): ")
s.sendall(command.encode())
if command == '2':
s.sendall(b'2') # 发送退出命令给服务器
break
elif command == '3':
print("Starting video stream mode. Press 'q' to stop.")
timestamp = int(time.time() * 1000) # 毫秒级时间戳
video_filename = f"video_stream_{timestamp}.avi"
fourcc = cv2.VideoWriter_fourcc(*'XVID') # 视频编码格式
video_writer = None # 重置视频写入器
# 请求第一帧以获取尺寸信息
s.sendall(b'0') # 请求图像帧
frame = receive_image(s, timeout=5) # 增加超时参数
if frame is None:
print("Failed to receive first image frame.")
continue
height, width, _ = frame.shape
video_writer = cv2.VideoWriter(video_filename, fourcc, 20.0, (width, height)) # 创建视频写入器
while True:
if video_writer is None:
break # 如果视频写入器未初始化,则跳出循环
frame = receive_image(s, timeout=1) # 对每一帧都设置较短的超时时间
if frame is None:
print("No new frame received. Waiting...")
time.sleep(1) # 等待一段时间再尝试重新请求
continue
cv2.imshow('Video Stream', frame)
video_writer.write(frame) # 将帧写入视频文件
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
print("Stopping video stream and saving video.")
s.sendall(b'q') # 发送停止信号给服务器
break
if video_writer is not None:
video_writer.release() # 关闭视频写入器
print(f"Video saved as {video_filename}")
cv2.destroyAllWindows()
elif command == '0':
frame = receive_image(s, timeout=5) # 增加超时参数
if frame is not None:
cv2.imshow('Received Frame', frame)
cv2.waitKey(3000) # 显示图像3秒后自动关闭窗口
cv2.destroyAllWindows()
else:
print("No image received.")
s.sendall(b'1') # 假设'1'是确认信号
except Exception as e:
print(f"Socket error occurred: {e}")
if video_writer is not None:
video_writer.release() # 确保异常情况下也释放视频写入器资源
if __name__ == "__main__":
main()

更多回帖