This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import textwrap | |
from typing import Any, Mapping, List, Tuple | |
from dagster import ( | |
AutomationCondition, | |
AssetKey, | |
BackfillPolicy, | |
DailyPartitionsDefinition, | |
job, | |
op, |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name: Dagster Cloud Hybrid Deployment | |
on: | |
push: # For full deployment | |
branches: | |
- "main" | |
- "master" | |
pull_request: # For branch deployments | |
types: [opened, synchronize, reopened, closed] | |
release: | |
types: [published] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
from dagster import asset, ResourceParam | |
from sqlalchemy import create_engine | |
# Define the PostgreSQL resource | |
class PostgresResource: | |
def __init__(self, connection_string): | |
self.connection_string = connection_string | |
def get_connection(self): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import dagster as dg | |
@dg.op | |
def first_op(context: dg.OpExecutionContext) -> None: | |
context.log.info("Creating op one") | |
@dg.op(ins={"start": dg.In(dg.Nothing)}) | |
def second_op(context: dg.OpExecutionContext) -> None: | |
context.log.info("Creating op two") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import time | |
# Define an asset that uses the custom resource | |
@asset | |
def sagemaker_pipeline() -> str: | |
# note: the functions to connect and use Sagemaker would likely be put into a Dagster Resource | |
# this is for illustration purposes for how you might integrate a system like Sagemaker into Dagster as simply as possible | |
# Step 1: Set up the SageMaker session and client |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
query GetAssetDetails($assetKey: AssetKeyInput!) { | |
assetNodeOrError(assetKey: $assetKey) { | |
... on AssetNode { | |
assetKey { | |
path | |
} | |
dataVersion | |
description | |
groupName | |
hasAssetChecks |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
import time | |
from dagster import asset, AssetExecutionContext | |
@asset | |
def execute_step_function(context: AssetExecutionContext): | |
# Initialize the boto3 client for Step Functions | |
client = boto3.client('stepfunctions', region_name='us-west-2') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dagster import Definitions, load_assets_from_modules, AssetSelection, define_asset_job, op, OpExecutionContext, job, asset, asset_check, ScheduleDefinition | |
import requests | |
from datetime import datetime | |
# can be modified to work with auth in dagster+ | |
graphql_endpoint = "http://localhost:3000/graphql" # Adjust the URL as needed | |
code_location_name = "turn_on_schedules_in_bulk" # Adjust the code location name as needed | |
# utility function to get all schedules for a code location |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dagster import run_status_sensor, DagsterRunStatus, RunRequest, SkipReason, job, sensor, define_asset_job, SensorEvaluationContext, DagsterInstance, RunsFilter, Definitions, asset, AssetExecutionContext | |
from datetime import datetime, timedelta | |
import json | |
@asset | |
def first_asset(context: AssetExecutionContext) -> None: | |
context.log.info("First asset") | |
@asset | |
def second_asset(context: AssetExecutionContext) -> None: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pipeline { | |
agent any | |
environment { | |
// Construct the IMAGE_TAG using Jenkins environment variables | |
IMAGE_TAG = "${env.GIT_COMMIT}-${env.BUILD_ID} | |
AWS_ACCESS_KEY_ID = credentials('aws-access-key-id') // Reference to the AWS access key ID secret | |
AWS_SECRET_ACCESS_KEY = credentials('aws-secret-access-key') // Reference to the AWS secret access key secret | |
AWS_REGION = 'us-west-2' // Set your AWS region | |
} |
NewerOlder