Created
November 30, 2016 13:18
-
-
Save guysmoilov/d5075395020a1cf594c71d17f657db8b to your computer and use it in GitHub Desktop.
Topic transforming Kafka mirror maker handler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package kafka.tools | |
import java.util | |
import java.util.Collections | |
import kafka.consumer.BaseConsumerRecord | |
import kafka.tools.MirrorMaker.MirrorMakerMessageHandler | |
import org.apache.kafka.clients.producer.ProducerRecord | |
import scala.util.matching.Regex | |
/** | |
* Args are expected to be a string of format: | |
* regex1=target1;regex2=target2 | |
*/ | |
class TopicChangingMirrorMakerMessageHandler(val args:String) | |
extends MirrorMakerMessageHandler { | |
val topicPairs: Array[String] = args.split(';') | |
val topicConversionMap : Map[Regex,String] = topicPairs map(topicPair => { | |
val pairParts = topicPair.split('=') | |
val srcRegex = new Regex(pairParts(0)) | |
val targetTopic = pairParts(1) | |
srcRegex -> targetTopic | |
}) toMap | |
override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = { | |
val maybeMatchingRegex: Option[Regex] = topicConversionMap.keys find(r => r.findFirstMatchIn(record.topic).isDefined) | |
val targetTopic: String = if (maybeMatchingRegex.isDefined) topicConversionMap(maybeMatchingRegex.get) else record.topic | |
// Same behaviour as defaultMirrorMakerMessageHandler, only topic is changed | |
Collections.singletonList(new ProducerRecord[Array[Byte], Array[Byte]](targetTopic, null, record.timestamp, record.key, record.value)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment