Last active
January 20, 2025 21:38
-
-
Save rajvermacas/3c7cc959a21cc55048f2c1208d602263 to your computer and use it in GitHub Desktop.
ReadBigCsvFile
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Process, Queue | |
def writer_process(output_dir): | |
"""Single process dedicated to writing files""" | |
while True: | |
msg = write_queue.get() | |
if msg == 'STOP': | |
break | |
date_str, data = msg | |
filepath = f"{output_dir}/daily_{date_str}.csv" | |
data.to_csv(filepath, mode='a', header=not os.path.exists(filepath), index=False) | |
def process_chunk(chunk, write_queue): | |
grouped = chunk.groupby(pd.Grouper(key='createdDt', freq='D')) | |
for date, group in grouped: | |
write_queue.put((date.strftime('%Y-%m-%d'), group)) | |
if __name__ == '__main__': | |
write_queue = Queue() | |
writer = Process(target=writer_process, args=('daily_files',)) | |
writer.start() | |
with ProcessPoolExecutor() as executor: | |
chunks = pd.read_csv(..., chunksize=100_000) | |
futures = [executor.submit(process_chunk, chunk, write_queue) for chunk in chunks] | |
write_queue.put('STOP') | |
writer.join() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import os | |
from datetime import datetime | |
def process_large_file(): | |
# Configuration | |
input_file = 'large_file.csv' | |
output_dir = 'daily_files' | |
date_column = 'createdDt' | |
chunksize = 100000 # Adjust based on your system's memory | |
file_prefix = 'daily_' | |
# Create output directory if it doesn't exist | |
os.makedirs(output_dir, exist_ok=True) | |
# Get existing processed dates | |
existing_dates = set() | |
for filename in os.listdir(output_dir): | |
if filename.startswith(file_prefix) and filename.endswith('.csv'): | |
date_str = filename[len(file_prefix):-4] | |
existing_dates.add(date_str) | |
# Process file in chunks | |
for chunk_idx, chunk in enumerate(pd.read_csv( | |
input_file, | |
chunksize=chunksize, | |
parse_dates=[date_column], | |
infer_datetime_format=True, | |
low_memory=False | |
)): | |
print(f"Processing chunk {chunk_idx + 1}") | |
# Group by date | |
grouped = chunk.groupby(pd.Grouper(key=date_column, freq='D')) | |
# Process each daily group | |
for date, group in grouped: | |
if group.empty: | |
continue | |
date_str = date.strftime('%Y-%m-%d') | |
output_path = os.path.join(output_dir, f'{file_prefix}{date_str}.csv') | |
# Determine write mode and header | |
if date_str in existing_dates: | |
mode = 'a' | |
header = False | |
else: | |
mode = 'a' if os.path.exists(output_path) else 'w' | |
header = not os.path.exists(output_path) | |
existing_dates.add(date_str) | |
# Write to file | |
group.to_csv( | |
output_path, | |
mode=mode, | |
header=header, | |
index=False, | |
date_format='%Y-%m-%d %H:%M:%S' # Keep original datetime format | |
) | |
if __name__ == '__main__': | |
process_large_file() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment