Last active
June 20, 2019 19:14
-
-
Save mkowoods/7b3d819c7298dfb3faac to your computer and use it in GitHub Desktop.
An example of how to use map reduce logic to sort a file that is greater than the size of memorey
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import os | |
random.seed(42) | |
F_PATH = "data.csv" | |
OUTPUT_PATH_TEMPLATE = "tmp_%d.csv" | |
CHUNK_SIZE = 5 | |
CHUNK_CTR = 0 | |
BUFFER = [] | |
def write_lines(f_obj, data, new_line_char = "\n"): | |
"""where data is an iterable of strings""" | |
f_obj.write(new_line_char.join(data)) | |
def write_chunk(data, chunk_iter): | |
out_path = OUTPUT_PATH_TEMPLATE%chunk_iter | |
print data | |
with open(out_path, 'wb') as f: | |
write_lines(f, data) | |
print 'Wrote: ', out_path | |
def clear_buffer(): | |
global BUFFER | |
BUFFER = [] | |
def update_ctr(): | |
global CHUNK_CTR | |
CHUNK_CTR += 1 | |
def sort_buffer_and_dump_to_disk(): | |
BUFFER.sort() | |
write_chunk(BUFFER, CHUNK_CTR) | |
clear_buffer() | |
update_ctr() | |
#create FAKE Shuffled Data | |
with open(F_PATH, 'wb') as f_fake: | |
data = ["This is line %02d"%i for i in range(25)] | |
random.shuffle(data) | |
write_lines(f_fake, data) | |
print '-------------Input Data ------------------' | |
for line in data: | |
print line | |
print '-------------Mapping to Partitions and dumped to tmp storage after sorting------------------' | |
with open(F_PATH, 'rb') as f: | |
#opens source file iterates through each line | |
for idx, line in enumerate(f): | |
#add new line to buffer list in memory to hold a batch of results | |
BUFFER.append(line.strip()) | |
if idx == 0: | |
continue | |
#if idx is roughly size of the CHUNK then sort the data in the BUFFER | |
#and write it to disk. You could also you use len or sys.getsizeof | |
if (idx % CHUNK_SIZE == 0): | |
sort_buffer_and_dump_to_disk() | |
#if there's anything left in the buffer | |
if BUFFER: | |
sort_buffer_and_dump_to_disk() | |
print '-------------Merge Data------------------' | |
#create an array to hold pointers to each file on disk | |
FILES = [open(OUTPUT_PATH_TEMPLATE%i, 'rb') for i in range(CHUNK_CTR)] | |
#initialize a heap to hold the current record(buffer) from each file on disk | |
import heapq | |
heap = [] | |
for idx, f in enumerate(FILES): | |
heapq.heappush(heap, (f.next(), idx)) | |
while heap: | |
assert len(heap) <= len(FILES), 'Heap should only be one line from all files' | |
#read top element from heap (smallest current element from each file) | |
item, f_idx = heapq.heappop(heap) | |
#add the next element to the heap which would be the next record in the same file | |
#that you just popped. If you're at the end of the file then close that file. | |
try: | |
new_item = FILES[f_idx].next() | |
heapq.heappush(heap, (new_item, f_idx)) | |
except StopIteration: | |
FILES[f_idx].close() | |
#this is the data you'd write to a file one record at a time. | |
#you can write a simple function to handle that or wrap the whole merge step | |
#in a context mgr | |
print item.strip() | |
#just in case any files were left open. can be removed in practice | |
for f in FILES: | |
f.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment