Complete Prefect Tutorial: Modern Workflow Orchestration for ML
Prefect is a modern workflow orchestration platform designed for data and ML pipelines. It provides a Pythonic way to build, schedule, and monitor complex workflows with automatic retries, caching, and observability.
Why Prefect?
Prefect Advantages:- Pythonic: Native Python decorators and functions
- Observable: Built-in UI and monitoring
- Resilient: Automatic retries and error handling
- Flexible: Run anywhere - local, cloud, Kubernetes
- Modern: Async support and dynamic workflows
- ML pipeline orchestration
- Data engineering workflows
- ETL processes
- Scheduled data processing
- Model training pipelines
Installation
pip install prefect
Verify installation
prefect version
Start Prefect server (local)
prefect server start
Or use Prefect Cloud
prefect cloud login
Quick Start
1. Basic Flow
from prefect import flow, task
@task
def extractdata():
return [1, 2, 3, 4, 5]
@task
def transformdata(data):
return [x 2 for x in data]
@task
def loaddata(data):
print(f"Loading data: {data}")
return len(data)
@flow(name="ETL Pipeline")
def etlpipeline():
rawdata = extractdata()
transformed = transformdata(rawdata)
result = loaddata(transformed)
return result
Run the flow
if name == "main":
etlpipeline()
2. With Parameters
from prefect import flow, task
@task
def fetchdata(url: str):
import requests
response = requests.get(url)
return response.json()
@task
def processdata(data: dict, threshold: float):
return {k: v for k, v in data.items() if v > threshold}
@flow(name="Parameterized Flow")
def datapipeline(url: str, threshold: float = 0.5):
data = fetchdata(url)
processed = processdata(data, threshold)
return processed
Run with parameters
if name == "main":
result = datapipeline(
url="https://api.example.com/data",
threshold=0.7
)
3. ML Training Flow
from prefect import flow, task
import pandas as pd
from sklearn.modelselection import traintestsplit
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracyscore
@task
def loaddataset(path: str):
return pd.readcsv(path)
@task
def preprocess(df: pd.DataFrame):
df = df.dropna()
X = df.drop("target", axis=1)
y = df["target"]
return traintestsplit(X, y, testsize=0.2)
@task
def trainmodel(Xtrain, ytrain, nestimators: int):
model = RandomForestClassifier(nestimators=nestimators)
model.fit(Xtrain, ytrain)
return model
@task
def evaluatemodel(model, Xtest, ytest):
predictions = model.predict(Xtest)
accuracy = accuracyscore(ytest, predictions)
return accuracy
@flow(name="ML Training Pipeline")
def trainingpipeline(datapath: str, nestimators: int = 100):
df = loaddataset(datapath)
Xtrain, Xtest, ytrain, ytest = preprocess(df)
model = trainmodel(Xtrain, ytrain, nestimators)
accuracy = evaluatemodel(model, Xtest, ytest)
print(f"Model accuracy: {accuracy:.4f}")
return accuracy
if name == "main":
trainingpipeline("data.csv", nestimators=200)
Task Configuration
1. Retries and Timeouts
from prefect import task, flow
from datetime import timedelta
@task(
retries=3,
retrydelayseconds=10,
timeoutseconds=300
)
def unreliabletask():
import random
if random.random() < 0.5: