Last active
March 30, 2023 03:50
-
-
Save danielballan/b822bebb4e1d7abc5c5cb73b6d9320b7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import metadatastore.conf | |
from collections import deque | |
from tqdm import tqdm | |
import ipyparallel as ipp | |
def main(): | |
old_config = dict(metadatastore.conf.connection_config) | |
new_config = old_config.copy() | |
new_config['database'] = 'metadatastore_production_v1' | |
rc = ipp.Client() | |
dview = rc[:] | |
with dview.sync_imports(): | |
from metadatastore.mds import MDS, MDSRO | |
old = MDSRO(version=0, config=old_config) | |
new = MDS(version=1, config=new_config) | |
dview.push({'old': old, 'new': new}) | |
new._connection.drop_database(new_config['database']) | |
# Drop all indexes on event collection to speed up insert. | |
# They will be rebuilt the next time an MDS(RO) object connects. | |
new._event_col.drop_indexes() | |
total = old._runstart_col.find().count() | |
for start in tqdm(old.find_run_starts(), desc='start docs', total=total): | |
new.insert('start', start) | |
total = old._runstop_col.find().count() | |
for stop in tqdm(old.find_run_stops(), desc='stop docs', total=total): | |
try: | |
new.insert('stop', stop) | |
except RuntimeError: | |
print("error inserting run stop with uid {!r}".format(stop['uid'])) | |
descs = deque() | |
counts = deque() | |
total = old._descriptor_col.find().count() | |
for desc in tqdm(old.find_descriptors(), unit='descriptors', total=total): | |
d_raw = next(old._descriptor_col.find({'uid': desc['uid']})) | |
num_events = old._event_col.find( | |
{'descriptor_id': d_raw['_id']}).count() | |
new.insert('descriptor', desc) | |
out = dict(desc) | |
out['run_start'] = out['run_start']['uid'] | |
descs.append(dict(desc)) | |
counts.append(num_events) | |
def migrate_event_stream(desc_in, num_events): | |
if num_events: | |
# skip empty event stream of bulk insert raises | |
try: | |
events = old.get_events_generator(descriptor=desc_in, | |
convert_arrays=False) | |
new.bulk_insert_events(descriptor=dict(desc_in), | |
events=(dict(e) for e in events)) | |
except KeyError: | |
pass | |
return num_events | |
v = rc.load_balanced_view() | |
amr = v.map(migrate_event_stream, descs, list(counts), ordered=False) | |
total = sum(counts) | |
with tqdm(total=total, unit='events') as pbar: | |
for res in amr: | |
if res: | |
pbar.update(res) | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from metadatastore.mds import MDS, MDSRO | |
import metadatastore.conf | |
from collections import deque | |
from tqdm import tqdm | |
def compare(o, n): | |
try: | |
assert o['uid'] == n['uid'] | |
assert o == n | |
except AssertionError: | |
print(o) | |
print(n) | |
raise | |
def main(): | |
old_config = dict(metadatastore.conf.connection_config) | |
new_config = old_config.copy() | |
new_config['database'] = 'metadatastore_production_v1' | |
old = MDSRO(version=0, config=old_config) | |
new = MDS(version=1, config=new_config) | |
total = old._runstart_col.find().count() | |
old_starts = tqdm(old.find_run_starts(), unit='start docs', total=total, | |
leave=True) | |
new_starts = new.find_run_starts() | |
for o, n in zip(old_starts, new_starts): | |
compare(o, n) | |
total = old._runstop_col.find().count() | |
old_stops = iter(tqdm(old.find_run_stops(), unit='stop docs', total=total)) | |
new_stops = iter(new.find_run_stops()) | |
while True: | |
try: | |
o = next(old_stops) | |
n = next(new_stops) | |
except StopIteration: | |
break | |
while True: | |
if o['uid'] == n['uid']: | |
break | |
print('skipping', o['uid']) | |
o = next(old_stops) | |
o = dict(o) | |
n = dict(n) | |
if o.get('reason') is None or o['reason'] == '': | |
o.pop('reason', None) | |
compare(o, n) | |
descs = deque() | |
counts = deque() | |
total = old._descriptor_col.find().count() | |
old_descs = tqdm(old.find_descriptors(), unit='descriptors', total=total) | |
new_descs = new.find_descriptors() | |
for o, n in zip(old_descs, new_descs): | |
d_raw = next(old._descriptor_col.find({'uid': o['uid']})) | |
num_events = old._event_col.find({'descriptor_id': d_raw['_id']}).count() | |
compare(o, n) | |
descs.append(o) | |
counts.append(num_events) | |
total = sum(counts) | |
with tqdm(total=total, unit='events') as pbar: | |
for desc, num_events in zip(descs, counts): | |
old_events = old.get_events_generator(descriptor=desc, | |
convert_arrays=False) | |
new_events = new.get_events_generator(descriptor=desc, | |
convert_arrays=False) | |
for ev in zip(old_events, new_events): | |
assert o == n | |
pbar.update(num_events) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment