Tutorial Lengkap Prefect: Modern Workflow Orchestration untuk ML

# Tutorial Lengkap Prefect: Modern Workflow Orchestration untuk ML Prefect adalah platform workflow orchestration modern yang dirancang untuk data dan ML pipelines. Library ini menyediakan cara Pytho...

By Ruby Abdullah · · tutorial
PrefectWorkflow OrchestrationMLOpsData PipelinePythonETL

Tutorial Lengkap Prefect: Modern Workflow Orchestration untuk ML

Prefect adalah platform workflow orchestration modern yang dirancang untuk data dan ML pipelines. Library ini menyediakan cara Pythonic untuk membangun, menjadwalkan, dan memonitor workflow kompleks dengan automatic retries, caching, dan observability.

Mengapa Prefect?

Keunggulan Prefect:
  • Pythonic: Decorators dan fungsi Python native
  • Observable: UI dan monitoring built-in
  • Resilient: Automatic retries dan error handling
  • Fleksibel: Jalankan dimana saja - local, cloud, Kubernetes
  • Modern: Async support dan dynamic workflows

Use Cases:
  • ML pipeline orchestration
  • Data engineering workflows
  • Proses ETL
  • Scheduled data processing
  • Model training pipelines

Instalasi

pip install prefect

Verify instalasi

prefect version

Start Prefect server (local)

prefect server start

Atau gunakan Prefect Cloud

prefect cloud login

Quick Start

1. Basic Flow

from prefect import flow, task

@task

def extractdata():

return [1, 2, 3, 4, 5]

@task

def transformdata(data):

return [x 2 for x in data]

@task

def loaddata(data):

print(f"Loading data: {data}")

return len(data)

@flow(name="ETL Pipeline")

def etlpipeline():

rawdata = extractdata()

transformed = transformdata(rawdata)

result = loaddata(transformed)

return result

Jalankan flow

if name == "main":

etlpipeline()

2. Dengan Parameters

from prefect import flow, task

@task

def fetchdata(url: str):

import requests

response = requests.get(url)

return response.json()

@task

def processdata(data: dict, threshold: float):

return {k: v for k, v in data.items() if v > threshold}

@flow(name="Parameterized Flow")

def datapipeline(url: str, threshold: float = 0.5):

data = fetchdata(url)

processed = processdata(data, threshold)

return processed

Jalankan dengan parameter

if name == "main":

result = datapipeline(

url="https://api.example.com/data",

threshold=0.7

)

3. ML Training Flow

from prefect import flow, task

import pandas as pd

from sklearn.modelselection import traintestsplit

from sklearn.ensemble import RandomForestClassifier

from sklearn.metrics import accuracyscore

@task

def loaddataset(path: str):

return pd.readcsv(path)

@task

def preprocess(df: pd.DataFrame):

df = df.dropna()

X = df.drop("target", axis=1)

y = df["target"]

return traintestsplit(X, y, testsize=0.2)

@task

def trainmodel(Xtrain, ytrain, nestimators: int):

model = RandomForestClassifier(nestimators=nestimators)

model.fit(Xtrain, ytrain)

return model

@task

def evaluatemodel(model, Xtest, ytest):

predictions = model.predict(Xtest)

accuracy = accuracyscore(ytest, predictions)

return accuracy

@flow(name="ML Training Pipeline")

def trainingpipeline(datapath: str, nestimators: int = 100):

df = loaddataset(datapath)

Xtrain, Xtest, ytrain, ytest = preprocess(df)

model = trainmodel(Xtrain, ytrain, nestimators)

accuracy = evaluatemodel(model, Xtest, ytest)

print(f"Model accuracy: {accuracy:.4f}")

return accuracy

if name == "main":

trainingpipeline("data.csv", nestimators=200)

Konfigurasi Task

1. Retries dan Timeouts

from prefect import task, flow

from datetime import timedelta

@task(

retries=3,

retrydelayseconds=10,

timeoutseconds=300

)

def unreliabletask():

import random

Artikel Terkait