Last active
June 28, 2018 21:15
-
-
Save double16/eb8eba2597d7a039f8044816f2c8f092 to your computer and use it in GitHub Desktop.
Kafka Producer Integration Testing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ExampleSpec extends spock.lang.Specification { | |
LoggingProducerOutput output | |
void setup() { | |
output = new LoggingProducerOutput(logPath: Paths.get('/tmp/producer.log')).withEmpty() | |
} | |
void "records should be produced"() { | |
when: "a record is produced" | |
String key = "the_record_key" | |
// ... | |
then: "record is found in log" | |
output.find { it.key == key } | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import groovy.util.logging.Slf4j | |
import org.apache.kafka.clients.producer.ProducerInterceptor | |
import org.apache.kafka.clients.producer.ProducerRecord | |
import org.apache.kafka.clients.producer.RecordMetadata | |
import org.yaml.snakeyaml.DumperOptions | |
import org.yaml.snakeyaml.Yaml | |
import java.nio.file.Files | |
import java.nio.file.Path | |
import java.nio.file.Paths | |
/** | |
* Logs messages sent to a Kafka producer to a YAML file for use by test verifications. This class is | |
* not intended for use in production. | |
* Use configuration "interceptor.LoggingProducerInterceptor.file" to specify the output file, otherwise a | |
* temporary unique file name will be chosen. | |
*/ | |
@Slf4j | |
class LoggingProducerInterceptor implements ProducerInterceptor { | |
static ThreadLocal<Yaml> YAML = new ThreadLocal<Yaml>() { | |
@Override | |
protected Yaml initialValue() { | |
new Yaml(new DumperOptions(defaultFlowStyle: DumperOptions.FlowStyle.FLOW)) | |
} | |
} | |
Path logPath | |
@Override | |
void configure(Map<String, ?> configs) { | |
String configuredLogPath = configs.get("interceptor.${LoggingProducerInterceptor.simpleName}.file".toString()) | |
logPath = configuredLogPath ? Paths.get(configuredLogPath) : Files.createTempFile('producer', '.log') | |
try { | |
logPath.withWriterAppend { | |
it.write('- ') | |
YAML.get().dump([configs: configs], it) | |
} | |
} catch (IOException e) { | |
// ignore, in case of multiple threads | |
log.debug("Writing configs to ${logPath}", e) | |
} | |
} | |
@Override | |
ProducerRecord onSend(ProducerRecord record) { | |
int retries = 3 | |
while (retries-- > 0) { | |
try { | |
logPath.withWriterAppend { | |
it.write('- ') | |
YAML.get().dump([thread: Thread.currentThread().id, key: record.key(), value: record.value()], it) | |
} | |
retries = 0 | |
} catch (IOException e) { | |
// ignore and retry, in case of multiple threads | |
log.debug("Writing producer record to ${logPath}", e) | |
Thread.sleep(100) | |
} | |
} | |
return record | |
} | |
@Override | |
void onAcknowledgement(RecordMetadata metadata, Exception exception) { | |
// nothing to do | |
} | |
@Override | |
void close() { | |
Files.delete(logPath) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import groovy.transform.AutoClone | |
import org.yaml.snakeyaml.DumperOptions | |
import org.yaml.snakeyaml.Yaml | |
import java.nio.file.Path | |
/** | |
* Provides the output of LoggingProducerInterceptor as an Iterable so Groovy constructs can be used to filter and | |
* process the YAML output. | |
*/ | |
@AutoClone | |
class LoggingProducerOutput implements Iterable { | |
static ThreadLocal<Yaml> YAML = new ThreadLocal<Yaml>() { | |
@Override | |
protected Yaml initialValue() { | |
new Yaml(new DumperOptions(defaultFlowStyle: DumperOptions.FlowStyle.FLOW)) | |
} | |
} | |
/** The path of the YAML log file. */ | |
Path logPath | |
private boolean limitToCurrentThread | |
private Integer startingEvent | |
private Iterator all() { | |
logPath.withReader { | |
YAML.get().load(it).iterator() | |
} | |
} | |
@Override | |
Iterator iterator() { | |
Iterator result = all() | |
if (startingEvent != null) { | |
result = result.drop(startingEvent) | |
} | |
if (limitToCurrentThread) { | |
result = result.findAll { it.thread == Thread.currentThread().id }.iterator() | |
} | |
result | |
} | |
/** | |
* Limits the results to only the current Thread id. Note that per JDK the Thread id may be reused. | |
*/ | |
Iterator findAllByThread() { | |
iterator().findAll { it.thread == Thread.currentThread().id }.iterator() | |
} | |
/** | |
* Return a new object limited to the current thread. Note that per JDK the Thread id may be reused. | |
*/ | |
LoggingProducerOutput withThread() { | |
LoggingProducerOutput result = clone() | |
result.limitToCurrentThread = true | |
result | |
} | |
/** | |
* Return a new object excluding all current messages. | |
*/ | |
LoggingProducerOutput withEmpty() { | |
LoggingProducerOutput result = clone() | |
result.startingEvent = all().size() | |
result | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- {configs: {compression.type: snappy, value.serializer: org.apache.kafka.common.serialization.StringSerializer, | |
interceptor.LoggingProducerInterceptor.file: /var/folders/3v/05f6zqc164g*b55zrdlhk*z00000gn/T/producer3683756155677198489.log, | |
acks: '1', bootstrap.servers: 'localhost:32818', interceptor.classes: LoggingProducerInterceptor, | |
key.serializer: org.apache.kafka.common.serialization.StringSerializer, client.id: api, | |
linger.ms: '100'}} | |
- {thread: 10, key: 5af19764bfe52300144eea35, value: '{"_links":{"self":{"href":"http://169.254.169.254:8080/app/api/user/doe","hreflang":"en","type":"user"}},"id":"5af19764bfe52300144eea35","name":"John Doe"}'} | |
- {thread: 10, key: 5af19764bfe52300144eea35, value: '{"_links":{"self":{"href":"http://169.254.169.254:8080/app/api/user/doe","hreflang":"en","type":"user"}},"id":"5af19764bfe52300144eea35","seasons":"John Doe"}'} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Map producerTestConfig = [:] | |
if (Environment.current == Environment.TEST) { | |
Path producerLogPath = Files.createTempFile('producer', '.log') | |
loggingProducerOutput(LoggingProducerOutput) { | |
logPath = producerLogPath | |
} | |
producerTestConfig['interceptor.classes'] = LoggingProducerInterceptor.name | |
producerTestConfig["interceptor.${LoggingProducerInterceptor.simpleName}.file".toString()] = producerLogPath.toString() | |
} | |
domainEventProducer(KafkaProducer, commonKafkaConfig + producerTestConfig + [ | |
'key.serializer' : 'org.apache.kafka.common.serialization.StringSerializer', | |
'value.serializer' : 'org.apache.kafka.common.serialization.StringSerializer', | |
'acks' : '1', | |
'linger.ms' : '100', | |
]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment