Skip to content

Instantly share code, notes, and snippets.

@matthew-d-jones
Created July 2, 2019 08:45
Show Gist options
  • Save matthew-d-jones/ae2b2ba2087a323886b2ac21693a0613 to your computer and use it in GitHub Desktop.
Save matthew-d-jones/ae2b2ba2087a323886b2ac21693a0613 to your computer and use it in GitHub Desktop.
Query the offsets for a specified Kafka topic at a specified time
from confluent_kafka import Consumer, TopicPartition
from datetime import datetime
import uuid
import argparse
"""
Query the offsets for a specified topic at a specified time
Example use: offsets_for_times.py -b obito:9092 -t IMAT_sampleEnv -q 2019-06-30T09:00:00
Requires:
confluent-kafka
"""
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-b', '--broker', type=str, help='Broker name and port, for example "localhost:9092"',
required=True)
parser.add_argument('-t', '--topic', type=str, help='Name of topic to query', required=True)
parser.add_argument('-q', '--query-time', type=str,
help='Time to query offset for, format "%Y-%m-%dT%H:%M:%S", example "2019-06-30T09:00:00"',
required=True)
args = parser.parse_args()
consumer = Consumer({'bootstrap.servers': args.broker, 'group.id': uuid.uuid4()})
utc_dt = datetime.strptime(args.query_time, '%Y-%m-%dT%H:%M:%S')
timestamp = int((utc_dt - datetime(1970, 1, 1)).total_seconds()) * 1000 # UTC datetime to milliseconds since the Epoch
topic_partitions = [TopicPartition(args.topic, 0, timestamp)]
consumer.assign(topic_partitions)
low_offet, high_offset = consumer.get_watermark_offsets(topic_partitions[0])
print(f'Topic: {args.topic}, low offset: {low_offet}, high offset: {high_offset}')
offset = consumer.offsets_for_times(topic_partitions, timeout=1.0)[0].offset
print(f'Offset for time {args.query_time} is {offset}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment