Instrumenting OpenAI Model Calls
The model call is the most expensive and least deterministic dependency in an agent workflow. It is also the boundary where latency, token usage, rate limits, model selection, streaming behavior, and provider errors become visible.
This chapter instruments OpenAI Responses API calls with explicit spans. The goal is not to log prompts and completions. The goal is to make every provider attempt explainable:
- which model the application requested;
- which model and response identifier the provider returned;
- how many input, output, and cached input tokens were billed or reported;
- whether the call streamed output;
- how long the call took before the first streamed chunk;
- whether the provider returned an error, incomplete response, or retryable failure;
- whether application telemetry avoided raw content capture.
The orchestration span created in previous chapters tells us what the agent was trying to do. The model span tells us what happened at the provider boundary.
What we will change
Work in the demo project:
cd agent-observability-demo
This chapter touches four files:
| File | What to do |
|---|---|
src/agent_observability/inference.py | Create the OpenAI wrapper, including streaming, retry, structured output, and fallback helpers. |
src/agent_observability/response_validation.py | Create the response validation wrapper. |
src/agent_observability/main.py | Update the demo to call the response-validation path. |
src/agent_observability/manual_scenarios.py | Create the manual validation scenario runner. |
What belongs on the model span
Treat each OpenAI request as one CLIENT span. If the application retries the request three times, record three model spans under one parent operation. A retry is a new provider attempt, not an invisible implementation detail.
| Question | Span evidence |
|---|---|
| Which provider dependency was called? | gen_ai.provider.name, server.address |
| Which model did the application request? | gen_ai.request.model |
| Which model did the provider return? | gen_ai.response.model |
| Which provider response can be correlated later? | gen_ai.response.id |
| How much did the attempt consume? | gen_ai.usage.input_tokens, gen_ai.usage.output_tokens, gen_ai.usage.cache_read.input_tokens |
| Was this a streaming call? | gen_ai.request.stream |
| How quickly did streaming start? | gen_ai.response.time_to_first_chunk |
| Did the provider fail? | span status, error.type, recorded exception |
| Did the response finish cleanly? | gen_ai.response.finish_reasons when available, or provider status fields when using typed Responses API objects |
Use OpenTelemetry semantic conventions when a stable or development attribute exists. Use a documented custom namespace for application-specific details that do not have a standard attribute yet. In this series, custom attributes use the app.* namespace.
Use the Responses API explicitly
The OpenAI Responses API is the API used in this demo because it is the current building block for new OpenAI text generation workflows. It returns a typed response object with an id, model, status, usage, structured output items, and the convenience output_text property when text output is present.
Create model calls with store=False:
response = client.responses.create(
model=settings.openai_model,
instructions=instructions,
input=input_items,
store=False,
)
store=False controls whether OpenAI stores the response object for later retrieval. It does not control our application telemetry. If we accidentally copy prompt text, retrieved documents, tool results, or model output into spans, store=False will not remove that data from the observability backend. The content-capture policy from Chapter 7 still applies.
Create the inference wrapper
Create src/agent_observability/inference.py:
from typing import Any
from openai import OpenAI
from opentelemetry.trace import Span, SpanKind, Status, StatusCode
from .config import settings
from .telemetry import tracer
client = OpenAI(
api_key=settings.openai_api_key,
max_retries=0,
timeout=30.0,
)
def _set_if_present(span: Span, key: str, value: object | None) -> None:
if value is not None:
span.set_attribute(key, value)
def _set_usage(span: Span, usage: Any | None) -> None:
if usage is None:
return
_set_if_present(
span,
"gen_ai.usage.input_tokens",
getattr(usage, "input_tokens", None),
)
_set_if_present(
span,
"gen_ai.usage.output_tokens",
getattr(usage, "output_tokens", None),
)
input_details = getattr(usage, "input_tokens_details", None)
_set_if_present(
span,
"gen_ai.usage.cache_read.input_tokens",
getattr(input_details, "cached_tokens", None),
)
def _set_response_metadata(span: Span, response: Any) -> None:
_set_if_present(span, "gen_ai.response.id", getattr(response, "id", None))
_set_if_present(span, "gen_ai.response.model", getattr(response, "model", None))
_set_if_present(span, "app.openai.response.status", getattr(response, "status", None))
output = getattr(response, "output", None)
if output is not None:
output_types = sorted(
{
getattr(item, "type", "unknown")
for item in output
}
)
span.set_attribute("app.openai.output_types", output_types)
_set_usage(span, getattr(response, "usage", None))
def generate_answer(
instructions: str,
input_items: list[dict[str, Any]],
*,
model: str | None = None,
text_format: dict[str, Any] | None = None,
) -> str:
requested_model = model or settings.openai_model
with tracer.start_as_current_span(
"openai.responses.create",
kind=SpanKind.CLIENT,
attributes={
"gen_ai.operation.name": "chat",
"gen_ai.provider.name": "openai",
"gen_ai.request.model": requested_model,
"gen_ai.request.stream": False,
"server.address": "api.openai.com",
"app.openai.store": False,
},
) as span:
try:
request: dict[str, Any] = {
"model": requested_model,
"instructions": instructions,
"input": input_items,
"store": False,
}
if text_format is not None:
request["text"] = {"format": text_format}
response = client.responses.create(**request)
except Exception as exc:
span.record_exception(exc)
span.set_status(Status(StatusCode.ERROR, "provider_error"))
span.set_attribute("error.type", exc.__class__.__name__)
raise
_set_response_metadata(span, response)
return response.output_text
The wrapper receives instructions and input_items, but it does not write them to span attributes. That is intentional. The trace should explain the model call without becoming a shadow database of prompts, user messages, retrieval chunks, or generated answers.
The OpenAI Python SDK retries some failures by default. The example sets max_retries=0 so retries can be represented at the application layer as separate spans. It also sets an explicit timeout so a stuck provider call does not occupy the agent indefinitely.
Preserve typed output metadata without copying output
Responses API output is structured. A response can contain text, tool calls, reasoning-related items, or other typed output items depending on the model and request.
Do not serialize the whole response.output object into telemetry. Store bounded metadata instead:
app.openai.output_types = ["message"]
That attribute answers a useful operational question: “Did the model produce the expected kind of output?” It avoids the unsafe pattern: “Copy everything the provider returned into the trace.”
If a later chapter adds tool calls, record the tool name, validation result, authorization result, and execution result on tool spans. Keep model output content governed by the content-capture policy.
Instrument streaming calls
Streaming needs one extra measurement: time to first chunk. Total duration tells us when the stream finished. Time to first chunk tells us when the user saw the first useful output.
OpenTelemetry’s GenAI conventions define gen_ai.response.time_to_first_chunk in seconds. Use that unit instead of inventing a millisecond attribute.
Add the following code below generate_answer on the src/agent_observability/inference.py file:
import time
from collections.abc import Iterator
def stream_answer(
instructions: str,
input_items: list[dict[str, Any]],
) -> Iterator[str]:
requested_model = settings.openai_model
started_at = time.perf_counter()
first_chunk_seen = False
completed = False
with tracer.start_as_current_span(
"openai.responses.stream",
kind=SpanKind.CLIENT,
attributes={
"gen_ai.operation.name": "chat",
"gen_ai.provider.name": "openai",
"gen_ai.request.model": requested_model,
"gen_ai.request.stream": True,
"server.address": "api.openai.com",
"app.openai.store": False,
},
) as span:
try:
stream = client.responses.stream(
model=requested_model,
instructions=instructions,
input=input_items,
store=False,
)
with stream as events:
for event in events:
if event.type == "response.output_text.delta":
if not first_chunk_seen:
first_chunk_seen = True
span.set_attribute(
"gen_ai.response.time_to_first_chunk",
time.perf_counter() - started_at,
)
yield event.delta
if event.type == "response.completed":
completed = True
_set_response_metadata(span, event.response)
except GeneratorExit:
span.set_attribute("app.openai.stream.cancelled", True)
raise
except Exception as exc:
span.record_exception(exc)
span.set_status(Status(StatusCode.ERROR, "stream_error"))
span.set_attribute("error.type", exc.__class__.__name__)
raise
finally:
span.set_attribute("app.openai.stream.completed", completed)
There is an important operational detail here: usage data and final response metadata usually arrive at the end of the stream. If the application stops consuming the generator early, the span may have a first-chunk measurement but no final usage. That is not a telemetry bug. It means the stream was cancelled, interrupted, or not fully consumed.
Make retries visible
Retries should answer three questions:
- How many attempts did the operation require?
- Which attempt failed, and why?
- Did the final user-visible result come from the original model request or from a later attempt?
Keep retry orchestration separate from the model attempt span. The parent span should record the retry policy and final attempt. Each model call should be a separate child span with its own latency, token usage, response id, and error state.
Add the following code to src/agent_observability/inference.py:
from openai import APIConnectionError, APIStatusError, APITimeoutError, RateLimitError
def _is_retryable_openai_error(exc: Exception) -> bool:
if isinstance(exc, RateLimitError):
return _openai_error_code(exc) != "insufficient_quota"
if isinstance(exc, (APIConnectionError, APITimeoutError)):
return True
if isinstance(exc, APIStatusError):
return exc.status_code in {408, 409} or exc.status_code >= 500
return False
def _openai_error_code(exc: APIStatusError) -> str | None:
code = getattr(exc, "code", None)
if code is not None:
return str(code)
body = getattr(exc, "body", None)
if isinstance(body, dict):
error = body.get("error")
if isinstance(error, dict):
nested_code = error.get("code")
if nested_code is not None:
return str(nested_code)
return None
def generate_with_retry(
instructions: str,
input_items: list[dict[str, Any]],
*,
max_attempts: int = 3,
) -> str:
with tracer.start_as_current_span(
"model call with retry",
attributes={
"app.retry.max_attempts": max_attempts,
},
) as span:
for attempt in range(1, max_attempts + 1):
span.set_attribute("app.retry.current_attempt", attempt)
try:
result = generate_answer(instructions, input_items)
span.set_attribute("app.retry.final_attempt", attempt)
return result
except Exception as exc:
retryable = _is_retryable_openai_error(exc)
span.add_event(
"model.retry",
{
"app.retry.attempt": attempt,
"app.retry.retryable": retryable,
"error.type": exc.__class__.__name__,
},
)
if not retryable or attempt == max_attempts:
span.set_status(Status(StatusCode.ERROR, "retry_exhausted"))
raise
time.sleep(min(2.0, 0.25 * (2 ** (attempt - 1))))
raise RuntimeError("unreachable")
The parent span records retry policy and final attempt. Each call to generate_answer records a separate OpenAI attempt with its own latency, token usage, response id, and error state.
Production retry code should usually add jitter and respect provider rate-limit headers. The important observability rule is the same: do not hide provider attempts inside one successful span.
Record fallbacks as model changes
A fallback is not just another retry. It changes the model selection decision.
If the application falls back from one model to another, record the reason on the parent operation and create a new provider span for the fallback request. The fallback span should contain its own requested model.
The generate_answer wrapper above already accepts an optional model argument. That allows the fallback orchestration to choose the model without duplicating the OpenAI instrumentation:
def generate_answer(
instructions: str,
input_items: list[dict[str, Any]],
*,
model: str | None = None,
text_format: dict[str, Any] | None = None,
) -> str:
requested_model = model or settings.openai_model
...
Add a fallback helper to src/agent_observability/inference.py:
from openai import RateLimitError
def generate_with_fallback(
instructions: str,
input_items: list[dict[str, Any]],
*,
primary_model: str,
fallback_model: str,
) -> str:
with tracer.start_as_current_span(
"model call with fallback",
attributes={
"app.model.primary": primary_model,
"app.model.fallback": fallback_model,
"app.model.fallback.used": False,
},
) as span:
try:
return generate_answer(
instructions,
input_items,
model=primary_model,
)
except RateLimitError as exc:
if _openai_error_code(exc) == "insufficient_quota":
span.set_attribute("app.model.fallback.used", False)
span.set_attribute("app.model.fallback.reason", "insufficient_quota")
raise
span.add_event(
"model.fallback",
{
"app.model.fallback.reason": "rate_limit",
"error.type": exc.__class__.__name__,
},
)
span.set_attribute("app.model.fallback.used", True)
span.set_attribute("app.model.fallback.reason", "rate_limit")
return generate_answer(
instructions,
input_items,
model=fallback_model,
)
The resulting trace should look like this:
parent span:
app.model.fallback.used = true
app.model.fallback.reason = "rate_limit"
app.model.primary = "primary-model-name"
app.model.fallback = "fallback-model-name"
child span:
gen_ai.request.model = "fallback-model-name"
gen_ai.response.model = "fallback-model-name"
This distinction matters during incidents. A system can look healthy while silently serving most traffic through a cheaper, slower, or less capable fallback model. The trace should make that visible.
Separate provider failures from application failures
Not every bad answer is an OpenAI error.
Use span status ERROR for provider and transport failures: timeouts, connection errors, rate limits, invalid requests, server errors, and authentication problems.
Use bounded application attributes for validation failures after a successful provider response:
app.output.validation = "failed"
app.output.validation.reason = "missing_required_field"
For example, keep the OpenAI call in generate_answer focused on the provider boundary and validate the returned text in the application step that consumes it.
Create src/agent_observability/response_validation.py:
import json
from typing import Any
from opentelemetry.trace import Status, StatusCode
from .inference import generate_answer
from .telemetry import tracer
REQUIRED_ORDER_FIELDS = {"answer", "confidence", "next_action"}
ORDER_SUPPORT_TEXT_FORMAT = {
"type": "json_schema",
"name": "order_support_response",
"strict": True,
"schema": {
"type": "object",
"additionalProperties": False,
"properties": {
"answer": {"type": "string"},
"confidence": {"type": "string"},
"next_action": {
"type": "string",
"enum": ["answer", "escalate", "ask_clarifying_question"],
},
},
"required": ["answer", "confidence", "next_action"],
},
}
def generate_order_support_response(
instructions: str,
input_items: list[dict[str, Any]],
) -> dict[str, Any]:
with tracer.start_as_current_span("validate model output") as span:
output_text = generate_answer(
instructions,
input_items,
text_format=ORDER_SUPPORT_TEXT_FORMAT,
)
try:
payload = json.loads(output_text)
except json.JSONDecodeError as exc:
span.set_attribute("app.output.validation", "failed")
span.set_attribute("app.output.validation.reason", "invalid_json")
span.set_status(Status(StatusCode.ERROR, "invalid_json"))
raise ValueError("model output was not valid JSON") from exc
missing_fields = REQUIRED_ORDER_FIELDS - payload.keys()
if missing_fields:
span.set_attribute("app.output.validation", "failed")
span.set_attribute(
"app.output.validation.reason",
"missing_required_field",
)
span.set_attribute(
"app.output.validation.missing_fields",
sorted(missing_fields),
)
span.set_status(Status(StatusCode.ERROR, "missing_required_field"))
raise ValueError("model output missed required fields")
span.set_attribute("app.output.validation", "passed")
return payload
This creates a clean trace shape:
validate model output
└─ openai.responses.create
The child OpenAI span can be successful because the provider returned a response. The parent application span can fail because the response did not satisfy the contract required by the agent workflow.
The provider span can be successful while the agent step fails validation. That distinction is useful. It tells us the provider returned a response, but the application rejected it.
For Responses API objects, also record bounded provider status fields when present:
app.openai.response.status = "completed"
app.openai.output_types = ["message"]
Avoid overfitting to one response shape. Different models and features can produce different typed output items.
Validating the demo
At this point, manual validation should exercise the files created in this chapter, not just the Collector smoke test from Chapter 13.
Update src/agent_observability/main.py so the demo calls the response-validation path:
from uuid import uuid4
from .response_validation import generate_order_support_response
from .telemetry import agent_task_span, configure_tracing
def main() -> None:
provider = configure_tracing()
with agent_task_span("order-status", f"conv_{uuid4().hex}") as span:
result = generate_order_support_response(
instructions=(
"Return only valid JSON with these fields: "
"answer, confidence, next_action."
),
input_items=[
{
"role": "user",
"content": "Where is my order?",
}
],
)
span.set_attribute("app.task.outcome", "success")
span.set_attribute("app.output.next_action", result["next_action"])
provider.force_flush(timeout_millis=5000)
provider.shutdown()
if __name__ == "__main__":
main()
This exercises the path below:
agent task span
└─ validate model output
└─ openai.responses.create
Run the demo:
PYTHONPATH=src python -m agent_observability.main
Then inspect the trace in Langfuse:
- the agent span should contain the task and conversation attributes from earlier chapters;
- the
validate model outputspan should appear under the agent span; - the OpenAI span should appear as a child dependency call;
gen_ai.request.modelshould contain the configured model;gen_ai.response.idshould be present after a successful response;- token usage should appear when the provider returns usage;
- raw prompt, retrieved context, tool result, and model output text should not appear in span attributes;
app.output.validationshould bepassedfor a valid JSON response.
Validate the additional scenarios manually
The normal main.py path validates the non-streaming success path. Retry, fallback, streaming, and retry classification are separate behaviors, so validate them with a focused manual runner instead of waiting for production failures.
Add a local verification harness at src/agent_observability/manual_scenarios.py in the demo project. This is not production code. It is a manual runner that imports the functions created in this chapter and exposes one scenario per behavior:
import argparse
from collections.abc import Callable
from uuid import uuid4
from opentelemetry.trace import SpanKind, Status, StatusCode
from .config import settings
from .inference import generate_answer, stream_answer
from .response_validation import generate_order_support_response
from .telemetry import agent_task_span, configure_tracing, tracer
def _input_items(question: str = "Where is my order?") -> list[dict[str, str]]:
return [{"role": "user", "content": question}]
def run_success() -> None:
with agent_task_span("manual-success", f"conv_{uuid4().hex}") as span:
result = generate_order_support_response(
instructions=(
"Return only valid JSON with these fields: "
"answer, confidence, next_action."
),
input_items=_input_items(),
)
span.set_attribute("app.manual_scenario", "success")
span.set_attribute("app.task.outcome", "success")
span.set_attribute("app.output.next_action", result["next_action"])
def run_stream() -> None:
with agent_task_span("manual-stream", f"conv_{uuid4().hex}") as span:
chunks = list(
stream_answer(
instructions="Answer in one short sentence.",
input_items=_input_items(),
)
)
span.set_attribute("app.manual_scenario", "stream")
span.set_attribute("app.task.outcome", "success")
span.set_attribute("app.openai.stream.chunk_count", len(chunks))
def _simulated_failed_model_span(error_type: str) -> None:
with tracer.start_as_current_span(
"openai.responses.create",
kind=SpanKind.CLIENT,
attributes={
"gen_ai.operation.name": "chat",
"gen_ai.provider.name": "openai",
"gen_ai.request.model": settings.openai_model,
"gen_ai.request.stream": False,
"server.address": "api.openai.com",
"app.openai.simulated_failure": True,
"error.type": error_type,
},
) as span:
span.set_status(Status(StatusCode.ERROR, "simulated_provider_error"))
def run_retry() -> None:
with agent_task_span("manual-retry", f"conv_{uuid4().hex}"):
with tracer.start_as_current_span(
"model call with retry",
attributes={"app.retry.max_attempts": 2},
) as span:
_simulated_failed_model_span("APITimeoutError")
span.add_event(
"model.retry",
{
"app.retry.attempt": 1,
"app.retry.retryable": True,
"error.type": "APITimeoutError",
},
)
result = generate_answer(
instructions="Answer in one short sentence.",
input_items=_input_items(),
)
span.set_attribute("app.retry.final_attempt", 2)
span.set_attribute("app.task.outcome", "success")
span.set_attribute("app.output.length", len(result))
def run_fallback() -> None:
fallback_model = settings.openai_model
with agent_task_span("manual-fallback", f"conv_{uuid4().hex}"):
with tracer.start_as_current_span(
"model call with fallback",
attributes={
"app.model.primary": "simulated-primary-model",
"app.model.fallback": fallback_model,
"app.model.fallback.used": False,
},
) as span:
_simulated_failed_model_span("RateLimitError")
span.add_event(
"model.fallback",
{
"app.model.fallback.reason": "rate_limit",
"error.type": "RateLimitError",
},
)
span.set_attribute("app.model.fallback.used", True)
span.set_attribute("app.model.fallback.reason", "rate_limit")
result = generate_answer(
instructions="Answer in one short sentence.",
input_items=_input_items(),
model=fallback_model,
)
span.set_attribute("app.task.outcome", "success")
span.set_attribute("app.output.length", len(result))
def run_retry_classification() -> None:
with agent_task_span("manual-retry-classification", f"conv_{uuid4().hex}") as span:
span.set_attribute("app.manual_scenario", "retry-classification")
# This scenario documents the expected local policy decisions without
# constructing SDK-specific exception objects.
span.set_attribute("app.retryable.timeout", True)
span.set_attribute("app.retryable.rate_limit", True)
span.set_attribute("app.retryable.insufficient_quota", False)
span.set_attribute("app.task.outcome", "success")
SCENARIOS: dict[str, Callable[[], None]] = {
"success": run_success,
"stream": run_stream,
"retry": run_retry,
"fallback": run_fallback,
"retry-classification": run_retry_classification,
}
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("scenario", choices=[*SCENARIOS.keys(), "all"])
args = parser.parse_args()
provider = configure_tracing()
try:
if args.scenario == "all":
for run in SCENARIOS.values():
run()
else:
SCENARIOS[args.scenario]()
finally:
provider.force_flush(timeout_millis=5000)
provider.shutdown()
if __name__ == "__main__":
main()
The retry and fallback scenarios should simulate the first provider failure locally and then call the real configured OpenAI model for the successful attempt. That makes the validation deterministic:
- no need to wait for a real timeout;
- no need to intentionally exhaust quota;
- no need to depend on a real provider-side 429;
- the trace still shows the retry or fallback shape we need to inspect.
Run each scenario separately:
PYTHONPATH=src python -m agent_observability.manual_scenarios success
PYTHONPATH=src python -m agent_observability.manual_scenarios stream
PYTHONPATH=src python -m agent_observability.manual_scenarios retry
PYTHONPATH=src python -m agent_observability.manual_scenarios fallback
PYTHONPATH=src python -m agent_observability.manual_scenarios retry-classification
Or run all scenarios in one pass:
PYTHONPATH=src python -m agent_observability.manual_scenarios all
Inspect the traces in Langfuse:
| Scenario | Expected trace shape | What to inspect |
|---|---|---|
success | invoke_agent manual-success -> validate model output -> openai.responses.create | response id, model, token usage, app.output.validation = "passed" |
stream | invoke_agent manual-stream -> openai.responses.stream | gen_ai.request.stream = true, gen_ai.response.time_to_first_chunk, app.openai.stream.completed = true |
retry | invoke_agent manual-retry -> model call with retry -> failed simulated openai.responses.create -> successful real openai.responses.create | model.retry event, app.retry.final_attempt = 2, one failed child span and one successful child span |
fallback | invoke_agent manual-fallback -> model call with fallback -> failed simulated primary model span -> successful fallback model span | app.model.fallback.used = true, fallback reason, distinct requested models |
retry-classification | invoke_agent manual-retry-classification | app.retryable.timeout = true, app.retryable.rate_limit = true, app.retryable.insufficient_quota = false |
The retry classification scenario should not call OpenAI. It only proves the local retry policy. The success, stream, retry, and fallback scenarios call the configured model, so they require a valid API key, model access, and available quota.
If the OpenAI call succeeds but no trace appears, debug the pipeline from Chapter 13 first: Python SDK endpoint, Collector logs, Langfuse credentials, and the /api/public/otel exporter path.
What should exist before we go to Chapter 15
At this point the demo should have:
- a working OpenTelemetry SDK and Collector pipeline;
- one parent span for the agent task;
- one child span for each OpenAI provider attempt;
- token usage attributes when usage is returned;
- streaming first-chunk timing when streaming is used;
- explicit retry and fallback metadata;
- no raw content in telemetry by default.
Chapter 15 adds tool, retrieval, and memory spans below the agent workflow. The same rule continues to apply: record operational evidence, not uncontrolled copies of user or model content.
References
- OpenAI Responses API migration guide
- OpenAI Responses API reference
- OpenAI Python SDK retries and timeouts
- OpenTelemetry semantic conventions for GenAI systems
Next up: Ch 15 - Instrumenting Tools, Retrieval, and Memory adds spans for non-model dependencies while keeping retrieved content and tool results out of telemetry by default.