Skip to content

Instantly share code, notes, and snippets.

@matthew-d-jones
Created January 31, 2018 15:07
Show Gist options
  • Save matthew-d-jones/7a80d5bf4c44398dfaae791b378993ee to your computer and use it in GitHub Desktop.
Save matthew-d-jones/7a80d5bf4c44398dfaae791b378993ee to your computer and use it in GitHub Desktop.
import datetime
from datatypes.RunInfo import RunInfo
from datatypes.RunStart import RunStart
from datatypes.RunStop import RunStop
from datatypes.InfoTypes import InfoTypes
from confluent_kafka import Consumer, KafkaError, TopicPartition
"""
Print published run information from Kafka stream
"""
BROKER_NAME = 'localhost'
INSTR_NAME = 'ZOOM'
def unix_time_to_readable(unix_timestamp_ns):
unix_timestamp_s = unix_timestamp_ns / 1000000000
return datetime.datetime.utcfromtimestamp(unix_timestamp_s).strftime('%Y-%m-%d %H:%M:%S (UTC)')
def parse_message(buf, offset=0):
buf = bytearray(buf)
runInfo = RunInfo.GetRootAsRunInfo(buf, 0)
union_type = runInfo.InfoTypeType()
if union_type == InfoTypes.RunStart:
union_start = RunStart()
union_start.Init(runInfo.InfoType().Bytes, runInfo.InfoType().Pos)
timestamp = union_start.StartTime()
string_to_print = "\nRUNSTART: " + "Start time: " + unix_time_to_readable(
timestamp) + " Run number: " + str(union_start.RunNumber())
elif union_type == InfoTypes.RunStop:
union_stop = RunStop()
union_stop.Init(runInfo.InfoType().Bytes, runInfo.InfoType().Pos)
timestamp = union_stop.StopTime()
string_to_print = "\nRUNSTOP: " + "Stop time: " + unix_time_to_readable(timestamp)
else:
raise ValueError("Message isn't a RunStart or RunStop, enum: " + str(union_type))
print(string_to_print)
return timestamp
def get_offset_for_time(timestamp=123456788000, last_offset=0, topicname=INSTR_NAME + '_events'):
topic_partitions_to_search = list(
map(lambda p: TopicPartition(topicname, p, timestamp), [0]))
offsets_with_timeout = c.offsets_for_times(topic_partitions_to_search, timeout=1.0)
print("Timestamp: %s" % timestamp)
print("Offset in %s: %s" % (topicname, offsets_with_timeout[0].offset))
print("Difference from previous offset: %s" % (offsets_with_timeout[0].offset - last_offset))
return offsets_with_timeout[0].offset
if __name__ == "__main__":
c = Consumer({'bootstrap.servers': BROKER_NAME, 'group.id': 'python-read-runs',
'default.topic.config': {'auto.offset.reset': 'smallest'}, 'enable.auto.commit': False})
c.subscribe([INSTR_NAME + '_runInfo'])
running = True
last_offset = 0
while running:
msg = c.poll(1000)
if not msg.error():
print("Got message with Kafka timestamp=" + str(msg.timestamp()[1]))
run_timestamp = parse_message(msg.value())
last_offset = get_offset_for_time(timestamp=run_timestamp / 1000000, last_offset=last_offset)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
else:
running = False
c.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment