Skip to content

Instantly share code, notes, and snippets.

@crypdick
crypdick / count_parquet_rows.py
Created August 12, 2025 20:49
Count the number of rows in a sharded parquet dataset without loading the shards. Works by reading just the metadata headers. Works with S3 datasets
import pyarrow.dataset as ds
def count_parquet_rows(dataset_path: str) -> int:
"""
Count the number of rows in a parquet file without reading the data into memory.
https://stackoverflow.com/a/79118602/4212158
"""
dataset = ds.dataset(dataset_path, format="parquet")
row_count = sum(row_group.num_rows for fragment in dataset.get_fragments() for row_group in fragment.row_groups)
Traceback (most recent call last):
File "/tmp/ray/session_2025-08-08_19-13-17_305038_2243/runtime_resources/working_dir_files/s3_ray-release-automation-results_working_dirs_text_embeddings_benchmark_fixed_size_preemptible_gswrofihok__anyscale_pkg_aa4c368f375d6f6f25845bef969f1c00/dataset/text_embeddings_benchmark.py", line 257, in <module>
benchmark.run_fn("text-embeddings-benchmark", main, args)
File "/tmp/ray/session_2025-08-08_19-13-17_305038_2243/runtime_resources/working_dir_files/s3_ray-release-automation-results_working_dirs_text_embeddings_benchmark_fixed_size_preemptible_gswrofihok__anyscale_pkg_aa4c368f375d6f6f25845bef969f1c00/dataset/benchmark.py", line 154, in run_fn
fn_output = fn(*fn_args, **fn_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/tmp/ray/session_2025-08-08_19-13-17_305038_2243/runtime_resources/working_dir_files/s3_ray-release-automation-results_working_dirs_text_embeddings_benchmark_fixed_size_preemptible_gswrofihok__anyscale_pkg_aa4c368f375d6f6f25845bef969f1c0
@crypdick
crypdick / convert_arrow_to_parquet_streaming.py
Created August 8, 2025 20:04
Convert large Arrow shards into Parquet without loading the entire dataset into memory.
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "pyarrow",
# ]
# ///
"""Convert .arrow shards to Parquet without loading entire dataset into memory.
- Discovers all .arrow files under a given source directory
@crypdick
crypdick / gpu_util_tracker.py
Created July 23, 2025 00:43
Actor that integrates GPU capacity over time across a Ray cluster.
import threading
import time
import ray
@ray.remote(num_cpus=0)
class GPUHoursTracker:
"""Actor that integrates GPU capacity over time across a Ray cluster.
@crypdick
crypdick / code.py
Last active July 17, 2025 20:23
repro for ray data llm failure with parquet sink
import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor
config = vLLMEngineProcessorConfig(model_source="unsloth/Llama-3.2-1B-Instruct")
processor = build_llm_processor(
config,
preprocess=lambda row: {
"messages": [
{"role": "system", "content": "You are a bot that responds with haikus."},
{"role": "user", "content": row["item"]},
@crypdick
crypdick / lerp_vs_slurp_2_vectors.py
Created July 7, 2025 03:06
for my blog post on multi-vector slerp
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# Create a 3D plot showing the vectors and interpolations
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')
# Plot original vectors
ax.quiver(0, 0, 0, vecs[0][0], vecs[0][1], vecs[0][2],
color='black', arrow_length_ratio=0.1, label='v0 [1,0,0]')
<deleted 50,000 lines of logs repeating the same thing>
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=3858, ip=10.0.54.100) File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/v1/engine/output_processor.py", line 51, in get
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=3858, ip=10.0.54.100) raise output
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=3858, ip=10.0.54.100) File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/llm/_internal/batch/stages/vllm_engine_stage.py", line 317, in generate_async
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=3858, ip=10.0.54.100) output = await self._generate_async(request)
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=3858, ip=10.0.54.100) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=3858, ip=10.0.54.100) File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/llm/_internal/batch/stages/vllm_engine_stage.py", line 399, in generate_as
@crypdick
crypdick / flaky_ray_data_llm.log
Created June 17, 2025 23:29
stack trace for flaky ray data llm workload
2025-06-17 16:26:19,406 DEBUG streaming_executor.py:546 -- 9: - MapBatches(vLLMEngineStageUDF): Tasks: 24; Actors: 3; Queued blocks: 13; Resources: 0.0 CPU, 3.0 GPU, 768.0MB object store; [8/24 objects local], Blocks Outputted: 0/None
2025-06-17 16:26:19,406 DEBUG streaming_executor.py:546 -- 10: - MapBatches(DetokenizeUDF): Tasks: 0; Actors: 1; Queued blocks: 0; Resources: 1.0 CPU, 0.0B object store; [all objects local], Blocks Outputted: 0/None
2025-06-17 16:26:19,406 DEBUG streaming_executor.py:546 -- 11: - Map(_postprocess)->Filter(NoneType)->Write: Tasks: 0; Actors: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store, Blocks Outputted: 0/None
2025-06-17 16:26:27,977 ERROR streaming_executor_state.py:519 -- An exception was raised from a task of operator "MapBatches(vLLMEngineStageUDF)". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.
Traceback (most recent call last):
File "/home/ray/anaconda3/lib/python3.11/site-packages/ray/data/_inter
@crypdick
crypdick / anyscale_job_submit_err.log
Created June 16, 2025 18:40
job completes successfully, but cell throws a CalledProcessError
{
"name": "CalledProcessError",
"message": "Command 'b'\
# Production batch job -- note that this is a bash cell\
! anyscale job submit --name=train-xboost-breast-cancer-model \\\\\
--containerfile=\"${WORKING_DIR}/containerfile\" \\\\\
--working-dir=\"${WORKING_DIR}\" \\\\\
--exclude=\"\" \\\\\
--wait \\\\\
--max-retries=0 \\\\\
---------------------------------------------------------------------------
SystemException Traceback (most recent call last)
SystemException:
The above exception was the direct cause of the following exception:
RayTaskError(TypeError) Traceback (most recent call last)
/home/ray/default/e2e-audio/e2e_audio/curation.ipynb Cell 16 line 1
----> 1 print(ds.take(1))