Created
March 7, 2019 23:02
-
-
Save kunanit/9d93797ba92f7ecc61ced829ebb5170b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import luigi | |
import hashlib | |
from collections import defaultdict | |
import json | |
from os.path import join | |
# luigi --module tasks ReportStep --local-scheduler | |
# clean up: rm data/output_* | |
DATA_DIR = '/app/data' | |
def get_hash(file_contents=None, salt=''): | |
hasher = hashlib.md5(salt.encode()) | |
if file_contents: | |
with open(file_contents, 'rb') as input_file: | |
buf = input_file.read() | |
hasher.update(buf) | |
return hasher.hexdigest()[:8] | |
class WordsInput(luigi.ExternalTask): | |
def output(self): | |
filename = 'input_words.txt' | |
return luigi.LocalTarget(join(DATA_DIR, filename)) | |
class CountStep(luigi.Task): | |
def requires(self): | |
return WordsInput() | |
def output(self): | |
# filename = 'output_count.txt' | |
hex_tag = get_hash(salt=self.input().path) | |
filename = f'output_count_{hex_tag}.txt' | |
return luigi.LocalTarget(join(DATA_DIR, filename)) | |
def run(self): | |
with self.input().open() as fi: | |
words = fi.read().splitlines() | |
word_counts = defaultdict(int) | |
for word in words: | |
word_counts[word] += 1 | |
with self.output().open('w') as out_file: | |
json.dump(word_counts, out_file) | |
class ValidWordsInput(luigi.ExternalTask): | |
def output(self): | |
filename = 'input_valid_words.txt' | |
return luigi.LocalTarget(join(DATA_DIR, filename)) | |
class FilterStep(luigi.Task): | |
def requires(self): | |
return { | |
'counts': CountStep(), | |
'valid_words': ValidWordsInput() | |
} | |
def output(self): | |
filename = 'output_filter.txt' | |
hex_tag = get_hash( | |
file_contents=self.input()['valid_words'].path, | |
salt=self.input()['counts'].path | |
) | |
filename = f'output_filter_{hex_tag}.txt' | |
return luigi.LocalTarget(join(DATA_DIR, filename)) | |
def run(self): | |
with self.input()['counts'].open() as fi: | |
word_counts = json.load(fi) | |
with self.input()['valid_words'].open() as fi: | |
valid_words = fi.read().splitlines() | |
filtered_word_counts = {word: count for word, count in word_counts.items() if word in valid_words} | |
with self.output().open('w') as fo: | |
json.dump(filtered_word_counts, fo) | |
class ReportStep(luigi.Task): | |
def requires(self): | |
return FilterStep() | |
def output(self): | |
# filename = "output_report.txt" | |
hex_tag = get_hash(salt=self.input().path) | |
filename = f'output_report_{hex_tag}.txt' | |
return luigi.LocalTarget(join(DATA_DIR, filename)) | |
def run(self): | |
with self.output().open('w') as fo: | |
date_string = str(datetime.datetime.now()) | |
fo.write(date_string + "\n") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment