飞腾派
直播中

jianyi

1年用户 15经验值
擅长:Pytorch
私信 关注

【飞腾派4G版免费试用】测试飞腾派的开发部署及环境调试

上期后续

在上一篇文章中笔者提到我的128G朗科内存卡坏了送去维保,同时三星的64G内存卡无法点亮飞腾派也是在今天顺利解决了这个问题

这里也是感谢相当负责任的飞腾社区负责人@alwinlee @kevin2011 同为本次试用名单的朋友给予的建议@tianyingkeji,来自本版块的负责人的点赞支持@dianzi @Jonny,以及来自上一批板厂论坛的测试者,你们的宝贵经验为整个飞腾派社群避免了诸多弯绕,再次感谢!!!

(↓↓↓上批测试者微信私聊答疑的截图)
f18cdf5a762c25ff01601971da070db.jpg

(↓↓↓点赞诸位留言建议与鼓励的负责人)
image.png

总而言之,内存卡的容量与品牌选择需要着重注意一下,本次我重新购买的内存卡是来自 朗科的长江系列-32G ,倘如你遇到与我相同的问题(烧写镜像后无法点亮外围IO不通电),可以选择在京东上购买相同的型号

回归正文-测试飞腾派基础应用部署能力

工欲善其事,必先利其器:

