Tutorial Lengkap Vertex AI Pipelines: Orkestrasi Workflow ML
Vertex AI Pipelines memungkinkan Anda mengorkestrasi workflow ML sebagai directed acyclic graphs (DAGs). Dibangun di atas Kubeflow Pipelines, menyediakan eksekusi serverless dengan integrasi Google Cloud.
Mengapa Vertex AI Pipelines?
Manfaat Utama:- Serverless: Tidak perlu mengelola infrastruktur
- Reproducible: Workflow dengan version control
- Scalable: Menangani ML jobs skala besar
- Integration: Layanan Google Cloud native
- Reusable: Komponen pipeline modular
- Automated ML training
- Data preprocessing workflows
- Pipeline deployment model
- Feature engineering
- Automasi MLOps
Prerequisites
pip install google-cloud-aiplatform kfp
Autentikasi
gcloud auth login
gcloud config set project your-project-id
Quick Start
1. Simple Pipeline
from kfp import dsl
from kfp.dsl import component
from google.cloud import aiplatform
Definisikan components
@component
def preprocessdata(inputpath: str, outputpath: str):
import pandas as pd
df = pd.readcsv(inputpath)
df = df.dropna()
df.tocsv(outputpath, index=False)
return outputpath
@component
def trainmodel(datapath: str, modelpath: str) -> float:
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.modelselection import traintestsplit
import joblib
df = pd.readcsv(datapath)
X = df.drop("target", axis=1)
y = df["target"]
Xtrain, Xtest, ytrain, ytest = traintestsplit(X, y, testsize=0.2)
model = RandomForestClassifier(nestimators=100)
model.fit(Xtrain, ytrain)
accuracy = model.score(Xtest, ytest)
joblib.dump(model, modelpath)
return accuracy
Definisikan pipeline
@dsl.pipeline(
name="simple-ml-pipeline",
description="Pipeline ML training sederhana"
)
def mlpipeline(inputdata: str, modeloutput: str):
preprocesstask = preprocessdata(
inputpath=inputdata,
outputpath="gs://bucket/processed/data.csv"
)
traintask = trainmodel(
datapath=preprocesstask.output,
modelpath=modeloutput
)
Compile dan jalankan
from kfp import compiler
compiler.Compiler().compile(
pipelinefunc=mlpipeline,
packagepath="pipeline.json"
)
Submit pipeline
aiplatform.init(project="your-project", location="us-central1")
job = aiplatform.PipelineJob(
displayname="ml-pipeline-run",
templatepath="pipeline.json",
parametervalues={
"inputdata": "gs://bucket/raw/data.csv",
"modeloutput": "gs://bucket/models/model.joblib"
}
)
job.run()
Pipeline Components
1. Python Function Components
from kfp.dsl import component, Input, Output, Dataset, Model, Metrics
@component(
baseimage="python:3.9",
packagestoinstall=["pandas", "scikit-learn"]
)
def trainsklearnmodel(
trainingdata: Input[Dataset],
model: Output[Model],
metrics: Output[Metrics],
nestimators: int = 100,
maxdepth: int = 10
):
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.modelselection import traintestsplit
from sklearn.metrics import accuracyscore, f1score
import joblib
# Load data
df = pd.readcsv(trainingdata.path)
X = df.drop("target", axis=1)
y = df["target"]
Xtrain, Xtest, ytrain, ytest = traintestsplit(X, y, testsize=0.2)
# Train
clf = RandomForestClassifier(nestimators=nestimators, maxdepth=maxdepth)
clf.fit(Xtrain, ytrain)
# Evaluasi
predictions = clf.predict(Xtest)