Last active
December 22, 2021 06:16
-
-
Save VincentK16/79776687d7fd683b11f9ed39b71f88cd to your computer and use it in GitHub Desktop.
Alpha Mini with IBM Watson Assistant + Robot Actions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# To run this code, make sure: | |
# 1. Have installed "alphamini" and "ibm-watson" Python modules | |
# 2. Changes to your IBM Watson Assistant parameters: | |
# a. API key | |
# b. Assistant URL | |
# c. Assistant ID | |
# 3. Alpha Mini serial number | |
# Follow through the code below to see where are the changes needed. | |
import asyncio | |
from typing import Counter | |
import mini.mini_sdk as MiniSdk | |
from mini.apis.api_observe import ObserveSpeechRecognise | |
from mini.apis.api_sound import StartPlayTTS | |
from mini.apis.api_action import PlayAction, PlayActionResponse | |
from mini.dns.dns_browser import WiFiDevice | |
from mini.apis.base_api import MiniApiResultType | |
from mini.pb2.codemao_speechrecognise_pb2 import SpeechRecogniseResponse | |
import json | |
from ibm_watson import AssistantV2 | |
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator | |
#from test.test_connect import test_connect, shutdown | |
#from test.test_connect import test_get_device_by_name, test_start_run_program | |
authenticator = IAMAuthenticator( | |
'2mLWS_yLl13ZgCy7WT_G6YFXJmCskbUUEb078vTIfZ2O') | |
assistant = AssistantV2( | |
version='2020-09-24', | |
authenticator=authenticator | |
) | |
assistant.set_service_url('https://api.us-south.assistant.watson.cloud.ibm.com/') | |
async def test_play_action(y): | |
# action_name: The action name can be obtained with GetActionList API: http://docs.ubtrobot.com/alphamini/python-sdk-en/mini.apis.api_action.html#mini.apis.api_action.GetActionList | |
block: PlayAction = PlayAction(action_name=y) | |
# response: PlayActionResponse | |
(resultType, response) = await block.execute() | |
print(f'test_play_action result:{response}') | |
assert resultType == MiniApiResultType.Success, 'test_play_action timetout' | |
assert response is not None and isinstance(response, PlayActionResponse), 'test_play_action result unavailable' | |
assert response.isSuccess, 'play_action failed' | |
async def test_connect(dev: WiFiDevice) -> bool: | |
"""Connect the device | |
Connect the specified device | |
Args: | |
dev (WiFiDevice): Specified device object WiFiDevice | |
Returns: | |
bool: Whether the connection is successful | |
""" | |
return await MiniSdk.connect(dev) | |
# Disconnect and release resources | |
async def shutdown(): | |
"""Disconnect and release resources | |
Disconnect the currently connected device and release resources | |
""" | |
await MiniSdk.quit_program() | |
await MiniSdk.release() | |
async def test_get_device_by_name(): | |
"""Search for devices based on the suffix of the robot serial number | |
To search for the robot with the specified serial number (behind the robot's butt), you can just enter the tail character of the serial number, any length, it is recommended that more than 5 characters can be matched accurately, and the timeout is 10 seconds | |
Returns: | |
WiFiDevice: Contains robot name, ip, port and other information | |
""" | |
result: WiFiDevice = await MiniSdk.get_device_by_name("1644", 10) | |
print(f"test_get_device_by_name result:{result}") | |
return result | |
async def test_start_run_program(): | |
"""Enter programming mode | |
Make the robot enter the programming mode, wait for the response result, and delay 6 seconds, let the robot finish "Enter the programming mode" | |
Returns: | |
None: | |
""" | |
await MiniSdk.enter_program() | |
asyncio.create_task(__tts("Hello guest, welcome to Marina Bay Sands Singapore Hotel! How can I help you?")) | |
asyncio.create_task(test_play_action("action_014")) | |
async def __tts(x): | |
#api_address='http://api.openweathermap.org/data/2.5/weather?appid=dc20d3160e7d504175d202850b79b74b&q=Singapore' | |
#city = input('City Name :') | |
#url = api_address | |
#json_data = requests.get(url).json() | |
#format_add = json_data['weather'][0]['description'] | |
print (x) | |
#print(format_add) | |
block: StartPlayTTS = StartPlayTTS(text=x) | |
response = await block.execute() | |
print(f'tes_play_tts: {response}') | |
async def open_hour(): | |
block: StartPlayTTS = StartPlayTTS(text="We are open from 10 in the morning to 6 in the evening") | |
response = await block.execute() | |
print(f'tes_play_tts: {response}') | |
# 测试监听语音识别 | |
async def test_speech_recognise(): | |
create = assistant.create_session(assistant_id='b197f077-ca47-44ca-9ea2-fcf0f9215a8e').get_result() | |
print(create) | |
value = create["session_id"] | |
"""Monitor voice recognition demo | |
Monitor voice recognition events, and the robot reports the text after voice recognition | |
When the voice is recognized as "hello", broadcast "Hello, I am alphamini, Lalila, Lalila" | |
When the voice is recognized as "stop", stop monitoring | |
# SpeechRecogniseResponse.text | |
# SpeechRecogniseResponse.isSuccess | |
# SpeechRecogniseResponse.resultCode | |
""" | |
# 语音监听对象 | |
observe: ObserveSpeechRecognise = ObserveSpeechRecognise() | |
# 处理器 | |
# SpeechRecogniseResponse.text | |
# SpeechRecogniseResponse.isSuccess | |
# SpeechRecogniseResponse.resultCode | |
def handler(msg: SpeechRecogniseResponse): | |
count = 0 | |
print(f'=======handle speech recognise:{msg}') | |
print(msg.text.lower()) | |
print(type(msg.text.lower())) | |
print("{0}".format(str(msg.text.lower()))) | |
response = assistant.message(assistant_id='b197f077-ca47-44ca-9ea2-fcf0f9215a8e', session_id=value, input={'message_type': 'text','text':str(msg.text)}).get_result() | |
#print(json.dumps(response, indent=2)) | |
if len (response["output"]["generic"]) > 1: | |
text = response["output"]["generic"][0]["text"] + ' ' + response["output"]["generic"][1]["text"] | |
else: | |
text = response["output"]["generic"][0]["text"] | |
#watson_response = response["output"]["generic"][0]["text"] | |
asyncio.create_task(__tts(text)) | |
if "day" in text: | |
asyncio.create_task(test_play_action("action_019")) | |
if "time" in text: | |
asyncio.create_task(test_play_action("Surveillance_006")) | |
if "confirm" in text: | |
asyncio.create_task(test_play_action("action_012")) | |
if "safe" in text: | |
asyncio.create_task(test_play_action("action_018")) | |
# if str(msg.text)[-1].isalpha() is False: | |
# if str(msg.text)[:-1].lower() == "Hello": | |
# asyncio.create_task(__tts()) | |
#if str(msg.text).lower() == "weather": | |
# print("I am in weather loop") | |
# 监听到"悟空", tts打个招呼 | |
# asyncio.create_task(test_play_action()) | |
# asyncio.create_task(MiniSdk.play_online_audio("http://hao.haolingsheng.com/ring/000/995/52513bb6a4546b8822c89034afb8bacb.mp3")) | |
#elif str(msg.text).lower() == "what is your opening hours?": | |
# print("I am in oepning hours loop") | |
# asyncio.create_task(open_hour()) | |
# asyncio.create_task(test_play_action()) | |
# asyncio.create_task(MiniSdk.play_expression('codemao19')) | |
#elif str(msg.text).lower() == "stop": | |
# 监听到结束, 停止监听 | |
# observe.stop() | |
# 结束event_loop | |
# asyncio.get_running_loop().run_in_executor(None, asyncio.get_running_loop().stop) | |
observe.set_handler(handler) | |
# 启动 | |
observe.start() | |
await asyncio.sleep(0) | |
if __name__ == '__main__': | |
MiniSdk.set_robot_type(MiniSdk.RobotType.EDU) | |
device: WiFiDevice = asyncio.get_event_loop().run_until_complete(test_get_device_by_name()) | |
if device: | |
asyncio.get_event_loop().run_until_complete(test_connect(device)) | |
asyncio.get_event_loop().run_until_complete(test_start_run_program()) | |
asyncio.get_event_loop().run_until_complete(test_speech_recognise()) | |
# 定义了事件监听对象,必须让event_loop.run_forver() | |
asyncio.get_event_loop().run_forever() | |
asyncio.get_event_loop().run_until_complete(shutdown()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment