Created
April 3, 2021 22:10
-
-
Save gallir/5353d7020dfdd397d853063d4fc56b49 to your computer and use it in GitHub Desktop.
base covid
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import time | |
from datetime import date | |
import pathlib | |
class Series(object): | |
def __init__(self, name, schema, start, end, freq, top=0, use_cache=False, flag=False): | |
self.df = None | |
self.name = name | |
self.schema = schema | |
self.start = start | |
self.end = end | |
if self.end is None: | |
self.end = date.today() | |
self.freq = freq | |
self.top = top | |
self.use_cache = use_cache | |
self.flag = flag | |
self.cache_file = f"/tmp/cache_{self.name}_{self.start.isoformat()}_{self.end.isoformat()}_{self.freq}_{self.flag}_top{self.top}.pickle" | |
def read(self): | |
if self.use_cache: | |
if pathlib.Path(self.cache_file).exists(): | |
df = pd.read_pickle(self.cache_file) | |
if df.shape[0] > 0: | |
self.df = df | |
def name(self): | |
return self.name | |
def freq(self): | |
return self.freq | |
def get_df(self) -> pd.DataFrame: | |
if self.df is not None: | |
return self.df | |
def store_cache(self): | |
self.df.to_pickle(self.cache_file) | |
def get_schema(self): | |
# Return the schema for the datasets | |
items = [] | |
for c in self.df.columns: | |
if c in self.schema: | |
items.append({ | |
"AttributeName": c, | |
"AttributeType": self.schema[c], | |
}) | |
return {"Attributes": items} | |
def format(self): | |
# Format the dataframe to the schema | |
invalids = [c for c in self.df.columns if c not in self.schema] | |
if invalids: | |
self.df = self.df.drop(columns=invalids) | |
def explode(self, on_column: str, new_column: str, map: dict): | |
self.df[new_column] = self.df[on_column].apply(lambda x: map[x]) | |
self.df = self.df.explode(new_column) | |
def rename_col(self, old: str, new: str): | |
self.df.rename(columns={old: new}, inplace=True) | |
def to_csv(self, filename: str): | |
self.df.to_csv(filename, index=False) | |
def top_keys(self, n: int, key_col: str, value_col: str): | |
keys = self.df[[key_col, value_col]].groupby([key_col], as_index=False).sum() | |
return set(keys.nlargest(n, [value_col])[key_col]) | |
def filter_top(self, n: int, key_col: str, value_col: str): | |
if n <= 0: | |
return | |
top = self.top_keys(n, key_col, value_col) | |
self.df = self.df[self.df[key_col].isin(top)] | |
def cursor_as_pandas_retry(cursor, query, retries=3, sleep=3): | |
while retries > 0: | |
try: | |
return cursor.execute(query).as_pandas() | |
except Exception as e: | |
time.sleep(sleep) | |
retries -= 1 | |
print(f"Retrying {retries} query {query}: {str(e)}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment