Skip to content

Instantly share code, notes, and snippets.

@jto
Created February 20, 2026 11:48
Show Gist options
  • Select an option

  • Save jto/cc0d812d413a9577bd084e8911d4c224 to your computer and use it in GitHub Desktop.

Select an option

Save jto/cc0d812d413a9577bd084e8911d4c224 to your computer and use it in GitHub Desktop.
//> using dep "org.apache.beam:beam-sdks-java-core:2.71.0"
//> using dep "org.apache.beam:beam-runners-portability-java:2.71.0"
//> using dep "org.apache.beam:beam-model-pipeline:2.71.0"
//> using dep "org.slf4j:slf4j-simple:2.0.17"
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Struct;
import org.joda.time.Duration;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
/**
* A portable Beam runner that prints the pipeline structure instead of
* executing it.
* Useful for debugging, visualization, and educational purposes.
*/
public class PrintPipelineRunner extends PipelineRunner<PipelineResult> implements PortablePipelineRunner {
/**
* Construct a {@link PrintPipelineRunner} from the provided options.
*
* @param options Properties that configure the runner.
* @return The newly created runner.
*/
public static PrintPipelineRunner fromOptions(PipelineOptions options) {
return new PrintPipelineRunner();
}
/**
* Standard PipelineRunner interface implementation - allows using this runner
* via Pipeline.run()
*/
@Override
public PipelineResult run(Pipeline pipeline) {
try {
// Convert Pipeline to RunnerApi.Pipeline protobuf
SdkComponents sdkComponents = SdkComponents.create(pipeline.getOptions());
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true);
// Convert PipelineOptions to protobuf Struct
Struct optionsStruct = PipelineOptionsTranslation.toProto(pipeline.getOptions());
// Create JobInfo
JobInfo jobInfo = JobInfo.create(
"job-" + System.currentTimeMillis(),
"PrintPipeline",
"retrieval-token",
optionsStruct);
// Call existing run method (PortablePipelineRunner interface)
return run(pipelineProto, jobInfo);
} catch (Exception e) {
throw new RuntimeException("Failed to print pipeline", e);
}
}
/**
* PortablePipelineRunner interface implementation - processes protobuf pipeline
*/
@Override
public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception {
System.out.println("=".repeat(80));
System.out.println("BEAM PIPELINE STRUCTURE");
System.out.println("=".repeat(80));
// Print job information
System.out.println("\nJob Information:");
System.out.println(" Job Name: " + jobInfo.jobName());
System.out.println(" Job ID: " + jobInfo.jobId());
// Get components
RunnerApi.Components components = pipeline.getComponents();
// Print pipeline metadata
System.out.println("\nPipeline Components Summary:");
System.out.println(" Transforms: " + components.getTransformsCount());
System.out.println(" PCollections: " + components.getPcollectionsCount());
System.out.println(" Coders: " + components.getCodersCount());
System.out.println(" Windowing Strategies: " + components.getWindowingStrategiesCount());
// Print root transforms
System.out.println("\nRoot Transform IDs:");
for (String rootId : pipeline.getRootTransformIdsList()) {
System.out.println(" - " + rootId);
}
// Print pipeline structure
System.out.println("\n" + "=".repeat(80));
System.out.println("PIPELINE STRUCTURE");
System.out.println("=".repeat(80));
// Traverse and print each root transform
for (String rootId : pipeline.getRootTransformIdsList()) {
printTransform(rootId, components, 0);
}
System.out.println("\n" + "=".repeat(80));
System.out.println("PIPELINE VISUALIZATION COMPLETE");
System.out.println("=".repeat(80));
return new SimplePipelineResult();
}
/**
* Recursively prints a transform and its subtransforms
*/
private void printTransform(String transformId, RunnerApi.Components components, int depth) {
if (!components.getTransformsMap().containsKey(transformId)) {
printIndented(depth, "[Warning: Transform not found: " + transformId + "]");
return;
}
RunnerApi.PTransform transform = components.getTransformsMap().get(transformId);
String indent = getIndent(depth);
// Print transform header
System.out.println();
printIndented(depth, "Transform: " + transformId);
// Print transform type (URN)
if (transform.hasSpec()) {
String urn = transform.getSpec().getUrn();
// Extract simple name from URN (e.g., beam:transform:par_do:v1 -> ParDo)
String simpleName = extractSimpleName(urn);
printIndented(depth + 1, "Type: " + simpleName);
printIndented(depth + 1, "URN: " + urn);
} else {
printIndented(depth + 1, "Type: Composite Transform");
}
// Print unique name if available
if (!transform.getUniqueName().isEmpty()) {
printIndented(depth + 1, "Name: " + transform.getUniqueName());
}
// Print inputs
if (!transform.getInputsMap().isEmpty()) {
printIndented(depth + 1, "Inputs:");
for (Map.Entry<String, String> input : transform.getInputsMap().entrySet()) {
String localName = input.getKey();
String pcollectionId = input.getValue();
RunnerApi.PCollection pcoll = components.getPcollectionsOrDefault(pcollectionId, null);
if (pcoll != null) {
printIndented(depth + 2, localName + " -> " + pcollectionId);
if (!pcoll.getUniqueName().isEmpty()) {
printIndented(depth + 3, "PCollection Name: " + pcoll.getUniqueName());
}
printIndented(depth + 3, "Coder ID: " + pcoll.getCoderId());
} else {
printIndented(depth + 2, localName + " -> " + pcollectionId + " [Not Found]");
}
}
}
// Print outputs
if (!transform.getOutputsMap().isEmpty()) {
printIndented(depth + 1, "Outputs:");
for (Map.Entry<String, String> output : transform.getOutputsMap().entrySet()) {
String localName = output.getKey();
String pcollectionId = output.getValue();
RunnerApi.PCollection pcoll = components.getPcollectionsOrDefault(pcollectionId, null);
if (pcoll != null) {
printIndented(depth + 2, localName + " -> " + pcollectionId);
if (!pcoll.getUniqueName().isEmpty()) {
printIndented(depth + 3, "PCollection Name: " + pcoll.getUniqueName());
}
printIndented(depth + 3, "Coder ID: " + pcoll.getCoderId());
} else {
printIndented(depth + 2, localName + " -> " + pcollectionId + " [Not Found]");
}
}
}
// Print environment ID if available
if (!transform.getEnvironmentId().isEmpty()) {
printIndented(depth + 1, "Environment: " + transform.getEnvironmentId());
}
// Recursively print subtransforms
if (!transform.getSubtransformsList().isEmpty()) {
printIndented(depth + 1, "Subtransforms:");
for (String subtransformId : transform.getSubtransformsList()) {
printTransform(subtransformId, components, depth + 2);
}
}
}
/**
* Prints a line with proper indentation
*/
private void printIndented(int depth, String message) {
System.out.println(getIndent(depth) + message);
}
/**
* Gets the indentation string for a given depth
*/
private String getIndent(int depth) {
return " ".repeat(depth);
}
/**
* Extracts a simple name from a Beam URN
* Example: beam:transform:par_do:v1 -> ParDo
*/
private String extractSimpleName(String urn) {
if (urn == null || urn.isEmpty()) {
return "Unknown";
}
// Split URN and extract transform type
String[] parts = urn.split(":");
if (parts.length >= 3) {
String transformType = parts[2];
// Convert snake_case to PascalCase
String[] words = transformType.split("_");
StringBuilder result = new StringBuilder();
for (String word : words) {
if (!word.isEmpty()) {
result.append(Character.toUpperCase(word.charAt(0)));
if (word.length() > 1) {
result.append(word.substring(1));
}
}
}
return result.toString();
}
return urn;
}
/**
* Simple implementation of PortablePipelineResult that indicates immediate
* completion
*/
private static class SimplePipelineResult implements PortablePipelineResult {
@Override
public PipelineResult.State getState() {
return PipelineResult.State.DONE;
}
@Override
public PipelineResult.State cancel() throws IOException {
// No execution to cancel
return PipelineResult.State.DONE;
}
@Override
public PipelineResult.State waitUntilFinish(Duration duration) {
// Already finished (no actual execution)
return PipelineResult.State.DONE;
}
@Override
public PipelineResult.State waitUntilFinish() {
// Already finished (no actual execution)
return PipelineResult.State.DONE;
}
@Override
public MetricResults metrics() {
// No metrics for print-only runner - throw UnsupportedOperationException
throw new UnsupportedOperationException("Metrics are not supported by PrintPipelineRunner");
}
@Override
public JobApi.MetricResults portableMetrics() throws UnsupportedOperationException {
// No portable metrics for print-only runner
throw new UnsupportedOperationException("Portable metrics are not supported by PrintPipelineRunner");
}
}
/**
* Main method demonstrating WordCount example
*/
public static void main(String[] args) throws Exception {
// Create pipeline
Pipeline pipeline = Pipeline.create();
pipeline.getOptions().setRunner(PrintPipelineRunner.class);
// Build WordCount pipeline
pipeline
.apply("ReadLines", TextIO.read()
.from("gs://apache-beam-samples/shakespeare/*"))
.apply("ExtractWords", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
.apply("CountPerElement", Count.<String>perElement())
.apply("FormatResults", MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()));
var state = pipeline.run().waitUntilFinish();
System.out.println("\nFinal state: " + state);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment