Created
November 27, 2024 20:08
-
-
Save cnolanminich/b3cdc8fec9f9442069efbf86cfc30204 to your computer and use it in GitHub Desktop.
example sagemaker pipeline
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import time | |
# Define an asset that uses the custom resource | |
@asset | |
def sagemaker_pipeline() -> str: | |
# note: the functions to connect and use Sagemaker would likely be put into a Dagster Resource | |
# this is for illustration purposes for how you might integrate a system like Sagemaker into Dagster as simply as possible | |
# Step 1: Set up the SageMaker session and client | |
sagemaker_client = boto3.client('sagemaker') | |
# Step 2: Define the pipeline name | |
pipeline_name = 'your-pipeline-name' | |
# Step 3: Start the pipeline execution | |
response = sagemaker_client.start_pipeline_execution( | |
PipelineName=pipeline_name | |
) | |
# Get the execution ARN | |
execution_arn = response['PipelineExecutionArn'] | |
print(f'Started pipeline execution: {execution_arn}') | |
# Step 4: Monitor the pipeline execution status | |
while True: | |
response = sagemaker_client.describe_pipeline_execution( | |
PipelineExecutionArn=execution_arn | |
) | |
status = response['PipelineExecutionStatus'] | |
print(f'Pipeline execution status: {status}') | |
if status in ['Succeeded', 'Failed', 'Stopped']: | |
break | |
time.sleep(30) # Wait for 30 seconds before checking the status again | |
print(f'Pipeline execution completed with status: {status}') | |
# Define the Dagster definitions with the custom resource | |
defs = Definitions( | |
assets=[greet_asset], | |
resources={ | |
"my_resource": MyCustomResource(greeting="Hello"), | |
}, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment