Created
April 15, 2020 23:43
-
-
Save lauralorenz/2e3abf4ac32fcd8ed4748b5495dfa859 to your computer and use it in GitHub Desktop.
Pydata Denver basic ETL flow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
from collections import namedtuple | |
from contextlib import closing | |
import sqlite3 | |
from prefect import task, Flow | |
## extract | |
@task | |
def get_complaint_data(): | |
r = requests.get("https://www.consumerfinance.gov/data-research/consumer-complaints/search/api/v1/", params={'size':10}) | |
response_json = json.loads(r.text) | |
return response_json['hits']['hits'] | |
## transform | |
@task | |
def parse_complaint_data(raw): | |
complaints = [] | |
Complaint = namedtuple('Complaint', ['data_received', 'state', 'product', 'company', 'complaint_what_happened']) | |
for row in raw: | |
source = row.get('_source') | |
this_complaint = Complaint( | |
data_received=source.get('date_recieved'), | |
state=source.get('state'), | |
product=source.get('product'), | |
company=source.get('company'), | |
complaint_what_happened=source.get('complaint_what_happened') | |
) | |
complaints.append(this_complaint) | |
return complaints | |
## load | |
@task | |
def store_complaints(parsed): | |
create_script = 'CREATE TABLE IF NOT EXISTS complaint (timestamp TEXT, state TEXT, product TEXT, company TEXT, complaint_what_happened TEXT)' | |
insert_cmd = "INSERT INTO complaint VALUES (?, ?, ?, ?, ?)" | |
with closing(sqlite3.connect("cfpbcomplaints.db")) as conn: | |
with closing(conn.cursor()) as cursor: | |
cursor.executescript(create_script) | |
cursor.executemany(insert_cmd, parsed) | |
conn.commit() | |
with Flow("my etl flow") as f: | |
raw = get_complaint_data() | |
parsed = parse_complaint_data(raw) | |
store_complaints(parsed) | |
f.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment