Skip to content

Instantly share code, notes, and snippets.

@manjuapu
Created November 28, 2017 18:35
Show Gist options
  • Save manjuapu/8c92ea9cbc142c98d9b6cf5e432a2e51 to your computer and use it in GitHub Desktop.
Save manjuapu/8c92ea9cbc142c98d9b6cf5e432a2e51 to your computer and use it in GitHub Desktop.
final KStreamBuilder builder = new KStreamBuilder();
// map the json transaction stream to (id, Transaction) key-value pairs
final KStream<String, Transaction> trnxStream = builder.stream(stringSerde, jsonSerde, TOPIC_TRNX)
.map((key, value) -> {
Transaction tx = Transaction.newFromJson(value);
return new KeyValue<>(tx.id, tx);
});
// aggregate the transaction stream: S1
trnxStream.map((id, trnx) -> new KeyValue<>(trnx.tokenId, trnx.amount))
.groupByKey(stringSerde, longSerde)
.reduce((v1, v2) -> v1 + v2, "trnx-amount-by-token")
.toStream()
.to(stringSerde, longSerde, TOPIC_TRNX_AMOUNT_BY_TOKEN);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment