Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
Created November 27, 2024 20:08
Show Gist options
  • Save cnolanminich/b3cdc8fec9f9442069efbf86cfc30204 to your computer and use it in GitHub Desktop.
Save cnolanminich/b3cdc8fec9f9442069efbf86cfc30204 to your computer and use it in GitHub Desktop.
example sagemaker pipeline
import boto3
import time
# Define an asset that uses the custom resource
@asset
def sagemaker_pipeline() -> str:
# note: the functions to connect and use Sagemaker would likely be put into a Dagster Resource
# this is for illustration purposes for how you might integrate a system like Sagemaker into Dagster as simply as possible
# Step 1: Set up the SageMaker session and client
sagemaker_client = boto3.client('sagemaker')
# Step 2: Define the pipeline name
pipeline_name = 'your-pipeline-name'
# Step 3: Start the pipeline execution
response = sagemaker_client.start_pipeline_execution(
PipelineName=pipeline_name
)
# Get the execution ARN
execution_arn = response['PipelineExecutionArn']
print(f'Started pipeline execution: {execution_arn}')
# Step 4: Monitor the pipeline execution status
while True:
response = sagemaker_client.describe_pipeline_execution(
PipelineExecutionArn=execution_arn
)
status = response['PipelineExecutionStatus']
print(f'Pipeline execution status: {status}')
if status in ['Succeeded', 'Failed', 'Stopped']:
break
time.sleep(30) # Wait for 30 seconds before checking the status again
print(f'Pipeline execution completed with status: {status}')
# Define the Dagster definitions with the custom resource
defs = Definitions(
assets=[greet_asset],
resources={
"my_resource": MyCustomResource(greeting="Hello"),
},
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment