Skip to main content

Command Palette

Search for a command to run...

Building the Stack — Plugin, StatsD, Sweeper, and Grafana

Step-by-step implementation of an Airflow metrics plugin using Gauges, configuring StatsD exporter, and deploying a Sweeper DAG to prevent Pushgateway OOMs.

Updated
6 min read
Building the Stack — Plugin, StatsD, Sweeper, and Grafana
C
Hi, I'm Chirag! By day, I'm a Staff Engineer designing high-scale distributed platforms and full-stack systems. By night, I’m just a highly curious engineer who loves writing down what I learn, sharing my architectural experiments, and chatting about how things work (and why they sometimes break!). I started this blog because I believe the best engineering happens when we share our blueprints openly, embrace our mistakes, and learn from one another. Grab a cup of coffee and let's explore!

(Part 4 of the Batch Workloads Observability series. Read Part 3: Metric Granularity for the metric classification rationale.)

(Clone the companion repo: TheStaffBlueprint/batch-workloads-observability for the full docker-compose stack, plugins, and Sweeper DAG.)

Part 4 Implementation Explainer

In Parts 2 and 3, we established the architecture and metric classification: StatsD for aggregate counts, Pushgateway with Gauges for per-run state, and structured logs/OLAP for audit data. Today, we're building all of it.

Step 1: StatsD — Zero Custom Code

Add these lines to your Airflow environment:

[metrics]
statsd_on = True
statsd_host = statsd-exporter
statsd_port = 8125
statsd_prefix = airflow

Airflow natively pushes ti.finish counts, dag.duration timers, and dozens of other operational metrics over UDP. The StatsD exporter aggregates these and exposes them to Prometheus as clean, low-cardinality metrics. See statsd_mapping.yml in the companion repo for the full mapping configuration.

Step 2: The Pushgateway Plugin — V1 to V2 Evolution

The V1 Anti-Pattern (Two Compounding Mistakes)

Mistake 1 — No run isolation. V1 used a coarse grouping key (dag_id + task_id) with no run_id. Every execution of the same task overwrites the previous one. If 10 parallel tasks push at the same time, only the last survives. This is a race condition caused by the grouping key design, not the metric type — it would happen with Gauges too.

# V1 ANTI-PATTERN: Coarse key — no run_id
def _get_task_group_key(self, ti):
    return {
        'dag_id': ti.dag_id,
        'task_id': ti.task_id,
        'instance': self.instance_name,
    }

Mistake 2 — Using Counters for state. Even with run_id added, Counters are semantically wrong. If a task fails then retries successfully, both failure_total = 1 and success_total = 1 persist. Failure count is permanently inflated. Both Counters and Gauges with run_id produce identical cardinality.

The V2 Fix (Two Corrections)

# airflow/plugins/v2_gauge_fix_plugin.py
from prometheus_client import CollectorRegistry, Gauge, pushadd_to_gateway

class PushgatewayV2GaugeListeners:
    def __init__(self):
        self.instance_name = os.environ.get("AIRFLOW_VAR_PROMETHEUS_INSTANCE_NAME", "airflow-local")

    def _push(self, registry, job_name, group_key):
        # THIS is the key: Read from environment to avoid SQLAlchemy session detachment
        enabled_str = os.environ.get("AIRFLOW_VAR_PROMETHEUS_METRICS_ENABLED", "true").lower()
        push_gateway_url = os.environ.get("AIRFLOW_VAR_PUSHGATEWAY_URL", "http://host.docker.internal:9091")
        if enabled_str != "true" or not push_gateway_url:
            return
        try:
            pushadd_to_gateway(push_gateway_url, job=job_name, grouping_key=group_key, registry=registry)
        except Exception as e:
            logging.error(f"Failed to push metric: {e}")

The SQLAlchemy Trap

We use os.environ.get instead of Airflow's Variable.get(). When the Scheduler fires a listener hook, it often does so outside an active database session. Using Variable.get() throws a DetachedInstanceError and crashes the scheduler loop.

Fix 1 — Run Isolation via Grouping Key

    def _get_task_group_key(self, ti):
        key = {
            'dag_id': ti.dag_id,
            'task_id': ti.task_id,
            'run_id': ti.run_id,        # Isolates each DAG run
            'instance': self.instance_name,
        }
        map_index = getattr(ti, 'map_index', -1)
        if map_index >= 0:
            key['map_index'] = str(map_index)  # Isolates mapped tasks
        return key

Fix 2 — Gauges for Semantic Correctness

Gauges represent state as an absolute value. On retry, the Gauge overwrites from -1 to 1 — only the final state is visible:

    @hookimpl
    def on_task_instance_success(self, previous_state: TaskInstanceState, task_instance: TaskInstance):
        registry = CollectorRegistry()
        group_key = self._get_task_group_key(task_instance)
        
        g = Gauge('task_status', 'Task state snapshot (1=success, -1=failed)', registry=registry)
        g.set(1)  # 1 = success (overwrites any previous -1 from a failed attempt)
        
        self._push(registry, group_key)

Step 3: The Sweeper DAG — Preventing OOM Crashes

Pushgateway has no native TTL. Every run_id creates a metric group held in memory forever. The Sweeper runs every 24 hours and deletes anything older than the threshold:

Pushgateway Sweeper DAG
# airflow/dags/pushgateway_sweeper.py
from datetime import datetime, timezone
import requests
from airflow.decorators import dag, task

@task
def clean_stale_metrics(max_age_mins: int):
    gateway_url = "http://host.docker.internal:9091"
    response = requests.get(f"{gateway_url}/api/v1/metrics")
    data = response.json()
    now = datetime.now(timezone.utc).timestamp()
    max_age_secs = max_age_mins * 60
    
    for group in data.get('data', []):
        push_time = group.get('push_time_seconds', {}).get('metrics', [{}])[0].get('value')
        if now - float(push_time) > max_age_secs:
            labels = group.get('labels', {})
            delete_url = f"{gateway_url}/metrics/job/{labels.pop('job', 'unknown_job')}"
            for k, v in labels.items():
                if v:
                    delete_url += f"/{k}/{v}"
            requests.delete(delete_url)

@dag(schedule="0 0 * * *", start_date=datetime(2026, 1, 1), catchup=False)
def pushgateway_sweeper():
    clean_stale_metrics(max_age_mins=1440)

sweeper_dag = pushgateway_sweeper()

The Sweeper is mandatory for V2 deployments with run_id in the grouping key.

Step 4: V3 — The Production-Grade Plugin

V2 works for small systems, but at scale (thousands+ runs/day), run_id in the grouping key causes severe series churn. V3 removes run_id entirely, giving you bounded cardinality with no Sweeper:

# airflow/plugins/v3_low_cardinality_plugin.py
class PushgatewayV3LowCardinalityListeners:
    def _get_task_group_key(self, ti):
        """LOW CARDINALITY: No run_id. Each (dag, task) pair has exactly one slot.
        Latest execution overwrites previous — shows current state only."""
        return {
            'dag_id': ti.dag_id,
            'task_id': ti.task_id,
            'instance': self.instance_name,
        }

The key difference: V3's grouping key has no run_id. Each task has exactly one slot in the Pushgateway. The latest execution overwrites the previous one. This means:

  • ✅ Bounded cardinality — no series churn, no Sweeper needed

  • ✅ Retry safe — Gauge overwrites reflect final state

  • ✅ Fast dashboards — Prometheus queries scan a small, fixed set of series

  • ❌ No per-run history — only the latest execution is visible

For per-run history, emit structured JSON logs to an OLAP engine or log aggregator. This is covered in Part 5.

The Plugin Evolution

Plugin run_id Cardinality Sweeper Use Case
V1 No Low No ❌ Anti-pattern (Counters + race condition)
V2 Yes High Yes ⚠️ Small systems only
V3 No Low No ✅ Production-grade

The V3 Production Dashboard

We've added a dedicated dashboard for the V3 plugin: airflow_v3_low_cardinality.json.

Grafana V3 Low Cardinality Dashboard

Unlike the V2 dashboard, this one:

  • Removes the run_id filter: Since metrics are no longer partitioned by run, the dashboard shows the global fleet state.

  • Focuses on "Latest State": The panels show the status of the most recent execution of every task.

  • Improved Performance: Because the number of series is bounded, the dashboard loads instantly even with thousands of historic runs in Prometheus.

Key Takeaways

  • Use os.environ.get() in Airflow listener plugins, never Variable.get().

  • V1 had two problems: missing run_id (race condition) AND Counters (semantic mismatch).

  • V2 fixes both but creates high cardinality — acceptable for small systems, not production-grade at scale.

  • V3 is the production recommendation — removes run_id, bounded cardinality, no Sweeper.

  • 🚨 Golden Rule: NEVER inject run_id into Prometheus at scale. Per-run data belongs in OLAP or structured logs.

References

Batch Workloads Observability

Part 1 of 4

Stop treating batch jobs like long-running services. This series provides a Staff-level blueprint for batch workload observability—covering push-based telemetry, metric granularity, and the architectural divide between monitoring state vs. auditing history.

Up next

Metric Granularity for Batch Workloads

What to Track, Where to Store It, and Why Counters Break

More from this blog

T

TheStaffBlueprint

4 posts

The Staff Blueprint is a shared space for exploring the complex, often messy world of high-level data and software architecture. We document our production-grade strategies and architectural experiments—not as final truths, but as evolving blueprints. Here, we bridge the gap between senior intuition and staff-level clarity by building, failing, and iterating together. Technical excellence is our goal, and the journey (mistakes included) is how we get there.