Created
January 31, 2018 15:07
-
-
Save matthew-d-jones/7a80d5bf4c44398dfaae791b378993ee to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import datetime | |
| from datatypes.RunInfo import RunInfo | |
| from datatypes.RunStart import RunStart | |
| from datatypes.RunStop import RunStop | |
| from datatypes.InfoTypes import InfoTypes | |
| from confluent_kafka import Consumer, KafkaError, TopicPartition | |
| """ | |
| Print published run information from Kafka stream | |
| """ | |
| BROKER_NAME = 'localhost' | |
| INSTR_NAME = 'ZOOM' | |
| def unix_time_to_readable(unix_timestamp_ns): | |
| unix_timestamp_s = unix_timestamp_ns / 1000000000 | |
| return datetime.datetime.utcfromtimestamp(unix_timestamp_s).strftime('%Y-%m-%d %H:%M:%S (UTC)') | |
| def parse_message(buf, offset=0): | |
| buf = bytearray(buf) | |
| runInfo = RunInfo.GetRootAsRunInfo(buf, 0) | |
| union_type = runInfo.InfoTypeType() | |
| if union_type == InfoTypes.RunStart: | |
| union_start = RunStart() | |
| union_start.Init(runInfo.InfoType().Bytes, runInfo.InfoType().Pos) | |
| timestamp = union_start.StartTime() | |
| string_to_print = "\nRUNSTART: " + "Start time: " + unix_time_to_readable( | |
| timestamp) + " Run number: " + str(union_start.RunNumber()) | |
| elif union_type == InfoTypes.RunStop: | |
| union_stop = RunStop() | |
| union_stop.Init(runInfo.InfoType().Bytes, runInfo.InfoType().Pos) | |
| timestamp = union_stop.StopTime() | |
| string_to_print = "\nRUNSTOP: " + "Stop time: " + unix_time_to_readable(timestamp) | |
| else: | |
| raise ValueError("Message isn't a RunStart or RunStop, enum: " + str(union_type)) | |
| print(string_to_print) | |
| return timestamp | |
| def get_offset_for_time(timestamp=123456788000, last_offset=0, topicname=INSTR_NAME + '_events'): | |
| topic_partitions_to_search = list( | |
| map(lambda p: TopicPartition(topicname, p, timestamp), [0])) | |
| offsets_with_timeout = c.offsets_for_times(topic_partitions_to_search, timeout=1.0) | |
| print("Timestamp: %s" % timestamp) | |
| print("Offset in %s: %s" % (topicname, offsets_with_timeout[0].offset)) | |
| print("Difference from previous offset: %s" % (offsets_with_timeout[0].offset - last_offset)) | |
| return offsets_with_timeout[0].offset | |
| if __name__ == "__main__": | |
| c = Consumer({'bootstrap.servers': BROKER_NAME, 'group.id': 'python-read-runs', | |
| 'default.topic.config': {'auto.offset.reset': 'smallest'}, 'enable.auto.commit': False}) | |
| c.subscribe([INSTR_NAME + '_runInfo']) | |
| running = True | |
| last_offset = 0 | |
| while running: | |
| msg = c.poll(1000) | |
| if not msg.error(): | |
| print("Got message with Kafka timestamp=" + str(msg.timestamp()[1])) | |
| run_timestamp = parse_message(msg.value()) | |
| last_offset = get_offset_for_time(timestamp=run_timestamp / 1000000, last_offset=last_offset) | |
| elif msg.error().code() != KafkaError._PARTITION_EOF: | |
| print(msg.error()) | |
| running = False | |
| else: | |
| running = False | |
| c.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment