Created
February 20, 2026 11:48
-
-
Save jto/cc0d812d413a9577bd084e8911d4c224 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //> using dep "org.apache.beam:beam-sdks-java-core:2.71.0" | |
| //> using dep "org.apache.beam:beam-runners-portability-java:2.71.0" | |
| //> using dep "org.apache.beam:beam-model-pipeline:2.71.0" | |
| //> using dep "org.slf4j:slf4j-simple:2.0.17" | |
| import org.apache.beam.model.jobmanagement.v1.JobApi; | |
| import org.apache.beam.model.pipeline.v1.RunnerApi; | |
| import org.apache.beam.runners.fnexecution.provisioning.JobInfo; | |
| import org.apache.beam.runners.jobsubmission.PortablePipelineResult; | |
| import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; | |
| import org.apache.beam.sdk.Pipeline; | |
| import org.apache.beam.sdk.PipelineRunner; | |
| import org.apache.beam.sdk.PipelineResult; | |
| import org.apache.beam.sdk.options.PipelineOptions; | |
| import org.apache.beam.sdk.io.TextIO; | |
| import org.apache.beam.sdk.metrics.MetricResults; | |
| import org.apache.beam.sdk.transforms.Count; | |
| import org.apache.beam.sdk.transforms.FlatMapElements; | |
| import org.apache.beam.sdk.transforms.MapElements; | |
| import org.apache.beam.sdk.util.construction.PipelineTranslation; | |
| import org.apache.beam.sdk.util.construction.SdkComponents; | |
| import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; | |
| import org.apache.beam.sdk.values.KV; | |
| import org.apache.beam.sdk.values.TypeDescriptors; | |
| import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Struct; | |
| import org.joda.time.Duration; | |
| import java.io.IOException; | |
| import java.util.Arrays; | |
| import java.util.Map; | |
| /** | |
| * A portable Beam runner that prints the pipeline structure instead of | |
| * executing it. | |
| * Useful for debugging, visualization, and educational purposes. | |
| */ | |
| public class PrintPipelineRunner extends PipelineRunner<PipelineResult> implements PortablePipelineRunner { | |
| /** | |
| * Construct a {@link PrintPipelineRunner} from the provided options. | |
| * | |
| * @param options Properties that configure the runner. | |
| * @return The newly created runner. | |
| */ | |
| public static PrintPipelineRunner fromOptions(PipelineOptions options) { | |
| return new PrintPipelineRunner(); | |
| } | |
| /** | |
| * Standard PipelineRunner interface implementation - allows using this runner | |
| * via Pipeline.run() | |
| */ | |
| @Override | |
| public PipelineResult run(Pipeline pipeline) { | |
| try { | |
| // Convert Pipeline to RunnerApi.Pipeline protobuf | |
| SdkComponents sdkComponents = SdkComponents.create(pipeline.getOptions()); | |
| RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true); | |
| // Convert PipelineOptions to protobuf Struct | |
| Struct optionsStruct = PipelineOptionsTranslation.toProto(pipeline.getOptions()); | |
| // Create JobInfo | |
| JobInfo jobInfo = JobInfo.create( | |
| "job-" + System.currentTimeMillis(), | |
| "PrintPipeline", | |
| "retrieval-token", | |
| optionsStruct); | |
| // Call existing run method (PortablePipelineRunner interface) | |
| return run(pipelineProto, jobInfo); | |
| } catch (Exception e) { | |
| throw new RuntimeException("Failed to print pipeline", e); | |
| } | |
| } | |
| /** | |
| * PortablePipelineRunner interface implementation - processes protobuf pipeline | |
| */ | |
| @Override | |
| public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception { | |
| System.out.println("=".repeat(80)); | |
| System.out.println("BEAM PIPELINE STRUCTURE"); | |
| System.out.println("=".repeat(80)); | |
| // Print job information | |
| System.out.println("\nJob Information:"); | |
| System.out.println(" Job Name: " + jobInfo.jobName()); | |
| System.out.println(" Job ID: " + jobInfo.jobId()); | |
| // Get components | |
| RunnerApi.Components components = pipeline.getComponents(); | |
| // Print pipeline metadata | |
| System.out.println("\nPipeline Components Summary:"); | |
| System.out.println(" Transforms: " + components.getTransformsCount()); | |
| System.out.println(" PCollections: " + components.getPcollectionsCount()); | |
| System.out.println(" Coders: " + components.getCodersCount()); | |
| System.out.println(" Windowing Strategies: " + components.getWindowingStrategiesCount()); | |
| // Print root transforms | |
| System.out.println("\nRoot Transform IDs:"); | |
| for (String rootId : pipeline.getRootTransformIdsList()) { | |
| System.out.println(" - " + rootId); | |
| } | |
| // Print pipeline structure | |
| System.out.println("\n" + "=".repeat(80)); | |
| System.out.println("PIPELINE STRUCTURE"); | |
| System.out.println("=".repeat(80)); | |
| // Traverse and print each root transform | |
| for (String rootId : pipeline.getRootTransformIdsList()) { | |
| printTransform(rootId, components, 0); | |
| } | |
| System.out.println("\n" + "=".repeat(80)); | |
| System.out.println("PIPELINE VISUALIZATION COMPLETE"); | |
| System.out.println("=".repeat(80)); | |
| return new SimplePipelineResult(); | |
| } | |
| /** | |
| * Recursively prints a transform and its subtransforms | |
| */ | |
| private void printTransform(String transformId, RunnerApi.Components components, int depth) { | |
| if (!components.getTransformsMap().containsKey(transformId)) { | |
| printIndented(depth, "[Warning: Transform not found: " + transformId + "]"); | |
| return; | |
| } | |
| RunnerApi.PTransform transform = components.getTransformsMap().get(transformId); | |
| String indent = getIndent(depth); | |
| // Print transform header | |
| System.out.println(); | |
| printIndented(depth, "Transform: " + transformId); | |
| // Print transform type (URN) | |
| if (transform.hasSpec()) { | |
| String urn = transform.getSpec().getUrn(); | |
| // Extract simple name from URN (e.g., beam:transform:par_do:v1 -> ParDo) | |
| String simpleName = extractSimpleName(urn); | |
| printIndented(depth + 1, "Type: " + simpleName); | |
| printIndented(depth + 1, "URN: " + urn); | |
| } else { | |
| printIndented(depth + 1, "Type: Composite Transform"); | |
| } | |
| // Print unique name if available | |
| if (!transform.getUniqueName().isEmpty()) { | |
| printIndented(depth + 1, "Name: " + transform.getUniqueName()); | |
| } | |
| // Print inputs | |
| if (!transform.getInputsMap().isEmpty()) { | |
| printIndented(depth + 1, "Inputs:"); | |
| for (Map.Entry<String, String> input : transform.getInputsMap().entrySet()) { | |
| String localName = input.getKey(); | |
| String pcollectionId = input.getValue(); | |
| RunnerApi.PCollection pcoll = components.getPcollectionsOrDefault(pcollectionId, null); | |
| if (pcoll != null) { | |
| printIndented(depth + 2, localName + " -> " + pcollectionId); | |
| if (!pcoll.getUniqueName().isEmpty()) { | |
| printIndented(depth + 3, "PCollection Name: " + pcoll.getUniqueName()); | |
| } | |
| printIndented(depth + 3, "Coder ID: " + pcoll.getCoderId()); | |
| } else { | |
| printIndented(depth + 2, localName + " -> " + pcollectionId + " [Not Found]"); | |
| } | |
| } | |
| } | |
| // Print outputs | |
| if (!transform.getOutputsMap().isEmpty()) { | |
| printIndented(depth + 1, "Outputs:"); | |
| for (Map.Entry<String, String> output : transform.getOutputsMap().entrySet()) { | |
| String localName = output.getKey(); | |
| String pcollectionId = output.getValue(); | |
| RunnerApi.PCollection pcoll = components.getPcollectionsOrDefault(pcollectionId, null); | |
| if (pcoll != null) { | |
| printIndented(depth + 2, localName + " -> " + pcollectionId); | |
| if (!pcoll.getUniqueName().isEmpty()) { | |
| printIndented(depth + 3, "PCollection Name: " + pcoll.getUniqueName()); | |
| } | |
| printIndented(depth + 3, "Coder ID: " + pcoll.getCoderId()); | |
| } else { | |
| printIndented(depth + 2, localName + " -> " + pcollectionId + " [Not Found]"); | |
| } | |
| } | |
| } | |
| // Print environment ID if available | |
| if (!transform.getEnvironmentId().isEmpty()) { | |
| printIndented(depth + 1, "Environment: " + transform.getEnvironmentId()); | |
| } | |
| // Recursively print subtransforms | |
| if (!transform.getSubtransformsList().isEmpty()) { | |
| printIndented(depth + 1, "Subtransforms:"); | |
| for (String subtransformId : transform.getSubtransformsList()) { | |
| printTransform(subtransformId, components, depth + 2); | |
| } | |
| } | |
| } | |
| /** | |
| * Prints a line with proper indentation | |
| */ | |
| private void printIndented(int depth, String message) { | |
| System.out.println(getIndent(depth) + message); | |
| } | |
| /** | |
| * Gets the indentation string for a given depth | |
| */ | |
| private String getIndent(int depth) { | |
| return " ".repeat(depth); | |
| } | |
| /** | |
| * Extracts a simple name from a Beam URN | |
| * Example: beam:transform:par_do:v1 -> ParDo | |
| */ | |
| private String extractSimpleName(String urn) { | |
| if (urn == null || urn.isEmpty()) { | |
| return "Unknown"; | |
| } | |
| // Split URN and extract transform type | |
| String[] parts = urn.split(":"); | |
| if (parts.length >= 3) { | |
| String transformType = parts[2]; | |
| // Convert snake_case to PascalCase | |
| String[] words = transformType.split("_"); | |
| StringBuilder result = new StringBuilder(); | |
| for (String word : words) { | |
| if (!word.isEmpty()) { | |
| result.append(Character.toUpperCase(word.charAt(0))); | |
| if (word.length() > 1) { | |
| result.append(word.substring(1)); | |
| } | |
| } | |
| } | |
| return result.toString(); | |
| } | |
| return urn; | |
| } | |
| /** | |
| * Simple implementation of PortablePipelineResult that indicates immediate | |
| * completion | |
| */ | |
| private static class SimplePipelineResult implements PortablePipelineResult { | |
| @Override | |
| public PipelineResult.State getState() { | |
| return PipelineResult.State.DONE; | |
| } | |
| @Override | |
| public PipelineResult.State cancel() throws IOException { | |
| // No execution to cancel | |
| return PipelineResult.State.DONE; | |
| } | |
| @Override | |
| public PipelineResult.State waitUntilFinish(Duration duration) { | |
| // Already finished (no actual execution) | |
| return PipelineResult.State.DONE; | |
| } | |
| @Override | |
| public PipelineResult.State waitUntilFinish() { | |
| // Already finished (no actual execution) | |
| return PipelineResult.State.DONE; | |
| } | |
| @Override | |
| public MetricResults metrics() { | |
| // No metrics for print-only runner - throw UnsupportedOperationException | |
| throw new UnsupportedOperationException("Metrics are not supported by PrintPipelineRunner"); | |
| } | |
| @Override | |
| public JobApi.MetricResults portableMetrics() throws UnsupportedOperationException { | |
| // No portable metrics for print-only runner | |
| throw new UnsupportedOperationException("Portable metrics are not supported by PrintPipelineRunner"); | |
| } | |
| } | |
| /** | |
| * Main method demonstrating WordCount example | |
| */ | |
| public static void main(String[] args) throws Exception { | |
| // Create pipeline | |
| Pipeline pipeline = Pipeline.create(); | |
| pipeline.getOptions().setRunner(PrintPipelineRunner.class); | |
| // Build WordCount pipeline | |
| pipeline | |
| .apply("ReadLines", TextIO.read() | |
| .from("gs://apache-beam-samples/shakespeare/*")) | |
| .apply("ExtractWords", FlatMapElements | |
| .into(TypeDescriptors.strings()) | |
| .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) | |
| .apply("CountPerElement", Count.<String>perElement()) | |
| .apply("FormatResults", MapElements | |
| .into(TypeDescriptors.strings()) | |
| .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())); | |
| var state = pipeline.run().waitUntilFinish(); | |
| System.out.println("\nFinal state: " + state); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment