Tutorial Lengkap Apache Airflow: Workflow Orchestration untuk Data Pipelines
Apache Airflow adalah platform open-source untuk membuat, menjadwalkan, dan memonitor workflows secara programatik. Awalnya dikembangkan oleh Airbnb, Airflow telah menjadi standar industri untuk orkestrasi data pipelines dan ML workflows yang kompleks.
Mengapa Airflow?
Keunggulan Airflow:- Python-based: Define workflows sebagai code (DAGs)
- Scalable: Distributed execution dengan Celery/Kubernetes
- Extensible: Rich ecosystem operators dan hooks
- Visual UI: Monitor dan manage workflows dengan mudah
- Active community: Ecosystem besar dan support
- ETL/ELT pipelines
- ML training workflows
- Data warehouse loading
- Report generation
- Infrastructure automation
Instalasi
1. Local Installation
# Buat virtual environment
python -m venv airflowvenv
source airflowvenv/bin/activate
Set Airflow home
export AIRFLOWHOME=~/airflow
Install Airflow
pip install apache-airflow
Initialize database
airflow db init
Buat admin user
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password admin
Start webserver (di satu terminal)
airflow webserver --port 8080
Start scheduler (di terminal lain)
airflow scheduler
2. Docker Installation
# Download docker-compose file
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
Buat directories
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOWUID=$(id -u)" > .env
Initialize dan start
docker compose up airflow-init
docker compose up -d
3. Install dengan Extras
# Dengan specific providers
pip install apache-airflow[postgres,google,amazon,slack]
Common extras
pip install apache-airflow[celery] # Celery executor
pip install apache-airflow[kubernetes] # Kubernetes executor
pip install apache-airflow[pandas] # Pandas support
Konsep Dasar
1. DAG (Directed Acyclic Graph)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
Default arguments
defaultargs = {
'owner': 'datateam',
'dependsonpast': False,
'email': ['alerts@example.com'],
'emailonfailure': True,
'emailonretry': False,
'retries': 3,
'retrydelay': timedelta(minutes=5),
}
Define DAG
dag = DAG(
'myfirstdag',
defaultargs=defaultargs,
description='DAG tutorial sederhana',
scheduleinterval=timedelta(days=1), # atau '@daily'
startdate=datetime(2024, 1, 1),
catchup=False,
tags=['tutorial'],
)
Define tasks
def helloworld():
print("Hello, World!")
return "success"
task1 = PythonOperator(
taskid='hellotask',
pythoncallable=helloworld,
dag=dag,
)
task2 = BashOperator(
taskid='bashtask',
bashcommand='echo "Hello from Bash"',
dag=dag,
)
Set dependencies
task1 >> task2
2. TaskFlow API (Recommended)
from datetime import datetime
from airflow.decorators import dag, task
@dag(
scheduleinterval='@daily',
startdate=datetime(2024, 1, 1),
catchup=False,
tags=['taskflow'],
)
def mytaskflowdag():
@task()
def extract():
"""Extract data dari source"""
data = {"orders": [1, 2, 3, 4, 5]}
return data
@task()
def transform(data: dict):
"""Transform data"""
total = sum(data["orders"])
return {"total": total}
@task()
def load(result: dict):
"""Load data ke destination"""