笔者本次烧录的系统是PhytiumPIOS,更注重美观和开箱即用的可以尝试openkylin,但在飞腾派汇总的网盘资源中openkylin的镜像并非最新版本,在官网中可以找到最新的1.0.1版本,笔者所用如此( 请注意,对于开发工具而言并不能盲求最新,网盘总所整理的资源都是经过飞腾派社区测试通过的,如果在生产环境中部署,我仍然建议你从飞腾派网盘中获取镜像等资源

openKylin官网下载
1701865823027.png

选择并烧写好所需的镜像之后即可点亮开发板,在飞腾派这块板子上是没有GPU的因此在任何系统中都是mesa的llvmpipe也就是CPU渲染,因此系统的最高分辨率是1920x1080,也不建议各位手动添加更高的分辨率,这边笔者也是选择使用putty+winscp进行开发

让我们先来写一个PyQt的GUI简单例程:

首先咱们要先装个PyQt5的库。。。嗯?

image.png

很明显,在PhytiumPIOS的环境中,Python默认没有配置pip,这种事我相信大家应该很清楚如何解决,装一个嘛,我也就不过多解释,不知道怎么装的就把我下面给出的三行指令复制到终端就好

curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py

image.png

sudo apt-get install python3-distutils

image.png

python get-pip.py

image.png

如果你的系统是Ubuntu,那么恭喜你,系统中已经将Python的大多数环境配置完整了,并且QtBase环境也是如此只需要在系统中部署你开发好的应用就能直接上线

image.png

简单应用测试

既然环境已经有了,先来玩个游戏吧,比如说咱们打个飞机?

这是一个通过PyGame实现的外星人入侵的简单游戏,可以通过键盘来控制飞机的左右躲避,运行的效果还是相当可以的

2eb5732d3ce7c45c8b1fe8ff59da9d6.jpg

俗话说得好我都花钱买游戏了或者我都花时间做游戏了,我再花时间去玩游戏那我不是亏麻了?你已经是一个成熟的小游戏了该学会自己陪自己玩了,我不想动给它创建一个随机数让它自己动:

def think(self):
    r = random.random()
    if r < 0.20:
        self.set_move_direction(1)
    elif r < 0.40:
        self.set_move_direction(-1)
    else:
        self.set_move_direction(0)
def move(self):
    if self.move_direction == 0:
        return
    center = self.rect.centerx
    if self.move_direction == 1: # move right
        center += self.ship_speed_factor
    elif self.move_direction == -1:
        center -= self.ship_speed_factor
    self.rect.centerx = center

emmm,可是这样它一点都不成熟,动的太蠢了,一下子就死了,下饭菜也要有点油水是吧,给它增点智慧

先写一个简单的神经网络:

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(2, 64)
        self.fc2 = nn.Linear(64, 3)

    def forward(self, x):
        x = torch.cat((x[:, 0].view(-1, 1), x[:, 1].view(-1, 1)), dim=1)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

再改变一下它的思考模式:

def think(self):
        with torch.no_grad():
            enemy_tensor = torch.tensor(self.inputs[1]).float().view(1, -1)
            plane_tensor = torch.tensor(self.inputs[0]).float().view(1, -1)
            outputs = self.model(torch.cat((enemy_tensor, plane_tensor), 1))
            _, predicted = torch.max(outputs, 1)
            self.nn_out = predicted.item() 
            return predicted.item()

那么数据集应该怎么搞呢?既然要懒,那就懒到底,让程序自动生成一批:

# 生成训练数据
def generate_data(data_size):
    # 生成随机敌人和飞机的x坐标
    enemy_positions = np.random.rand(data_size)
    plane_positions = np.random.rand(data_size)
    
    # 标注标签
    labels = []
    for i in range(data_size):
        if abs(enemy_positions[i] - 0.2) <= plane_positions[i] <= abs(enemy_positions[i] + 0.2): #如果飞机在敌机撞击范围内
            if 0.2 <= abs(enemy_positions[i] - 0.2) <= 0.8: #如果敌机在边缘范围之外
                # Condition 1
                if abs(plane_positions[i] - (enemy_positions[i] - 0.2)) < abs(plane_positions[i] - (enemy_positions[i] + 0.2)): #如果飞机向左飞更快离开撞击区
                    labels.append(1) #向左飞行
                else:
                    labels.append(2) #向右飞行
            elif abs(enemy_positions[i] - 0.2) <= 0.2: #或者敌机在左侧边缘之内
                # Condition 2
                labels.append(2) #向右飞行
            elif abs(enemy_positions[i] + 0.2) >= 0.8:
                # Condition 3
                labels.append(1) #向左飞行
            else:
                # Default case
                print(enemy_positions[i] , plane_positions[i])
                labels.append(0) #原地不动
        else:
            if abs(plane_positions[i] - 0) < abs(plane_positions[i] - 1): #如果飞机坐标-0 小于飞机坐标-1
                # Condition 4
                labels.append(2) #向右飞行
            elif abs(plane_positions[i] - 0) > abs(plane_positions[i] - 1): #或者如果飞机坐标-0 大于飞机坐标-1
                # Condition 5
                labels.append(1) #向左飞行
            else:
                # Condition 6
                labels.append(0) #原地不动

这下应该不蠢了吧,看看效果:

ezgif.com-video-to-gif.gif

相当聪明啊,出道即巅峰

当然,咱们也不能纵情于娱不是?是时候来点工作了,来一套视觉检测的应用如何?

9d0c259f63e22458ba12871f7944536.jpg

这是一个基于Yolov5 + PyQt实现的钢铁缺陷检测系统,实现也简单,拉取Yolov5的代码库,准备好数据集用Yolov5的训练脚本重新跑一遍,同时用QtC做一个UI再用Python实现基本逻辑调用训练好的模型即可

推理Demo:

import sys
from PyQt5.QtGui import QPixmap
from PyQt5.QtWidgets import QApplication, QMainWindow, QFileDialog
from PyQt5.QtCore import QTimer, QTime, Qt, QThread, pyqtSignal
from PyQt5.uic import loadUi
import os
import torch
import cv2 as cv

# 加载本地预训练模型
model = torch.hub.load('./', 'custom', path='best.pt', source='local')

class ImageProcessingThread(QThread):
    update_progress = pyqtSignal(int)
    update_list_widget = pyqtSignal(list)

    def __init__(self, input_path, output_path):
        super().__init__()
        self.input_path = input_path
        self.output_path = output_path

    def run(self):
        # 获取文件夹内的所有文件名
        file_names = os.listdir(self.input_path)
        total_files = len(file_names)

        # 遍历每个文件名
        for i, file_name in enumerate(file_names):
            # 拼接完整的图片路径
            img_path = os.path.join(self.input_path, file_name)

            # 调用模型,得到结果
            results = model(img_path)

            # 渲染结果
            frame = results.render()[0]

            # 转换颜色空间
            bgr = cv.cvtColor(frame, cv.COLOR_RGB2BGR)

            # 拼接完整的保存路径
            save_file = os.path.join(self.output_path, file_name)

            # 保存图片
            cv.imwrite(save_file, bgr)

            # 更新进度条
            progress = int((i + 1) / total_files * 100)
            self.update_progress.emit(progress)

        # 更新listWidget
        self.update_list_widget.emit(file_names)

class MyMainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        loadUi("mainwindow.ui", self)  # 加载UI文件
        self.timer = QTimer(self)
        self.timer.timeout.connect(self.update_time)
        self.timer.start(1000)

        self.Start.clicked.connect(self.start_processing)
        self.listWidget.itemClicked.connect(self.show_selected_image)

    def update_time(self):
        # 获取当前时间,并转换为00:00的格式
        current_time = QTime.currentTime()
        time_string = current_time.toString("hh:mm")

        # 设置Time标签的文本为当前时间
        self.Time.setText(time_string)

    def start_processing(self):
        # 获取输入和输出路径
        input_path = self.InputPath.text()
        output_path = self.OutputPath.text()

        # 创建并启动后台处理线程
        self.processing_thread = ImageProcessingThread(input_path, output_path)
        self.processing_thread.update_progress.connect(self.update_progress_bar)
        self.processing_thread.update_list_widget.connect(self.update_list_widget)
        self.processing_thread.start()

        # 禁用Start按钮
        self.Start.setEnabled(False)

    def update_progress_bar(self, value):
        # 更新进度条
        self.progressBar.setValue(value)

        # 当进度条到达100%时,恢复Start按钮状态
        if value == 100:
            self.Start.setEnabled(True)

    def update_list_widget(self, file_names):
        # 更新listWidget
        self.listWidget.clear()
        self.listWidget.addItems(file_names)

    def show_selected_image(self, item):
        # 获取选中的文件名
        selected_file = item.text()

        # 构造选中图片的路径
        selected_image_path = os.path.join(self.OutputPath.text(), selected_file)

        # 加载并在 ViewLabel QLabel 中显示图片
        pixmap = QPixmap(selected_image_path)
        self.ViewImg.setPixmap(pixmap)
        self.ViewImg.setScaledContents(True)

if __name__ == "__main__":
    app = QApplication(sys.argv)
    main_window = MyMainWindow()
    main_window.show()
    sys.exit(app.exec_())

那么推理速度如何呢?

c9e95b7f883909d93413e230946e1ff.jpg

可以看到,在不进行任何优化的情况下处理400张图片需要1180秒,平均每张图要3秒左右,这样的效率肯定是无法做到实时检测的,所以咱们还需要做一个最基本的处理-多进程优化

其实在我上面的代码中以及引入了一个QThread,这个是Qt所必须的多线程模块,否则在处理这种长时间的推理任务时会由于阻塞导致UI崩溃,这也是Python GIL需要处理的一个很复杂的内容

那么现在我所说的多进程优化是什么呢?其实就是针对单线程模型推理时有相当的系统资源其实是无法完全调用上的,这就会导致整体系统资源浪费和处理时间过长的问题,这样的程序对嵌入式设备来说是不合格的 (虽然也没人会在嵌入式设备中使用Python就是了)

我们从以下模块引入一些函数做后续调用

from concurrent.futures import ProcessPoolExecutor,  wait

这是 Python 中用于支持并发编程的模块。它提供了高级的接口,使用这个模块可以使得我们在多线程(ThreadPoolExecutor)和多进程(ProcessPoolExecutor)环境中进行并发编程变得更加方便

用法如下:

with ProcessPoolExecutor() as executor:
    futures = {file_name: executor.submit(process_image, self.input_path, self.output_path, file_name) for file_name in file_names}

    done, not_done = wait(futures.values(), timeout=None)

    for future in done:
        file_name = [name for name, f in futures.items() if f == future][0]
        future.result()
        progress = int((len(done) / total_files) * 100)
        self.update_progress.emit(progress)

这将创建多个图像处理进程,充分利用到系统中的现有资源(通常情况下该模块会自动根据CPU的线程数分配资源创建线程池),理论是如此,看看实际效果:

26e0bf80a8f722ea429a75b11bb10c0.jpg

我们可以看到,在启用了多进程处理之后,整体耗时缩短了一半,虽然还是远不及实时监测的门槛(最起码每秒10张),但是最起码咱们能看到不充分的利用存在过半的资源浪费

倘若我们将该代码通过C++进行优化复现,那么实时监测并非难事

最后再来跑一个人脸识别的例程,在这个例程中还是使用的PyQt进行实现,那么为何要演示这个例程呢,主要是我认为对于编程,很多东西比起最优的模板实现,一些奇淫技巧是非常有意思的,我手上并没有可以接入飞腾派的图像采集设备,那么我是如何实现人脸识别的呢?

53a060ce67c79f224a98da71c649f64.jpg

不知道有多少人用过上面这个APP,顾名思义这是一个将本地摄像头采集的数据以网络的形式在内网中转发

image.png

它可以相当方便的在我们缺少图像采集设备的时候临时顶用

实现效果:

52dd3da0bbd78bd7ceb867cd5df0c54.jpg

具体方案:
通过IP拉取视频流 -> opencv截取面部信息 -> MTCNN人脸相似度匹配

代码Demo:

import os
import sys
import cv2
from PyQt5.QtWidgets import QApplication, QMainWindow, QInputDialog
from mainwindow import Ui_MainWindow
from PyQt5.QtCore import QTime, QTimer, QThread, pyqtSignal, Qt
from PyQt5.QtGui import QImage, QPixmap
from PyQt5.uic import loadUi
from model import Backbone
import torch
from PIL import Image
from mtcnn import MTCNN
import matplotlib.pyplot as plt
from torchvision.transforms import Compose, ToTensor, Normalize
import numpy as np
import time
import threading

mtcnn = MTCNN()
device = 'cpu'
model = Backbone(num_layers=50, drop_ratio=0.6, mode='ir_se')
model.load_state_dict(torch.load('model_ir_se50.pth', map_location=torch.device(device)))
model.eval()
model.to(device)

face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

# 获取图像并处理的函数
def get_img(img, device):
    timestamp = str(int(time.time()))
    output_dir = './log'
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    image_name = f'I1_{timestamp}.png' if type(img) == np.ndarray else f'I2_{timestamp}.png'
    image_path = os.path.join(output_dir, image_name)

    if type(img) == np.ndarray:
        img = Image.fromarray(img.astype('uint8'))
    elif type(img) == str:
        img = Image.open(img)

    img.save(image_path)
    face = mtcnn.align(img)
    transforms = Compose([ToTensor(), Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])
    return transforms(face).to(device).unsqueeze(0)


# 从图像提取嵌入的函数
def get_embeddings_from_image(img, model, device):
    img = get_img(img, device)
    return model(img)[0]

# 从文件夹加载嵌入
def load_folder_embeddings(folder_path, model, device):
    embeddings = []
    image_paths = []

    for file in os.listdir(folder_path):
        if file.endswith('.jpg') or file.endswith('.png'):
            image_path = os.path.join(folder_path, file)
            emb = get_embeddings_from_image(image_path, model, device)
            embeddings.append(emb)
            image_paths.append(image_path)

    return embeddings, image_paths

# 计算嵌入之间的相似度
def calculate_similarity(emb1, emb2):
    return emb1.dot(emb2).item()

# 根据嵌入查找匹配的人员
def find_matching_person(input_emb, folder_path, model, device, threshold=0.9):
    folder_names = [name for name in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, name))]

    for name in folder_names:
        embeddings, image_paths = load_folder_embeddings(os.path.join(folder_path, name), model, device)
        for emb, path in zip(embeddings, image_paths):
            similarity = calculate_similarity(input_emb, emb)
            if similarity >= threshold:
                return True, name

    return True, "请联系管理员"

