Complete Apache Airflow Tutorial: Workflow Orchestration for Data Pipelines
Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. Originally developed by Airbnb, Airflow has become the industry standard for orchestrating complex data pipelines and ML workflows.
Why Airflow?
Airflow Advantages:- Python-based: Define workflows as code (DAGs)
- Scalable: Distributed execution with Celery/Kubernetes
- Extensible: Rich ecosystem of operators and hooks
- Visual UI: Monitor and manage workflows easily
- Active community: Large ecosystem and support
- ETL/ELT pipelines
- ML training workflows
- Data warehouse loading
- Report generation
- Infrastructure automation
Installation
1. Local Installation
# Create virtual environment
python -m venv airflowvenv
source airflowvenv/bin/activate
Set Airflow home
export AIRFLOWHOME=~/airflow
Install Airflow
pip install apache-airflow
Initialize database
airflow db init
Create admin user
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password admin
Start webserver (in one terminal)
airflow webserver --port 8080
Start scheduler (in another terminal)
airflow scheduler
2. Docker Installation
# Download docker-compose file
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
Create directories
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOWUID=$(id -u)" > .env
Initialize and start
docker compose up airflow-init
docker compose up -d
3. Install with Extras
# With specific providers
pip install apache-airflow[postgres,google,amazon,slack]
Common extras
pip install apache-airflow[celery] # Celery executor
pip install apache-airflow[kubernetes] # Kubernetes executor
pip install apache-airflow[pandas] # Pandas support
Core Concepts
1. DAG (Directed Acyclic Graph)
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
Default arguments
defaultargs = {
'owner': 'datateam',
'dependsonpast': False,
'email': ['alerts@example.com'],
'emailonfailure': True,
'emailonretry': False,
'retries': 3,
'retrydelay': timedelta(minutes=5),
}
Define DAG
dag = DAG(
'myfirstdag',
defaultargs=defaultargs,
description='A simple tutorial DAG',
scheduleinterval=timedelta(days=1), # or '@daily'
startdate=datetime(2024, 1, 1),
catchup=False,
tags=['tutorial'],
)
Define tasks
def helloworld():
print("Hello, World!")
return "success"
task1 = PythonOperator(
taskid='hellotask',
pythoncallable=helloworld,
dag=dag,
)
task2 = BashOperator(
taskid='bashtask',
bashcommand='echo "Hello from Bash"',
dag=dag,
)
Set dependencies
task1 >> task2
2. TaskFlow API (Recommended)
from datetime import datetime
from airflow.decorators import dag, task
@dag(
scheduleinterval='@daily',
startdate=datetime(2024, 1, 1),
catchup=False,
tags=['taskflow'],
)
def mytaskflowdag():
@task()
def extract():
"""Extract data from source"""
data = {"orders": [1, 2, 3, 4, 5]}
return data
@task()
def transform(data: dict):
"""Transform the data"""
total = sum(data["orders"])
return {"total": total}
@task()
def load(result: dict):
"""Load data to destination"""