Created
October 30, 2024 14:42
-
-
Save cnolanminich/a8703eec41dca18c081d8a441308bcb3 to your computer and use it in GitHub Desktop.
Dagster + AWS Step Functions Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
import time | |
from dagster import asset, AssetExecutionContext | |
@asset | |
def execute_step_function(context: AssetExecutionContext): | |
# Initialize the boto3 client for Step Functions | |
client = boto3.client('stepfunctions', region_name='us-west-2') | |
# Define the Step Function ARN and input | |
step_function_arn = 'arn:aws:states:us-west-2:123456789012:stateMachine:YourStateMachineName' | |
input_data = { | |
"key": "value" # Replace with your actual input data | |
} | |
# Start the Step Function execution | |
response = client.start_execution( | |
stateMachineArn=step_function_arn, | |
input=json.dumps(input_data) | |
) | |
execution_arn = response['executionArn'] | |
context.log.info(f"Started Step Function execution: {execution_arn}") | |
# Poll for the execution status | |
while True: | |
# Get the current execution status | |
status_response = client.describe_execution(executionArn=execution_arn) | |
status = status_response['status'] | |
# Log the current status | |
context.log.info(f"Current execution status: {status}") | |
# Check if the execution has reached a terminal state | |
if status in ['SUCCEEDED', 'FAILED', 'TIMED_OUT']: | |
break | |
# Wait before polling again | |
time.sleep(5) | |
# Log the final status | |
context.log.info(f"Step Function execution completed with status: {status}") | |
return status |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment