Skip to content

Instantly share code, notes, and snippets.

@rajvermacas
Last active January 20, 2025 21:38
Show Gist options
  • Save rajvermacas/3c7cc959a21cc55048f2c1208d602263 to your computer and use it in GitHub Desktop.
Save rajvermacas/3c7cc959a21cc55048f2c1208d602263 to your computer and use it in GitHub Desktop.
ReadBigCsvFile
from multiprocessing import Process, Queue
def writer_process(output_dir):
"""Single process dedicated to writing files"""
while True:
msg = write_queue.get()
if msg == 'STOP':
break
date_str, data = msg
filepath = f"{output_dir}/daily_{date_str}.csv"
data.to_csv(filepath, mode='a', header=not os.path.exists(filepath), index=False)
def process_chunk(chunk, write_queue):
grouped = chunk.groupby(pd.Grouper(key='createdDt', freq='D'))
for date, group in grouped:
write_queue.put((date.strftime('%Y-%m-%d'), group))
if __name__ == '__main__':
write_queue = Queue()
writer = Process(target=writer_process, args=('daily_files',))
writer.start()
with ProcessPoolExecutor() as executor:
chunks = pd.read_csv(..., chunksize=100_000)
futures = [executor.submit(process_chunk, chunk, write_queue) for chunk in chunks]
write_queue.put('STOP')
writer.join()
import pandas as pd
import os
from datetime import datetime
def process_large_file():
# Configuration
input_file = 'large_file.csv'
output_dir = 'daily_files'
date_column = 'createdDt'
chunksize = 100000 # Adjust based on your system's memory
file_prefix = 'daily_'
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Get existing processed dates
existing_dates = set()
for filename in os.listdir(output_dir):
if filename.startswith(file_prefix) and filename.endswith('.csv'):
date_str = filename[len(file_prefix):-4]
existing_dates.add(date_str)
# Process file in chunks
for chunk_idx, chunk in enumerate(pd.read_csv(
input_file,
chunksize=chunksize,
parse_dates=[date_column],
infer_datetime_format=True,
low_memory=False
)):
print(f"Processing chunk {chunk_idx + 1}")
# Group by date
grouped = chunk.groupby(pd.Grouper(key=date_column, freq='D'))
# Process each daily group
for date, group in grouped:
if group.empty:
continue
date_str = date.strftime('%Y-%m-%d')
output_path = os.path.join(output_dir, f'{file_prefix}{date_str}.csv')
# Determine write mode and header
if date_str in existing_dates:
mode = 'a'
header = False
else:
mode = 'a' if os.path.exists(output_path) else 'w'
header = not os.path.exists(output_path)
existing_dates.add(date_str)
# Write to file
group.to_csv(
output_path,
mode=mode,
header=header,
index=False,
date_format='%Y-%m-%d %H:%M:%S' # Keep original datetime format
)
if __name__ == '__main__':
process_large_file()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment