Tutorial Lengkap Prefect: Modern Workflow Orchestration untuk ML
Prefect adalah platform workflow orchestration modern yang dirancang untuk data dan ML pipelines. Library ini menyediakan cara Pythonic untuk membangun, menjadwalkan, dan memonitor workflow kompleks dengan automatic retries, caching, dan observability.
Mengapa Prefect?
Keunggulan Prefect:- Pythonic: Decorators dan fungsi Python native
- Observable: UI dan monitoring built-in
- Resilient: Automatic retries dan error handling
- Fleksibel: Jalankan dimana saja - local, cloud, Kubernetes
- Modern: Async support dan dynamic workflows
- ML pipeline orchestration
- Data engineering workflows
- Proses ETL
- Scheduled data processing
- Model training pipelines
Instalasi
pip install prefect
Verify instalasi
prefect version
Start Prefect server (local)
prefect server start
Atau gunakan Prefect Cloud
prefect cloud login
Quick Start
1. Basic Flow
from prefect import flow, task
@task
def extractdata():
return [1, 2, 3, 4, 5]
@task
def transformdata(data):
return [x 2 for x in data]
@task
def loaddata(data):
print(f"Loading data: {data}")
return len(data)
@flow(name="ETL Pipeline")
def etlpipeline():
rawdata = extractdata()
transformed = transformdata(rawdata)
result = loaddata(transformed)
return result
Jalankan flow
if name == "main":
etlpipeline()
2. Dengan Parameters
from prefect import flow, task
@task
def fetchdata(url: str):
import requests
response = requests.get(url)
return response.json()
@task
def processdata(data: dict, threshold: float):
return {k: v for k, v in data.items() if v > threshold}
@flow(name="Parameterized Flow")
def datapipeline(url: str, threshold: float = 0.5):
data = fetchdata(url)
processed = processdata(data, threshold)
return processed
Jalankan dengan parameter
if name == "main":
result = datapipeline(
url="https://api.example.com/data",
threshold=0.7
)
3. ML Training Flow
from prefect import flow, task
import pandas as pd
from sklearn.modelselection import traintestsplit
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracyscore
@task
def loaddataset(path: str):
return pd.readcsv(path)
@task
def preprocess(df: pd.DataFrame):
df = df.dropna()
X = df.drop("target", axis=1)
y = df["target"]
return traintestsplit(X, y, testsize=0.2)
@task
def trainmodel(Xtrain, ytrain, nestimators: int):
model = RandomForestClassifier(nestimators=nestimators)
model.fit(Xtrain, ytrain)
return model
@task
def evaluatemodel(model, Xtest, ytest):
predictions = model.predict(Xtest)
accuracy = accuracyscore(ytest, predictions)
return accuracy
@flow(name="ML Training Pipeline")
def trainingpipeline(datapath: str, nestimators: int = 100):
df = loaddataset(datapath)
Xtrain, Xtest, ytrain, ytest = preprocess(df)
model = trainmodel(Xtrain, ytrain, nestimators)
accuracy = evaluatemodel(model, Xtest, ytest)
print(f"Model accuracy: {accuracy:.4f}")
return accuracy
if name == "main":
trainingpipeline("data.csv", nestimators=200)
Konfigurasi Task
1. Retries dan Timeouts
from prefect import task, flow
from datetime import timedelta
@task(
retries=3,
retrydelayseconds=10,
timeoutseconds=300
)
def unreliabletask():
import random