需要:
树莓派4B*1,4g内存至少(2G就不能推广到边缘计算了,8G其实最好,不过给学校省经费用的4G),32GB存储卡。
语音部分需要树莓派免驱扬声器,那种USB直连的。
视觉部分需要USB摄像头,最好也是免驱的。
语音部分
扬声器测试
在树莓派中新建项目文件夹,因为树莓派主要是下位机功能,所以我这里叫client_rsp
。
1 2 mkdir client_rspcd client_rsp
这里推荐从终端换成vscode的remote ssh连接,写代码比较方便。
配置环境:
1 2 sudo apt-get updatesudo apt-get install python3-pyaudio
创建虚拟环境:
1 2 python3 -m venv --system-site-packages venv source venv/bin/activate
简单将一个audio.wav
传到这个目录下,然后运行下面代码,可以测试扬声器是否正常工作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import pyaudioimport wavedef play_audio (filename ): with wave.open (filename, 'rb' ) as wf: p = pyaudio.PyAudio() stream = p.open (format =p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True ) chunk_size = 1024 data = wf.readframes(chunk_size) while data: stream.write(data) data = wf.readframes(chunk_size) stream.stop_stream() stream.close() p.terminate() play_audio('audio.wav' )
有声音就是正常。
树莓派音量调节
1 2 3 4 5 amixer get Master amixer set Master 50%
自定义语音输出
这里希望实现一个输入任意文字,能输出语音的接口,也就是语音合成。这个语音合成可以部署在树莓派或者服务器上,还是比较好部署的,但是我这里直接用了阿里云dashscope的api,一是比较方便,二是合成效果更好(模型好,更自然)。
获取api key在这里:API-KEY的获取
配置阿里云DashScope的SDK:
同时配置一下环境:
1 2 pip install paho-mqtt pip install pyyaml
和前面一样,我们在项目文件夹下新建一个config.yaml
,用来填写配置:
1 2 dashscope: api-key: "<api-key>"
下面代码实现了一个自定义语音输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import timeimport dashscopefrom dashscope.api_entities.dashscope_response import SpeechSynthesisResponsefrom dashscope.audio.tts import ResultCallback, SpeechSynthesizer, SpeechSynthesisResultimport pyaudioimport osimport sysimport yamlwith open ('config.yaml' , 'r' , encoding='utf-8' ) as file: config = yaml.safe_load(file) dashscope.api_key = config['dashscope' ]['api-key' ] sport_active = False class Callback (ResultCallback ): _player = None _stream = None def on_open (self ): print ('Speech synthesizer is opened.' ) current_timestamp = int (time.time()) self ._player = pyaudio.PyAudio() self ._stream = self ._player.open ( format =pyaudio.paInt16, channels=1 , rate=48000 , output=True ) def on_complete (self ): print ('Speech synthesizer is completed.' ) def on_error (self, response: SpeechSynthesisResponse ): print ('Speech synthesizer failed, response is %s' % (str (response))) def on_close (self ): print ('Speech synthesizer is closed.' ) self ._stream.stop_stream() self ._stream.close() self ._player.terminate() def on_event (self, result: SpeechSynthesisResult ): if result.get_audio_frame() is not None : print ('audio result length:' , sys.getsizeof(result.get_audio_frame())) self ._stream.write(result.get_audio_frame()) if result.get_timestamp() is not None : print ('timestamp result:' , str (result.get_timestamp())) callback = Callback() while True : message = input ("请输入要合成的文本:" ) if message: SpeechSynthesizer.call(model='sambert-zhiya-v1' , text=message, sample_rate=48000 , format ='pcm' , callback=callback) else : time.sleep(1 )
MQTT远程语音控制
自定义输入文字还不够,我们的项目是服务器得到运动评价后,将评语传递给树莓派,让它说话。因此要实现一个mqtt的远程语音控制。
配置文件config.yaml
中加上MQTT的信息:
1 2 3 4 5 6 7 dashscope: api-key: "<api-key>" model: "<model>" server: address: "<ip>" port: <port>
然后是运行的文件voice.py
,实现了通过mqtt远程发送后输出声音:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 import timeimport osimport sysimport yamlwith open ('config.yaml' , 'r' , encoding='utf-8' ) as file: config = yaml.safe_load(file) ''' 语音合成 ''' import dashscopefrom dashscope.api_entities.dashscope_response import SpeechSynthesisResponsefrom dashscope.audio.tts import ResultCallback, SpeechSynthesizer, SpeechSynthesisResultimport pyaudiodashscope.api_key = config['dashscope' ]['api-key' ] class Callback (ResultCallback ): _player = None _stream = None def on_open (self ): print ('Speech synthesizer is opened.' ) current_timestamp = int (time.time()) self ._player = pyaudio.PyAudio() self ._stream = self ._player.open ( format =pyaudio.paInt16, channels=1 , rate=48000 , output=True ) def on_complete (self ): print ('Speech synthesizer is completed.' ) def on_error (self, response: SpeechSynthesisResponse ): print ('Speech synthesizer failed, response is %s' % (str (response))) def on_close (self ): print ('Speech synthesizer is closed.' ) self ._stream.stop_stream() self ._stream.close() self ._player.terminate() def on_event (self, result: SpeechSynthesisResult ): if result.get_audio_frame() is not None : print ('audio result length:' , sys.getsizeof(result.get_audio_frame())) self ._stream.write(result.get_audio_frame()) if result.get_timestamp() is not None : print ('timestamp result:' , str (result.get_timestamp())) callback = Callback() ''' MQTT部分 ''' import paho.mqtt.client as mqttmqtt_server = config['server' ]['address' ] mqtt_port = config['server' ]['port' ] mqtt_user = "rsp_voicer" mqtt_topic = "rsp/voice" def on_connect (client, userdata, flags, rc ): print ("Connected with result code " +str (rc)) client.subscribe(mqtt_topic) client.subscribe("sport_cmd" ) def on_message (client, userdata, msg ): message = msg.payload.decode('utf-8' ) print (f"Message arrived on topic: {msg.topic} . Message: {message} " ) if msg.topic == mqtt_topic: SpeechSynthesizer.call(model=config['dashscope' ]['model' ], text=message, sample_rate=48000 , format ='pcm' , callback=callback) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.username_pw_set(mqtt_user) client.connect(mqtt_server, mqtt_port, 60 ) client.loop_forever()
基本上实现了这部分功能。
视觉
因为初始版本是openmv,并且为了适应低带宽像素调的很低,因此树莓派先实现了一个比较低级的版本。也就是像素进行很大程度的压缩,并且控制了采样率。
配置opencv:
1 sudo apt-get install python3-opencv
略了。
树莓派脚本开机自启动
在随便哪个路径下创建一个要开机执行的命令,假设是/<路径>/start.sh
。写入命令后,要记得给执行权限:
然后编辑这个文件:
在exit 0
前,输入:
这里意思是在pi用户下执行命令,主要是为了避免权限问题导致失败。