Complete Vertex AI Pipelines Tutorial: Orchestrating ML Workflows
Vertex AI Pipelines enables you to orchestrate ML workflows as directed acyclic graphs (DAGs). Built on Kubeflow Pipelines, it provides serverless execution with Google Cloud integration.
Why Vertex AI Pipelines?
Key Benefits:- Serverless: No infrastructure to manage
- Reproducible: Version-controlled workflows
- Scalable: Handles large-scale ML jobs
- Integration: Native Google Cloud services
- Reusable: Modular pipeline components
- Automated ML training
- Data preprocessing workflows
- Model deployment pipelines
- Feature engineering
- MLOps automation
Prerequisites
pip install google-cloud-aiplatform kfp
Authenticate
gcloud auth login
gcloud config set project your-project-id
Quick Start
1. Simple Pipeline
from kfp import dsl
from kfp.dsl import component
from google.cloud import aiplatform
Define components
@component
def preprocessdata(inputpath: str, outputpath: str):
import pandas as pd
df = pd.readcsv(inputpath)
df = df.dropna()
df.tocsv(outputpath, index=False)
return outputpath
@component
def trainmodel(datapath: str, modelpath: str) -> float:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.modelselection import traintestsplit
import joblib
df = pd.readcsv(datapath)
X = df.drop("target", axis=1)
y = df["target"]
Xtrain, Xtest, ytrain, ytest = traintestsplit(X, y, testsize=0.2)
model = RandomForestClassifier(nestimators=100)
model.fit(Xtrain, ytrain)
accuracy = model.score(Xtest, ytest)
joblib.dump(model, modelpath)
return accuracy
Define pipeline
@dsl.pipeline(
name="simple-ml-pipeline",
description="A simple ML training pipeline"
)
def mlpipeline(inputdata: str, modeloutput: str):
preprocesstask = preprocessdata(
inputpath=inputdata,
outputpath="gs://bucket/processed/data.csv"
)
traintask = trainmodel(
datapath=preprocesstask.output,
modelpath=modeloutput
)
Compile and run
from kfp import compiler
compiler.Compiler().compile(
pipelinefunc=mlpipeline,
packagepath="pipeline.json"
)
Submit pipeline
aiplatform.init(project="your-project", location="us-central1")
job = aiplatform.PipelineJob(
displayname="ml-pipeline-run",
templatepath="pipeline.json",
parametervalues={
"inputdata": "gs://bucket/raw/data.csv",
"modeloutput": "gs://bucket/models/model.joblib"
}
)
job.run()
Pipeline Components
1. Python Function Components
from kfp.dsl import component, Input, Output, Dataset, Model, Metrics
@component(
baseimage="python:3.9",
packagestoinstall=["pandas", "scikit-learn"]
)
def trainsklearnmodel(
trainingdata: Input[Dataset],
model: Output[Model],
metrics: Output[Metrics],
nestimators: int = 100,
maxdepth: int = 10
):
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.modelselection import traintestsplit
from sklearn.metrics import accuracyscore, f1score
import joblib
# Load data
df = pd.readcsv(trainingdata.path)
X = df.drop("target", axis=1)
y = df["target"]
Xtrain, Xtest, ytrain, ytest = traintestsplit(X, y, testsize=0.2)
# Train
clf = RandomForestClassifier(nestimators=nestimators, maxdepth=maxdepth)
clf.fit(Xtrain, ytrain)
# Evaluate
predictions = clf.predict(Xtest)