Created
November 29, 2020 16:38
-
-
Save made2591/7ac3a9b6e1212a52aabf925c83a1b719 to your computer and use it in GitHub Desktop.
My gist to get my blog read by AWS Polly and my markdown files updated accordingly with new meta
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
import re | |
import requests | |
from bs4 import BeautifulSoup | |
from pathlib import Path | |
BASE_URL = "YOUR_BASE_URL" | |
CONTENT_BUCKET = "YOUR_BUCKET_NAME" | |
NUMBER_OF_WORDS = 500 | |
NUMBER_OF_PARAGRAPHS = 6 | |
FINAL_SENTENCE = "... Ehy! You are surely miss something: if you want to know more, visit YOUR_BASE_URL" | |
BASE_PATH = "YOUR_MARKDOWN_BLOGPOSTS_FILESYSTEM_POSITION" | |
polly_client = boto3.client('polly') | |
s3_client = boto3.client('s3') | |
def lambda_handler(event, context): | |
print("Getting urls...") | |
# get post urls | |
urls = __get_blog_post_urls() # urls = ["https://madeddu.xyz/posts/go-async-await/"] | |
# print(urls) | |
print("Getting paragraphs...") | |
# for each post get text | |
all_paragraphs = { url : __get_page_content_for_nts(url) for url in urls } | |
# print(all_paragraphs) | |
print("Getting SSML tagged text...") | |
texts = { url : __add_SSML_Enhanced_tags(paragraphs) for url, paragraphs in all_paragraphs.items()} | |
# print(texts) | |
print("Getting articles read...") | |
# for each post produce mp3 and save it to s3 | |
s3_paths = [__get_content_read_by_polly(url.replace(f"{BASE_URL}/posts/", ""), text) for url, text in texts.items()] | |
# for s3_path in s3_paths: | |
# print(s3_path) | |
print("Getting articles markdownsssss...") | |
# get file list | |
file_list = __get_markdown_list() | |
# for file_name in file_list: | |
# print(file_name) | |
print("Getting matches audio/articles...") | |
# match produced audio | |
matches = __match_audio_and_post(file_list, s3_paths) | |
for audio_name, file_name in matches.items(): | |
print(audio_name, file_name) | |
print("Getting articles modified...") | |
# modify old post | |
__insert_new_audio_reference(matches) | |
print("Getting Hugo Theme changed acc...just kidding. Done!!!") | |
# print s3 paths | |
return s3_paths | |
def __get_blog_post_urls(url=BASE_URL): | |
# create request | |
r = requests.get(f"{url}/posts/") | |
index = 2 | |
# accumulate blog post urls | |
urls = [] | |
# go ahead with pages until | |
while r.status_code == 200: | |
# parse page | |
soup = BeautifulSoup(r.text, features="html.parser") | |
# find all href | |
for a in soup.findAll('a', href=True): | |
# get only post | |
if a['href'] != f"{BASE_URL}/posts/" and f"{BASE_URL}/posts/" in a['href'] and "/page/" not in a['href']: | |
# append urls | |
urls.append(a['href']) | |
# create request | |
try: | |
r = requests.get(f"{url}/posts/page/{index}") | |
except: | |
return urls | |
index += 1 | |
# return result | |
return urls | |
def __get_page_content(url, number_of_words=NUMBER_OF_WORDS, final_sentence=FINAL_SENTENCE): | |
# create request | |
r = requests.get(url) | |
# parse page | |
soup = BeautifulSoup(r.text, features="html.parser") | |
# get all paragraph | |
paragraphs = soup.find("div", {"id": "main"}).findAll("p") | |
# accumulate outer text | |
page_with_no_code = "" | |
# for each found paragraph | |
for paragraph in paragraphs: | |
# exclude portion of code | |
if paragraph.findAll("div", {"class": "highlight"}): | |
continue | |
# get the text out of the paragraph | |
text = paragraph.text.strip() | |
# exclude a common header | |
if "Subscribe to my newsletter to be informed about my new blog posts, talks and activities." in text: | |
continue | |
# accumulate page text | |
page_with_no_code += text+" " | |
# return result | |
return f"{page_with_no_code[:number_of_words]}{FINAL_SENTENCE}" | |
def __get_page_content_for_nts(url, number_of_words=NUMBER_OF_WORDS, final_sentence=FINAL_SENTENCE): | |
# create request | |
r = requests.get(url) | |
# parse page | |
soup = BeautifulSoup(r.text, features="html.parser") | |
# get all paragraph | |
paragraphs = soup.find("div", {"id": "main"}).findAll("p") | |
# accumulate outer text | |
page_with_no_code = [] | |
# for each found paragraph | |
for paragraph in paragraphs: | |
# exclude portion of code | |
if paragraph.findAll("div", {"class": "highlight"}): | |
continue | |
# get the text out of the paragraph | |
text = paragraph.text.strip() | |
# exclude a common header | |
if "Subscribe to my newsletter to be informed about my new blog posts, talks and activities." in text: | |
continue | |
# accumulate page text | |
page_with_no_code.append(paragraph) | |
# return result | |
return page_with_no_code | |
def __add_SSML_Enhanced_tags(paragraphs): | |
# tag to start a speach | |
text = "<speak>" | |
# add informal style | |
text = f'{text}<amazon:domain name="conversational"><amazon:effect name="drc">' | |
# # add breathing to sounds more natural | |
# text = f'{text}<amazon:auto-breaths>' | |
# for each paragraph | |
for paragraph in paragraphs[:NUMBER_OF_PARAGRAPHS]: | |
# prepare the paragraph with dot and comma breaks | |
paragraph_text = paragraph.text.strip() | |
# paragraph_text = paragraph_text.replace("...", "<break time=\"500ms\"/>") | |
# paragraph_text = paragraph_text.replace(". ", "<break time=\"800ms\"/>") | |
# paragraph_text = paragraph_text.replace(",", "<break time=\"300ms\"/>") | |
# prepare the paragraph with slang expression | |
paragraph_text = paragraph_text.replace("btw", "<sub alias=\"by the way\">by the way</sub>") | |
paragraph_text = paragraph_text.replace("PoC", f"<say-as interpret-as=\"spell-out\">PoC</say-as>") | |
# empthatyse em words | |
# ems = paragraph.findAll("em") | |
# for em in ems: | |
# paragraph_text = paragraph_text.replace(f"{em.text}", f'<emphasis level="moderate">{em.text}</emphasis>') | |
# # pronunce strong words loudly | |
# strongs = paragraph.findAll("strong") | |
# for strong in strongs: | |
# paragraph_text = paragraph_text.replace(f"{strong.text}", f'<emphasis level="moderate">{strong.text}</emphasis>') | |
# print(paragraph) | |
# print(paragraph_text) | |
# concat paragraph parsed to text | |
if len(f"{text} {paragraph_text}") > 1490-len(f" {FINAL_SENTENCE}"): | |
break | |
else: | |
text = f"{text} {paragraph_text}" | |
# close the text | |
#text = f"{text} {FINAL_SENTENCE}</speak>" | |
text = f"{text} {FINAL_SENTENCE}</amazon:effect></amazon:domain></speak>" | |
# close the text | |
return text | |
def __get_content_read_by_polly(article_path, content): | |
print(article_path) | |
# read content | |
response = polly_client.synthesize_speech( | |
Engine='neural', | |
LanguageCode='en-US', | |
OutputFormat='mp3', | |
Text = content, | |
TextType='ssml', | |
VoiceId='Matthew' | |
) | |
# save mp3 | |
with open('speech.mp3', 'wb') as f: | |
f.write(response['AudioStream'].read()) | |
# upload mp3 | |
with open('speech.mp3', 'rb') as f: | |
s3_client.upload_fileobj(f, CONTENT_BUCKET, f'mp3/{article_path[:-1]}.mp3') | |
return f'mp3/{article_path[:-1]}.mp3' | |
def __get_markdown_list(base_path=BASE_PATH): | |
# get list of all markdown | |
list_of_files = list(Path(base_path).rglob("*.md")) | |
# return it | |
return list_of_files | |
def __match_audio_and_post(file_list, audio_list): | |
# match_dict | |
matches = {audio_path : '' for audio_path in audio_list} | |
# find match by name | |
for audio_path in audio_list: | |
for file_name in file_list: | |
if audio_path.split("/")[-1].replace(".mp3", "") == str(file_name).split("/")[-1].replace(".md", "").lower(): | |
matches[audio_path] = str(file_name) | |
continue | |
# return matches | |
return matches | |
def __insert_new_audio_reference(matches): | |
# for each match | |
for audio_name, file_name in matches.items(): | |
# read the content | |
with open(file_name, "r") as f: | |
lines = f.readlines() | |
# add the line | |
lines = lines[0:4]+[f'polly: {BASE_URL}/{audio_name}\n']+lines[4:] | |
# write the new content | |
with open(file_name, "w") as f: | |
for line in lines: | |
f.write(line) | |
if __name__ == "__main__": | |
lambda_handler(None, None) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment