Testing and Operating the Telemetry Pipeline
Telemetry is production code. In our application so far, the agent can run, create traces, attach sessions and users, record scores, use managed prompts, and run dataset experiments. That is useful, but it is still easy to break with a normal refactor.
This chapter creates regression tests for the telemetry contract. The goal is not to test Langfuse itself. The goal is to protect the behavior our application depends on: parent-child span shape, bounded attributes, privacy defaults, Langfuse correlation fields, score names, and the dataset runner.
What we will change
This chapter updates the test suite, not the runtime agent:
| File | What to do |
|---|---|
tests/conftest.py | Create a shared in-memory OpenTelemetry exporter fixture. |
tests/test_graph.py | Create the parent-child span shape assertions. |
tests/test_privacy.py | Create the content-capture invariants. |
tests/test_telemetry.py | Create the Langfuse context assertions. |
tests/test_scores.py | Create the score contract assertions. |
The tests should not call configure_tracing(). That function installs the production OTLP exporter and tries to set the global OpenTelemetry provider. In tests, install one in-memory provider for the whole pytest process and clear its spans between tests.
Create the test tracer fixture
Create tests/conftest.py with this content:
import pytest
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from agent_observability.telemetry import LangfuseBaggageSpanProcessor
@pytest.fixture(scope="session")
def telemetry_exporter() -> InMemorySpanExporter:
exporter = InMemorySpanExporter()
provider = TracerProvider()
provider.add_span_processor(LangfuseBaggageSpanProcessor())
provider.add_span_processor(SimpleSpanProcessor(exporter))
trace.set_tracer_provider(provider)
yield exporter
provider.shutdown()
@pytest.fixture
def span_exporter(telemetry_exporter: InMemorySpanExporter) -> InMemorySpanExporter:
telemetry_exporter.clear()
yield telemetry_exporter
telemetry_exporter.clear()
This fixture gives every test access to finished spans without sending anything to the Collector or Langfuse. The LangfuseBaggageSpanProcessor is included because Chapters 18 and 19 rely on baggage values becoming span attributes.
If you see Overriding of current TracerProvider is not allowed, one of the tests or imported modules configured tracing before this fixture ran. In this demo, keep configure_tracing() in manual scripts and out of unit tests.
Create the graph structure test
Open tests/test_graph.py and add this test:
from typing import Any
import agent_observability.graph as graph_module
def test_order_status_trace_keeps_parent_child_shape(
span_exporter,
monkeypatch,
) -> None:
def fake_prompt(
context: dict[str, Any],
*,
label: str = "production",
) -> tuple[list[dict[str, str]], dict[str, Any]]:
return (
[{"role": "user", "content": context["query"]}],
{
"prompt_name": "order-status-answer",
"prompt_version": 1,
"prompt_label": label,
"prompt_source": "test",
},
)
def fake_generate_answer(
*,
instructions: str,
input_items: list[dict[str, str]],
prompt_metadata: dict[str, Any],
) -> str:
return "Your order is in transit."
monkeypatch.setattr(graph_module, "get_order_status_prompt", fake_prompt)
monkeypatch.setattr(graph_module, "generate_answer", fake_generate_answer)
result = graph_module.run_agent(
{
"query": "Where is my delayed EU order?",
"conversation_id": "conv_test_001",
"order_reference": "ORDER-123EU",
"region": "eu",
}
)
spans = span_exporter.get_finished_spans()
by_name = {span.name: span for span in spans}
root = by_name["invoke_agent order-status"]
fetch_node = by_name["workflow.node fetch_order"]
tool = by_name["execute_tool fetch_order_status"]
assert result["outcome"] == "answer"
assert fetch_node.context.trace_id == root.context.trace_id
assert fetch_node.parent.span_id == root.context.span_id
assert tool.context.trace_id == root.context.trace_id
assert tool.parent.span_id == fetch_node.context.span_id
assert tool.attributes["gen_ai.operation.name"] == "execute_tool"
assert tool.attributes["app.tool.side_effect"] == "read"
assert tool.attributes["app.tool.authorization"] == "allowed"
The model call and prompt fetch are stubbed because this test is about trace shape, not OpenAI or Langfuse availability. A unit test that reaches the network is usually a flaky integration test in disguise.
Create the privacy invariant test
Open tests/test_privacy.py and add these tests:
from agent_observability.capture import CapturePolicy, project_content_for_telemetry
FORBIDDEN = {
"person@example.invalid",
"ORDER-CANARY-0001",
}
def test_metadata_only_capture_does_not_export_content() -> None:
projected = project_content_for_telemetry(
value="Email person@example.invalid about ORDER-CANARY-0001",
policy=CapturePolicy(
mode="metadata_only",
policy_version="capture-policy-1",
),
)
serialized = repr(projected)
assert projected == {
"capture_mode": "metadata_only",
"capture_policy_version": "capture-policy-1",
}
for value in FORBIDDEN:
assert value not in serialized
def test_redacted_capture_removes_forbidden_patterns() -> None:
projected = project_content_for_telemetry(
value="Email person@example.invalid about ORDER-CANARY-0001",
policy=CapturePolicy(
mode="redacted",
policy_version="capture-policy-1",
),
)
serialized = repr(projected)
assert projected["capture_mode"] == "redacted"
assert "redacted_preview" in projected
for value in FORBIDDEN:
assert value not in serialized
assert "ORDER-" not in serialized
These are small tests, but they protect important rules from Chapters 8, 14, 15, and 17: content is not exported by default, and redacted mode removes forbidden patterns. They also keep the test independent of Langfuse and the Collector.
Create the telemetry test
Open tests/test_telemetry.py and add this test:
from agent_observability.langfuse_context import langfuse_trace_context
from agent_observability.telemetry import agent_task_span
def test_langfuse_context_reaches_root_span(span_exporter) -> None:
with langfuse_trace_context(
session_id="conv_test_001",
user_id="usr_test_001",
trace_name="manual-prompt-management",
version="test",
tags=("order-status", "test"),
metadata={
"environment": "development",
"workflow": "order-status",
"region": "eu",
},
):
with agent_task_span("order-status", "conv_test_001"):
pass
root = next(
span
for span in span_exporter.get_finished_spans()
if span.name == "invoke_agent order-status"
)
assert root.attributes["langfuse.session.id"] == "conv_test_001"
assert root.attributes["langfuse.user.id"] == "usr_test_001"
assert root.attributes["langfuse.trace.name"] == "manual-prompt-management"
assert root.attributes["langfuse.version"] == "test"
assert root.attributes["langfuse.trace.metadata.workflow"] == "order-status"
assert tuple(root.attributes["langfuse.trace.tags"]) == ("order-status", "test")
The tuple conversion is intentional. The OpenTelemetry SDK can store sequence attributes as tuples on finished spans even when the application originally set a list.
This test protects the work from Chapters 18 and 19. If it fails, Langfuse can still receive spans, but the product views for Sessions, Users, prompt versions, and trace tags will become harder to use.
Create the score contract tests
Create tests/test_scores.py with this content:
from agent_observability import scores
from agent_observability.feedback import build_user_feedback_event
class FakeLangfuse:
def __init__(self) -> None:
self.calls: list[dict[str, object]] = []
def create_score(self, **kwargs: object) -> None:
self.calls.append(kwargs)
def test_user_feedback_score_is_bounded(monkeypatch) -> None:
fake = FakeLangfuse()
monkeypatch.setattr(scores, "get_langfuse", lambda: fake)
event = build_user_feedback_event(
interaction_id="interaction_demo_001",
value="negative",
agent_version="local",
)
scores.record_user_feedback_score(trace_id="trace_123", event=event)
assert fake.calls == [
{
"name": "user_feedback",
"value": "negative",
"trace_id": "trace_123",
"score_id": "trace_123:user_feedback:interaction_demo_001",
"data_type": "CATEGORICAL",
"comment": "rubric=user-feedback-v1",
"environment": scores.settings.deployment_environment,
"metadata": {
"interaction_id": "interaction_demo_001",
"feedback_type": "user_rating",
"agent_version": "local",
},
}
]
def test_boolean_score_uses_numeric_value(monkeypatch) -> None:
fake = FakeLangfuse()
monkeypatch.setattr(scores, "get_langfuse", lambda: fake)
scores.record_answer_correctness_score(trace_id="trace_123", is_correct=False)
assert fake.calls[0]["name"] == "answer_correctness"
assert fake.calls[0]["value"] == 0
assert fake.calls[0]["data_type"] == "BOOLEAN"
assert fake.calls[0]["environment"] == scores.settings.deployment_environment
def test_session_resolution_rejects_unknown_values() -> None:
try:
scores.record_session_resolution_score(
session_id="session_123",
outcome="kind_of_resolved",
)
except ValueError as exc:
assert str(exc) == "invalid session outcome"
else:
raise AssertionError("expected invalid session outcome")
When you add a new score name, add the allowed values and metadata expectations there before trusting the Langfuse Scores view.
Run the test suite
Run the tests from the demo project:
PYTHONPATH=src pytest -q
Expected result:
....... [100%]
7 passed in 0.61s
The seven tests come from one graph structure test, two privacy tests, one Langfuse context test, and three score contract tests. Now the test suite fails when telemetry parentage, privacy defaults, Langfuse context, or score contracts drift. This kind of regression test is the only way to protect the telemetry pipeline from accidental breakage.
Validating the Collector configuration and end-to-end path
The unit tests above never touch the Collector or Langfuse. That is intentional: they should be fast and deterministic. After they pass, run one end-to-end check through the same local path used by the manual scenarios.
First validate the Collector configuration from the demo project root:
docker run --rm \
-v "$PWD/collector-config.yaml:/etc/otelcol/config.yaml:ro" \
otel/opentelemetry-collector-contrib:0.151.0 \
validate --config=/etc/otelcol/config.yaml
Expected result:
Config validation succeeded
Then start the local Collector:
docker compose up -d otel-collector
docker compose logs -f otel-collector
Keep the logs open in one terminal. In another terminal, send one real trace through the pipeline:
PYTHONPATH=src python -m agent_observability.manual_scenarios prompt-management
The command should print:
answer
In the Collector logs, look for a received/exported trace entry from the debug exporter. In Langfuse, open Tracing and find the latest manual-prompt-management trace.
Check these fields on the trace:
| Field | Expected value |
|---|---|
| Trace name | manual-prompt-management |
| Session ID | starts with conv_ |
| User ID | usr_pseudo_prompt_demo |
| Environment | development |
| Tags | order-status, prompt-management, manual-scenario |
Then inspect the graph. It should include the order-status workflow nodes and the OpenAI generation span. It should not include raw memory values, retrieved policy text, or full graph state as span attributes.
If the command prints answer but no trace appears in Langfuse, debug the path in this order:
docker compose psconfirmsotel-collectoris running..envcontainsLANGFUSE_PUBLIC_KEY,LANGFUSE_SECRET_KEY, andLANGFUSE_COLLECTOR_BASE_URL.OTEL_EXPORTER_OTLP_TRACES_ENDPOINTpoints tohttp://localhost:4318/v1/traces.- Collector logs do not show authentication or exporter errors.
- Langfuse is filtering the same environment and time window as the new trace.
Monitor the monitoring path
The local check proves one trace can move through the path. In production, the telemetry pipeline needs its own health checks. At minimum, track:
- SDK export failures and queue saturation.
- Collector accepted, refused, dropped, retried, and exported spans.
- Collector memory limiter events.
- Backend ingestion latency and errors.
- The difference between billable provider calls and observed model-call spans.
That last check catches silent instrumentation gaps. If OpenAI usage reports show 1,000 successful calls and Langfuse shows 700 inference spans for the same window, treat it as a telemetry incident until sampling or loss explains the gap.
What should exist before we go to Chapter 24
At this point the demo should have:
tests/conftest.pywith an in-memory OpenTelemetry exporter;- real tests in
tests/test_graph.py,tests/test_privacy.py, andtests/test_telemetry.py; - score contract tests in
tests/test_scores.py; - a passing
PYTHONPATH=src pytest -q; - a validated Collector configuration;
- one manual Langfuse run used to confirm the end-to-end export path.
Chapter 24 turns these tested signals into dashboards, alerts, release gates, and runbooks.
References
- OpenTelemetry Python trace export
- OpenTelemetry Collector internal telemetry
- OpenTelemetry Collector resiliency
- OWASP Logging Cheat Sheet verification
Next up: Ch 24 - Dashboards, Alerts, Release Gates, and Runbooks turns the telemetry into operational decisions.