Skip to content

Instantly share code, notes, and snippets.

@njvack
Created June 27, 2024 17:36
  • Select an option

Select an option

Revisions

  1. njvack created this gist Jun 27, 2024.
    243 changes: 243 additions & 0 deletions ema_gen.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,243 @@
    #!/usr/bin/env python

    """
    Simple EMA schedule generator. Outputs to stdout.
    Usage:
    ema_gen.py [options] <record_id> <sched_field> <start_time> <days> <samples_per_day> <sampling_min> <gap_min>
    ema_gen.py interval_test <iters> <num_samples> <sampling_minutes> <gap_minutes>
    Options:
    --rec-field=<str> Name of the record_id field [default: record_id]
    --event-name=<str> Name of the REDCap event, if it's longitudinal
    --start-date=<date> Date to start generating; [default: tomorrow]
    -v --verbose Print debugging information
    """

    import csv
    from datetime import datetime, timedelta
    import random
    import sys

    from docopt import docopt

    import logging

    logging.basicConfig(format="%(message)s")
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)

    MAX_ITERS = 1000


    def random_minute_offsets(num_samples, total_minutes, gap_minutes, day_jitter_scale=0):
    """
    Return num_samples samples, over total_minutes time, separated by
    at least gap_minutes. Values are integer numbers of minutes.
    It divides the day into num_samples periods, randomly (from a uniform
    distribution) places samples in those periods, and rerolls if any samples
    are closer than gap_minutes together.
    TODO: See if we can fix the thing where the distribution of times isn't
    flat.
    """
    day_jitter_max = gap_minutes * day_jitter_scale
    total_minutes = total_minutes - day_jitter_max
    day_jitter = random.randrange(day_jitter_max + 1)
    fragment_length = total_minutes // num_samples
    iter = 0
    while iter < MAX_ITERS:
    time_offsets = [
    random.randrange(fragment_length) + (i * fragment_length) + day_jitter
    for i in range(num_samples)
    ]
    logger.debug(f"Iter {iter}: Generated {time_offsets}")
    if num_samples == 1:
    return time_offsets
    diffs = [
    (time_offsets[i + 1] - time_offsets[i])
    for i in range(len(time_offsets) - 1)
    ]
    if min(diffs) > gap_minutes:
    return time_offsets
    iter += 1
    return None


    def random_timedeltas(num_samples, total_minutes, gap_minutes):
    return [
    timedelta(minutes=ofs)
    for ofs in random_minute_offsets(num_samples, total_minutes, gap_minutes)
    ]


    # def make_sample_times(start_datetime, offsets):
    # """
    # See the daylight savings warning on time_str_to_delta
    # """
    # return [start_datetime + ofs for ofs in offsets]


    def time_str_to_delta(time_string):
    """
    Takes a time formatted like "15:00 or "3:00 pm" or "3:00PM" and returns a
    timedelta that'll get you from midnight to there. Except maybe on Daylight
    Savings days.
    Actually yes it will absolutely do something dumb on those days, we'll need
    to do more complicated things in those cases -- in particular, we'll need to
    have timezone-aware times.
    For now, just remember about this dumbness.
    """
    normed = time_string.upper().replace(" ", "")
    format = "%H:%M"
    if "M" in normed:
    format = "%I:%M%p"
    parsed = datetime.strptime(normed, format)
    delta = timedelta(hours=parsed.hour, minutes=parsed.minute)
    return delta


    def make_sample_times(date, start_delta, samples, sampling_min, gap_min):
    sample_deltas = random_timedeltas(samples, sampling_min, gap_min)
    sample_datetimes = [
    date + start_delta + sample_delta for sample_delta in sample_deltas
    ]
    return sample_datetimes


    def str_to_date(date_str):
    """
    date_str could be "tomorrow" or something like "2024-06-27"
    In either case, return a real datetime, starting at midnight, on that date
    """
    gen_date = None
    if date_str.lower() == "tomorrow":
    today = datetime.now().date()
    tomorrow = today + timedelta(days=1)
    gen_date = tomorrow
    else:
    gen_date = datetime.strptime(date_str, "%Y-%m-%d").date()
    return datetime.combine(gen_date, datetime.min.time())


    def generate_schedule(
    start_date, start_delta, days, samples_per_day, sampling_min, gap_min
    ):
    """
    Generates datetimes for every sample we're going to collect.
    Returns a list of datetimes.
    """
    schedule = []
    for day in range(days):
    day_delta = timedelta(days=day)
    gen_date = start_date + day_delta
    sample_times = make_sample_times(
    gen_date, start_delta, samples_per_day, sampling_min, gap_min
    )
    logger.debug(f"Generated {sample_times}")
    schedule += sample_times
    return schedule


    def schedule_to_dicts(schedule, record_id, record_field, event_name, sched_field):
    """
    Turns the schedule into a dict formatted for import into REDCap.
    """
    rows = []
    for instance_num, ema_time in enumerate(schedule):
    row = {}
    row[record_field] = record_id
    if event_name is not None:
    row["redcap_event_name"] = event_name
    row["redcap_repeat_instance"] = instance_num + 1
    row[sched_field] = ema_time.strftime("%Y-%m-%d %H:%M:00")
    rows.append(row)
    return rows


    def main(
    record_id,
    record_field,
    sched_field,
    event_name,
    start_date_str,
    start_time_str,
    days_str,
    samples_per_day,
    sampling_min,
    gap_min,
    ):
    """
    Generate some CSV data.
    It'll have the columns:
    <record_field>
    redcap_event_name (if event_name is not None)
    redcap_repeat_instance
    <sched_field>
    It'll have as many rows as the value of <days> * <samples_per_day>
    <record_field> will be <record_id>
    We'll compute the values for <sched_field>
    """
    logger.debug("We are in main")
    start_date = str_to_date(start_date_str)
    logger.debug(f"Start date is {start_date}")
    days = int(days_str)
    start_delta = time_str_to_delta(start_time_str)
    samples = int(samples_per_day)
    sampling_min = int(sampling_min)
    gap_min = int(gap_min)
    schedule = generate_schedule(
    start_date, start_delta, days, samples, sampling_min, gap_min
    )
    logger.debug(f"Generated schedule with {len(schedule)} items: {schedule}")
    redcap_rows = schedule_to_dicts(
    schedule, record_id, record_field, event_name, sched_field
    )
    logger.debug(redcap_rows)
    writer = csv.DictWriter(sys.stdout, fieldnames=redcap_rows[0].keys())
    writer.writeheader()
    for row in redcap_rows:
    writer.writerow(row)


    def interval_test(args):
    iters = int(args["<iters>"])
    samples = int(args["<num_samples>"])
    sample_min = int(args["<sampling_minutes>"])
    sample_gap = int(args["<gap_minutes>"])
    for i in range(iters):
    results = random_minute_offsets(samples, sample_min, sample_gap)
    logger.info(f"Run {i}: Solution: {results}")
    if results is None:
    logger.critical("Could not find a solution!")
    sys.exit(1)


    if __name__ == "__main__":
    args = docopt(__doc__)
    if args["--verbose"]:
    logger.setLevel(logging.DEBUG)
    logger.debug("Called with args:")
    logger.debug(args)
    if args["interval_test"]:
    interval_test(args)

    else:
    main(
    args["<record_id>"],
    args["--rec-field"],
    args["<sched_field>"],
    args["--event-name"],
    args["--start-date"],
    args["<start_time>"],
    args["<days>"],
    args["<samples_per_day>"],
    args["<sampling_min>"],
    args["<gap_min>"],
    )