This gist contains all code snippets from the article.
snippet-3.java
snippet-7.java
snippet-10.java
snippet-2.java
for.java
snippet-8.java
snippet-1.java
Statistics.java
snippet-5.java
snippet-9.java
public static <T, R extends Comparable<R>> Gatherer<T, ?, T> | |
takeWhileIncreasing(Function<T, R> valueExtractor) { | |
return Gatherer.of( | |
() -> new Object() { R lastValue = null; }, // ① Anonymous class for state | |
(state, element, downstream) -> { | |
R value = valueExtractor.apply(element); // ② Extract comparable value | |
if (state.lastValue == null || // ③ First element OR | |
value.compareTo(state.lastValue) > 0) { // value is increasing | |
state.lastValue = value; // ④ Update state | |
return downstream.push(element); // ⑤ Pass element along | |
} | |
return false; // ⑥ Stop gathering - sequence broken! | |
} | |
); | |
} | |
List<String> increasingLength = words.stream() | |
.gather(takeWhileIncreasing(String::length)) | |
.toList(); | |
System.out.println("Increasing length sequence: " + increasingLength); | |
// Output: [hello] | |
// Why? "hello"(5) → "world"(5) - not increasing, so we stop! |
List<Integer> values = List.of(1, 5, 3, 8, 2, 9, 4, 7, 6); | |
System.out.println("Input values: " + values); |
record PeakValley(String type, double value, int index) {} | |
public static Gatherer<Double, ?, PeakValley> peakValleyDetection() { | |
return Gatherer.of( | |
() -> new Object() { | |
Double prev = null; // ① Previous value | |
Double current = null; // ② Current value | |
int index = 0; // ③ Track position | |
}, | |
(state, next, downstream) -> { | |
if (state.prev != null && state.current != null) { // ④ Have 3 points? | |
// Check for peak: current > both neighbors | |
if (state.current > state.prev && state.current > next) { | |
downstream.push(new PeakValley("PEAK", state.current, state.index)); | |
} | |
// Check for valley: current < both neighbors | |
else if (state.current < state.prev && state.current < next) { | |
downstream.push(new PeakValley("VALLEY", state.current, state.index)); | |
} | |
} | |
// ⑤ Slide the window forward | |
state.prev = state.current; | |
state.current = next; | |
state.index++; | |
return true; | |
} | |
); | |
} | |
// Visual example: | |
// Peak | |
// ↓ | |
// 103 | |
// / \ | |
// 101 99 ← Valley | |
// \ | |
// 102 |
List<Integer> runningMax = values.stream() | |
.gather(Gatherers.scan( | |
() -> Integer.MIN_VALUE, // ① Start with smallest possible value | |
Integer::max // ② Keep the maximum at each step | |
)) | |
.toList(); | |
System.out.println("Running maximum: " + runningMax); | |
// Output: [-2147483648, 1, 5, 5, 8, 8, 9, 9, 9, 9] | |
// ↑ initial ↑ 5>1 ↑ 8>5 ↑ 9>8 |
List<Long> runningProduct = values.stream() | |
.gather(Gatherers.scan( | |
() -> 1L, // ① Start with identity for multiplication | |
(acc, val) -> acc * val // ② Multiply accumulated value by current | |
)) | |
.toList(); | |
System.out.println("Running product: " + runningProduct); | |
// Output: [1, 1, 5, 15, 120, 240, 2160, 8640, 60480, 362880] | |
// ↑ ↑ ↑ ↑ ↑ | |
// init 1×1 1×5 5×3 15×8 (and so on...) |
public static <T, K> Gatherer<T, ?, T> distinctByKey(Function<T, K> keyExtractor) { | |
return Gatherer.of( | |
HashSet::new, // ① State: Set to track seen keys | |
(state, element, downstream) -> { | |
K key = keyExtractor.apply(element); // ② Extract the key | |
if (state.add(key)) { // ③ If key is new (add returns true) | |
return downstream.push(element); // ④ Pass element downstream | |
} | |
return true; // ⑤ Continue processing | |
} | |
); | |
} | |
List<String> words = List.of("hello", "world", "java", "gatherers", "stream", "api"); | |
List<String> distinctByLength = words.stream() | |
.gather(distinctByKey(String::length)) // Using length as the key | |
.toList(); | |
System.out.println("Distinct by length: " + distinctByLength); | |
// Output: [hello, java, gatherers, api] | |
// ↑ ↑ ↑ ↑ | |
// len=5 len=4 len=9 len=3 | |
// Note: "world"(5) and "stream"(6) are filtered out as duplicates |
public static <T> Gatherer<T, ?, List<T>> | |
batchByCondition(Predicate<List<T>> batchComplete) { | |
return Gatherer.of( | |
ArrayList::new, // ① Current batch being built | |
(batch, element, downstream) -> { | |
batch.add(element); // ② Add to current batch | |
if (batchComplete.test(batch)) { // ③ Check if batch is "full" | |
downstream.push(new ArrayList<>(batch)); // ④ Send copy downstream | |
batch.clear(); // ⑤ Start fresh batch | |
} | |
return true; // ⑥ Keep processing | |
}, | |
(batch, downstream) -> { // ⑦ Finisher: handle remaining elements | |
if (!batch.isEmpty()) { | |
downstream.push(new ArrayList<>(batch)); | |
} | |
} | |
); | |
} | |
List<Integer> numbers = List.of(1, 4, 2, 8, 5, 7, 3, 9, 6); | |
List<List<Integer>> batches = numbers.stream() | |
.gather(batchByCondition( | |
batch -> batch.stream().mapToInt(i -> i).sum() <= 10 // Sum threshold | |
)) | |
.toList(); | |
// Output visualization: | |
// [1, 4, 2] → Sum: 7 ✓ (7 ≤ 10, continue) | |
// [1, 4, 2, 8] → Sum: 15 ✗ (15 > 10, emit [1,4,2], start new with [8]) | |
// [8] → Sum: 8 ✓ (8 ≤ 10, continue) | |
// [8, 5] → Sum: 13 ✗ (13 > 10, emit [8], start new with [5]) | |
// ... and so on |
public static Gatherer<Double, ?, Double> movingAverage(int windowSize) { | |
return Gatherer.of( | |
LinkedList::new, // ① Use LinkedList for efficient add/remove | |
(window, price, downstream) -> { | |
window.add(price); // ② Add new price to window | |
if (window.size() > windowSize) { // ③ Window too big? | |
window.removeFirst(); // ④ Remove oldest price | |
} | |
if (window.size() == windowSize) { // ⑤ Window full? | |
double avg = window.stream() | |
.mapToDouble(Double::doubleValue) | |
.average() | |
.orElse(0.0); | |
return downstream.push(avg); // ⑥ Emit moving average | |
} | |
return true; // ⑦ Keep gathering (building up window) | |
} | |
); | |
} | |
// Usage example: | |
// Prices: [100, 102, 101, 105, 103, 107, 104] | |
// Window size 3: | |
// [100, 102, 101] → avg: 101.0 | |
// [102, 101, 105] → avg: 102.7 | |
// [101, 105, 103] → avg: 103.0 | |
// ... sliding window continues |
record TrendSignal(double price, String trend) {} | |
public static Gatherer<Double, ?, TrendSignal> trendDetection(int lookback) { | |
return Gatherer.of( | |
() -> new Object() { | |
List<Double> history = new ArrayList<>(); // ① Price history buffer | |
}, | |
(state, price, downstream) -> { | |
state.history.add(price); // ② Add to history | |
if (state.history.size() >= lookback) { // ③ Enough data? | |
String trend = detectTrend(state.history); // ④ Analyze trend | |
downstream.push(new TrendSignal(price, trend)); // ⑤ Emit signal | |
if (state.history.size() > lookback) { // ⑥ Maintain window size | |
state.history.remove(0); | |
} | |
} | |
return true; | |
} | |
); | |
} | |
private static String detectTrend(List<Double> prices) { | |
double first = prices.get(0); | |
double last = prices.get(prices.size() - 1); | |
double change = (last - first) / first * 100; // Percentage change | |
if (change > 2) return "UPTREND ↑"; // ① Significant rise | |
else if (change < -2) return "DOWNTREND ↓"; // ② Significant fall | |
else return "SIDEWAYS →"; // ③ Relatively flat | |
} | |
// Example output: | |
// Price: $100.50 → SIDEWAYS → | |
// Price: $102.30 → UPTREND ↑ | |
// Price: $98.75 → DOWNTREND ↓ |
class Statistics { | |
int count = 0; | |
int sum = 0; | |
int min = Integer.MAX_VALUE; | |
int max = Integer.MIN_VALUE; | |
Statistics add(Integer value) { | |
count++; // ① Track how many elements | |
sum += value; // ② Running total | |
min = Math.min(min, value); // ③ Track minimum | |
max = Math.max(max, value); // ④ Track maximum | |
return this; // ⑤ Return self for chaining | |
} | |
double getAverage() { | |
return count == 0 ? 0 : (double) sum / count; | |
} | |
} | |
Statistics stats = values.stream() | |
.gather(Gatherers.fold( | |
Statistics::new, // ① Create fresh Statistics object | |
Statistics::add // ② Add each element to our statistics | |
)) | |
.findFirst() // ③ Fold returns a single-element stream | |
.orElse(new Statistics()); | |
System.out.printf("Count: %d, Sum: %d, Avg: %.2f, Min: %d, Max: %d%n", | |
stats.count, stats.sum, stats.getAverage(), stats.min, stats.max); | |
// Output: Count: 9, Sum: 45, Avg: 5.00, Min: 1, Max: 9 |