Last active
November 4, 2023 00:24
-
-
Save wengkham/4ed48fb92d3f78804ca609ad8c89b3f8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import concurrent.futures | |
from confluent_kafka import TopicPartition | |
consumer = DeserializingConsumer(consumer_conf) | |
consumer.subscribe("my_topic") | |
def process_message(msg, consumer, topic_partition): | |
if msg is not None and msg.value() is not None: | |
# assuming you have id as key in your message | |
id = msg.value().id | |
# do your thing here | |
print(id) | |
# resume consuming partition | |
consumer.resume(topic_partition) | |
# commit the message once done | |
consumer.commit(msg) | |
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: | |
while True: | |
msg = consumer.poll() | |
# create a TopicPartiton object | |
topic_partition = [TopicPartition(msg.topic(), msg.partition(), msg.offset())] | |
# pause consuming this partition | |
consumer.pause(topic_partition) | |
executor.submit(process_message, msg, consumer, topic_partition) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment