Complete AWS SageMaker Pipelines Tutorial: Automating ML Workflows
SageMaker Pipelines is a purpose-built CI/CD service for machine learning that helps you automate and manage ML workflows. It enables you to create reproducible, production-ready ML pipelines with minimal code.
Why SageMaker Pipelines?
Key Benefits:- Automation: Automate end-to-end ML workflows
- Reproducibility: Track and reproduce experiments
- Integration: Native SageMaker service integration
- Visualization: DAG visualization in Studio
- Version control: Pipeline versioning and lineage
- Processing Steps
- Training Steps
- Transform Steps
- Model Steps
- Condition Steps
- Callback Steps
Prerequisites
pip install sagemaker boto3 pandas scikit-learn
Ensure SageMaker SDK >= 2.0
python -c "import sagemaker; print(sagemaker.version)"
Quick Start
1. Setup
import boto3
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.parameters import ParameterString, ParameterInteger
session = sagemaker.Session()
bucket = session.defaultbucket()
role = sagemaker.getexecutionrole()
region = session.botoregionname
pipelinename = "iris-ml-pipeline"
2. Define Parameters
from sagemaker.workflow.parameters import (
ParameterString,
ParameterInteger,
ParameterFloat
)
Pipeline parameters
inputdata = ParameterString(
name="InputData",
defaultvalue=f"s3://{bucket}/iris/raw/data.csv"
)
traininginstancetype = ParameterString(
name="TrainingInstanceType",
defaultvalue="ml.m5.xlarge"
)
traininginstancecount = ParameterInteger(
name="TrainingInstanceCount",
defaultvalue=1
)
modelapprovalstatus = ParameterString(
name="ModelApprovalStatus",
defaultvalue="PendingManualApproval"
)
Processing Steps
1. Data Preprocessing
# preprocess.py
import argparse
import os
import pandas as pd
from sklearn.modelselection import traintestsplit
from sklearn.preprocessing import StandardScaler
if name == "main":
parser = argparse.ArgumentParser()
parser.addargument("--input-data", type=str)
parser.addargument("--test-size", type=float, default=0.2)
args = parser.parseargs()
# Read data
inputpath = os.path.join("/opt/ml/processing/input", "data.csv")
df = pd.readcsv(inputpath)
# Split features and target
X = df.drop("target", axis=1)
y = df["target"]
# Scale features
scaler = StandardScaler()
Xscaled = scaler.fittransform(X)
# Split data
Xtrain, Xtest, ytrain, ytest = traintestsplit(
Xscaled, y, testsize=args.testsize, randomstate=42
)
# Save outputs
traindf = pd.DataFrame(Xtrain)
traindf["target"] = ytrain.values
traindf.tocsv("/opt/ml/processing/train/train.csv", index=False, header=False)
testdf = pd.DataFrame(Xtest)
testdf["target"] = ytest.values
testdf.tocsv("/opt/ml/processing/test/test.csv", index=False, header=False)
print(f"Train size: {len(traindf)}, Test size: {len(testdf)}")
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
Create processor
sklearnprocessor = SKLearnProcessor(
frameworkversion="1.0-1",
role=role,
instancetype="ml.m5.large",
instancecount=1,
sagemakersession=session
)
Define processing step
stepprocess = ProcessingStep(
name="PreprocessData",
processor=sklearnprocessor,
inputs=[
ProcessingInput(