# 图像处理线程
class ImageProcessingThread(QThread):
    finished = pyqtSignal(list)  # 修正信号定义
    def __init__(self, img, model, device, threshold, folder_path):
        super(ImageProcessingThread, self).__init__()
        self.img = img
        self.model = model
        self.device = device
        self.threshold = threshold
        self.folder_path = folder_path

    def run(self):
        try:
            input_emb = get_embeddings_from_image(self.img, self.model, self.device)
        except Exception as e:
            print(f"在 run 中出现错误: {e}")
            signal = 1
            result = "请联系管理员"
            result = [signal, result]
            self.finished.emit(result)
            return
        signal, result = find_matching_person(input_emb, self.folder_path, self.model, self.device, self.threshold)
        result = [signal, result]
        self.finished.emit(result)
        
    

class MainWindow(QMainWindow, Ui_MainWindow):
    def __init__(self):
        super(MainWindow, self).__init__()
        self.setupUi(self)
        #loadUi('mainwindow.ui', self)  # 加载UI文件
        # 创建时间更新程序
        self.Systimer = QTimer(self)
        self.Systimer.timeout.connect(self.update_time)
        self.Systimer.start(1000)

        # 连接按钮点击事件到函数
        self.Setting.clicked.connect(self.showInputDialog)
        self.show()

        self.cap = None
        self.face_image = None
        self.is_processing = False

        # 初始化用于FPS计算的变量
        self.start_time = time.time()
        self.frame_count = 0

    def start_processing(self):
        threshold = 0.75
        folder_path = 'FaceData'
        # 创建并启动用于图像处理的 QThread
        self.image_thread = ImageProcessingThread(self.face_image, model, device, threshold, folder_path)
        self.image_thread.finished.connect(lambda result: self.set_UserName(result))  # 当线程完成时处理结果
        self.image_thread.start()

    def set_UserName(self, result):
        if result[0]:
            Result = "欢迎:" + result[1] if result[1] != "请联系管理员" else result[1]
            self.UserName.setText(Result)
            self.face_image = None
            self.is_processing = False
        else:
            self.UserName.setText(result[1])

    def showInputDialog(self):
        text, ok = QInputDialog.getText(self, '设置摄像头IP', '请输入IP摄像头的地址:')
        if ok:
            # 记录输入的内容到变量
            self.IP_Address = text
            self.cap = cv2.VideoCapture(self.IP_Address)
            self.timer = QTimer(self)
            self.timer.timeout.connect(self.update_frame)
            self.timer.start(50)  # Set your desired refresh rate in milliseconds

    def update_time(self):
        # 获取当前时间,并转换为00:00的格式
        current_time = QTime.currentTime()
        time_string = current_time.toString("hh:mm")

        # 设置Time标签的文本为当前时间
        self.TimeLabel.setText(time_string)

    def update_frame(self):
        ret, frame = self.cap.read()
        if ret:

            self.VideoView.setStyleSheet("background-color: rgb(102, 204, 255)")

            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
            
            # 在视频中检测人脸
            gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
            faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=6, minSize=(200, 200))
            
            # 如果检测到人脸
            if len(faces) > 0 and not self.is_processing: 
                # 更改VideoView的背景色为绿色
                self.VideoView.setStyleSheet("background-color: rgb(0, 255, 204)")
                
                # 在视频中标注人脸并裁剪人脸区域
                for (x, y, w, h) in faces:
                    cv2.rectangle(frame, (x, y), (x + w, y + h), (255, 255, 0), 2)  # 绘制黄色方框
                    self.face_image = frame[y:y + h, x:x + w]  # 裁剪人脸区域
                    self.is_processing = True
                    self.start_processing()
            elif len(faces) > 0 and self.is_processing:
                self.VideoView.setStyleSheet("background-color: rgb(0, 255, 204)")
                # 在视频中标注人脸并裁剪人脸区域
                for (x, y, w, h) in faces:
                    cv2.rectangle(frame, (x, y), (x + w, y + h), (255, 255, 0), 2)  # 绘制黄色方框
            else:
                self.set_UserName([False, "欢迎光临"])
                self.VideoView.setStyleSheet("background-color: rgb(102, 204, 255)")
            
            self.frame_count += 1
            if self.frame_count > 30:  # 每处理30帧计算一次FPS(可以调整)
                end_time = time.time()
                elapsed_time = end_time - self.start_time
                fps = self.frame_count / elapsed_time
                integer_fps = int(fps)
                self.FPSLabel.setText(f"FPS: {integer_fps}")
                self.start_time = time.time()
                self.frame_count = 0

            h, w, ch = frame.shape
            bytes_per_line = ch * w
            convert_to_qt_format = QImage(frame.data, w, h, bytes_per_line, QImage.Format_RGB888)
            pixmap = QPixmap.fromImage(convert_to_qt_format)
            self.VideoLabel.setPixmap(pixmap.scaled(self.VideoLabel.size(), Qt.KeepAspectRatio, Qt.SmoothTransformation))

    def closeEvent(self, event):
    # 停止线程并释放资源
        if hasattr(self, 'image_thread') and self.image_thread.isRunning():
            self.image_thread.finished.disconnect()  # 断开连接,确保线程安全结束
            self.image_thread.quit()  # 通知线程退出
            self.image_thread.wait()  # 等待线程退出
        if self.cap is not None and self.cap.isOpened():
            self.cap.release()  # 释放摄像头资源
        super(MainWindow, self).closeEvent(event)

if __name__ == '__main__':
    app = QApplication(sys.argv)
    window = MainWindow()
    sys.exit(app.exec_())

回帖(3)

alwinlee

2023-12-8 10:14:19
飞腾派大飞机第一人!哈哈
1 举报

kevin2011

2023-12-11 11:13:12
好文章!
举报

淡化浅出

2023-12-11 23:49:21
想问下楼主你的 openkylin 镜像能安装 docker 嘛?
2 举报
  • jianyi: 兴许是飞腾Pi版本的OpenKylin裁剪了一些必需库,你需要想办法补全或是你试试Ubuntu,docker此前我仅在鲲鹏服务机上使用过,对于飞腾Pi,其系统资源还是相对捉襟见肘,也就没有做
  • 淡化浅出 回复 jianyi: Ubuntu的是可以的。如果要补全它也没提示缺了啥。我就用着Ubuntu的了

更多回帖

发帖
×
20
完善资料,
赚取积分