Skip to content

Instantly share code, notes, and snippets.

@BertPC
Last active August 29, 2015 14:19
Show Gist options
  • Save BertPC/f5f9840c1031fd70ad5b to your computer and use it in GitHub Desktop.
Save BertPC/f5f9840c1031fd70ad5b to your computer and use it in GitHub Desktop.
Kafka producer/consumer example
from kafka import KafkaClient, SimpleProducer, KafkaConsumer
import time
host_port = "localhost:9092"
topic = 'locust2'
client = KafkaClient(host_port)
# Instantiate Kafka client and producer objects
producer = SimpleProducer(client,
async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000)
producer.send_messages(topic, 'Topic {0} here'.format(int(time.time())))
# Instantiate Kafka client and consumer objects
#self.kafka_client = KafkaLocustClient("%s:%s" % (host, port))
consumer = KafkaConsumer('locust2',
metadata_broker_list=[host_port],
fetch_message_max_bytes=10485760,
group_id='locust_test',
consumer_timeout_ms=2000)
print "Fetching..."
m = consumer.next()
print("%s:%d:%d: key=%s value=%s" % (m.topic, m.partition,
m.offset, m.key, m.value))
consumer.task_done(m)
consumer.commit()
print("Done!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment