Blog
Data Observability: How to Know When Your Pipeline Is Lying to You
Ryan Kirsch · November 28, 2025 · 9 min read
A pipeline that fails loudly is the best kind of problem. You get an alert, you fix it, you move on. The dangerous pipeline is the one that succeeds silently while delivering wrong data. Data observability is the discipline of catching that second type before your finance team builds a quarterly report on corrupted numbers.
What Data Observability Actually Means
The term borrows from software observability (logs, metrics, traces) but applies to data assets instead of services. Where software observability asks “is the system behaving correctly,” data observability asks “is the data trustworthy.”
The five pillars that cover most production data problems:
- Freshness -- Is the data as recent as it should be? A table that was supposed to refresh at 6 AM and still shows yesterday's max timestamp at 9 AM is stale.
- Volume -- Did the expected amount of data arrive? A table that normally receives 50K rows per day suddenly receiving 500 or 5 million is a signal something changed upstream.
- Schema -- Did the shape of the data change? A column that disappeared or changed type will break downstream models silently if you do not catch it at ingestion.
- Distribution -- Are values within expected ranges? Null rates spiking, numeric columns going negative, categorical fields gaining unexpected values -- all signs of upstream data quality degradation.
- Lineage -- Which tables depend on which? When something breaks, lineage tells you what downstream assets are affected without manually tracing every dependency.
Most data quality tests cover schema and distribution. Most data teams neglect freshness and volume. Lineage is often absent entirely. A complete observability setup covers all five.
Freshness Monitoring with dbt and Python
dbt has built-in freshness checks for source tables. You declare an expected freshness threshold, and dbt warns or errors when the source timestamp exceeds it:
# sources.yml
sources:
- name: raw_orders
database: raw
schema: public
tables:
- name: orders
loaded_at_field: _ingested_at
freshness:
warn_after: {count: 6, period: hour}
error_after: {count: 12, period: hour}
- name: payments
loaded_at_field: updated_at
freshness:
warn_after: {count: 1, period: hour}
error_after: {count: 4, period: hour}Run dbt source freshness on a schedule and pipe the results to your alerting channel. For custom freshness logic -- checking multiple timestamp columns, or applying different thresholds by business day vs. weekend -- a Python check gives you more control:
import snowflake.connector
from datetime import datetime, timedelta, timezone
import os
def check_table_freshness(
table: str,
timestamp_col: str,
warn_hours: int,
error_hours: int,
) -> dict:
conn = snowflake.connector.connect(
account=os.environ["SNOWFLAKE_ACCOUNT"],
user=os.environ["SNOWFLAKE_USER"],
password=os.environ["SNOWFLAKE_PASSWORD"],
database="ANALYTICS",
schema="PUBLIC",
)
cursor = conn.cursor()
cursor.execute(
f"SELECT MAX({timestamp_col}) FROM {table}"
)
max_ts = cursor.fetchone()[0]
conn.close()
if max_ts is None:
return {"status": "error", "message": f"{table} has no rows"}
now = datetime.now(timezone.utc)
age_hours = (now - max_ts).total_seconds() / 3600
if age_hours > error_hours:
return {
"status": "error",
"table": table,
"age_hours": round(age_hours, 1),
"message": f"Data is {age_hours:.1f}h old (error threshold: {error_hours}h)"
}
elif age_hours > warn_hours:
return {
"status": "warn",
"table": table,
"age_hours": round(age_hours, 1),
"message": f"Data is {age_hours:.1f}h old (warn threshold: {warn_hours}h)"
}
return {"status": "ok", "table": table, "age_hours": round(age_hours, 1)}Volume Anomaly Detection
Fixed thresholds break quickly as business grows. A table that receives 50K rows per day in January might receive 200K per day in December. The better approach is statistical anomaly detection against a rolling baseline.
import numpy as np
from typing import List
def detect_volume_anomaly(
recent_counts: List[int],
today_count: int,
z_threshold: float = 3.0,
) -> dict:
"""
Flag today's row count if it's more than z_threshold
standard deviations from the rolling mean.
recent_counts: list of daily row counts (last 14-30 days)
today_count: today's row count
z_threshold: how many std devs = anomaly (3.0 = ~0.3% false positive rate)
"""
mean = np.mean(recent_counts)
std = np.std(recent_counts)
if std == 0:
# No variance in history -- exact match check
is_anomaly = today_count != mean
return {
"is_anomaly": is_anomaly,
"today": today_count,
"expected_mean": mean,
"z_score": None,
}
z_score = (today_count - mean) / std
is_anomaly = abs(z_score) > z_threshold
direction = "high" if z_score > 0 else "low"
return {
"is_anomaly": is_anomaly,
"today": today_count,
"expected_mean": round(mean),
"std": round(std),
"z_score": round(z_score, 2),
"direction": direction if is_anomaly else None,
"message": (
f"Volume anomaly: {today_count:,} rows ({direction} by {abs(z_score):.1f} std devs)"
if is_anomaly else "Volume normal"
),
}Integrate this into a Dagster asset that runs after each pipeline load, reads the last 30 days of row counts from a metadata table, and fires a Slack alert on anomaly. The key parameters to tune are window size (30 days works for stable tables, 7 days for volatile ones) and the Z threshold (3.0 is conservative -- 2.0 is more sensitive but noisier).
Schema Change Detection at Ingestion
Schema changes from upstream sources are one of the most common causes of silent data failures. A source API removes a field, a database column gets renamed, a JSON payload adds a new nested object -- if you do not catch these at ingestion, they propagate through your entire transformation layer.
The pattern is to store expected schemas in a registry and diff against actual schemas at ingestion time:
from pydantic import BaseModel
from typing import Optional, Dict, Any
import json
class ColumnSpec(BaseModel):
name: str
dtype: str
nullable: bool = True
class SchemaRegistry:
"""Stores expected schemas and diffs against incoming data."""
def __init__(self, schema_store_path: str):
self.path = schema_store_path
self._load()
def _load(self):
try:
with open(self.path) as f:
self.store: Dict[str, list] = json.load(f)
except FileNotFoundError:
self.store = {}
def register(self, table: str, columns: list[ColumnSpec]):
self.store[table] = [c.model_dump() for c in columns]
with open(self.path, "w") as f:
json.dump(self.store, f, indent=2)
def diff(self, table: str, actual_columns: list[str]) -> dict:
if table not in self.store:
return {"status": "unregistered", "table": table}
expected = {c["name"] for c in self.store[table]}
actual = set(actual_columns)
added = actual - expected
removed = expected - actual
if added or removed:
return {
"status": "schema_changed",
"table": table,
"added": sorted(added),
"removed": sorted(removed),
}
return {"status": "ok", "table": table}Run this diff before every ingestion job. On schema change, pause ingestion and alert -- do not attempt to load data with an unknown schema into a typed destination. The cost of a missed load is a pipeline delay. The cost of loading malformed data is corrupted downstream models that are much harder to identify and remediate.
Lineage: Knowing What Breaks When Something Breaks
dbt builds a lineage graph automatically from ref() and source() calls. This is one of the most underutilized features in a dbt project. The lineage graph tells you, for any given table, every downstream model that depends on it.
When an upstream source fails, instead of scrambling to identify impact, you query the lineage:
# Query dbt's manifest.json for downstream dependencies
import json
from pathlib import Path
def find_downstream(
manifest_path: str,
target_node: str, # e.g. "source.myproject.raw_orders.orders"
) -> list[str]:
with open(manifest_path) as f:
manifest = json.load(f)
# Build a parent -> children index
children: dict[str, list[str]] = {}
for node_id, node in manifest.get("nodes", {}).items():
for parent in node.get("depends_on", {}).get("nodes", []):
children.setdefault(parent, []).append(node_id)
# BFS from target node
visited = set()
queue = [target_node]
while queue:
current = queue.pop(0)
if current in visited:
continue
visited.add(current)
for child in children.get(current, []):
queue.append(child)
visited.discard(target_node)
return sorted(visited)
# Usage
affected = find_downstream(
"target/manifest.json",
"source.myproject.raw_orders.orders"
)
print(f"{len(affected)} downstream models affected:"
for m in affected:
print(f" {m}")Integrate this into your incident response. When a source fails, automatically compute the blast radius and include it in your alert. “Source raw_orders.orders is stale -- 14 downstream models affected, including gold.monthly_revenue” is far more actionable than “pipeline failure.”
Choosing a Data Observability Tool
The commercial observability landscape has matured significantly. The main options:
- Monte Carlo -- The most established platform. ML-based anomaly detection, automated lineage, strong dbt and Snowflake integration. Expensive at scale. Best for larger teams with a data reliability mandate.
- Anomalo -- Strong on automatic anomaly detection without manual threshold configuration. Good fit for teams that want ML-driven quality without full Monte Carlo pricing.
- Elementary (open source) -- dbt-native observability. Runs entirely within your dbt project, stores results in your warehouse, and generates an HTML report. Zero additional infrastructure for teams already on dbt.
- Great Expectations -- Explicit, code-defined expectations rather than automatic anomaly detection. Higher engineering effort but more precise control. Best when you have specific domain knowledge to encode.
- dbt tests + custom Python -- The DIY path. Sufficient for smaller data platforms, expensive to maintain at scale. Good starting point before committing to a platform.
The right choice depends on team size and budget. Under 5 data engineers: Elementary + dbt tests covers most ground at zero cost. 5-20 engineers: Anomalo or Elementary with custom Python layers. 20+ engineers or a data reliability product mandate: Monte Carlo.
The Practical Starting Point
If you have no observability today, do not try to implement all five pillars at once. Start with the two that catch the most common production failures:
First: Add freshness checks to your top 10 most-queried tables. Configure dbt source freshness or write simple Python checks. Set error thresholds at 2x the expected latency. Run them on a schedule before business hours so problems surface before stakeholders open dashboards.
Second: Add volume tracking to your fact tables. Store daily row counts in a metadata table, compute a 30-day rolling baseline, and alert when today's count is outside 3 standard deviations. This catches the majority of upstream data issues -- missed loads, truncated feeds, duplicate ingestion -- without requiring ML infrastructure.
Schema change detection and distribution checks come next. Lineage is often already available from dbt -- the work is wiring it into your alerting so it surfaces automatically.
The goal is not a perfect observability system on day one. The goal is to stop finding out about data problems from stakeholders. Even partial observability -- freshness and volume alone -- cuts the time-to-detection dramatically. Build from there.
Ryan Kirsch
Senior Data Engineer with experience building production pipelines at scale. Works with dbt, Snowflake, and Dagster, and writes about data engineering patterns from production experience. See his full portfolio.