Created
February 29, 2016 23:19
-
-
Save usrlocalben/641664d9f23218b731b2 to your computer and use it in GitHub Desktop.
python parallel map-reduce onefiler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
map-reduce for text-file inputs, parallel version | |
""" | |
import os | |
import re | |
import sys | |
import json | |
import hashlib | |
from datetime import datetime | |
from multiprocessing import cpu_count, Queue, Process | |
DEBUG = True | |
"""enable/disable debug logging to stderr""" | |
def main(): | |
"""perform parallel_map_reduce over the files passed on | |
the command line. | |
input files are processed in reverse-size-order to reduce | |
the chance of having to wait on a single worker to finish. | |
the results are streamed to stdout as a json list[] of | |
key/value pairs (also in a list) | |
""" | |
results = parallel_map_reduce( | |
files=reversed(order_filenames_by_size(sys.argv[1:])), | |
mapper=hour_stat_mapper, reducer=hour_stat_reducer, | |
#mapper=range_mapper, reducer=range_reducer, | |
#mapper=duplicate_finding_mapper, reducer=duplicate_finding_reducer, | |
) | |
json_list_stream_dump( | |
data=([k, v] for k, v in results), # if value > 1), | |
fd=sys.stdout, | |
indent=4 | |
) | |
ACCESS_LOG_PATTERN = re.compile( | |
r"(?P<host>[\d\.]+)\s" | |
r"(?P<identity>\S*)\s" | |
r"(?P<user>\S*)\s" | |
r"\[(?P<time>.*?)\]\s" | |
r'"(?P<request>.*?)"\s' | |
r"(?P<status>\d+)\s" | |
r"(?P<bytes>\S*)" | |
) | |
def parse_apache_log_entry(line): | |
def parse_apache_timestamp(value): | |
return datetime.strptime(value.split(' ')[0], '%d/%b/%Y:%H:%M:%S') | |
def parse_apache_transfer_bytes(value): | |
return 0 if value == '-' else int(value) | |
match = ACCESS_LOG_PATTERN.match(line) | |
if not match: | |
raise Exception('pattern not matched') | |
data = match.groupdict() | |
return extend(data, | |
{'bytes': parse_apache_transfer_bytes(data['bytes']), | |
'time': parse_apache_timestamp(data['time'])}) | |
def hour_stat_mapper(line): | |
doc = parse_apache_log_entry(line) | |
def truncate_to_hour(dt): | |
return dt.replace(minute=0, second=0, microsecond=0) | |
def truncate_to_day(dt): | |
return dt.replace(hour=0, minute=0, second=0, microsecond=0) | |
def truncate_to_month(dt): | |
return dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0) | |
key = truncate_to_hour(doc['time']).isoformat() | |
value = { | |
'total_bytes': doc['bytes'], | |
'remote_addrs': {doc['host']: 1} | |
} | |
yield key, value | |
def hour_stat_reducer(key, values): | |
def merge_and_sum(a, b): | |
"""merge two dicts, summing their keys""" | |
out = dict(a) | |
for key, value in b.iteritems(): | |
out[key] = out.get(key, 0) + value | |
return out | |
def _r(ax, cur): | |
return { | |
'total_bytes': ax['total_bytes'] + cur['total_bytes'], | |
'remote_addrs': merge_and_sum(ax['remote_addrs'], | |
cur['remote_addrs']) | |
} | |
return reduce(_r, values) | |
def range_mapper(line): | |
doc = parse_apache_log_entry(line) | |
yield 'min', doc['time'].isoformat() | |
yield 'max', doc['time'].isoformat() | |
yield 'total', doc['bytes'] | |
def range_reducer(key, values): | |
if key == 'min': | |
return min(values) | |
elif key == 'max': | |
return max(values) | |
elif key == 'total': | |
return sum(values) | |
def duplicate_finding_mapper(line): | |
"""find log entries that are identical | |
it helps to filter value==1 from the final output. | |
""" | |
yield hashlib.sha1(line).hexdigest(), 1 | |
def duplicate_finding_reducer(key, values): | |
return reduce(lambda ax, cur: ax+cur, values) | |
def hash_finding_mapper(line): | |
"""find an entry that has a particular hash value | |
just use the duplicate_finding_reducer with this. | |
""" | |
hash = hashlib.sha1(line).hexdigest() | |
if hash != '55e660c53e953420a5670d6556b5bc12614b2415': | |
return | |
yield line, 1 | |
def map_reduce(data, mapper, reducer, ax): | |
"""in-memory map_reduce using a hashtable, based on raymond's: | |
http://code.activestate.com/recipes/577676-dirt-simple-mapreduce/ | |
my mods: | |
+ reduce periodically to conserve ram | |
+ mappers are generators, and may emit Zero-or-more values | |
+ allow multiple invocations by keeping values array-wrapped | |
Args: | |
data: iterable of items to pass to mapper | |
mapper: mapper function, yielding key, value pairs | |
reducer: reducer function | |
ax: accumulator dictionary | |
""" | |
for item in data: | |
for key, value in mapper(item): | |
ax[key] = ax.get(key, []) + [value] | |
if len(ax[key]) > 5: | |
ax[key] = [reducer(key, ax[key])] | |
for key, values in ax.iteritems(): | |
ax[key] = [reducer(key, values)] | |
def parallel_map_reduce(files, mapper, reducer): | |
"""manage map_reduce() workers, merge-join the results, | |
and perform the final reduce phase. | |
files will be placed in the worker-job queue in order. | |
Args: | |
files: list of filenames to process | |
mapper: map function | |
reducer: reduce function | |
Yields: | |
sorted key/value pairs (each as a tuple) | |
""" | |
# start a worker for each cpu | |
q_request = Queue(1) | |
workers = [] | |
streams = [] | |
for n in range(cpu_count()): | |
q_response = Queue() | |
p = Process(target=worker_main, | |
args=(n, q_request, q_response, mapper, reducer)) | |
p.daemon = True | |
p.start() | |
workers.append(p) | |
streams.append(MergeJoinStream(q_response)) | |
log('started', len(workers), 'workers, waiting for results') | |
# load filenames into the request queue | |
[q_request.put(fn) for fn in files] | |
# ...followed by quit-signals, one for each worker | |
[q_request.put(None) for _ in workers] | |
# perform the merge-join & final reduce | |
[s.advance() for s in streams] # prime streams | |
log('final merge & reduce begins') | |
while any(s.alive for s in streams): | |
streams = [s for s in streams if s.alive] # drop any dead | |
minkey = min(s.key for s in streams) # pick next key | |
values = [s.value for s in streams if s.key == minkey] # gather values | |
[s.advance() for s in streams if s.key == minkey] # read next val | |
yield minkey, reducer(minkey, values) # final reduce! | |
# allow worker processes to terminate properly | |
[p.join() for p in workers] | |
def worker_main(worker_id, q_in, q_out, mapper, reducer): | |
"""parallel map_reduce worker entry point. | |
perform mapreduce on filenames received on q_in until | |
None is received, indicating we should exit. | |
main thread expects to receive sorted key/value pairs | |
q_out. keys _must_ be unique and _may not_ repeat. | |
""" | |
# stage 1: process input files, store in accumulator | |
ax = {} | |
while True: | |
filename = q_in.get() | |
if filename is None: | |
break | |
with open(filename, 'r') as fd: | |
log(worker_id, 'begin reading from', filename) | |
map_reduce(fd, mapper, reducer, ax) | |
# stage 2: sort and send key/val pairs to main-thread | |
keys = ax.keys() | |
keys.sort() | |
for key in keys: | |
q_out.put((key, ax[key][0])) | |
# indicate to main-thread we are terminating | |
q_out.put(None) | |
class MergeJoinStream(object): | |
"""merge-join helper class | |
reads values from a queue, retaining the last | |
read value until the caller (merge-loop) is ready | |
to advance to the next value. | |
sets a flag (alive) to indicate if this stream | |
has reached eof (signaled by None) | |
Attributes: | |
q: stdlib Queue we are reading from | |
data: current value read from the queue | |
alive: bool indicating if we have reached eof | |
""" | |
def __init__(self, q): | |
self.q = q | |
def advance(self): | |
self.data = self.q.get() | |
self.alive = self.data is not None | |
if self.alive: | |
self.key, self.value = self.data | |
def json_list_stream_dump(data, fd, indent=None): | |
"""read elements from an interator, streaming the output | |
to a filelike as a json list. | |
Args: | |
data: iterable of objects to encode | |
fd: filelike stream to write to | |
indent: similar to stdlib json.dumps indent | |
""" | |
if indent is None: | |
pretty_cr, pretty_ws = '', '' | |
else: | |
pretty_cr, pretty_ws = '\n', ' ' * indent | |
fd.write('[') | |
try: | |
item = next(data) | |
except StopIteration: | |
fd.write(']') | |
return | |
fd.write(pretty_cr + pretty_ws) | |
fd.write(json.dumps(item)) | |
for item in data: | |
fd.write(',') | |
fd.write(pretty_cr + pretty_ws) | |
fd.write(json.dumps(item)) | |
fd.write(pretty_cr) | |
fd.write(']') | |
def order_filenames_by_size(fns): | |
"""sort a list of filenames by their sizes according to fstat | |
Args: | |
fns: list of filenames | |
Returns: | |
filenames, sorted in ascending order by size | |
""" | |
with_sizes = [(fn, os.stat(fn).st_size) for fn in fns] | |
with_sizes.sort(lambda a, b: -1 if a[1] < b[1] else 1) | |
return [fn for fn, _ in with_sizes] | |
def log(*args): | |
"""log to stderr, maybe""" | |
if not DEBUG: return | |
sys.stderr.write(' '.join(str(item) for item in args)) | |
sys.stderr.write('\n') | |
def extend(a, b): | |
"""shallow-merge a & b, b has precedence""" | |
out = dict(a) | |
out.update(b) | |
return out | |
if __name__ == '__main__': | |
main() | |
# vim: tabstop=4 shiftwidth=4 softtabstop=4 expandtab |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment