Building the Stack — Plugin, StatsD, Sweeper, and Grafana
Step-by-step implementation of an Airflow metrics plugin using Gauges, configuring StatsD exporter, and deploying a Sweeper DAG to prevent Pushgateway OOMs.

(Part 4 of the Batch Workloads Observability series. Read Part 3: Metric Granularity for the metric classification rationale.)
(Clone the companion repo: TheStaffBlueprint/batch-workloads-observability for the full docker-compose stack, plugins, and Sweeper DAG.)
In Parts 2 and 3, we established the architecture and metric classification: StatsD for aggregate counts, Pushgateway with Gauges for per-run state, and structured logs/OLAP for audit data. Today, we're building all of it.
Step 1: StatsD — Zero Custom Code
Add these lines to your Airflow environment:
[metrics]
statsd_on = True
statsd_host = statsd-exporter
statsd_port = 8125
statsd_prefix = airflow
Airflow natively pushes ti.finish counts, dag.duration timers, and dozens of other operational metrics over UDP. The StatsD exporter aggregates these and exposes them to Prometheus as clean, low-cardinality metrics. See statsd_mapping.yml in the companion repo for the full mapping configuration.
Step 2: The Pushgateway Plugin — V1 to V2 Evolution
The V1 Anti-Pattern (Two Compounding Mistakes)
Mistake 1 — No run isolation. V1 used a coarse grouping key (dag_id + task_id) with no run_id. Every execution of the same task overwrites the previous one. If 10 parallel tasks push at the same time, only the last survives. This is a race condition caused by the grouping key design, not the metric type — it would happen with Gauges too.
# V1 ANTI-PATTERN: Coarse key — no run_id
def _get_task_group_key(self, ti):
return {
'dag_id': ti.dag_id,
'task_id': ti.task_id,
'instance': self.instance_name,
}
Mistake 2 — Using Counters for state. Even with run_id added, Counters are semantically wrong. If a task fails then retries successfully, both failure_total = 1 and success_total = 1 persist. Failure count is permanently inflated. Both Counters and Gauges with run_id produce identical cardinality.
The V2 Fix (Two Corrections)
# airflow/plugins/v2_gauge_fix_plugin.py
from prometheus_client import CollectorRegistry, Gauge, pushadd_to_gateway
class PushgatewayV2GaugeListeners:
def __init__(self):
self.instance_name = os.environ.get("AIRFLOW_VAR_PROMETHEUS_INSTANCE_NAME", "airflow-local")
def _push(self, registry, job_name, group_key):
# THIS is the key: Read from environment to avoid SQLAlchemy session detachment
enabled_str = os.environ.get("AIRFLOW_VAR_PROMETHEUS_METRICS_ENABLED", "true").lower()
push_gateway_url = os.environ.get("AIRFLOW_VAR_PUSHGATEWAY_URL", "http://host.docker.internal:9091")
if enabled_str != "true" or not push_gateway_url:
return
try:
pushadd_to_gateway(push_gateway_url, job=job_name, grouping_key=group_key, registry=registry)
except Exception as e:
logging.error(f"Failed to push metric: {e}")
The SQLAlchemy Trap
We use os.environ.get instead of Airflow's Variable.get(). When the Scheduler fires a listener hook, it often does so outside an active database session. Using Variable.get() throws a DetachedInstanceError and crashes the scheduler loop.
Fix 1 — Run Isolation via Grouping Key
def _get_task_group_key(self, ti):
key = {
'dag_id': ti.dag_id,
'task_id': ti.task_id,
'run_id': ti.run_id, # Isolates each DAG run
'instance': self.instance_name,
}
map_index = getattr(ti, 'map_index', -1)
if map_index >= 0:
key['map_index'] = str(map_index) # Isolates mapped tasks
return key
Fix 2 — Gauges for Semantic Correctness
Gauges represent state as an absolute value. On retry, the Gauge overwrites from -1 to 1 — only the final state is visible:
@hookimpl
def on_task_instance_success(self, previous_state: TaskInstanceState, task_instance: TaskInstance):
registry = CollectorRegistry()
group_key = self._get_task_group_key(task_instance)
g = Gauge('task_status', 'Task state snapshot (1=success, -1=failed)', registry=registry)
g.set(1) # 1 = success (overwrites any previous -1 from a failed attempt)
self._push(registry, group_key)
Step 3: The Sweeper DAG — Preventing OOM Crashes
Pushgateway has no native TTL. Every run_id creates a metric group held in memory forever. The Sweeper runs every 24 hours and deletes anything older than the threshold:
# airflow/dags/pushgateway_sweeper.py
from datetime import datetime, timezone
import requests
from airflow.decorators import dag, task
@task
def clean_stale_metrics(max_age_mins: int):
gateway_url = "http://host.docker.internal:9091"
response = requests.get(f"{gateway_url}/api/v1/metrics")
data = response.json()
now = datetime.now(timezone.utc).timestamp()
max_age_secs = max_age_mins * 60
for group in data.get('data', []):
push_time = group.get('push_time_seconds', {}).get('metrics', [{}])[0].get('value')
if now - float(push_time) > max_age_secs:
labels = group.get('labels', {})
delete_url = f"{gateway_url}/metrics/job/{labels.pop('job', 'unknown_job')}"
for k, v in labels.items():
if v:
delete_url += f"/{k}/{v}"
requests.delete(delete_url)
@dag(schedule="0 0 * * *", start_date=datetime(2026, 1, 1), catchup=False)
def pushgateway_sweeper():
clean_stale_metrics(max_age_mins=1440)
sweeper_dag = pushgateway_sweeper()
The Sweeper is mandatory for V2 deployments with run_id in the grouping key.
Step 4: V3 — The Production-Grade Plugin
V2 works for small systems, but at scale (thousands+ runs/day), run_id in the grouping key causes severe series churn. V3 removes run_id entirely, giving you bounded cardinality with no Sweeper:
# airflow/plugins/v3_low_cardinality_plugin.py
class PushgatewayV3LowCardinalityListeners:
def _get_task_group_key(self, ti):
"""LOW CARDINALITY: No run_id. Each (dag, task) pair has exactly one slot.
Latest execution overwrites previous — shows current state only."""
return {
'dag_id': ti.dag_id,
'task_id': ti.task_id,
'instance': self.instance_name,
}
The key difference: V3's grouping key has no run_id. Each task has exactly one slot in the Pushgateway. The latest execution overwrites the previous one. This means:
✅ Bounded cardinality — no series churn, no Sweeper needed
✅ Retry safe — Gauge overwrites reflect final state
✅ Fast dashboards — Prometheus queries scan a small, fixed set of series
❌ No per-run history — only the latest execution is visible
For per-run history, emit structured JSON logs to an OLAP engine or log aggregator. This is covered in Part 5.
The Plugin Evolution
| Plugin | run_id |
Cardinality | Sweeper | Use Case |
|---|---|---|---|---|
| V1 | No | Low | No | ❌ Anti-pattern (Counters + race condition) |
| V2 | Yes | High | Yes | ⚠️ Small systems only |
| V3 | No | Low | No | ✅ Production-grade |
The V3 Production Dashboard
We've added a dedicated dashboard for the V3 plugin: airflow_v3_low_cardinality.json.
Unlike the V2 dashboard, this one:
Removes the
run_idfilter: Since metrics are no longer partitioned by run, the dashboard shows the global fleet state.Focuses on "Latest State": The panels show the status of the most recent execution of every task.
Improved Performance: Because the number of series is bounded, the dashboard loads instantly even with thousands of historic runs in Prometheus.
Key Takeaways
Use
os.environ.get()in Airflow listener plugins, neverVariable.get().V1 had two problems: missing
run_id(race condition) AND Counters (semantic mismatch).V2 fixes both but creates high cardinality — acceptable for small systems, not production-grade at scale.
V3 is the production recommendation — removes
run_id, bounded cardinality, no Sweeper.🚨 Golden Rule: NEVER inject
run_idinto Prometheus at scale. Per-run data belongs in OLAP or structured logs.




