Skip to content

Instantly share code, notes, and snippets.

@manjuapu
Created July 17, 2017 22:41
Show Gist options
  • Save manjuapu/27991bf5e1adae4852d13328a18a3960 to your computer and use it in GitHub Desktop.
Save manjuapu/27991bf5e1adae4852d13328a18a3960 to your computer and use it in GitHub Desktop.
KStream<CustomerId, KeyValue<SpecificRecord, OutboundMessage>> addressedMessages =
builder.<AccountId, AccountEntry>stream(accountEntryStream)
.leftJoin(accountToCustomerIds, (accountEntry, customerIds) -> {
if (isNull(customerIds)) {
return Collections.<KeyValue<CustomerId, AccountEntry>>emptyList();
} else {
return customerIds.getCustomerIds().stream()
.map(customerId -> KeyValue.pair(customerId, accountEntry))
.collect(toList());
}
})
.flatMap((accountId, accountentryByCustomer) -> accountentryByCustomer)
.through(customerIdToAccountEntryStream)
.leftJoin(alertSettings, Pair::with)
.flatMapValues(
(Pair<AccountEntry, CustomerAlertSettings> accountEntryAndSettings) ->
BalanceAlertsGenerator.generateAlerts(
accountEntryAndSettings.getValue0(),
accountEntryAndSettings.getValue1())
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment