Skip to content

Instantly share code, notes, and snippets.

@ivirshup
Last active March 21, 2025 20:35
Show Gist options
  • Save ivirshup/248a2931d6e78cf72dedd9520af0e8de to your computer and use it in GitHub Desktop.
Save ivirshup/248a2931d6e78cf72dedd9520af0e8de to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebook contains some profiling for the pyarrow based implementation of conversion to parquet"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Setup"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"%load_ext memory_profiler"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import itertools\n",
"import subprocess\n",
"from pathlib import Path\n",
"import logging\n",
"\n",
"import dask.dataframe as ddf\n",
"import pandas as pd\n",
"import pyarrow as pa\n",
"import pyarrow.csv, pyarrow.dataset\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"logger = logging.getLogger(__name__)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# First 100,000,000 lines of a large dataset\n",
"TEST_INPUT = \"temporaries/filtered_fragments.tsv.gz\""
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"# Some global variables\n",
"human_chromosome_by_length = {\n",
" \"chr1\": 248956422,\n",
" \"chr2\": 242193529,\n",
" \"chr3\": 198295559,\n",
" \"chr4\": 190214555,\n",
" \"chr5\": 181538259,\n",
" \"chr6\": 170805979,\n",
" \"chr7\": 159345973,\n",
" \"chr8\": 145138636,\n",
" \"chr9\": 138394717,\n",
" \"chr10\": 133797422,\n",
" \"chr11\": 135086622,\n",
" \"chr12\": 133275309,\n",
" \"chr13\": 114364328,\n",
" \"chr14\": 107043718,\n",
" \"chr15\": 101991189,\n",
" \"chr16\": 90338345,\n",
" \"chr17\": 83257441,\n",
" \"chr18\": 80373285,\n",
" \"chr19\": 58617616,\n",
" \"chr20\": 64444167,\n",
" \"chr21\": 46709983,\n",
" \"chr22\": 50818468,\n",
" \"chrX\": 156040895,\n",
" \"chrY\": 57227415,\n",
" \"chrM\": 16569,\n",
" \"GL000009.2\": 201709,\n",
" \"GL000194.1\": 191469,\n",
" \"GL000195.1\": 182896,\n",
" \"GL000205.2\": 185591,\n",
" \"GL000213.1\": 164239,\n",
" \"GL000216.2\": 176608,\n",
" \"GL000218.1\": 161147,\n",
" \"GL000219.1\": 179198,\n",
" \"GL000220.1\": 161802,\n",
" \"GL000225.1\": 211173,\n",
" \"KI270442.1\": 392061,\n",
" \"KI270711.1\": 42210,\n",
" \"KI270713.1\": 40745,\n",
" \"KI270721.1\": 100316,\n",
" \"KI270726.1\": 43739,\n",
" \"KI270727.1\": 448248,\n",
" \"KI270728.1\": 1872759,\n",
" \"KI270731.1\": 150754,\n",
" \"KI270733.1\": 179772,\n",
" \"KI270734.1\": 165050,\n",
" \"KI270744.1\": 168472,\n",
" \"KI270750.1\": 148850,\n",
"}\n",
"mouse_chromosome_by_length = {\n",
" \"chr1\": 195154279,\n",
" \"chr2\": 181755017,\n",
" \"chr3\": 159745316,\n",
" \"chr4\": 156860686,\n",
" \"chr5\": 151758149,\n",
" \"chr6\": 149588044,\n",
" \"chr7\": 144995196,\n",
" \"chr8\": 130127694,\n",
" \"chr9\": 124359700,\n",
" \"chr10\": 130530862,\n",
" \"chr11\": 121973369,\n",
" \"chr12\": 120092757,\n",
" \"chr13\": 120883175,\n",
" \"chr14\": 125139656,\n",
" \"chr15\": 104073951,\n",
" \"chr16\": 98008968,\n",
" \"chr17\": 95294699,\n",
" \"chr18\": 90720763,\n",
" \"chr19\": 61420004,\n",
" \"chrX\": 169476592,\n",
" \"chrY\": 91455967,\n",
" \"chrM\": 16299,\n",
" \"GL456210.1\": 169725,\n",
" \"GL456211.1\": 241735,\n",
" \"GL456212.1\": 153618,\n",
" \"GL456219.1\": 175968,\n",
" \"GL456221.1\": 206961,\n",
" \"GL456239.1\": 40056,\n",
" \"GL456354.1\": 195993,\n",
" \"GL456372.1\": 28664,\n",
" \"GL456381.1\": 25871,\n",
" \"GL456385.1\": 35240,\n",
" \"JH584295.1\": 1976,\n",
" \"JH584296.1\": 199368,\n",
" \"JH584297.1\": 205776,\n",
" \"JH584298.1\": 184189,\n",
" \"JH584299.1\": 953012,\n",
" \"JH584303.1\": 158099,\n",
" \"JH584304.1\": 114452,\n",
"}\n",
"organism_ontology_term_id_by_chromosome_length_table = {\n",
" \"NCBITaxon:9606\": human_chromosome_by_length,\n",
" \"NCBITaxon:10090\": mouse_chromosome_by_length,\n",
"}\n",
"column_ordering = [\n",
" \"chromosome\",\n",
" \"start coordinate\",\n",
" \"stop coordinate\",\n",
" \"barcode\",\n",
" \"read support\",\n",
"]\n",
"allowed_chromosomes = list(\n",
" set(\n",
" itertools.chain(\n",
" human_chromosome_by_length.keys(), mouse_chromosome_by_length.keys()\n",
" )\n",
" )\n",
")\n",
"allowed_chromosomes.sort()\n",
"chromosome_categories = pd.CategoricalDtype(categories=allowed_chromosomes)\n",
"column_types = {\n",
" \"chromosome\": chromosome_categories,\n",
" \"start coordinate\": int,\n",
" \"stop coordinate\": int,\n",
" \"barcode\": str,\n",
" \"read support\": int,\n",
"}\n",
"column_ordering = [\n",
" \"chromosome\",\n",
" \"start coordinate\",\n",
" \"stop coordinate\",\n",
" \"barcode\",\n",
" \"read support\",\n",
"]\n",
"schema = pa.schema(\n",
" [\n",
" pa.field(\"chromosome\", pa.string()),\n",
" pa.field(\"start coordinate\", pa.int64()),\n",
" pa.field(\"stop coordinate\", pa.int64()),\n",
" pa.field(\"barcode\", pa.string()),\n",
" pa.field(\"read support\", pa.int64()),\n",
" ]\n",
")\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"def convert_to_parquet(fragment_file: str, tempdir: str) -> str:\n",
" # unzip the fragment. Subprocess is used here because gzip on the cli uses less memory with comparable\n",
" # speed to the python gzip library.\n",
" # Dask is unable to read a gzip compressed csv in chunks, so we need to decompress first\n",
" unzipped_file = Path(tempdir) / Path(fragment_file).name.rsplit(\".\", 1)[0]\n",
" logger.info(f\"Unzipping {fragment_file}\")\n",
" with open(unzipped_file, \"wb\") as fp:\n",
" subprocess.run([\"gunzip\", \"-c\", fragment_file], stdout=fp, check=True)\n",
"\n",
" # convert the fragment to a parquet file\n",
" # logger.info(f\"Converting {fragment_file} to parquet\")\n",
" parquet_file_path = Path(tempdir) / Path(fragment_file).name.replace(\n",
" \".gz\", \".parquet\"\n",
" )\n",
" try:\n",
" ddf.read_csv(\n",
" unzipped_file,\n",
" sep=\"\\t\",\n",
" names=column_ordering,\n",
" dtype=column_types,\n",
" keep_default_na=False,\n",
" ).to_parquet(parquet_file_path, partition_on=[\"chromosome\"], compute=True)\n",
" finally:\n",
" # remove the unzipped file\n",
" logger.debug(f\"Removing {unzipped_file}\")\n",
" unzipped_file.unlink()\n",
" parquet_file = str(parquet_file_path)\n",
" return parquet_file\n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"def new_convert_to_parquet(fragment_file: str, tempdir: str) -> str:\n",
" logger.info(f\"Unzipping {fragment_file}\")\n",
" parquet_file_path = Path(tempdir) / Path(fragment_file).name.replace(\n",
" \".gz\", \".parquet\"\n",
" ).replace(\".bgz\", \".parquet\")\n",
" pa.dataset.write_dataset(\n",
" data=pa.csv.open_csv(\n",
" pa.input_stream(fragment_file, compression=\"gzip\"),\n",
" read_options=pa.csv.ReadOptions(column_names=schema.names),\n",
" parse_options=pa.csv.ParseOptions(delimiter=\"\\t\"),\n",
" convert_options=pa.csv.ConvertOptions(column_types=schema),\n",
" ),\n",
" base_dir=parquet_file_path,\n",
" format=\"parquet\",\n",
" partitioning=pa.dataset.partitioning(\n",
" pa.schema([pa.field(\"chromosome\", pa.string())]), flavor=\"hive\"\n",
" ),\n",
" )\n",
"\n",
" return str(parquet_file_path)\n"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"if Path(\"performance_test_scratch\").exists():\n",
" !rm -r performance_test_scratch"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"!mkdir -p performance_test_scratch"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"!mkdir performance_test_scratch/old_writer\n",
"!mkdir performance_test_scratch/new_writer"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"peak memory: 177.48 MiB, increment: 0.08 MiB\n"
]
}
],
"source": [
"# baseline\n",
"%memit"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"peak memory: 5990.58 MiB, increment: 5813.06 MiB\n"
]
}
],
"source": [
"%memit convert_to_parquet(TEST_INPUT, \"performance_test_scratch/old_writer\")"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"peak memory: 3149.48 MiB, increment: 0.08 MiB\n"
]
}
],
"source": [
"%memit"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"peak memory: 4368.38 MiB, increment: 1219.89 MiB\n"
]
}
],
"source": [
"%memit new_convert_to_parquet(TEST_INPUT, \"performance_test_scratch/new_writer\")"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"!rm -r performance_test_scratch\n",
"!mkdir performance_test_scratch\n",
"!mkdir performance_test_scratch/new_writer/\n",
"!mkdir performance_test_scratch/old_writer/"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 53.8 s, sys: 23.7 s, total: 1min 17s\n",
"Wall time: 36.7 s\n"
]
},
{
"data": {
"text/plain": [
"'performance_test_scratch/old_writer/filtered_fragments.tsv.parquet'"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"convert_to_parquet(TEST_INPUT, \"performance_test_scratch/old_writer\")"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 32.2 s, sys: 7.09 s, total: 39.3 s\n",
"Wall time: 8.68 s\n"
]
},
{
"data": {
"text/plain": [
"'performance_test_scratch/new_writer/filtered_fragments.tsv.parquet'"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%time\n",
"new_convert_to_parquet(TEST_INPUT, \"performance_test_scratch/new_writer\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "ingestion",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.11"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment