Skip to content

Instantly share code, notes, and snippets.

@pascalwhoop
Created June 24, 2025 09:30
Show Gist options
  • Save pascalwhoop/cb33fcabe07779e902357c357e48a36e to your computer and use it in GitHub Desktop.
Save pascalwhoop/cb33fcabe07779e902357c357e48a36e to your computer and use it in GitHub Desktop.
# NOTE: This file was partially generated using AI assistance.
from kedro.pipeline import node, pipeline
def dummy_node(data=None, *args, **kwargs):
return data
def create_pipeline(**kwargs):
data_engineering = pipeline(
[
# commenting out for now, we do not have an ingest node in matrix
# pipeline(
# [
# node(dummy_node, inputs=None, outputs="raw_data", name="ingest_data"),
# ],
# namespace="ingestion",
# outputs={"raw_data"},
# ),
pipeline(
[
node(dummy_node, inputs="raw_data", outputs="integrated_data", name="integrate_data"),
],
namespace="integration",
inputs={"raw_data"},
outputs={"integrated_data"},
),
pipeline(
[
node(dummy_node, inputs="integrated_data", outputs="preprocessed_data", name="preprocess_data"),
],
namespace="preprocessing",
outputs={"preprocessed_data"},
inputs={"integrated_data"},
),
],
namespace="data_engineering",
outputs={"preprocessed_data"},
inputs={"raw_data"},
)
feature_engineering = pipeline(
[
pipeline(
[
node(dummy_node, inputs="preprocessed_data", outputs="filtered_data", name="filter_data"),
],
namespace="filtering",
inputs={"preprocessed_data"},
outputs={"filtered_data"},
),
pipeline(
[
node(dummy_node, inputs="filtered_data", outputs="embeddings", name="create_embeddings"),
],
namespace="embeddings",
inputs={"filtered_data"},
outputs={"embeddings"},
),
pipeline(
[
node(dummy_node, inputs="embeddings", outputs="selected_features", name="select_features"),
],
namespace="feature_selection",
inputs={"embeddings"},
outputs={"selected_features"},
),
],
namespace="feature_engineering",
inputs={"preprocessed_data"},
outputs={"selected_features"},
)
ml_pipeline = pipeline(
[
pipeline(
[
node(dummy_node, inputs="selected_features", outputs="model_input", name="prepare_model_input"),
],
namespace="modelling",
inputs={"selected_features"},
outputs={"model_input"},
),
pipeline(
[
node(dummy_node, inputs="model_input", outputs="matrix", name="generate_matrix"),
],
namespace="matrix_generation",
inputs={"model_input"},
outputs={"matrix"},
),
pipeline(
[
node(dummy_node, inputs="matrix", outputs="evaluation_results", name="evaluate_model"),
],
namespace="evaluation",
inputs={"matrix"},
outputs={"evaluation_results"},
),
pipeline(
[
node(
dummy_node, inputs="evaluation_results", outputs="transformed_matrix", name="transform_matrix"
),
],
namespace="matrix_transformations",
inputs={"evaluation_results"},
outputs={"transformed_matrix"},
),
],
namespace="ml_pipeline",
inputs={"selected_features"},
outputs={"transformed_matrix"},
)
data_release = pipeline(
[
pipeline(
[
node(dummy_node, inputs="transformed_matrix", outputs="release_file", name="export_data"),
],
namespace="export",
inputs={"transformed_matrix"},
outputs={"release_file"},
),
pipeline(
[
node(dummy_node, inputs="release_file", outputs="neo4j_data", name="ingest_to_neo4j"),
],
namespace="ingest_to_N4J",
inputs={"release_file"},
outputs={"neo4j_data"},
),
pipeline(
[
node(dummy_node, inputs="neo4j_data", outputs="sentinel_signal", name="send_sentinel"),
],
namespace="sentinel",
inputs={"neo4j_data"},
outputs={"sentinel_signal"},
),
],
namespace="data_release",
inputs={"transformed_matrix"},
outputs={"sentinel_signal"},
)
reporting = pipeline(
[
pipeline(
[
node(dummy_node, inputs="transformed_matrix", outputs="plots", name="generate_plots"),
],
namespace="plots",
inputs={"transformed_matrix"},
outputs={"plots"},
),
pipeline(
[
node(dummy_node, inputs="plots", outputs="tables", name="generate_tables"),
],
namespace="tables",
inputs={"plots"},
outputs={"tables"},
),
pipeline(
[
node(dummy_node, inputs="tables", outputs="dashboards", name="generate_dashboards"),
],
namespace="dashboards",
inputs={"tables"},
outputs={"dashboards"},
),
],
namespace="reporting",
inputs={"transformed_matrix"},
outputs={"dashboards"},
)
fabrication_pipeline = pipeline(
[
node(dummy_node, inputs=None, outputs="fabricated_data", name="fabricate_data"),
],
namespace="fabrication",
# We map fabricated data to raw data here because this allows us to use this data in downstream real data pipelines
outputs={"fabricated_data": "raw_data"},
)
postprocessing = pipeline(
[
node(
dummy_node,
inputs=["sentinel_signal", "dashboards"],
outputs="final_output",
name="postprocess_output",
),
],
namespace="postprocessing",
inputs={"sentinel_signal", "dashboards"},
outputs={"final_output"},
)
return (
data_engineering
+ feature_engineering
+ ml_pipeline
+ data_release
+ reporting
+ fabrication_pipeline
+ postprocessing
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment