Last active
March 3, 2019 22:35
-
-
Save biggers/65e838a6d37a235ed411088f1815ec94 to your computer and use it in GitHub Desktop.
insert a set of CSVs (spreadsheets, "raw") into a MySQL DB - Python3 example (DB API; generators; PyMySQL)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pymysql.cursors | |
import os | |
from attrdict import AttrDict | |
import sys | |
from datetime import datetime as dt | |
# Insert all /var/tmp/*/"CSV summary-reports" into MySQL - using Py DB-API | |
# NOTE: schema for the table must have been already created! | |
# | |
# INSTALL: | |
# pip3 install PyMySQL attrdict | |
# | |
# REFs: | |
# PyMySQL pure-Py driver for MySQL - Python DB-API | |
# | |
# Run: | |
# env DB_PASSWD=xyzzy1234 python3 csvs_db_insert.py | |
from reporting.futils import ( | |
gen_find, | |
gen_open_each, | |
gen_concatenate, | |
) | |
# CSV row: | |
# compute,abc-123-kms,instances,36,50,0.72,14,2017-08-17 14:47:51.009463 | |
# compute,abc-123-kms,total_cores,138,200,0.69,62,2017-08-17 14:47:51.009526 | |
def set_unique_index_for_fields(connection_object): | |
""" Ref: https://stackoverflow.com/a/5038052 | |
""" | |
connobj = connection_object | |
sql = """ALTER TABLE {db_table} ADD UNIQUE | |
(cap_type, measurement, utc_datetime)""".\ | |
format(db_table=connobj.db_table) | |
with connobj.dbc.cursor() as cursor: | |
cursor.execute(sql) | |
connobj.dbc.commit() | |
def put_csv_rows_to_db(lines, connection_object): | |
""" batch INSERT of all CSV-summary report values... | |
""" | |
connobj = connection_object | |
with connobj.dbc.cursor() as cursor: | |
values = [] | |
for line in lines: | |
if 'in-use,' in line: # drop CSV summary-report "header" row | |
continue | |
vals = line.strip().split(',') | |
try: # convert the old "Influx timestamp", if any | |
t = int(vals[7]) | |
ts = dt.utcfromtimestamp(t / 1e9) | |
vals[7] = str(ts) | |
except ValueError: | |
pass # vals[7] is a UTC-datetime string! | |
for i in (3, 4): # get rid of None(s) in server_group rows | |
vals[i] = 0 if 'None' in vals[i] else vals[i] | |
values.append(vals) | |
try: | |
# Create a new Measurement record | |
sql = """INSERT INTO {db_table}(cap_type, cloud_region, | |
measurement, in_use, cap_limit, | |
percent, remaining, utc_datetime) | |
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""".\ | |
format(db_table=connobj.db_table) | |
cursor.executemany(sql, values) | |
# NOT "autocommit by default; commit to save! | |
connobj.dbc.commit() | |
except BaseException as e: | |
print(str(e), file=sys.stderr) | |
connobj.dbc.close() | |
def main(): | |
""" | |
""" | |
# get *all* the Summary CSV reports, across all CSV files/folders | |
file_names = gen_find('compute-quotas-*summary-*.csv', '/var/tmp') | |
files = gen_open_each(file_names) # file "pointers" (opened) | |
lines = gen_concatenate(files) # get all lines, from all files! | |
connobj = AttrDict(user=os.getenv('DB_USER', 'capacity'), | |
password=os.getenv('DB_PASSWD', 'xyzzy'), | |
host=os.getenv('DB_HOST', '10.203.49.194'), | |
db=os.getenv('DB', 'capacity'), | |
charset='utf8', | |
cursorclass=pymysql.cursors.DictCursor) | |
# connect to the Capacity database | |
connobj.dbc = pymysql.connect(**connobj) | |
connobj.db_table = os.getenv('DB_TABLE', 'report') | |
set_unique_idx = os.getenv('SET_UNIQUE_IDX', 0) | |
if set_unique_idx: | |
set_unique_index_for_fields(connobj) | |
put_csv_rows_to_db(lines, connobj) | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import os | |
import fnmatch | |
import gzip | |
import bz2 | |
def walk_files(root, patterns='*', single_level=True, yield_folders=False): | |
""" Return all files, matched from a ';'-string of "patterns". | |
Modified, from the Python Cookbook 2nd-ed, ex 2.16 | |
""" | |
patterns = patterns.split(';') | |
for path, subdirs, files in os.walk(root): | |
if yield_folders: | |
files.extend(subdirs) | |
files.sort() | |
for fname in files: | |
for pattern in patterns: | |
if fnmatch.fnmatch(fname, pattern): | |
yield path, fname | |
break | |
if single_level: | |
break | |
def walk_files_2(root, patterns='*', single_level=True, yield_folders=False): | |
""" Return all files, matched from a ';'-string of "patterns". | |
Modified, from the Python Cookbook 2nd-ed, ex 2.16 | |
""" | |
patterns = patterns.split(';') | |
for path, subdirs, files in os.walk(root): | |
if yield_folders: | |
files.extend(subdirs) | |
files.sort() | |
for fname in files: | |
for pattern in patterns: | |
if fnmatch.fnmatch(fname, pattern): | |
yield os.path.join(path, fname) | |
break | |
if single_level: | |
break | |
# REF: Python_Cookbook_Third_Edition/ | |
# src/4/creating_data_processing_pipelines/example.py | |
# (below fns) | |
def gen_find(filepat, top): | |
""" | |
Find all filenames in a directory tree that match a shell wildcard pattern | |
Does *not* sort the filenames, that are found! | |
""" | |
for path, dirlist, filelist in os.walk(top): | |
for name in fnmatch.filter(filelist, filepat): | |
yield os.path.join(path, name) | |
def gen_open_each(filenames): | |
""" | |
Open a sequence of filenames one at a time producing a file object. | |
The file is closed immediately when proceeding to the next iteration. | |
""" | |
for filename in filenames: | |
if filename.endswith('.gz'): | |
f = gzip.open(filename, 'rt') | |
elif filename.endswith('.bz2'): | |
f = bz2.open(filename, 'rt') | |
else: | |
f = open(filename, 'rt') | |
yield f | |
f.close() | |
def gen_concatenate(iterators): | |
""" | |
Chain a sequence of iterators together into a single sequence. | |
""" | |
for it in iterators: | |
yield from it # Python.3 ! | |
# for one in it: # Py2 | |
# yield one | |
def ilen(it): | |
""" | |
Make a stateful counting iterator - zip it with the input | |
iterator, then drain until input exhausted at C level. | |
'count' is 0 based, so the 'next' value is the actual count | |
REF: http://stackoverflow.com/a/34404546/4846773 | |
""" | |
from collections import deque | |
from itertools import count | |
cnt = count() | |
deque(zip(it, cnt), 0) # cnt is 2nd zip-arg, to avoid advancing too far | |
return next(cnt) | |
def main_test(): | |
""" | |
"integration test" of functions in this module | |
""" | |
from itertools import tee | |
import sys | |
file_ns = gen_find('quotas*summary*.csv', '/var/tmp') | |
# file_names = walk_files_2(CSV_DIR, CSV_PAT) | |
file_names, fns2 = tee(file_ns) | |
files = gen_open_each(file_names) | |
lines = gen_concatenate(files) | |
total_lines = ilen(lines) # sum(1 for l in lines) | |
print("total lines, all found files: {}".format(total_lines), | |
file=sys.stderr) | |
for fname in fns2: | |
print("-> {}".format(fname), file=sys.stderr) | |
if __name__ == "__main__": | |
main_test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment