Created
July 11, 2019 06:31
-
-
Save deepaksood619/b41d65baf26601118a6b9294b806e60e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import http.client as httplib | |
import logging | |
import os | |
import time | |
import traceback | |
import zlib | |
from confluent_kafka import Consumer, KafkaError, Producer | |
# logging settings | |
if eval(os.environ.get('DEBUG')): | |
logging.basicConfig(level=logging.DEBUG) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
consumer_config = { | |
'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS'), | |
'group.id': os.environ.get('KAFKA_SMAP_CONSUMER_CONFIG_GROUP_ID'), | |
'auto.offset.reset': os.environ.get('KAFKA_SMAP_CONSUMER_CONFIG_AUTO_OFFSET_RESET'), # earliest/latest | |
'enable.auto.commit': 'false', | |
# for limiting the amount of messages pre-fetched by librdkafka | |
'queued.max.messages.kbytes': os.environ.get('KAFKA_SMAP_CONSUMER_CONFIG_QUEUED_MAX_MESSAGES_KBYTES'), | |
'fetch.message.max.bytes': os.environ.get('KAFKA_SMAP_CONSUMER_CONFIG_FETCH_MESSSAGE_MAX_BYTES') | |
} | |
c = Consumer(consumer_config) | |
p = Producer({ | |
'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVERS') | |
}) | |
timeout_seconds = 1 | |
SMAP_ARCHIVER_IP = os.environ.get('KAFKA_SMAP_ARCHIVER_IP') | |
SMAP_ARCHIVER_PORT = os.environ.get('KAFKA_SMAP_ARCHIVER_PORT') | |
topic = os.environ.get('KAFKA_SMAP_SUBSCRIBE_TOPIC') | |
# if no topic found | |
if not topic: | |
logging.error('No topic: {}'.format(topic)) | |
exit(0) | |
# callbacks | |
def print_on_assign(consumer, partitions): | |
logging.info(f'Assignment: {partitions}') | |
for partition in partitions: | |
logging.info(f'watermark: {c.get_watermark_offsets(partition=partition)}') | |
logging.info(f'committed offsets for all partitions: {c.committed(partitions=partitions)}') | |
logging.info(f'position: {c.position(partitions=partitions)}') | |
def print_on_revoke(consumer, partitions): | |
logging.info(f'Revoke Assignment: {partitions}') | |
def delivery_report(err, msg): | |
""" Called once for each message produced to indicate delivery result. | |
Triggered by poll() or flush(). """ | |
if err is not None: | |
# raise error and handle using exception | |
raise SystemError(err) | |
else: | |
logging.debug(f'Message delivered topic:{msg.topic()} partition:{msg.partition()} offset:{msg.offset()}') | |
c.subscribe([topic], on_assign=print_on_assign, on_revoke=print_on_revoke) | |
logging.warning('New instance of Kafka SMAP Consumer started') | |
while True: | |
msg = c.poll(1.0) | |
# initial error handling | |
if msg is None: | |
continue | |
if msg.error(): | |
if msg.error().code() == KafkaError._PARTITION_EOF: | |
continue | |
else: | |
logging.error(f'consumer error: {msg.error()}') | |
break | |
logging.debug(f'{msg.topic()} [{msg.partition()}] at offset {msg.offset()}') | |
try: | |
final_data = msg.value() | |
logging.debug(f'final_data: {final_data}') | |
try: | |
# decompress the gzip-json value | |
final_data = zlib.decompress(final_data) | |
final_data = final_data.decode('utf-8') | |
except zlib.error as e: | |
logging.debug(f'Error decompressing or uncompressed, Error: {e}') | |
final_data = base64.b64decode(final_data) | |
final_data = zlib.decompress(final_data) | |
# send http request to smap | |
timeout_seconds = 1 | |
while True: | |
try: | |
conn = httplib.HTTPConnection(SMAP_ARCHIVER_IP, SMAP_ARCHIVER_PORT) | |
conn.request('POST', os.environ.get('KAFKA_SMAP_ARCHIVER_ENDPOINT'), final_data, | |
headers={'Content-Type': 'application/json'}) | |
response = conn.getresponse() | |
conn.close() | |
if response.status == 200: | |
logging.debug('Data published to smap') | |
c.commit() | |
break | |
elif response.status == 500: | |
# republish to error_<archiver> topic | |
logging.error( | |
f'500 - error in creating stream, status - {response.status}, reason - {response.reason}, msg - {response.msg}, final_data - {final_data}') | |
p.produce('error_' + msg.topic(), msg.value(), callback=delivery_report) | |
p.flush() | |
c.commit() | |
break | |
else: | |
# for every other error | |
# timeout and retry | |
raise ValueError( | |
f'error in creating stream, status - {response.status}, reason - {response.reason}, msg - {response.msg}') | |
except Exception as e: | |
logging.error( | |
f'error in publishing data to smap. Will not attempt for another {timeout_seconds} seconds. error - {e}') | |
# exponential back-off if exception occurred | |
time.sleep(timeout_seconds) | |
timeout_seconds *= 2 | |
except UnicodeDecodeError as e: | |
logging.error(f'Unicode Decode Error: {e}, data: {msg.value()}') | |
except Exception as e: | |
try: | |
logging.error(f'data/msg: {msg.value()}') | |
except Exception as es: | |
logging.error(f'cannot print data: {es}') | |
logging.error( | |
f'global exception occurred: {e}, traceback: {traceback.print_exc()}, Will not attempt for another {timeout_seconds} seconds.') | |
else: | |
continue | |
# exponential back-off if exception occurred | |
time.sleep(timeout_seconds) | |
timeout_seconds *= 2 | |
logging.info('Kafka SMAP Consumer closed') | |
c.close() |
Author
deepaksood619
commented
Jul 11, 2019
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment