Created
February 2, 2021 23:10
-
-
Save ycd/c6de3a76ebb25a4da7b27ce80cfd8daa to your computer and use it in GitHub Desktop.
async logger for kafka
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import json | |
import asyncio | |
from typing import Dict, Any | |
from threading import Thread | |
import confluent_kafka | |
from confluent_kafka.cimpl import KafkaException | |
class AIOProducer: | |
def __init__(self, *, configs: Dict[str, Any], topic: str, loop: asyncio.AbstractEventLoop = None): | |
self._loop = loop or asyncio.get_event_loop() | |
self._producer = confluent_kafka.Producer(configs) | |
self.topic = topic | |
self._cancelled = False | |
self._poll_thread = Thread(target=self._poll_loop) | |
self._poll_thread.start() | |
def _poll_loop(self): | |
while not self._cancelled: | |
self._producer.poll(0.1) | |
def close(self): | |
self._cancelled = True | |
self._poll_thread.join() | |
def produce(self, value): | |
""" | |
An awaitable produce method. | |
""" | |
result = self._loop.create_future() | |
def ack(err, msg): | |
if err: | |
self._loop.call_soon_threadsafe( | |
result.set_exception, KafkaException(err) | |
) | |
else: | |
self._loop.call_soon_threadsafe(result.set_result, msg) | |
if isinstance(value, dict): | |
value = json.dumps(value) | |
self._producer.produce(self.topic, value, on_delivery=ack) | |
return result | |
class KafkaLogger(logging.Handler): | |
def __init__(self, *, configs: Dict[str, Any], topic: str='kafka-test'): | |
"""Initialize an instance of the kafka handler.""" | |
logging.Handler.__init__(self) | |
self.producer = AIOProducer(configs=configs, topic=topic) | |
def emit(self, record): | |
"""Emit the provided record to the kafka_client producer.""" | |
# drop kafka logging to avoid infinite recursion | |
if 'kafka.' in record.name: | |
return | |
try: | |
log = self.format(record) | |
print(log) | |
self.producer.produce({'log': log}) | |
except Exception: | |
logging.Handler.handleError(self, record) | |
def close(self): | |
"""Close the producer and clean up.""" | |
self.acquire() | |
try: | |
if self.producer: | |
self.producer.close() | |
logging.Handler.close(self) | |
finally: | |
self.release() | |
kafka_config = { | |
"bootstrap.servers": "0.0.0.0:9092", | |
} | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
kl = KafkaLogger(configs=kafka_config, topic="kafka-test") | |
logger.addHandler(kl) | |
logger.info("testing the logger 123") | |
logger.debug("321 reggol eht gnitset") | |
logger.info("testing the logger 123") | |
logger.debug("321 reggol eht gnitset") | |
logger.info("testing the logger 123") | |
logger.debug("321 reggol eht gnitset") | |
logger.info("testing the logger 123") | |
logger.debug("321 reggol eht gnitset") | |
logger.info("testing the logger 123") | |
logger.debug("321 reggol eht gnitset") | |
logger.info("testing the logger 123") | |
logger.debug("321 reggol eht gnitset") | |
logger.info("testing the logger 123") | |
logger.debug("321 reggol eht gnitset") | |
logger.info("testing the logger 123") | |
logger.debug("321 reggol eht gnitset") | |
logger.info("testing the logger 123") | |
logger.debug("321 reggol eht gnitset") | |
logger.info("testing the logger 123") | |
logger.debug("321 reggol eht gnitset") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment