少儿AI智能STEAM积木平台的最终实现
在之前的学习研究过程中,已经完成了少儿AI智能STEAM积木平台的一些技术准备工作,包括:
- 外设操作,例如I2C读取温湿度传感器信息
- 语音识别
- 语音合成
- 大模型调用
现在,就结合上述各项功能,来实现一个基础的少儿AI智能STEAM积木平台。
一、硬件准备
![]()
因为目前处于原型构建,所以暂时使用数据线,直接连接到了开发板的I2C接口。
每个不同的外设功能模块,放置于积木块中。
后续会使用专用接口,方便连接:
![]()
二、核心逻辑
少儿AI智能STEAM积木平台的核心逻辑为:
- 前期拼接:指手工操作,将积木块连接到开发板。
- 语音监听:监听使用者的说话声音
- 云端意图理解:发送语音数据到大模型平台,以理解使用者的意图
- 端侧功能执行:根据大模型的返回,进行端侧具体功能的执行
- 语音播放:播放语音反馈结果
三、代码编写
结合之前的各项研究学习,综合而成的代码如下:
- import time
- import sys
- sys.path.append("./lib")
- import pyaudio
- import dashscope
- from dashscope.audio.asr import (Recognition, RecognitionCallback,
- RecognitionResult)
- from dashscope.api_entities.dashscope_response import SpeechSynthesisResponse
- from dashscope.audio.tts import ResultCallback, SpeechSynthesizer, SpeechSynthesisResult
- from http import HTTPStatus
- from lib import sht30
- import atexit
- dashscope.api_key=dashscope.common.api_key.get_default_api_key()
- mic = None
- stream = None
- class TTS_Callback(ResultCallback):
- _player = None
- _stream = None
- def on_open(self):
- global in_play
- in_play = True
- print("开始播放")
- self._player = pyaudio.PyAudio()
- self._stream = self._player.open(
- format=pyaudio.paInt16,
- channels=1,
- rate=48000,
- output=True)
- def on_complete(self):
- global in_play
- in_play = False
- print('播放结束')
- def on_error(self, response: SpeechSynthesisResponse):
- global in_play
- in_play = False
- print('播放错误: %s' % (str(response)))
- def on_close(self):
- global in_play
- in_play = False
- print('播放完毕')
- self._stream.stop_stream()
- self._stream.close()
- self._player.terminate()
- def on_event(self, result: SpeechSynthesisResult):
- if result.get_audio_frame() is not None:
- # print('audio result length:', sys.getsizeof(result.get_audio_frame()))
- self._stream.write(result.get_audio_frame())
- if result.get_timestamp() is not None:
- print('timestamp result:', str(result.get_timestamp()))
- def call_with_messages(prompt):
- system_desc = '''
- 你是一位知识丰富的人,你的名字叫小兔,上知天文下懂地理,请用中文回答问题,切回答言简意赅,最多不超过30个字。
- 如果问当前温度,则直接返回:检测环境温度
- 如果问当前湿度,则直接返回:检测环境湿度
- '''
- messages = [{'role': 'system', 'content': system_desc},
- {'role': 'user', 'content': prompt}]
- response = dashscope.Generation.call(
- dashscope.Generation.Models.qwen_turbo,
- messages=messages,
- result_format='message', # set the result to be "message" format.
- )
- if response.status_code == HTTPStatus.OK:
- resp_body = response['output']['choices'][0]['message']['content']
- print("AI大模型返回:", resp_body)
- if resp_body and len(resp_body)>0:
- if resp_body.startswith("检测环境温度"):
- temperature, humidity = sensor.measure()
- resp_body = "当前温度为:%0.1d 度" % temperature
- elif resp_body.startswith("检测环境湿度"):
- temperature, humidity = sensor.measure()
- resp_body = "当前湿度为:百分之 %d" % humidity
- print("调用语音服务:%s" % resp_body)
- in_play = True
- SpeechSynthesizer.call(model='sambert-zhigui-v1',
- text=resp_body,
- sample_rate=48000,
- format='pcm',
- callback=tts_callback)
- else:
- print('Request id: %s, Status code: %s, error code: %s, error message: %s' % (
- response.request_id, response.status_code,
- response.code, response.message
- ))
- class ASR_Callback(RecognitionCallback):
- def on_open(self) -> None:
- global mic
- global stream
- print('RecognitionCallback open.')
- mic = pyaudio.PyAudio()
- stream = mic.open(format=pyaudio.paInt16,
- channels=1,
- rate=16000,
- input=True)
- def on_close(self) -> None:
- global mic
- global stream
- print('RecognitionCallback close.')
- stream.stop_stream()
- stream.close()
- mic.terminate()
- stream = None
- mic = None
- def on_event(self, result: RecognitionResult) -> None:
- global prompt, get_times, in_chat
- # print('RecognitionCallback sentence: ', result.get_sentence())
- response = result.get_sentence()
- print("识别结果:", response['text'])
- if response['text'].startswith('小兔'):
- # 从 response 中获取识别结果,执行对话逻辑
- get_times = time.time()
- prompt = response['text']
- if in_chat == False:
- in_chat = True
- print("开始对话")
- tts_callback = TTS_Callback()
- asr_callback = ASR_Callback()
- sensor = sht30.SHT30()
- recognition = Recognition(model='paraformer-realtime-v1',
- format='pcm',
- sample_rate=16000,
- callback=asr_callback)
- @atexit.register
- def clean():
- global recognition
- recognition.stop()
- sys.exit()
- while True:
- recognition.start()
- prompt = ""
- send_times = 0
- get_times = 0
- in_chat = False
- in_play = False
- while True:
- if stream:
- if in_play:
- continue
- # print("发送流数据进行识别:")
- send_times = time.time()
- data = stream.read(3200, exception_on_overflow = False)
- recognition.send_audio_frame(data)
- # print(send_times, get_times)
- if get_times>0 and send_times - get_times > 0.5:
- # 连续五次没有识别,自动退出
- break
- else:
- break
- recognition.stop()
- if len(prompt)>5:
- print("当前识别内容:", prompt)
- in_play = True
- SpeechSynthesizer.call(model='sambert-zhigui-v1',
- text="你稍等",
- sample_rate=48000,
- format='pcm',
- callback=tts_callback)
- while in_play:
- pass
- call_with_messages(prompt)
复制代码
上述代码的逻辑较为简单,主要就是之前的大模型调用返回时,做了一些简单的判断处理:
![]()
四、演示效果
https://player.bilibili.com/player.html?aid=1502058546&bvid=BV1vD42177Ui&cid=1480345427&p=1
上面的演示,整体效果,还是非常不错的。
|