Created
July 2, 2019 08:45
-
-
Save matthew-d-jones/ae2b2ba2087a323886b2ac21693a0613 to your computer and use it in GitHub Desktop.
Query the offsets for a specified Kafka topic at a specified time
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from confluent_kafka import Consumer, TopicPartition | |
| from datetime import datetime | |
| import uuid | |
| import argparse | |
| """ | |
| Query the offsets for a specified topic at a specified time | |
| Example use: offsets_for_times.py -b obito:9092 -t IMAT_sampleEnv -q 2019-06-30T09:00:00 | |
| Requires: | |
| confluent-kafka | |
| """ | |
| parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
| parser.add_argument('-b', '--broker', type=str, help='Broker name and port, for example "localhost:9092"', | |
| required=True) | |
| parser.add_argument('-t', '--topic', type=str, help='Name of topic to query', required=True) | |
| parser.add_argument('-q', '--query-time', type=str, | |
| help='Time to query offset for, format "%Y-%m-%dT%H:%M:%S", example "2019-06-30T09:00:00"', | |
| required=True) | |
| args = parser.parse_args() | |
| consumer = Consumer({'bootstrap.servers': args.broker, 'group.id': uuid.uuid4()}) | |
| utc_dt = datetime.strptime(args.query_time, '%Y-%m-%dT%H:%M:%S') | |
| timestamp = int((utc_dt - datetime(1970, 1, 1)).total_seconds()) * 1000 # UTC datetime to milliseconds since the Epoch | |
| topic_partitions = [TopicPartition(args.topic, 0, timestamp)] | |
| consumer.assign(topic_partitions) | |
| low_offet, high_offset = consumer.get_watermark_offsets(topic_partitions[0]) | |
| print(f'Topic: {args.topic}, low offset: {low_offet}, high offset: {high_offset}') | |
| offset = consumer.offsets_for_times(topic_partitions, timeout=1.0)[0].offset | |
| print(f'Offset for time {args.query_time} is {offset}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment