Skip to content

Instantly share code, notes, and snippets.

@tskisner
Last active May 28, 2025 22:35
Show Gist options
  • Select an option

  • Save tskisner/bcd5e535c4e2d474305adae046028502 to your computer and use it in GitHub Desktop.

Select an option

Save tskisner/bcd5e535c4e2d474305adae046028502 to your computer and use it in GitHub Desktop.
Simons Observatory HWP Metadata Compression
#!/usr/bin/env python3
# NOTE: This requires a version of flacarray with this PR merged:
# https://github.com/hpc4cmb/flacarray/pull/9
# (or installed from that branch).
import os
import sys
import re
import numpy as np
import h5py
import flacarray
import matplotlib.pyplot as plt
def copy_dataset(gin, gout, name):
din = gin[name]
data = din[()]
if re.match(r"hwp_angle.*", name) is not None:
# Process an angle field. Quanta is 10x smaller
# than expected measurement precision.
flacgrp = gout.create_group(name)
flacarray.hdf5.write_array(data, flacgrp, level=5, quanta=1.0e-8)
elif re.match(r"hwp_rate.*", name) is not None:
# Process a rate field
flacgrp = gout.create_group(name)
flacarray.hdf5.write_array(data, flacgrp, level=5, quanta=1.0e-8)
elif re.match(r"timestamps*", name) is not None:
# Process timestamps. Use time delta for 20kHz as the quanta,
# even though we are sampling at 200Hz
flacgrp = gout.create_group(name)
flacarray.hdf5.write_array(data, flacgrp, level=5, quanta=5.0e-5)
elif din.dtype == np.dtype(np.int32) or din.dtype == np.dtype(np.int64):
# Integer data, flac compress
flacgrp = gout.create_group(name)
flacarray.hdf5.write_array(data, flacgrp)
else:
# Other datasets, typically single-byte flags. Compress with
# gzip.
dout = gout.create_dataset(name, data=data, compression="gzip")
# Copy attributes
for aname, aval in din.attrs.items():
dout.attrs[aname] = aval
def copy_group(gin, gout):
for aname, aval in gin.attrs.items():
gout.attrs[aname] = aval
for name in gin:
if re.match(r"obs_.*", name) is not None:
print(f"Copy obs {name}", flush=True)
if re.match(r"oper_.*", name) is not None:
print(f"Copy oper {name}", flush=True)
if isinstance(gin[name], h5py.Group):
# Recurse
child = gout.create_group(name)
copy_group(gin[name], child)
else:
# This is a dataset
copy_dataset(gin, gout, name)
def compare(plot_file, original, output, obs_indx=0):
# Open both groups
orig = h5py.File(original, "r")
out = h5py.File(output, "r")
all_obs = list(orig.keys())
oname = all_obs[obs_indx]
in_obs = orig[oname]
out_obs = out[oname]
# Load the timestamps, angle, and rate for both datasets
in_stamps = in_obs["timestamps"][:]
out_stamps = flacarray.hdf5.read_array(out_obs["timestamps"])
in_angle = in_obs["hwp_angle"][:]
out_angle = flacarray.hdf5.read_array(out_obs["hwp_angle"])
in_rate = in_obs["hwp_rate_1"][:]
out_rate = flacarray.hdf5.read_array(out_obs["hwp_rate_1"])
resid_stamps = out_stamps - in_stamps
resid_angle = out_angle - in_angle
resid_rate = out_rate - in_rate
# Plot it
n_samp = 500
fig_dpi = 100
fig_width = 8
fig_height = 20
fig = plt.figure(figsize=(fig_width, fig_height), dpi=fig_dpi)
ax = fig.add_subplot(5, 1, 1, aspect="auto")
ax.plot(in_stamps[:n_samp], in_angle[:n_samp], label="Original")
ax.plot(out_stamps[:n_samp], out_angle[:n_samp], label="FLAC Compressed")
ax.set_xlabel("Time (seconds)")
ax.set_ylabel("Angle (radians)")
ax.legend(loc="best")
ax.set_title("hwp_angle")
ax = fig.add_subplot(5, 1, 2, aspect="auto")
ax.plot(in_stamps[:n_samp], resid_angle[:n_samp])
ax.set_xlabel("Time (seconds)")
ax.set_ylabel("Angle (radians)")
ax.set_title("hwp_angle Residual")
ax = fig.add_subplot(5, 1, 3, aspect="auto")
ax.plot(in_stamps[:n_samp], in_rate[:n_samp], label="Original")
ax.plot(out_stamps[:n_samp], out_rate[:n_samp], label="FLAC Compressed")
ax.set_xlabel("Time (seconds)")
ax.set_ylabel("Rate (Hz)")
ax.legend(loc="best")
ax.set_title("hwp_rate")
ax = fig.add_subplot(5, 1, 4, aspect="auto")
ax.plot(in_stamps[:n_samp], resid_rate[:n_samp])
ax.set_xlabel("Time (seconds)")
ax.set_ylabel("Rate (Hz)")
ax.set_title("hwp_rate Residual")
ax = fig.add_subplot(5, 1, 5, aspect="auto")
ax.plot(np.arange(n_samp, dtype=np.int32), resid_stamps[:n_samp])
ax.set_xlabel("Samples")
ax.set_ylabel("Difference (seconds)")
ax.set_title("Timestamps Residual")
plt.tight_layout()
plt.savefig(plot_file)
plt.close()
orig.close()
out.close()
def main():
if len(sys.argv) > 1:
in_file = sys.argv[1]
else:
print(f"Usage: {sys.argv[0]} <input HWP hdf5 file>")
return
in_root = os.path.splitext(in_file)[0]
out_file = f"{in_root}_flac.h5"
if os.path.isfile(out_file):
os.remove(out_file)
with h5py.File(out_file, "w") as fout:
with h5py.File(in_file, "r") as fin:
copy_group(fin, fout)
compare(f"{out_file}.pdf", in_file, out_file)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment