Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Created October 30, 2024 14:42
Show Gist options
  • Save cnolanminich/a8703eec41dca18c081d8a441308bcb3 to your computer and use it in GitHub Desktop.
Save cnolanminich/a8703eec41dca18c081d8a441308bcb3 to your computer and use it in GitHub Desktop.
Dagster + AWS Step Functions Example
import boto3
import json
import time
from dagster import asset, AssetExecutionContext
@asset
def execute_step_function(context: AssetExecutionContext):
# Initialize the boto3 client for Step Functions
client = boto3.client('stepfunctions', region_name='us-west-2')
# Define the Step Function ARN and input
step_function_arn = 'arn:aws:states:us-west-2:123456789012:stateMachine:YourStateMachineName'
input_data = {
"key": "value" # Replace with your actual input data
}
# Start the Step Function execution
response = client.start_execution(
stateMachineArn=step_function_arn,
input=json.dumps(input_data)
)
execution_arn = response['executionArn']
context.log.info(f"Started Step Function execution: {execution_arn}")
# Poll for the execution status
while True:
# Get the current execution status
status_response = client.describe_execution(executionArn=execution_arn)
status = status_response['status']
# Log the current status
context.log.info(f"Current execution status: {status}")
# Check if the execution has reached a terminal state
if status in ['SUCCEEDED', 'FAILED', 'TIMED_OUT']:
break
# Wait before polling again
time.sleep(5)
# Log the final status
context.log.info(f"Step Function execution completed with status: {status}")
return status
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment