Created
October 31, 2018 16:28
-
-
Save Richard-Mathie/c1202e23bb60c369b6e8e8f58f29527b to your computer and use it in GitHub Desktop.
Kafka Consumer for Long Running Process
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from threading import Thread, Event | |
from queue import Queue, Empty | |
from kafka import KafkaConsumer, KafkaProducer | |
from kafka.structs import TopicPartition, OffsetAndMetadata | |
from kafka.consumer.fetcher import ConsumerRecord | |
from kafka.errors import CommitFailedError | |
class KafkaWorker(object): | |
def __init__(self, topics, hosts, poll_timeout=1): | |
self.poll_timeout = poll_timeout | |
self.poll_timeout_ms = poll_timeout * 1000 | |
self.topics = topics | |
self.hosts = hosts | |
self.queue = Queue() # type: Queue[ConsumerRecord] | |
self.commits = Queue() | |
self._stop_processing = Event() | |
self.connect_kafka() | |
self.worker_thread = Thread(target=self.processor) | |
self.worker_thread.start() | |
def process_record(self, record: ConsumerRecord): | |
pass | |
@property | |
def consumer_topics(self): | |
if isinstance(self.topics, list) or isinstance(self.topics, tuple): | |
return self.topics | |
return (self.topics,) | |
@property | |
def consumer_options(self): | |
return {} | |
@property | |
def producer_options(self): | |
return {} | |
def connect_kafka(self): | |
self.consumer = KafkaConsumer(*self.consumer_topics, | |
bootstrap_servers=self.hosts, | |
**self.consumer_options) | |
self.producer = KafkaProducer(bootstrap_servers=self.hosts, | |
**self.producer_options) | |
def consume(self): | |
while True: | |
topic_records = self.consumer.poll(timeout_ms=self.poll_timeout_ms) | |
for records in topic_records.values(): | |
for record in records: | |
logging.debug(f'got message {record}') | |
self.queue.put(record) | |
offsets = {} | |
while not self.commits.empty(): | |
record = self.commits.get() | |
topic_partition = TopicPartition( | |
record.topic, | |
record.partition | |
) | |
offset = OffsetAndMetadata(record.offset + 1, None) | |
offsets[topic_partition] = offset | |
self.consumer.commit(offsets) | |
def processor(self): | |
while not self._stop_processing.is_set(): | |
try: | |
record = self.queue.get_nowait() | |
except Empty: | |
self._stop_processing.wait(timeout=self.poll_timeout) | |
else: | |
self.process_record(record) | |
self.commit(record) | |
self.queue.task_done() | |
def commit(self, record): | |
self.commits.put(record) | |
def stop(self): | |
self._stop_processing.set() | |
self.worker_thread.join() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import time | |
import sys | |
from kafka.consumer.fetcher import ConsumerRecord | |
from kafka_worker import KafkaWorker | |
logger = logging.getLogger(__name__) | |
hosts = "localhost:29092" | |
topic = "test_toppic" | |
def F(n): | |
"""Some Stupid Long Running Task""" | |
if n == 0: | |
return 0 | |
elif n == 1: | |
return 1 | |
else: | |
return F(n-1)+F(n-2) | |
class Worker(KafkaWorker): | |
count = 0 | |
def process_record(self, record: ConsumerRecord): | |
self.count += 1 | |
logger.debug(f'processing message {record}') | |
sys.stdout.write('.') | |
if self.count % 100 == 0: | |
sys.stdout.flush() | |
F(40) | |
@property | |
def consumer_topics(self): | |
return (topic,) | |
@property | |
def consumer_options(self): | |
return dict( | |
group_id='test1', | |
auto_offset_reset='earliest', | |
enable_auto_commit=False, | |
auto_commit_interval_ms=1000, | |
max_poll_interval_ms=2000 | |
) | |
@property | |
def producer_options(self): | |
return dict( | |
compression_type='lz4', | |
key_serializer=lambda x: x.encode(), | |
value_serializer=lambda x: x.encode() | |
) | |
logging.basicConfig(level=logging.INFO) | |
try: | |
worker = Worker(topic, hosts) | |
for i in range(100): | |
worker.producer.send(topic, key=f'{i}', value=f'hello {i}') | |
worker.consume() | |
except KeyboardInterrupt: | |
logger.info("Got Keyboard Interrupt") | |
worker.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment