Last active
May 16, 2020 14:01
-
-
Save shiplu/6ba6bf3629860f8c8fface6b032a9023 to your computer and use it in GitHub Desktop.
A program that compares CSV data between the “before” and the “after” after. Both CSV must have a unique identifier specified in a `key` column. This will output the new rows, updated rows and the deleted rows' keys.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.7 | |
""" | |
Shows changes in product information from CSV files. | |
""" | |
from typing import Dict, List, Tuple, Set, Any, Hashable | |
import argparse | |
import csv | |
import logging | |
import logging.config | |
import sys | |
log_config_dict = { | |
"version": 1, | |
"disable_existing_loggers": False, | |
"formatters": { | |
"standard": { | |
"format": "%(asctime)s [%(levelname)7s] %(name)s: %(message)s" | |
}, | |
}, | |
"handlers": { | |
"cli": { | |
"level": "INFO", | |
"formatter": "standard", | |
"class": "logging.StreamHandler", | |
}, | |
"file": { | |
"level": "INFO", | |
"filename": "status-check.log", | |
"class": "logging.FileHandler", | |
"formatter": "standard", | |
}, | |
}, | |
"loggers": { | |
"": {"handlers": ["file", "cli"], "level": "INFO", "propagate": True} | |
}, | |
} | |
logging.config.dictConfig(log_config_dict) | |
logger = logging.getLogger(__name__) | |
class ProductInformation: | |
def __init__(self, data: Dict[Hashable, Dict], fields: List): | |
self._data = data | |
self._fields = fields | |
@classmethod | |
def from_csv(cls, csv_path: str, key: str): | |
data = {} | |
fields = [] | |
with open(csv_path, "r") as file: | |
reader = csv.DictReader(file) | |
fields = reader.fieldnames | |
for row_index, row in enumerate(reader, 1): | |
if key in row: | |
data[row[key]] = row | |
else: | |
logger.warning( | |
"Key '{}' is not present in {}:{}. Discarded. ".format( | |
key, csv_path, row_index | |
) | |
) | |
return cls(data, fields) | |
@property | |
def data(self) -> List[Dict]: | |
return self._data.values() | |
@property | |
def fields(self) -> List[str]: | |
return self._fields | |
def to_csv(self, fp, fields: List[str] = None): | |
fields = fields or self.fields | |
# If fields is subset of self.fields | |
# then extra fields should be ignored | |
writer = csv.DictWriter(fp, extrasaction="ignore", fieldnames=fields) | |
writer.writeheader() | |
for row in self.data: | |
writer.writerow(row) | |
def get(self, key: str, default: Any = None) -> Any: | |
return self._data.get(key, default) | |
def keys(self) -> List[str]: | |
return self._data.keys() | |
def __len__(self): | |
return len(self.keys()) | |
def __sub__(self, other): | |
data = {} | |
for key in set(self.keys()) - set(other.keys()): | |
data[key] = self.get(key) | |
return self.__class__(data, self.fields) | |
class ChangeTracker: | |
def __init__(self, before: ProductInformation, after: ProductInformation): | |
self.before = before | |
self.after = after | |
def creates(self) -> ProductInformation: | |
return self.after - self.before | |
def updates(self) -> ProductInformation: | |
common_product_keys = set(self.before.keys()) & set(self.after.keys()) | |
data = {} | |
for product_key in common_product_keys: | |
if self.before.get(product_key) != self.after.get(product_key): | |
data[product_key] = self.after.get(product_key) | |
return ProductInformation(data, self.after.fields) | |
def deletes(self) -> ProductInformation: | |
return self.before - self.after | |
def calculate_create_update_delete( | |
before_csv: str, after_csv: str, key: str = "id" | |
) -> Tuple[ProductInformation, ProductInformation, ProductInformation]: | |
before_product_info = ProductInformation.from_csv(before_csv, key=key) | |
after_product_info = ProductInformation.from_csv(after_csv, key=key) | |
change_tracker = ChangeTracker(before_product_info, after_product_info) | |
return ( | |
change_tracker.creates(), | |
change_tracker.updates(), | |
change_tracker.deletes(), | |
) | |
def main( | |
before_csv: str, after_csv: str | |
) -> Tuple[List[Dict], List[Dict], Set[str]]: | |
creates, updates, deletes = calculate_create_update_delete( | |
before_csv, after_csv, "id" | |
) | |
return list(creates.data), list(updates.data), set(deletes.keys()) | |
def command_line(): | |
parser = argparse.ArgumentParser(__doc__) | |
parser.add_argument("before", help="Old production information CSV") | |
parser.add_argument("after", help="New production information CSV") | |
parser.add_argument( | |
"-k", | |
"--key", | |
help="CSV column name to identify product key", | |
default="id", | |
) | |
args = parser.parse_args() | |
creates, updates, deletes = calculate_create_update_delete( | |
args.before, args.after, args.key | |
) | |
if creates: | |
logger.info("CREATES") | |
creates.to_csv(sys.stdout) | |
if updates: | |
logger.info("UPDATES") | |
updates.to_csv(sys.stdout) | |
if deletes: | |
logger.info("DELETES") | |
deletes.to_csv(sys.stdout, fields=["id"]) | |
if __name__ == "__main__": | |
command_line() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment