Complete Prefect Tutorial: Modern Workflow Orchestration for ML

# Tutorial Lengkap Prefect: Modern Workflow Orchestration untuk ML Prefect adalah platform workflow orchestration modern yang dirancang untuk data dan ML pipelines. Library ini menyediakan cara Pytho...

By Ruby Abdullah · · tutorial
PrefectWorkflow OrchestrationMLOpsData PipelinePythonETL

Complete Prefect Tutorial: Modern Workflow Orchestration for ML

Prefect is a modern workflow orchestration platform designed for data and ML pipelines. It provides a Pythonic way to build, schedule, and monitor complex workflows with automatic retries, caching, and observability.

Why Prefect?

Prefect Advantages:
  • Pythonic: Native Python decorators and functions
  • Observable: Built-in UI and monitoring
  • Resilient: Automatic retries and error handling
  • Flexible: Run anywhere - local, cloud, Kubernetes
  • Modern: Async support and dynamic workflows

Use Cases:
  • ML pipeline orchestration
  • Data engineering workflows
  • ETL processes
  • Scheduled data processing
  • Model training pipelines

Installation

pip install prefect

Verify installation

prefect version

Start Prefect server (local)

prefect server start

Or use Prefect Cloud

prefect cloud login

Quick Start

1. Basic Flow

from prefect import flow, task

@task

def extractdata():

return [1, 2, 3, 4, 5]

@task

def transformdata(data):

return [x 2 for x in data]

@task

def loaddata(data):

print(f"Loading data: {data}")

return len(data)

@flow(name="ETL Pipeline")

def etlpipeline():

rawdata = extractdata()

transformed = transformdata(rawdata)

result = loaddata(transformed)

return result

Run the flow

if name == "main":

etlpipeline()

2. With Parameters

from prefect import flow, task

@task

def fetchdata(url: str):

import requests

response = requests.get(url)

return response.json()

@task

def processdata(data: dict, threshold: float):

return {k: v for k, v in data.items() if v > threshold}

@flow(name="Parameterized Flow")

def datapipeline(url: str, threshold: float = 0.5):

data = fetchdata(url)

processed = processdata(data, threshold)

return processed

Run with parameters

if name == "main":

result = datapipeline(

url="https://api.example.com/data",

threshold=0.7

)

3. ML Training Flow

from prefect import flow, task

import pandas as pd

from sklearn.modelselection import traintestsplit

from sklearn.ensemble import RandomForestClassifier

from sklearn.metrics import accuracyscore

@task

def loaddataset(path: str):

return pd.readcsv(path)

@task

def preprocess(df: pd.DataFrame):

df = df.dropna()

X = df.drop("target", axis=1)

y = df["target"]

return traintestsplit(X, y, testsize=0.2)

@task

def trainmodel(Xtrain, ytrain, nestimators: int):

model = RandomForestClassifier(nestimators=nestimators)

model.fit(Xtrain, ytrain)

return model

@task

def evaluatemodel(model, Xtest, ytest):

predictions = model.predict(Xtest)

accuracy = accuracyscore(ytest, predictions)

return accuracy

@flow(name="ML Training Pipeline")

def trainingpipeline(datapath: str, nestimators: int = 100):

df = loaddataset(datapath)

Xtrain, Xtest, ytrain, ytest = preprocess(df)

model = trainmodel(Xtrain, ytrain, nestimators)

accuracy = evaluatemodel(model, Xtest, ytest)

print(f"Model accuracy: {accuracy:.4f}")

return accuracy

if name == "main":

trainingpipeline("data.csv", nestimators=200)

Task Configuration

1. Retries and Timeouts

from prefect import task, flow

from datetime import timedelta

@task(

retries=3,

retrydelayseconds=10,

timeoutseconds=300

)

def unreliabletask():

import random

if random.random() < 0.5:

Related Articles