Created
July 11, 2022 14:55
-
-
Save nddipiazza/512ec37d447039a41e1069e1a80e8a47 to your computer and use it in GitHub Desktop.
Kafka with Testcontainers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.tika.pipes.kafka.tests; | |
import java.io.File; | |
import java.util.Collections; | |
import java.util.Map; | |
import java.util.Properties; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.common.serialization.IntegerDeserializer; | |
import org.apache.kafka.common.serialization.IntegerSerializer; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import org.junit.ClassRule; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.testcontainers.containers.DockerComposeContainer; | |
import org.testcontainers.containers.output.Slf4jLogConsumer; | |
public class TikaPipesKafkaTest { | |
private static final Logger LOG = LoggerFactory.getLogger(TikaPipesKafkaTest.class); | |
public static final int ZK_PORT = 2181; | |
public static final int KAFKA_PORT = 9092; | |
public static final String KAFKA = "kafka1"; | |
public static final String ZOOKEEPER = "zoo1"; | |
@ClassRule | |
public static DockerComposeContainer environment = | |
new DockerComposeContainer(new File("src/test/resources/kafka-docker/zk-single-kafka-single.yml")) | |
.withExposedService(KAFKA, KAFKA_PORT) | |
.withExposedService(ZOOKEEPER, ZK_PORT) | |
.withLogConsumer(ZOOKEEPER, new Slf4jLogConsumer(LOG)) | |
.withLogConsumer(KAFKA, new Slf4jLogConsumer(LOG)); | |
@Test | |
public void testTikaPipesKafka() throws Exception { | |
Properties producerProps = new Properties(); | |
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getServiceHost(KAFKA, KAFKA_PORT) + ":" + environment.getServicePort(KAFKA, KAFKA_PORT)); | |
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "producer"); | |
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); | |
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); | |
Properties consumerProps = new Properties(); | |
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getServiceHost(KAFKA, KAFKA_PORT) + ":" + environment.getServicePort(KAFKA, KAFKA_PORT)); | |
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer"); | |
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "grp"); | |
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); | |
consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); | |
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); | |
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); | |
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); | |
String topic = "nick"; | |
ExecutorService es = Executors.newCachedThreadPool(); | |
Map<Integer, Boolean> waiting = new ConcurrentHashMap<>(); | |
AtomicBoolean doneEmit = new AtomicBoolean(false); | |
Future f = es.submit(() -> { | |
int numRec = 0; | |
KafkaConsumer consumer = new KafkaConsumer(consumerProps); | |
consumer.subscribe(Collections.singletonList(topic)); | |
while (!doneEmit.get() || !waiting.isEmpty()) { | |
try { | |
ConsumerRecords<Integer, String> records = consumer.poll(1000); | |
for (ConsumerRecord<Integer, String> record : records) { | |
waiting.remove(record.key()); | |
LOG.info("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); | |
++numRec; | |
} | |
Thread.sleep(500L); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
LOG.info("Consumer is now complete. NumRec={}", numRec); | |
}); | |
Thread.sleep(8000L); | |
es.submit(() -> { | |
KafkaProducer producer = new KafkaProducer(producerProps); | |
int numSent = 0; | |
for (int messageNo = 1; messageNo < 100; ++messageNo) { | |
String messageStr = "Message_" + messageNo; | |
try { | |
waiting.put(messageNo, true); | |
producer.send(new ProducerRecord<>(topic, | |
messageNo, | |
messageStr)).get(); | |
LOG.info("Sent message: (" + messageNo + ", " + messageStr + ")"); | |
++numSent; | |
} catch (InterruptedException | ExecutionException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
doneEmit.set(true); | |
LOG.info("Producer is now complete - sent {}.", numSent); | |
}); | |
f.get(); | |
LOG.info("Done"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment