- explore_pgcam_data.ipynb -> demo of how to pulll reduced data out of the msgpack databroker
- extract.py -> code run on xpd against raw data broker to sort out what raw runs we are interested in
- reprocess.py -> code run to re-process the raw data and produce the reduced outputs
Last active
February 18, 2021 19:02
-
-
Save tacaswell/4ff2365a6bb39273b627e7b1da2614a8 to your computer and use it in GitHub Desktop.
Files for working with reduced data from 2020-12 experiments at XPD
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from databroker import Broker | |
from databroker.queries import TimeRange | |
from databroker._drivers.msgpack import BlueskyMsgpackCatalog | |
import datetime | |
import itertools | |
from collections import defaultdict | |
import json | |
def summerize(by_sample): | |
tmp = [] | |
for k, v in by_sample.items(): | |
times = [h.metadata["start"]["time"] for h in v] | |
start_time = min(times) | |
stop_time = max(times) | |
tmp.append( | |
( | |
start_time, | |
f"{k!r:<25}:{len(v):<4} " | |
+ f"{datetime.datetime.fromtimestamp(start_time)} - " | |
+ f"{datetime.datetime.fromtimestamp(stop_time)}", | |
) | |
) | |
for _, pp in sorted(tmp): | |
print(pp) | |
def groupby_sample(hdrs): | |
ret = defaultdict(list) | |
for h in hdrs: | |
ret[h.metadata["start"].get("sample_name", None)].append(h) | |
return ret | |
def by_run_phase(by_sample, groups): | |
out = {} | |
for k, v in groups.items(): | |
out[k] = list(itertools.chain(*(by_sample[s] for s in v))) | |
return out | |
def by_reduced_status(by_phase, has_reduction): | |
out = {} | |
for k, v in by_phase.items(): | |
wd = out[k] = {"needs": [], "has": []} | |
for h in v: | |
if h.metadata["start"]["uid"] in has_reduction: | |
wd["has"].append(h) | |
else: | |
wd["needs"].append(h) | |
return out | |
db = Broker.named("xpd") | |
cat = db.v2 | |
tr = TimeRange(since="2020-12-07 19:00", until="2020-12-12 12:00") | |
hdrs = list(cat[d] for d in cat.search(tr).search({"bt_uid": "98812e29"})) | |
by_sample = groupby_sample(hdrs) | |
print({k: len(v) for k, v in by_sample.items()}) | |
ticumg_data = {k: v for k, v in by_sample.items() if k is not None and "TiCuMg" in k} | |
by_phase = by_run_phase( | |
ticumg_data, | |
{ | |
"grid": ["TiCuMg_alloy", "TiCuMg_alloy_2", "TiCuMg_alloy_3"], | |
"gpcam": ["TiCuMg_alloy_auto_1"], | |
"xca": ["TiCuMg_alloy_adapt"], | |
}, | |
) | |
print({k: len(v) for k, v in by_phase.items()}) | |
local_cat = BlueskyMsgpackCatalog(["reduced/*.msgpack"]) | |
have_reduced = set( | |
local_cat[d].metadata["start"]["original_start_uid"] for d in local_cat.search({}) | |
) | |
work_plan = by_reduced_status(by_phase, have_reduced) | |
print({k: {_k: len(_v) for _k, _v in v.items()} for k, v in work_plan.items()}) | |
just_uids = {k: {_k: [_h.metadata['start']['uid'] for _h in _v] for _k, _v in v.items()} for k, v in work_plan.items()} | |
with open(Path('~/work_plan.json').expanduser(), 'w') as fout: | |
json.dump(just_uids, fout) | |
# len(have_reduced) | |
# have_reduced - data_uids | |
# len(have_reduced - data_uids) | |
# len(data_uids - have_reduced) | |
# to_export = list(data_uids - have_reduced) | |
# to_export | |
# len(data_uids | have_reduced) | |
# len(data_uids & have_reduced) | |
# len(to_export) | |
# just_copy = data_uids - have_reduced | |
# import itertools | |
# list(itertools.chain(*(local_cat.search({"original_start_uid": d}) for d in just_copy))) | |
# just_copy | |
# local_cat.search({"oiginal_start_uid": "05006b0d-e55b-4e9c-b359-e59aee82bad0"}) | |
# list(local_cat.search({"oiginal_start_uid": "05006b0d-e55b-4e9c-b359-e59aee82bad0"})) | |
# have_reduced | |
# list(local_cat.search({"oiginal_start_uid": "ffe934b2-46ec-4607-993c-dbe509447c1a"})) | |
# list(local_cat.search(oiginal_start_uid="ffe934b2-46ec-4607-993c-dbe509447c1a")) | |
# reduced_headers = [local_cat[d] for d in local_cat.search({})] | |
# just_export_headers = [ | |
# h for h in reduced_headers if h["original_start_uid"] in just_export | |
# ] | |
# just_export_headers = [ | |
# h | |
# for h in reduced_headers | |
# if h.metadata["start"]["original_start_uid"] in just_export | |
# ] | |
# just_export_headers = [ | |
# h for h in reduced_headers if h.metadata["start"]["original_start_uid"] in just_copy | |
# ] | |
# len(just_export_headers) | |
# just_copy = data_uids & have_reduced | |
# list(itertools.chain(*(local_cat.search({"original_start_uid": d}) for d in just_copy))) | |
# just_copy_reduced_uids = list( | |
# itertools.chain(*(local_cat.search({"original_start_uid": d}) for d in just_copy)) | |
# ) | |
# len(local_cat) | |
# len(res) | |
# res = list(cat.search(dict(bt_safN="306612"))) | |
# len(res) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Run data reduction in-line | |
""" | |
import copy | |
from pathlib import Path | |
import json | |
from functools import partial | |
import itertools | |
from event_model import RunRouter as RealRunRouter | |
from event_model import unpack_datum_page, unpack_event_page | |
from rapidz import Stream, move_to_first | |
from rapidz.link import link | |
from shed import SimpleToEventStream | |
from suitcase.msgpack import Serializer | |
from xpdan.pipelines.main import ( | |
image_process, | |
calibration, | |
clear_geo_gen, | |
scattering_correction, | |
gen_mask, | |
integration, | |
clear_comp, | |
start_gen, | |
pdf_gen, | |
) | |
from xpdan.pipelines.to_event_model import pipeline_order as tem_pipeline_order | |
from xpdan.pipelines.to_event_model import to_event_stream_with_ind | |
from xpdan.vend.callbacks.core import RunRouter, StripDepVar | |
from databroker.v0 import Broker | |
db = Broker.named("xpd") | |
pipeline_order = [ | |
partial(start_gen, db=db, image_names=["pe2_image", "pe1_image"]), | |
image_process, | |
calibration, | |
clear_geo_gen, | |
scattering_correction, | |
gen_mask, | |
integration, | |
pdf_gen, | |
clear_comp, | |
] | |
order = pipeline_order + tem_pipeline_order | |
def create_analysis_pipeline( | |
*, order, publisher, stage_blacklist=(), **kwargs, | |
): | |
"""Create the analysis pipeline from an list of chunks and pipeline kwargs | |
Parameters | |
---------- | |
order : list of functions | |
The list of pipeline chunk functions | |
kwargs : Any | |
The kwargs to pass to the pipeline creation | |
Returns | |
------- | |
namespace : dict | |
The namespace of the pipeline | |
""" | |
namespace = link(*order, raw_source=Stream(stream_name="raw source"), **kwargs) | |
source = namespace["source"] | |
# do inspection of pipeline for ToEventModel nodes, maybe? | |
# for analyzed data with independent data (vis and save) | |
# strip the dependant vars form the raw data | |
raw_stripped = move_to_first(source.starmap(StripDepVar())) | |
namespace.update( | |
to_event_stream_with_ind( | |
raw_stripped, | |
*[ | |
node | |
for node in namespace.values() | |
if isinstance(node, SimpleToEventStream) | |
and node.md.get("analysis_stage", None) not in stage_blacklist | |
], | |
publisher=publisher, | |
) | |
) | |
return namespace | |
def diffraction_router(start, diffraction_dets, xrd_namespace): | |
# This does not support concurrent radiograms and diffractograms | |
# If there are diffraction detectors in the list, this is diffraction | |
if any(d in diffraction_dets for d in start["detectors"]): | |
print("analyzing as diffraction") | |
return lambda *x: xrd_namespace["raw_source"].emit(x) | |
def create_analysis_pipeline( | |
order, stage_blacklist=(), *, publisher, **kwargs, | |
): | |
"""Create the analysis pipeline from an list of chunks and pipeline kwargs | |
Parameters | |
---------- | |
order : list of functions | |
The list of pipeline chunk functions | |
kwargs : Any | |
The kwargs to pass to the pipeline creation | |
Returns | |
------- | |
namespace : dict | |
The namespace of the pipeline | |
""" | |
namespace = link(*order, raw_source=Stream(stream_name="raw source"), **kwargs) | |
source = namespace["source"] | |
# do inspection of pipeline for ToEventModel nodes, maybe? | |
# for analyzed data with independent data (vis and save) | |
# strip the dependant vars form the raw data | |
raw_stripped = move_to_first(source.starmap(StripDepVar())) | |
namespace.update( | |
to_event_stream_with_ind( | |
raw_stripped, | |
*[ | |
node | |
for node in namespace.values() | |
if isinstance(node, SimpleToEventStream) | |
and node.md.get("analysis_stage", None) not in stage_blacklist | |
], | |
publisher=publisher, | |
) | |
) | |
return namespace | |
def run_processor( | |
order, *, db, diffraction_dets, stage_blacklist=(), publisher=None, **kwargs, | |
): | |
print(kwargs) | |
db.prepare_hook = lambda x, y: copy.deepcopy(y) | |
rr = RunRouter( | |
[diffraction_router], | |
xrd_namespace=create_analysis_pipeline( | |
order=order, | |
stage_blacklist=stage_blacklist, | |
publisher=publisher, | |
**kwargs, | |
db=db, | |
), | |
diffraction_dets=diffraction_dets, | |
) | |
return rr | |
def filter_factory(base_name): | |
def filter(name, doc): | |
if doc["analysis_stage"] == "integration": | |
print("got one!") | |
return ( | |
[Serializer(Path("~").expanduser() / "adaptive_reduced" / base_name)], | |
[], | |
) | |
return [], [] | |
return filter | |
with open(Path("~/work_plan.json").expanduser(), "r") as fin: | |
just_uids = json.load(fin) | |
for k, v in just_uids.items(): | |
filter = filter_factory(k) | |
outputter = RealRunRouter([filter]) | |
rr = run_processor( | |
order=order, db=db, diffraction_dets=["pe1", "pe2"], publisher=outputter | |
) | |
for uid in itertools.chain(*v.values()): | |
h = db[uid] | |
for name, doc in h.documents(): | |
doc = dict(doc) | |
if name == "datum_page": | |
for d in unpack_datum_page(doc): | |
rr("datum", d) | |
elif name == "event_page": | |
for d in unpack_event_page(doc): | |
rr("event", d) | |
else: | |
rr(name, doc) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment