Created
February 22, 2017 21:04
-
-
Save magnunleno/8f833abbcf0f95482f61cff132614755 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import sys | |
import aiohttp | |
import hashlib | |
import asyncio | |
from lxml import etree | |
from pprint import pprint | |
def check_args(): | |
if len(sys.argv) == 1: | |
print("Please inform at least one feed") | |
print("Usage: {} http://example.com/feed.rss".format(sys.argv[0])) | |
exit(1) | |
for url in sys.argv[1:]: | |
if url.startswith('http://') or url.startswith('https://'): | |
yield url | |
else: | |
yield 'http://' + url | |
def get_tag_text(root, tag_name): | |
tag = root.find(tag_name) | |
return tag.text if tag is not None else None | |
def parse_feed(feed): | |
feed = etree.fromstring(feed) | |
data = {} | |
channel = feed.find('channel') | |
data['title'] = get_tag_text(channel, 'title') | |
data['link'] = get_tag_text(channel, 'link') | |
data['language'] = get_tag_text(channel, 'language') | |
data['lastBuildDate'] = get_tag_text(channel, 'lastBuildDate') | |
data['description'] = get_tag_text(channel, 'description') | |
# TODO: media:thumbnail, media:keywords, media:copyright, media:category | |
# itunes:owner > itunes:email, itunes:owner > itunes:name, itunes:author | |
# itunes:explicit, itunes:image, itunes:keywords, itunes:subtitle, itunes:summary | |
# itunes:category, | |
data['items'] = [] | |
for item in channel.iterfind('item'): | |
item_data = {} | |
item_data['title'] = get_tag_text(item, 'title') | |
data['items'].append(item_data) | |
return data | |
async def fetch_feed(feed_uri): | |
resp = await aiohttp.request('GET', feed_uri) | |
feed = await resp.read() | |
digest = hashlib.sha256(feed).hexdigest() | |
feed = parse_feed(feed) | |
print("Done {} {}".format(feed_uri, digest)) | |
pprint(feed) | |
def download_feeds(feeds): | |
loop = asyncio.get_event_loop() | |
tasks = [fetch_feed(feed) for feed in feeds] | |
jobs = asyncio.wait(tasks) | |
response, _ = loop.run_until_complete(jobs) | |
loop.close() | |
if __name__ == '__main__': | |
feeds = check_args() | |
download_feeds(feeds) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment