Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
cnolanminich / dbt_sources_as_external_assets.py
Created January 16, 2025 16:59
example of building dbt external assets from
import json
import textwrap
from typing import Any, Mapping, List, Tuple
from dagster import (
AutomationCondition,
AssetKey,
BackfillPolicy,
DailyPartitionsDefinition,
job,
op,
@cnolanminich
cnolanminich / deployment
Created January 7, 2025 21:15
Dagster GitHub Action with releases for prod and dev for pushes to main
name: Dagster Cloud Hybrid Deployment
on:
push: # For full deployment
branches:
- "main"
- "master"
pull_request: # For branch deployments
types: [opened, synchronize, reopened, closed]
release:
types: [published]
@cnolanminich
cnolanminich / postgres_resource.py
Created December 11, 2024 20:39
example postgres resource
import pandas as pd
from dagster import asset, ResourceParam
from sqlalchemy import create_engine
# Define the PostgreSQL resource
class PostgresResource:
def __init__(self, connection_string):
self.connection_string = connection_string
def get_connection(self):
@cnolanminich
cnolanminich / ops_no_data.py
Created December 3, 2024 20:05
ops_no_data
import dagster as dg
@dg.op
def first_op(context: dg.OpExecutionContext) -> None:
context.log.info("Creating op one")
@dg.op(ins={"start": dg.In(dg.Nothing)})
def second_op(context: dg.OpExecutionContext) -> None:
context.log.info("Creating op two")
@cnolanminich
cnolanminich / dagster_with_sagemaker.py
Created November 27, 2024 20:08
example sagemaker pipeline
import boto3
import time
# Define an asset that uses the custom resource
@asset
def sagemaker_pipeline() -> str:
# note: the functions to connect and use Sagemaker would likely be put into a Dagster Resource
# this is for illustration purposes for how you might integrate a system like Sagemaker into Dagster as simply as possible
# Step 1: Set up the SageMaker session and client
@cnolanminich
cnolanminich / assets
Created November 8, 2024 17:21
querying assets and jobs
query GetAssetDetails($assetKey: AssetKeyInput!) {
assetNodeOrError(assetKey: $assetKey) {
... on AssetNode {
assetKey {
path
}
dataVersion
description
groupName
hasAssetChecks
@cnolanminich
cnolanminich / step_functions.py
Created October 30, 2024 14:42
Dagster + AWS Step Functions Example
import boto3
import json
import time
from dagster import asset, AssetExecutionContext
@asset
def execute_step_function(context: AssetExecutionContext):
# Initialize the boto3 client for Step Functions
client = boto3.client('stepfunctions', region_name='us-west-2')
@cnolanminich
cnolanminich / definitions.py
Created October 15, 2024 20:31
turn on or off schedules in bulk, or have schedules that expire
from dagster import Definitions, load_assets_from_modules, AssetSelection, define_asset_job, op, OpExecutionContext, job, asset, asset_check, ScheduleDefinition
import requests
from datetime import datetime
# can be modified to work with auth in dagster+
graphql_endpoint = "http://localhost:3000/graphql" # Adjust the URL as needed
code_location_name = "turn_on_schedules_in_bulk" # Adjust the code location name as needed
# utility function to get all schedules for a code location
@cnolanminich
cnolanminich / run_status_sensors.py
Last active October 4, 2024 20:43
run status sensor that only runs once both upstream jobs complete
from dagster import run_status_sensor, DagsterRunStatus, RunRequest, SkipReason, job, sensor, define_asset_job, SensorEvaluationContext, DagsterInstance, RunsFilter, Definitions, asset, AssetExecutionContext
from datetime import datetime, timedelta
import json
@asset
def first_asset(context: AssetExecutionContext) -> None:
context.log.info("First asset")
@asset
def second_asset(context: AssetExecutionContext) -> None:
@cnolanminich
cnolanminich / jenkins.file
Last active September 13, 2024 16:29
Jenkinsfile for hybrid
pipeline {
agent any
environment {
// Construct the IMAGE_TAG using Jenkins environment variables
IMAGE_TAG = "${env.GIT_COMMIT}-${env.BUILD_ID}
AWS_ACCESS_KEY_ID = credentials('aws-access-key-id') // Reference to the AWS access key ID secret
AWS_SECRET_ACCESS_KEY = credentials('aws-secret-access-key') // Reference to the AWS secret access key secret
AWS_REGION = 'us-west-2' // Set your AWS region
}