Testing and Operating the Telemetry Pipeline

Telemetry is production code. In our application so far, the agent can run, create traces, attach sessions and users, record scores, use managed prompts, and run dataset experiments. That is useful, but it is still easy to break with a normal refactor.

This chapter creates regression tests for the telemetry contract. The goal is not to test Langfuse itself. The goal is to protect the behavior our application depends on: parent-child span shape, bounded attributes, privacy defaults, Langfuse correlation fields, score names, and the dataset runner.

What we will change

This chapter updates the test suite, not the runtime agent:

FileWhat to do
tests/conftest.pyCreate a shared in-memory OpenTelemetry exporter fixture.
tests/test_graph.pyCreate the parent-child span shape assertions.
tests/test_privacy.pyCreate the content-capture invariants.
tests/test_telemetry.pyCreate the Langfuse context assertions.
tests/test_scores.pyCreate the score contract assertions.

The tests should not call configure_tracing(). That function installs the production OTLP exporter and tries to set the global OpenTelemetry provider. In tests, install one in-memory provider for the whole pytest process and clear its spans between tests.

Create the test tracer fixture

Create tests/conftest.py with this content:

import pytest
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

from agent_observability.telemetry import LangfuseBaggageSpanProcessor


@pytest.fixture(scope="session")
def telemetry_exporter() -> InMemorySpanExporter:
    exporter = InMemorySpanExporter()
    provider = TracerProvider()
    provider.add_span_processor(LangfuseBaggageSpanProcessor())
    provider.add_span_processor(SimpleSpanProcessor(exporter))
    trace.set_tracer_provider(provider)
    yield exporter
    provider.shutdown()


@pytest.fixture
def span_exporter(telemetry_exporter: InMemorySpanExporter) -> InMemorySpanExporter:
    telemetry_exporter.clear()
    yield telemetry_exporter
    telemetry_exporter.clear()

This fixture gives every test access to finished spans without sending anything to the Collector or Langfuse. The LangfuseBaggageSpanProcessor is included because Chapters 18 and 19 rely on baggage values becoming span attributes.

If you see Overriding of current TracerProvider is not allowed, one of the tests or imported modules configured tracing before this fixture ran. In this demo, keep configure_tracing() in manual scripts and out of unit tests.

Create the graph structure test

Open tests/test_graph.py and add this test:

from typing import Any

import agent_observability.graph as graph_module


def test_order_status_trace_keeps_parent_child_shape(
    span_exporter,
    monkeypatch,
) -> None:
    def fake_prompt(
        context: dict[str, Any],
        *,
        label: str = "production",
    ) -> tuple[list[dict[str, str]], dict[str, Any]]:
        return (
            [{"role": "user", "content": context["query"]}],
            {
                "prompt_name": "order-status-answer",
                "prompt_version": 1,
                "prompt_label": label,
                "prompt_source": "test",
            },
        )

    def fake_generate_answer(
        *,
        instructions: str,
        input_items: list[dict[str, str]],
        prompt_metadata: dict[str, Any],
    ) -> str:
        return "Your order is in transit."

    monkeypatch.setattr(graph_module, "get_order_status_prompt", fake_prompt)
    monkeypatch.setattr(graph_module, "generate_answer", fake_generate_answer)

    result = graph_module.run_agent(
        {
            "query": "Where is my delayed EU order?",
            "conversation_id": "conv_test_001",
            "order_reference": "ORDER-123EU",
            "region": "eu",
        }
    )

    spans = span_exporter.get_finished_spans()
    by_name = {span.name: span for span in spans}

    root = by_name["invoke_agent order-status"]
    fetch_node = by_name["workflow.node fetch_order"]
    tool = by_name["execute_tool fetch_order_status"]

    assert result["outcome"] == "answer"
    assert fetch_node.context.trace_id == root.context.trace_id
    assert fetch_node.parent.span_id == root.context.span_id
    assert tool.context.trace_id == root.context.trace_id
    assert tool.parent.span_id == fetch_node.context.span_id
    assert tool.attributes["gen_ai.operation.name"] == "execute_tool"
    assert tool.attributes["app.tool.side_effect"] == "read"
    assert tool.attributes["app.tool.authorization"] == "allowed"

The model call and prompt fetch are stubbed because this test is about trace shape, not OpenAI or Langfuse availability. A unit test that reaches the network is usually a flaky integration test in disguise.

Create the privacy invariant test

Open tests/test_privacy.py and add these tests:

from agent_observability.capture import CapturePolicy, project_content_for_telemetry


FORBIDDEN = {
    "person@example.invalid",
    "ORDER-CANARY-0001",
}


def test_metadata_only_capture_does_not_export_content() -> None:
    projected = project_content_for_telemetry(
        value="Email person@example.invalid about ORDER-CANARY-0001",
        policy=CapturePolicy(
            mode="metadata_only",
            policy_version="capture-policy-1",
        ),
    )

    serialized = repr(projected)

    assert projected == {
        "capture_mode": "metadata_only",
        "capture_policy_version": "capture-policy-1",
    }
    for value in FORBIDDEN:
        assert value not in serialized


def test_redacted_capture_removes_forbidden_patterns() -> None:
    projected = project_content_for_telemetry(
        value="Email person@example.invalid about ORDER-CANARY-0001",
        policy=CapturePolicy(
            mode="redacted",
            policy_version="capture-policy-1",
        ),
    )

    serialized = repr(projected)

    assert projected["capture_mode"] == "redacted"
    assert "redacted_preview" in projected
    for value in FORBIDDEN:
        assert value not in serialized
    assert "ORDER-" not in serialized

These are small tests, but they protect important rules from Chapters 8, 14, 15, and 17: content is not exported by default, and redacted mode removes forbidden patterns. They also keep the test independent of Langfuse and the Collector.

Create the telemetry test

Open tests/test_telemetry.py and add this test:

from agent_observability.langfuse_context import langfuse_trace_context
from agent_observability.telemetry import agent_task_span


def test_langfuse_context_reaches_root_span(span_exporter) -> None:
    with langfuse_trace_context(
        session_id="conv_test_001",
        user_id="usr_test_001",
        trace_name="manual-prompt-management",
        version="test",
        tags=("order-status", "test"),
        metadata={
            "environment": "development",
            "workflow": "order-status",
            "region": "eu",
        },
    ):
        with agent_task_span("order-status", "conv_test_001"):
            pass

    root = next(
        span
        for span in span_exporter.get_finished_spans()
        if span.name == "invoke_agent order-status"
    )

    assert root.attributes["langfuse.session.id"] == "conv_test_001"
    assert root.attributes["langfuse.user.id"] == "usr_test_001"
    assert root.attributes["langfuse.trace.name"] == "manual-prompt-management"
    assert root.attributes["langfuse.version"] == "test"
    assert root.attributes["langfuse.trace.metadata.workflow"] == "order-status"
    assert tuple(root.attributes["langfuse.trace.tags"]) == ("order-status", "test")

The tuple conversion is intentional. The OpenTelemetry SDK can store sequence attributes as tuples on finished spans even when the application originally set a list.

This test protects the work from Chapters 18 and 19. If it fails, Langfuse can still receive spans, but the product views for Sessions, Users, prompt versions, and trace tags will become harder to use.

Create the score contract tests

Create tests/test_scores.py with this content:

from agent_observability import scores
from agent_observability.feedback import build_user_feedback_event


class FakeLangfuse:
    def __init__(self) -> None:
        self.calls: list[dict[str, object]] = []

    def create_score(self, **kwargs: object) -> None:
        self.calls.append(kwargs)


def test_user_feedback_score_is_bounded(monkeypatch) -> None:
    fake = FakeLangfuse()
    monkeypatch.setattr(scores, "get_langfuse", lambda: fake)
    event = build_user_feedback_event(
        interaction_id="interaction_demo_001",
        value="negative",
        agent_version="local",
    )

    scores.record_user_feedback_score(trace_id="trace_123", event=event)

    assert fake.calls == [
        {
            "name": "user_feedback",
            "value": "negative",
            "trace_id": "trace_123",
            "score_id": "trace_123:user_feedback:interaction_demo_001",
            "data_type": "CATEGORICAL",
            "comment": "rubric=user-feedback-v1",
            "environment": scores.settings.deployment_environment,
            "metadata": {
                "interaction_id": "interaction_demo_001",
                "feedback_type": "user_rating",
                "agent_version": "local",
            },
        }
    ]


def test_boolean_score_uses_numeric_value(monkeypatch) -> None:
    fake = FakeLangfuse()
    monkeypatch.setattr(scores, "get_langfuse", lambda: fake)

    scores.record_answer_correctness_score(trace_id="trace_123", is_correct=False)

    assert fake.calls[0]["name"] == "answer_correctness"
    assert fake.calls[0]["value"] == 0
    assert fake.calls[0]["data_type"] == "BOOLEAN"
    assert fake.calls[0]["environment"] == scores.settings.deployment_environment


def test_session_resolution_rejects_unknown_values() -> None:
    try:
        scores.record_session_resolution_score(
            session_id="session_123",
            outcome="kind_of_resolved",
        )
    except ValueError as exc:
        assert str(exc) == "invalid session outcome"
    else:
        raise AssertionError("expected invalid session outcome")

When you add a new score name, add the allowed values and metadata expectations there before trusting the Langfuse Scores view.

Run the test suite

Run the tests from the demo project:

PYTHONPATH=src pytest -q

Expected result:

.......                                                                                [100%]
7 passed in 0.61s

The seven tests come from one graph structure test, two privacy tests, one Langfuse context test, and three score contract tests. Now the test suite fails when telemetry parentage, privacy defaults, Langfuse context, or score contracts drift. This kind of regression test is the only way to protect the telemetry pipeline from accidental breakage.

Validating the Collector configuration and end-to-end path

The unit tests above never touch the Collector or Langfuse. That is intentional: they should be fast and deterministic. After they pass, run one end-to-end check through the same local path used by the manual scenarios.

First validate the Collector configuration from the demo project root:

docker run --rm \
  -v "$PWD/collector-config.yaml:/etc/otelcol/config.yaml:ro" \
  otel/opentelemetry-collector-contrib:0.151.0 \
  validate --config=/etc/otelcol/config.yaml

Expected result:

Config validation succeeded

Then start the local Collector:

docker compose up -d otel-collector
docker compose logs -f otel-collector

Keep the logs open in one terminal. In another terminal, send one real trace through the pipeline:

PYTHONPATH=src python -m agent_observability.manual_scenarios prompt-management

The command should print:

answer

In the Collector logs, look for a received/exported trace entry from the debug exporter. In Langfuse, open Tracing and find the latest manual-prompt-management trace.

Check these fields on the trace:

FieldExpected value
Trace namemanual-prompt-management
Session IDstarts with conv_
User IDusr_pseudo_prompt_demo
Environmentdevelopment
Tagsorder-status, prompt-management, manual-scenario

Then inspect the graph. It should include the order-status workflow nodes and the OpenAI generation span. It should not include raw memory values, retrieved policy text, or full graph state as span attributes.

If the command prints answer but no trace appears in Langfuse, debug the path in this order:

  1. docker compose ps confirms otel-collector is running.
  2. .env contains LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, and LANGFUSE_COLLECTOR_BASE_URL.
  3. OTEL_EXPORTER_OTLP_TRACES_ENDPOINT points to http://localhost:4318/v1/traces.
  4. Collector logs do not show authentication or exporter errors.
  5. Langfuse is filtering the same environment and time window as the new trace.

Monitor the monitoring path

The local check proves one trace can move through the path. In production, the telemetry pipeline needs its own health checks. At minimum, track:

  • SDK export failures and queue saturation.
  • Collector accepted, refused, dropped, retried, and exported spans.
  • Collector memory limiter events.
  • Backend ingestion latency and errors.
  • The difference between billable provider calls and observed model-call spans.

That last check catches silent instrumentation gaps. If OpenAI usage reports show 1,000 successful calls and Langfuse shows 700 inference spans for the same window, treat it as a telemetry incident until sampling or loss explains the gap.

What should exist before we go to Chapter 24

At this point the demo should have:

  • tests/conftest.py with an in-memory OpenTelemetry exporter;
  • real tests in tests/test_graph.py, tests/test_privacy.py, and tests/test_telemetry.py;
  • score contract tests in tests/test_scores.py;
  • a passing PYTHONPATH=src pytest -q;
  • a validated Collector configuration;
  • one manual Langfuse run used to confirm the end-to-end export path.

Chapter 24 turns these tested signals into dashboards, alerts, release gates, and runbooks.

References


Next up: Ch 24 - Dashboards, Alerts, Release Gates, and Runbooks turns the telemetry into operational decisions.