Building the OpenTelemetry Pipeline
Build the telemetry pipeline before instrumenting model calls, tools, retrieval, and workflow branches. A single smoke-test span should travel from the Python process to the OpenTelemetry Collector and then to Langfuse. If that path is not proven first, every later failure is ambiguous.
This chapter creates the first end-to-end path:
Python app
-> OpenTelemetry SDK
-> OTLP HTTP
-> OpenTelemetry Collector
-> Langfuse OTLP endpoint
The application knows only the local Collector endpoint. Langfuse credentials stay in the Collector configuration. That boundary matters later when we add filtering, sampling, content controls, retries, and backend-specific exporters.
What we will change
Work in the demo project:
cd agent-observability-demo
This chapter touches five files:
| File | What to do |
|---|---|
.env | Add Langfuse keys and host/container base URLs. |
src/agent_observability/telemetry.py | Create OpenTelemetry SDK setup and a root task span helper. |
collector-config.yaml | Create the Collector pipeline that exports to Langfuse. |
compose.yaml | Add the local OpenTelemetry Collector service. |
src/agent_observability/main.py | Create a smoke-test trace runner. |
Add Collector settings to the local environment
The demo .env already contains the OpenTelemetry endpoint used by the Python SDK:
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4318/v1/traces
Add the Langfuse project keys shown by the Langfuse UI to the same local .env file. Keep the keys as shown, but adapt the base URL for the Collector container:
LANGFUSE_SECRET_KEY="replace-with-langfuse-secret-key"
LANGFUSE_PUBLIC_KEY="replace-with-langfuse-public-key"
LANGFUSE_BASE_URL="http://localhost:3000"
LANGFUSE_COLLECTOR_BASE_URL="http://host.docker.internal:3000"
LANGFUSE_BASE_URL is for Python code running on the host, which needs localhost. LANGFUSE_COLLECTOR_BASE_URL is for the Collector running inside Docker, where localhost would point to the Collector container itself. host.docker.internal points from the Collector container back to Langfuse exposed on the host.
The Collector derives the OTLP endpoint and Basic auth from these variables. We do not need a separate base64-encoded LANGFUSE_AUTH variable.
Configure the Python SDK
Create src/agent_observability/telemetry.py:
from collections.abc import Iterator
from contextlib import contextmanager
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import Span
from .config import settings
def configure_tracing() -> TracerProvider:
resource = Resource.create(
{
"service.name": settings.otel_service_name,
"service.version": settings.agent_version,
"deployment.environment.name": settings.deployment_environment,
}
)
provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(
endpoint=settings.otel_exporter_otlp_traces_endpoint,
)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
return provider
tracer = trace.get_tracer("agent_observability", settings.agent_version)
@contextmanager
def agent_task_span(task_type: str, conversation_id: str) -> Iterator[Span]:
with tracer.start_as_current_span(f"invoke_agent {task_type}") as span:
span.set_attribute("gen_ai.operation.name", "invoke_agent")
span.set_attribute("gen_ai.agent.name", "order-support")
span.set_attribute("gen_ai.agent.version", settings.agent_version)
span.set_attribute("gen_ai.conversation.id", conversation_id)
span.set_attribute("app.task.type", task_type)
yield span
trace.set_tracer_provider should run once during process startup. Application modules may acquire tracers, but they should not replace the provider. Replacing the provider from libraries or tests creates duplicate exporters and confusing spans.
The smoke-test span uses metadata only. It does not export prompts, tool payloads, retrieved text, user input, or model output.
Configure the Collector
Create collector-config.yaml in the demo project root:
extensions:
basicauth/langfuse:
client_auth:
username: ${env:LANGFUSE_PUBLIC_KEY}
password: ${env:LANGFUSE_SECRET_KEY}
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318
processors:
memory_limiter:
check_interval: 1s
limit_mib: 256
spike_limit_mib: 64
batch:
send_batch_size: 512
timeout: 5s
exporters:
debug:
verbosity: basic
otlp_http/langfuse:
endpoint: ${env:LANGFUSE_COLLECTOR_BASE_URL}/api/public/otel
auth:
authenticator: basicauth/langfuse
sending_queue:
enabled: true
queue_size: 2048
retry_on_failure:
enabled: true
service:
extensions: [basicauth/langfuse]
telemetry:
logs:
level: info
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [debug, otlp_http/langfuse]
This pipeline does four things:
| Component | Role |
|---|---|
otlp receiver | Accepts spans from the Python SDK over OTLP HTTP. |
memory_limiter | Protects the Collector process under pressure. |
batch | Groups spans before export. |
debug exporter | Prints local evidence that spans reached the Collector. |
basicauth/langfuse extension | Turns the Langfuse public and secret keys into Basic auth for the exporter. |
otlp_http/langfuse exporter | Sends spans to Langfuse using the project credentials. |
The debug exporter is for local verification. It can print span attributes into container logs. Reduce or remove it when the pipeline starts carrying production-like data.
Run the Collector
Create compose.yaml in the demo project root:
services:
otel-collector:
image: otel/opentelemetry-collector-contrib:0.151.0
command: ["--config=/etc/otelcol/config.yaml"]
env_file:
- .env
extra_hosts:
- "host.docker.internal:host-gateway"
ports:
- "127.0.0.1:4318:4318"
volumes:
- ./collector-config.yaml:/etc/otelcol/config.yaml:ro
The port binding is intentionally local:
127.0.0.1:4318:4318
Only processes on your machine should send traces to this Collector during the tutorial. Do not expose the local Collector on 0.0.0.0 unless there is a specific reason.
Start it:
docker compose up -d
docker compose logs -f otel-collector
The logs should show the Collector starting without exporter configuration errors. If LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, or LANGFUSE_COLLECTOR_BASE_URL is missing, fix .env before running the smoke test.
Export a smoke-test trace
Create src/agent_observability/main.py:
from uuid import uuid4
from .telemetry import agent_task_span, configure_tracing
def main() -> None:
provider = configure_tracing()
with agent_task_span("order-status", f"conv_{uuid4().hex}") as span:
span.set_attribute("app.task.outcome", "smoke_test")
provider.force_flush(timeout_millis=5000)
provider.shutdown()
if __name__ == "__main__":
main()
Run it from the demo project root:
PYTHONPATH=src python -m agent_observability.main
This command should exit without exporter errors. The Collector logs should show one trace with a root span named like:
invoke_agent order-status
If you go to the Langfuse UI, you should see the same trace under the correct project.
Verify Langfuse
Check four layers, in this order:
- The Python command exits successfully.
- Collector logs show the span arriving through the OTLP receiver.
- Collector logs show the span exported to Langfuse.
- Langfuse shows the trace under the correct project.
In Langfuse, inspect the trace and confirm:
| Expected field | Why it matters |
|---|---|
service.name = order-support-agent | Resource attribution works. |
service.version = local | Release or agent version is attached. |
deployment.environment.name = development | Environment segmentation works. |
gen_ai.operation.name = invoke_agent | The root operation is queryable. |
gen_ai.agent.name = order-support | Agent identity is present. |
gen_ai.conversation.id = conv_... | Conversation correlation is present for the smoke test. |
app.task.type = order-status | Project task taxonomy is present. |
app.task.outcome = smoke_test | Outcome field is present and bounded. |
No prompt, response, tool argument, tool result, retrieved text, or user input should appear in the trace.
Test the failure path
Test one failure before adding real instrumentation. Change LANGFUSE_SECRET_KEY in .env to an invalid value and restart the Collector:
docker compose up -d
docker compose logs -f otel-collector
Run the smoke test again:
PYTHONPATH=src python -m agent_observability.main
The expected behavior is:
- the Python process can still export to the local Collector;
- the Collector reports Langfuse export failures;
- the Collector retries according to
retry_on_failure; - no Langfuse trace appears for the bad credential.
Restore LANGFUSE_SECRET_KEY, restart the Collector, and verify that export recovers.
A pipeline tested only on the success path is not ready for model and tool spans. Backend credentials fail, networks fail, backends restart, and queues fill.
Protect backend credentials
The Python application should not receive Langfuse credentials. It should export telemetry only to the local Collector:
http://localhost:4318/v1/traces
The Collector receives those spans and then exports them to Langfuse using the Langfuse keys from .env:
Python app
-> local Collector endpoint
-> Collector adds Langfuse authentication
-> Langfuse OTLP endpoint
This separation keeps backend credentials out of application code. It also lets the Collector own:
- Langfuse endpoint;
- Langfuse Basic auth header;
- batching;
- retry behavior;
- future filtering and sampling;
- future content controls.
For production, replace local .env with the platform secret store, use TLS, restrict network access, and use separate credentials per environment. The local setup is for learning the pipeline mechanics.
Troubleshooting
| Symptom | Likely cause | Check |
|---|---|---|
| Python exits with connection refused | Collector is not listening on localhost:4318. | docker compose ps and docker compose logs otel-collector. |
| Collector starts but exports fail | Bad Langfuse key or endpoint. | Check .env and Collector logs. |
| Langfuse shows no trace | Wrong project keys, wrong endpoint, or Collector export failure. | Confirm project keys and inspect Collector logs. |
| Collector cannot reach local Langfuse | Wrong host from inside Docker. | Use http://host.docker.internal:3000/api/public/otel. |
| Trace appears without resource fields | SDK provider was not configured before span creation. | Ensure configure_tracing() runs at process startup. |
| Trace contains raw content | Instrumentation captured too much. | Keep CAPTURE_CONTENT=false and inspect span attributes. |
What should exist before we go to Chapter 14
Before adding model-call instrumentation, the project should have:
telemetry.pyconfiguring the OpenTelemetry SDK.collector-config.yamlreceiving OTLP and exporting to Langfuse.compose.yamlrunning the Collector locally..envcontainingLANGFUSE_SECRET_KEY,LANGFUSE_PUBLIC_KEY,LANGFUSE_BASE_URL, andLANGFUSE_COLLECTOR_BASE_URL.- A smoke-test trace visible in Langfuse.
- A verified failure path for bad backend credentials.
- No raw content in the smoke-test trace.
Chapter 14 adds model-call spans, usage, errors, streaming timing, and retries on top of this pipeline.
References
- Langfuse OpenTelemetry endpoint
- OpenTelemetry Collector configuration
- OpenTelemetry Python exporters
- Collector resiliency
Next up: Ch 14 - Instrumenting OpenAI Model Calls adds inference spans, usage, errors, streaming timing, and retries.