Created
September 2, 2021 07:02
-
-
Save raihanba13/c630099c61586a414e93d52373451190 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import dateutil.parser | |
import datetime | |
import time | |
import os | |
import math | |
import random | |
import logging | |
logger = logging.getLogger() | |
logger.setLevel(logging.DEBUG) | |
""" some necessary intent func """ | |
def elicit_slot(session_attributes, intent_name, slots, slot_to_elicit, message, response_card): | |
return { | |
'sessionAttributes': session_attributes, | |
'dialogAction': { | |
'type': 'ElicitSlot', | |
'intentName': intent_name, | |
'slots': slots, | |
'slotToElicit': slot_to_elicit, | |
'message': message, | |
'responseCard': response_card | |
} | |
} | |
def confirm_intent(session_attributes, intent_name, slots, message, response_card): | |
return { | |
'sessionAttributes': session_attributes, | |
'dialogAction': { | |
'type': 'ConfirmIntent', | |
'intentName': intent_name, | |
'slots': slots, | |
'message': message, | |
'responseCard': response_card | |
} | |
} | |
def close(session_attributes, fulfillment_state, message): | |
response = { | |
'sessionAttributes': session_attributes, | |
'dialogAction': { | |
'type': 'Close', | |
'fulfillmentState': fulfillment_state, | |
'message': message | |
} | |
} | |
return response | |
def delegate(session_attributes, slots): | |
return { | |
'sessionAttributes': session_attributes, | |
'dialogAction': { | |
'type': 'Delegate', | |
'slots': slots | |
} | |
} | |
""" --- Intents --- """ | |
def dispatch(intent_request): | |
# here intent is not important as we have only one | |
# the input data does not have any intent | |
# this is the site code | |
""" | |
Called when the user specifies an intent for this bot. | |
""" | |
user_id_str = intent_request['inputTranscript'] | |
user_id_int = text2int(user_id_str) | |
print(user_id_int) | |
try: | |
int(user_id_int) | |
except Exception as e: | |
print(e) | |
return False | |
dynamodb = boto3.resource('dynamodb') | |
user_table = dynamodb.Table('KordiaUser2') | |
user_data = user_table.get_item( | |
Key={ | |
'userId': str(user_id_int) | |
} | |
) | |
print(1) | |
print(user_data) | |
if not user_data.get('Item'): | |
msg = f"User Id {user_id_int} is not registered. Please try again." | |
print('user not found') | |
print(msg) | |
print(2) | |
return { | |
"dialogAction": { | |
"type": "Close", | |
"fulfillmentState": "Fulfilled", | |
"message": { | |
"contentType": "SSML", | |
"content": msg | |
} | |
} | |
} | |
print(3) | |
return { | |
"dialogAction": { | |
"type": "Close", | |
"fulfillmentState": "Fulfilled", | |
"message": { | |
"contentType": "SSML", | |
"content": "Hi " | |
} | |
} | |
} | |
def is_number(x): | |
if type(x) == str: | |
x = x.replace(',', '') | |
try: | |
float(x) | |
except: | |
return False | |
return True | |
def text2int (textnum, numwords={}): | |
units = [ | |
'zero', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', | |
'nine', 'ten', 'eleven', 'twelve', 'thirteen', 'fourteen', 'fifteen', | |
'sixteen', 'seventeen', 'eighteen', 'nineteen', | |
] | |
tens = ['', '', 'twenty', 'thirty', 'forty', 'fifty', 'sixty', 'seventy', 'eighty', 'ninety'] | |
scales = ['hundred', 'thousand', 'million', 'billion', 'trillion'] | |
ordinal_words = {'first':1, 'second':2, 'third':3, 'fifth':5, 'eighth':8, 'ninth':9, 'twelfth':12} | |
ordinal_endings = [('ieth', 'y'), ('th', '')] | |
if not numwords: | |
numwords['and'] = (1, 0) | |
for idx, word in enumerate(units): numwords[word] = (1, idx) | |
for idx, word in enumerate(tens): numwords[word] = (1, idx * 10) | |
for idx, word in enumerate(scales): numwords[word] = (10 ** (idx * 3 or 2), 0) | |
textnum = textnum.replace('-', ' ') | |
current = result = 0 | |
curstring = '' | |
onnumber = False | |
lastunit = False | |
lastscale = False | |
def is_numword(x): | |
if is_number(x): | |
return True | |
if word in numwords: | |
return True | |
return False | |
def from_numword(x): | |
if is_number(x): | |
scale = 0 | |
increment = int(x.replace(',', '')) | |
return scale, increment | |
return numwords[x] | |
for word in textnum.split(): | |
if word in ordinal_words: | |
scale, increment = (1, ordinal_words[word]) | |
current = current * scale + increment | |
if scale > 100: | |
result += current | |
current = 0 | |
onnumber = True | |
lastunit = False | |
lastscale = False | |
else: | |
for ending, replacement in ordinal_endings: | |
if word.endswith(ending): | |
word = "%s%s" % (word[:-len(ending)], replacement) | |
if (not is_numword(word)) or (word == 'and' and not lastscale): | |
if onnumber: | |
# Flush the current number we are building | |
curstring += repr(result + current) + " " | |
curstring += word + " " | |
result = current = 0 | |
onnumber = False | |
lastunit = False | |
lastscale = False | |
else: | |
scale, increment = from_numword(word) | |
onnumber = True | |
if lastunit and (word not in scales): | |
# Assume this is part of a string of individual numbers to | |
# be flushed, such as a zipcode "one two three four five" | |
curstring += repr(result + current) | |
result = current = 0 | |
if scale > 1: | |
current = max(1, current) | |
current = current * scale + increment | |
if scale > 100: | |
result += current | |
current = 0 | |
lastscale = False | |
lastunit = False | |
if word in scales: | |
lastscale = True | |
elif word in units: | |
lastunit = True | |
if onnumber: | |
curstring += repr(result + current) | |
return curstring | |
""" --- Main handler --- """ | |
from pprint import pprint | |
from boto3.dynamodb.conditions import Key | |
import boto3 | |
def lambda_handler(event, context): | |
""" | |
Route the incoming request based on intent. | |
The JSON body of the request is provided in the event slot. | |
""" | |
# By default, treat the user request as coming from the America/New_York time zone. | |
os.environ['TZ'] = 'America/New_York' | |
time.tzset() | |
logger.debug('event.bot.name={}'.format(event['bot']['name'])) | |
pprint(event) | |
return dispatch(event) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment