Created
October 11, 2019 10:13
-
-
Save jaumevalls/90281e5476cb1101acee5de67edddb77 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Iterator, Dict, Any | |
from pymongo import MongoClient | |
from typing import Iterator, Optional | |
import io | |
import psycopg2 | |
import psycopg2.extras | |
import time | |
from functools import wraps | |
from memory_profiler import memory_usage | |
connection_string = "dbname='acc_rep' user='postgres' host='localhost' password='postgres'" | |
def profile(fn): | |
@wraps(fn) | |
def inner(*args, **kwargs): | |
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items()) | |
print(f'\n{fn.__name__}({fn_kwargs_str})') | |
# Measure time | |
t = time.perf_counter() | |
retval = fn(*args, **kwargs) | |
elapsed = time.perf_counter() - t | |
print(f'Time {elapsed:0.4}') | |
# Measure memory | |
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7) | |
print(f'Memory {max(mem) - min(mem)}') | |
return retval | |
return inner | |
class StringIteratorIO(io.TextIOBase): | |
def __init__(self, iter: Iterator[str]): | |
self._iter = iter | |
self._buff = '' | |
def readable(self) -> bool: | |
return True | |
def _read1(self, n: Optional[int] = None) -> str: | |
while not self._buff: | |
try: | |
self._buff = next(self._iter) | |
except StopIteration: | |
break | |
ret = self._buff[:n] | |
self._buff = self._buff[len(ret):] | |
return ret | |
def read(self, n: Optional[int] = None) -> str: | |
line = [] | |
if n is None or n < 0: | |
while True: | |
m = self._read1() | |
if not m: | |
break | |
line.append(m) | |
else: | |
while n > 0: | |
m = self._read1(n) | |
if not m: | |
break | |
n -= len(m) | |
line.append(m) | |
return ''.join(line) | |
def clean_csv_value(value: Optional[Any]) -> str: | |
if value is None: | |
return r'\N' | |
return str(value).replace('\n', '\\n') | |
def iter_beers_from_api(page_size: int = 1000) -> Iterator[Dict[str, Any]]: | |
client = MongoClient("mongodb://localhost:27017") | |
db = client["trackingdb"] | |
col = db["clk"] | |
page = 0 | |
while True: | |
data = col.aggregate([ | |
{ | |
'$match': {"tim": {"$gt": '1570579200',"$lte": '1570620665'}} | |
}, | |
{ | |
'$skip':page | |
}, | |
{ | |
'$limit':page_size | |
} | |
]) | |
if not data: | |
break | |
yield from data | |
page += 1 | |
client.close() | |
def insert_one_by_one(connection_string, beers: Iterator[Dict[str, Any]]) -> None: | |
try: | |
conn = psycopg2.connect(connection_string) | |
conn.autocommit = True | |
with conn.cursor() as cursor: | |
cursor.execute("Truncate {} Cascade;".format("clk_day.clk_day_1_master")) | |
for beer in beers: | |
cursor.execute("INSERT INTO clk_day.clk_day_1_master (_id,org,act,pub,gel,url,adg,nid,ei1,ei2,ei3,ei4,sid,sid2,ppa,coo,dom,fid,tim,pca,uui,fin,szc,sfo,pde,uac,uaa,uag,uip,ref,eme,val,ifa,gai,enc,dev,unq) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", (str(beer['_id']),beer['org'],beer['act'],beer['pub'],beer['gel'],beer['url'],beer['adg'],beer['nid'],beer['ei1'],beer['ei2'],beer['ei3'],beer['ei4'],beer['sid'],beer['sid2'],beer['ppa'],beer['coo'],beer['dom'],beer['fid'],beer['tim'],beer['pca'],beer['uui'],beer['fin'],beer['szc'],beer['sfo'],beer['pde'],beer['uac'],beer['uaa'],beer['uag'],beer['uip'],beer['ref'],beer['eme'],beer['val'],beer['ifa'],beer['gai'],beer['enc'],beer['dev'],beer['unq'])) | |
except Exception as e: | |
print('Error {}'.format(str(e))) | |
@profile | |
def copy_string_iterator(connection_string, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None: | |
try: | |
conn = psycopg2.connect(connection_string) | |
conn.autocommit = True | |
with conn.cursor() as cursor: | |
cursor.execute("Truncate {} Cascade;".format("clk_day.clk_day_1_master")) | |
beers_string_iterator = StringIteratorIO(( | |
'|'.join(map(clean_csv_value,( | |
str(beer['_id']), | |
beer['org'], | |
beer['act'], | |
beer['pub'], | |
beer['gel'], | |
beer['url'], | |
beer['adg'], | |
beer['nid'], | |
beer['ei1'], | |
beer['ei2'], | |
beer['ei3'], | |
beer['ei4'], | |
beer['ppa'], | |
beer['coo'], | |
beer['pca'], | |
beer['tim'], | |
beer['uui'], | |
beer['fin'], | |
beer['uac'], | |
beer['szc'], | |
beer['sfo'], | |
beer['pde'], | |
beer['uaa'], | |
beer['uag'], | |
beer['uip'], | |
beer['ref'], | |
beer['eme'], | |
beer['val'], | |
beer['ifa'], | |
beer['gai'], | |
beer['enc'], | |
0, | |
0, | |
0, | |
0, | |
0, | |
'', | |
'0', | |
beer['unq'], | |
beer['dev'], | |
beer['dom'], | |
beer['fid'], | |
beer['sid'], | |
beer['sid2'], | |
))) + '\n' | |
for beer in beers | |
)) | |
cursor.copy_expert(beers_string_iterator, 'clk_day.clk_day_1_master', sep='|', size=size) | |
except Exception as e: | |
print('Error Exception {}'.format(str(e))) | |
if __name__ == "__main__": | |
#beers = iter_beers_from_api() | |
print(list(iter_beers_from_api())) | |
#copy_string_iterator(connection_string, iter(beers), size=1024) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment