Created
May 10, 2018 12:30
-
-
Save orian/5633f2e06ef67b459ede287dbf75f12f to your computer and use it in GitHub Desktop.
Experimenting with TestStream in Apache Beam test framework
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package eu.datainq.playground; | |
import org.apache.beam.sdk.coders.VarIntCoder; | |
import org.apache.beam.sdk.testing.NeedsRunner; | |
import org.apache.beam.sdk.testing.PAssert; | |
import org.apache.beam.sdk.testing.TestPipeline; | |
import org.apache.beam.sdk.testing.TestStream; | |
import org.apache.beam.sdk.transforms.Create; | |
import org.apache.beam.sdk.transforms.Sum; | |
import org.apache.beam.sdk.transforms.WithKeys; | |
import org.apache.beam.sdk.transforms.windowing.*; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.TimestampedValue; | |
import org.apache.beam.sdk.values.TypeDescriptors; | |
import org.joda.time.Duration; | |
import org.joda.time.Instant; | |
import org.junit.Rule; | |
import org.junit.Test; | |
import org.junit.experimental.categories.Category; | |
public class StarterPipelineTest { | |
private static final Duration WINDOW_DUR = Duration.standardMinutes(5); | |
@Rule | |
public final transient TestPipeline pipeline = TestPipeline.create(); | |
Instant baseTime = new Instant(0); | |
public TimestampedValue<Integer> num(int v, Duration d) { | |
return TimestampedValue.of(v, baseTime.plus(d)); | |
} | |
@Test | |
@Category(NeedsRunner.class) | |
public void simpleSum() { | |
TestStream<Integer> str = TestStream.create(VarIntCoder.of()) | |
.advanceWatermarkTo(baseTime) | |
.addElements(num(1, Duration.ZERO)) | |
.advanceWatermarkTo(baseTime.plus(WINDOW_DUR).plus(1000)) | |
.advanceWatermarkToInfinity(); | |
BoundedWindow window = new IntervalWindow(baseTime, WINDOW_DUR); | |
PCollection<KV<Integer, Integer>> got = pipeline | |
.apply(str) | |
.apply(Window.<Integer>into(FixedWindows.of(WINDOW_DUR))) | |
.apply(WithKeys.<Integer, Integer>of((v) -> v % 2).withKeyType(TypeDescriptors.integers())) | |
.apply(Sum.integersPerKey()); | |
PAssert.that(got) | |
.inOnTimePane(window) | |
.containsInAnyOrder(KV.of(1, 1)); | |
pipeline.run(); | |
} | |
@Test | |
@Category(NeedsRunner.class) | |
public void simpleSum2() { | |
TestStream<Integer> str = TestStream.create(VarIntCoder.of()) | |
.advanceWatermarkTo(baseTime) | |
.addElements(num(1, Duration.ZERO), num(3, Duration.standardSeconds(10))) | |
.advanceWatermarkTo(baseTime.plus(WINDOW_DUR).plus(Duration.standardMinutes(1))) | |
.advanceWatermarkToInfinity(); | |
BoundedWindow window = new IntervalWindow(baseTime, WINDOW_DUR); | |
PCollection<KV<Integer, Integer>> got = pipeline | |
.apply(str) | |
.apply(Window.<Integer>into(FixedWindows.of(WINDOW_DUR))) | |
.apply(WithKeys.<Integer, Integer>of((v) -> v % 2).withKeyType(TypeDescriptors.integers())) | |
.apply(Sum.integersPerKey()); | |
PAssert.that(got) | |
.inOnTimePane(window) | |
.containsInAnyOrder(KV.of(1, 4)); | |
pipeline.run(); | |
} | |
@Test | |
@Category(NeedsRunner.class) | |
public void simpleSum3() { | |
TestStream<Integer> str = TestStream.create(VarIntCoder.of()) | |
.advanceWatermarkTo(baseTime) | |
.addElements(num(1, Duration.ZERO), num(3, Duration.standardSeconds(10))) | |
.advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(1))) | |
.addElements(num(5, Duration.standardSeconds(30))) | |
.advanceWatermarkTo(baseTime.plus(WINDOW_DUR).plus(Duration.standardMinutes(1))) | |
.advanceWatermarkToInfinity(); | |
BoundedWindow window = new IntervalWindow(baseTime, WINDOW_DUR); | |
PCollection<KV<Integer, Integer>> got = pipeline | |
.apply(str) | |
.apply(Window.<Integer>into(FixedWindows.of(WINDOW_DUR))) | |
.apply(WithKeys.<Integer, Integer>of((v) -> v % 2).withKeyType(TypeDescriptors.integers())) | |
.apply(Sum.integersPerKey()); | |
PAssert.that(got) | |
.inWindow(window) | |
.containsInAnyOrder(KV.of(1, 9)); | |
pipeline.run(); | |
} | |
@Test | |
@Category(NeedsRunner.class) | |
public void simpleSum4() { | |
TestStream<Integer> str = TestStream.create(VarIntCoder.of()) | |
.advanceWatermarkTo(baseTime) | |
.addElements(num(1, Duration.ZERO), num(3, Duration.standardSeconds(10))) | |
.advanceWatermarkTo(baseTime.plus(WINDOW_DUR)) | |
.addElements(num(5, Duration.standardMinutes(3))) | |
.advanceWatermarkTo(baseTime.plus(WINDOW_DUR).plus(Duration.standardMinutes(1))) | |
.advanceWatermarkToInfinity(); | |
BoundedWindow window = new IntervalWindow(baseTime, WINDOW_DUR); | |
PCollection<KV<Integer, Integer>> got = pipeline | |
.apply(str) | |
.apply(Window.<Integer>into(FixedWindows.of(WINDOW_DUR)) | |
.triggering( | |
AfterWatermark.pastEndOfWindow() | |
.withLateFirings(AfterProcessingTime.pastFirstElementInPane() | |
.plusDelayOf(Duration.standardMinutes(2)))) | |
.withAllowedLateness(Duration.standardMinutes(2)) | |
.accumulatingFiredPanes()) | |
.apply(WithKeys.<Integer, Integer>of((v) -> v % 2).withKeyType(TypeDescriptors.integers())) | |
.apply(Sum.integersPerKey()); | |
PAssert.that(got) | |
.inOnTimePane(window) | |
.containsInAnyOrder(KV.of(1, 4)); | |
PAssert.that(got) | |
.inFinalPane(window) | |
.containsInAnyOrder(KV.of(1, 9)); | |
pipeline.run(); | |
} | |
@Test | |
@Category(NeedsRunner.class) | |
public void batchMode() { | |
BoundedWindow window = new IntervalWindow(baseTime, WINDOW_DUR); | |
PCollection<Integer> x = pipeline | |
.apply(Create.timestamped(num(1, Duration.ZERO))) | |
.apply(Window.<Integer>into(FixedWindows.of(WINDOW_DUR)) | |
.accumulatingFiredPanes() | |
.withAllowedLateness(Duration.standardMinutes(5)) | |
.triggering(AfterWatermark.pastEndOfWindow() | |
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() | |
.plusDelayOf(Duration.standardMinutes(5))) | |
.withLateFirings(AfterProcessingTime.pastFirstElementInPane() | |
.plusDelayOf(Duration.standardMinutes(10))))); | |
PCollection<KV<Integer, Integer>> got = | |
x.apply(WithKeys.<Integer, Integer>of((v) -> v % 2).withKeyType(TypeDescriptors.integers())) | |
.apply(Sum.integersPerKey()); | |
PAssert.that(got).inOnTimePane(window).containsInAnyOrder(KV.of(1, 1)); | |
pipeline.run().waitUntilFinish(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment