Runtime Hardening and Feedback
Chapter 16 made the workflow visible: graph nodes, routes, checkpoints, subagents, and child operations now show up in one trace. Runtime hardening is the next step because visibility alone does not stop a graph from looping, spending too much, bypassing approval, or exporting content after a policy failure.
Observability describes behavior; runtime controls constrain it. Production hardening connects the two so every termination, approval, policy block, and fallback is both enforced and observable.
This chapter continues from the src/agent_observability/graph.py file created in Chapter 16. Some code goes directly into that file because it changes graph execution. Other code is better kept in small support modules so the graph does not become a pile of policy, approval, and feedback helpers.
The practical goal is to give the demo the same boundaries a production agent needs:
- budget checks before repeated or expensive work;
- loop detection based on bounded fingerprints;
- content-capture policy before exporting values;
- approval binding for write actions;
- feedback that can be connected to traces without exposing backend IDs.
What we will change
Work in the demo project:
cd agent-observability-demo
This chapter touches these files:
| File | What to do |
|---|---|
.env and .env.example | Add bounded exporter and batch processor settings. |
src/agent_observability/config.py | Add telemetry tuning and approval digest settings. |
src/agent_observability/runtime.py | Create budgets, progress tracking, and fallback decision types. |
src/agent_observability/capture.py | Create the content-capture and redaction helper. |
src/agent_observability/approvals.py | Create the approval digest helper. |
src/agent_observability/feedback.py | Create bounded feedback event helpers. |
src/agent_observability/signals.py | Create negative signal helpers. |
src/agent_observability/graph.py | Update state, budgets, progress handling, and fallback recording. |
src/agent_observability/telemetry.py | Apply bounded exporter and batch settings. |
src/agent_observability/manual_scenarios.py | Add feedback and negative-signal scenarios. |
src/agent_observability/main.py | Update the hardened graph runner. |
Centralize execution budgets
Budgets should live close to the workflow, not inside a model wrapper hidden three layers down. The graph knows when an execution starts, when it enters a node, and when it is about to call a model. That is the right place to count iterations and model calls.
Create a new file named src/agent_observability/runtime.py. This file will hold runtime controls that are shared by the graph and by later tests:
from dataclasses import dataclass
@dataclass
class ExecutionBudget:
max_iterations: int
max_model_calls: int
iterations: int = 0
model_calls: int = 0
def before_iteration(self) -> None:
if self.iterations >= self.max_iterations:
raise BudgetExceeded("iterations", self.max_iterations)
self.iterations += 1
def before_model_call(self) -> None:
if self.model_calls >= self.max_model_calls:
raise BudgetExceeded("model_calls", self.max_model_calls)
self.model_calls += 1
class BudgetExceeded(RuntimeError):
def __init__(self, budget_type: str, limit: int) -> None:
super().__init__(f"{budget_type} budget exhausted")
self.budget_type = budget_type
self.limit = limit
The check must happen before the operation that would exceed the budget. A span that reports a budget stop after the provider call already happened is only a receipt.
Now update src/agent_observability/graph.py. Add budget and budget-stop fields to the existing AgentState from Chapter 16:
from .runtime import BudgetExceeded, ExecutionBudget
class AgentState(TypedDict, total=False):
query: str
conversation_id: str
order_reference: str
region: str
policy_document_ids: list[str]
memory_record_types: list[str]
order_state: str
answer: str
outcome: str
iterations: int
budget: ExecutionBudget
budget_type: str
budget_limit: int
Next, replace the traced_node function in src/agent_observability/graph.py with this version. The wrapper is the best place to count graph iterations because every node passes through it:
def traced_node(name: str) -> Callable[[Node], Node]:
def decorate(function: Node) -> Node:
@wraps(function)
def wrapped(state: AgentState) -> dict[str, Any]:
with tracer.start_as_current_span(f"workflow.node {name}") as span:
span.set_attribute("app.workflow.node.name", name)
budget = state.get("budget")
if budget is not None:
budget.before_iteration()
iteration = state.get("iterations", 0) + 1
span.set_attribute("app.workflow.iteration", iteration)
result = function(state)
result.setdefault("iterations", iteration)
span.set_attribute(
"app.workflow.updated_fields",
sorted(result.keys()),
)
return result
return wrapped
return decorate
Then update compose_answer_node in the same file. The model budget belongs immediately before generate_answer:
@traced_node("compose_answer")
def compose_answer_node(state: AgentState) -> dict[str, Any]:
budget = state.get("budget")
if budget is not None:
budget.before_model_call()
answer = generate_answer(
instructions="Answer using only the provided order and policy context.",
input_items=[
{
"role": "user",
"content": state["query"],
},
],
)
return {"answer": answer}
Catch BudgetExceeded at the workflow boundary, not inside every node. Replace run_agent in src/agent_observability/graph.py with this version:
from .runtime import BudgetExceeded, ExecutionBudget
def run_agent(initial_state: AgentState) -> AgentState:
state = dict(initial_state)
state.setdefault(
"budget",
ExecutionBudget(max_iterations=8, max_model_calls=2),
)
with agent_task_span(
"order-status",
state["conversation_id"],
) as span:
try:
result = graph.invoke(state)
except BudgetExceeded as exc:
result = {
**state,
"outcome": "budget_stopped",
"budget_type": exc.budget_type,
"budget_limit": exc.limit,
}
span.set_attribute("app.task.outcome", result["outcome"])
span.set_attribute(
"app.workflow.iterations",
result.get("iterations", 0),
)
if result["outcome"] == "budget_stopped":
span.set_attribute("app.budget.type", result["budget_type"])
span.set_attribute("app.budget.limit", result["budget_limit"])
span.set_attribute("app.budget.action", "stop")
return result
In Langfuse, a budget stop should appear as a completed trace with a bounded terminal outcome, not as an unhandled exception:
app.task.outcome = "budget_stopped"
app.budget.type = "model_calls"
app.budget.limit = 2
app.budget.action = "stop"
Detect lack of progress
A maximum iteration count is the final safety net. It prevents runaway work, but it does not explain why the agent is stuck. Add a small progress tracker so repeated states can stop earlier and produce a useful trace.
Add this code to src/agent_observability/runtime.py, below the budget classes:
from hashlib import sha256
@dataclass
class ProgressTracker:
max_repeated_fingerprints: int
seen: dict[str, int]
def record(self, fingerprint: str) -> None:
count = self.seen.get(fingerprint, 0) + 1
self.seen[fingerprint] = count
if count > self.max_repeated_fingerprints:
raise LackOfProgress(fingerprint)
class LackOfProgress(RuntimeError):
def __init__(self, fingerprint: str) -> None:
super().__init__("workflow stopped making progress")
self.fingerprint = fingerprint
def build_progress_fingerprint(state: dict[str, object]) -> str:
approved_fields = {
"outcome": state.get("outcome", "unknown"),
"order_state": state.get("order_state", "unknown"),
"policy_document_ids": state.get("policy_document_ids", []),
"memory_record_types": state.get("memory_record_types", []),
}
canonical = repr(sorted(approved_fields.items())).encode("utf-8")
return sha256(canonical).hexdigest()
Then update the imports and state fields in src/agent_observability/graph.py:
from .runtime import (
BudgetExceeded,
ExecutionBudget,
LackOfProgress,
ProgressTracker,
build_progress_fingerprint,
)
class AgentState(TypedDict, total=False):
...
progress: ProgressTracker
progress_fingerprint: str
The ... means “keep the fields you already added earlier in this file.” Do not paste the ellipsis into Python code.
Now replace classify_outcome in src/agent_observability/graph.py. This node has enough state to decide whether the workflow has actually progressed:
@traced_node("classify_outcome")
def classify_outcome(state: AgentState) -> dict[str, Any]:
outcome = "escalate" if state.get("order_state") == "missing" else "answer"
next_state = {**state, "outcome": outcome}
fingerprint = build_progress_fingerprint(next_state)
progress = state.get("progress")
if progress is not None:
progress.record(fingerprint)
return {
"outcome": outcome,
"progress_fingerprint": fingerprint[:12],
}
Finally, extend run_agent so a lack-of-progress stop becomes a normal terminal outcome:
def run_agent(initial_state: AgentState) -> AgentState:
state = dict(initial_state)
state.setdefault(
"budget",
ExecutionBudget(max_iterations=8, max_model_calls=2),
)
state.setdefault(
"progress",
ProgressTracker(max_repeated_fingerprints=2, seen={}),
)
with agent_task_span(
"order-status",
state["conversation_id"],
) as span:
try:
result = graph.invoke(state)
except BudgetExceeded as exc:
result = {
**state,
"outcome": "budget_stopped",
"budget_type": exc.budget_type,
"budget_limit": exc.limit,
}
except LackOfProgress as exc:
result = {
**state,
"outcome": "no_progress_stopped",
"progress_fingerprint": exc.fingerprint[:12],
}
span.set_attribute("app.task.outcome", result["outcome"])
span.set_attribute(
"app.workflow.iterations",
result.get("iterations", 0),
)
if result["outcome"] == "budget_stopped":
span.set_attribute("app.budget.type", result["budget_type"])
span.set_attribute("app.budget.limit", result["budget_limit"])
span.set_attribute("app.budget.action", "stop")
if "progress_fingerprint" in result:
span.set_attribute(
"app.workflow.progress_fingerprint",
result["progress_fingerprint"],
)
return result
Fingerprints should be built from approved fields. These are good candidates:
- same workflow node sequence repeats;
- same tool and canonical argument digest repeats;
- retrieval returns the same document set;
- state version changes but no business field progresses;
- cost increases without new external evidence.
Do not hash raw arguments with an unkeyed public hash and assume privacy. Canonicalize only approved fields or compute the digest inside an authorized boundary with a protected key.
Gate content capture
The demo has intentionally avoided exporting raw prompts, raw tool arguments, and retrieved content. This section makes that decision explicit so future code does not accidentally bypass it.
Create a new file named src/agent_observability/capture.py:
from dataclasses import dataclass
@dataclass(frozen=True)
class CapturePolicy:
mode: str
policy_version: str
def permits_content(self) -> bool:
return self.mode in {"redacted", "secure_reference"}
class ContentCaptureBlocked(RuntimeError):
pass
def redact_text(value: str) -> str:
redacted = value.replace("@", "[at]")
if "ORDER-" in redacted:
redacted = redacted.replace("ORDER-", "[ORDER_ID]")
return redacted
def project_content_for_telemetry(
*,
value: str,
policy: CapturePolicy,
) -> dict[str, str]:
if not policy.permits_content():
return {
"capture_mode": policy.mode,
"capture_policy_version": policy.policy_version,
}
redacted = redact_text(value)
if "@" in redacted or "ORDER-" in redacted:
raise ContentCaptureBlocked("redaction did not remove forbidden content")
return {
"capture_mode": policy.mode,
"capture_policy_version": policy.policy_version,
"redacted_preview": redacted[:160],
}
Use this helper only at explicit capture points. For example, if you later decide to add a redacted preview to compose_answer_node, place it after the model call and before returning node metadata:
from .capture import CapturePolicy, project_content_for_telemetry
@traced_node("compose_answer")
def compose_answer_node(state: AgentState) -> dict[str, Any]:
budget = state.get("budget")
if budget is not None:
budget.before_model_call()
answer = generate_answer(
instructions="Answer using only the provided order and policy context.",
input_items=[
{
"role": "user",
"content": state["query"],
},
],
)
capture = project_content_for_telemetry(
value=answer,
policy=CapturePolicy(
mode="metadata_only",
policy_version="capture-policy-1",
),
)
return {
"answer": answer,
"capture_mode": capture["capture_mode"],
"capture_policy_version": capture["capture_policy_version"],
}
With metadata_only, no content preview is exported. That is the default I would keep for this demo. If a later chapter or experiment uses redacted, the helper gives the reader one controlled place to add and test redaction.
The capture pipeline should follow this order:
- Project only approved fields.
- Detect secrets and PII.
- Apply domain redaction.
- Validate that forbidden patterns are absent.
- Export redacted content or an opaque secure-store reference.
- Record policy and redactor versions.
Fail closed if the redaction service is unavailable for a path that requires redaction.
Bind approvals to actions
Approvals belong to write actions. The current demo reads order status, but Chapter 15 already discussed write-tool attributes for operations such as issue_refund. This section prepares that code path without adding a real refund integration.
Create a new file named src/agent_observability/approvals.py:
First add one setting to src/agent_observability/config.py so the digest key is not hardcoded in the approval module:
approval_digest_key: str = "demo-only-replace-with-secret-key"
Keep the demo value only in local development. Production deployments should provide this value from a secret store and rotate it with the same care as other signing keys.
import hmac
import json
from dataclasses import dataclass
from hashlib import sha256
from time import monotonic
from .config import settings
from .telemetry import tracer
DEMO_APPROVAL_DIGEST_KEY = "demo-only-replace-with-secret-key"
def _approval_digest_key() -> bytes:
if (
settings.deployment_environment != "development"
and settings.approval_digest_key == DEMO_APPROVAL_DIGEST_KEY
):
raise RuntimeError("approval digest key must come from a secret store")
return settings.approval_digest_key.encode("utf-8")
@dataclass(frozen=True)
class CanonicalAction:
tool: str
schema_version: str
resource_scope: str
amount_minor: int
currency: str
def to_bytes(self) -> bytes:
payload = {
"amount_minor": self.amount_minor,
"currency": self.currency,
"resource_scope": self.resource_scope,
"schema_version": self.schema_version,
"tool": self.tool,
}
return json.dumps(
payload,
separators=(",", ":"),
sort_keys=True,
).encode("utf-8")
def approval_digest(action: CanonicalAction) -> str:
return hmac.new(
_approval_digest_key(),
action.to_bytes(),
sha256,
).hexdigest()
def require_approval(action: CanonicalAction) -> str:
start = monotonic()
digest = approval_digest(action)
with tracer.start_as_current_span("approval.require issue_refund") as span:
span.set_attribute("app.approval.required", True)
span.set_attribute("app.approval.decision", "approved")
span.set_attribute("app.approval.digest_algorithm", "hmac-sha256")
span.set_attribute("app.approval.latency_ms", int((monotonic() - start) * 1000))
span.set_attribute("app.approval.action", action.tool)
span.set_attribute("app.approval.resource_scope", action.resource_scope)
return digest
The code above simulates approval and returns a digest. In a real app, require_approval would create an approval request, wait for a human or policy engine, and store the approval record. The trace still uses the same attributes.
When you later implement a write tool, call require_approval immediately before the tool execution span:
from .approvals import CanonicalAction, require_approval
def prepare_refund_approval() -> str:
action = CanonicalAction(
tool="issue_refund",
schema_version="3",
resource_scope="order-owned-by-subject",
amount_minor=2599,
currency="EUR",
)
return require_approval(action)
An approval should authorize one canonical action:
{
"tool": "issue_refund",
"schema_version": "3",
"resource_scope": "order-owned-by-subject",
"amount_minor": 2599,
"currency": "EUR"
}
Generate a protected digest of the canonical representation and store it with the approval and execution record. If arguments change, request approval again.
Record approval latency separately from agent execution latency. A long human wait should not be attributed to model performance.
Record both sides of the boundary on spans:
app.approval.required = true
app.approval.decision = "approved"
app.approval.digest_algorithm = "hmac-sha256"
app.approval.latency_ms = 18420
Do not put the digest input in telemetry. The digest proves that the approved canonical action and the executed canonical action match.
Fallbacks must preserve safety posture
A fallback can change model, provider, tool, or workflow path. The mistake I see most often is treating fallback as a reliability feature only. For agents, fallback is also a safety boundary because it can accidentally skip validation, use a different prompt, or route to a less restricted tool.
In this demo, there is no fallback path yet. So the practical work in this section is small:
- Add a fallback decision type to
src/agent_observability/runtime.py. - Add a helper to
src/agent_observability/graph.pythat knows how to record fallback attributes. - Do not call the helper yet unless you add a real fallback branch.
Put this class at the bottom of src/agent_observability/runtime.py, below ProgressTracker and build_progress_fingerprint:
@dataclass(frozen=True)
class FallbackDecision:
source: str
target: str
reason: str
preserves_safety_posture: bool
Then add this helper to src/agent_observability/graph.py, near the other small workflow helpers such as route_outcome. You will need to merge the imports with the existing imports at the top of the file:
from opentelemetry import trace
from .runtime import FallbackDecision
def record_fallback(decision: FallbackDecision) -> None:
span = trace.get_current_span()
span.set_attribute("app.fallback.source", decision.source)
span.set_attribute("app.fallback.target", decision.target)
span.set_attribute("app.fallback.reason", decision.reason)
span.set_attribute(
"app.fallback.preserves_safety_posture",
decision.preserves_safety_posture,
)
At this point, nothing else changes in graph.py. The helper is intentionally unused in the current chapter because the demo still has one model path and one workflow path. If you do not add a fallback branch, do not call record_fallback.
When you later add a real fallback, call record_fallback only inside the fallback branch. For example, if the primary answer generation fails and you route to a smaller model, the call would sit inside that except or fallback decision block:
Do not paste this try block into the demo now unless you also create ModelUnavailable and generate_answer_with_fallback_model. It is here to show where the fallback telemetry belongs when that branch exists.
try:
answer = generate_answer(
instructions="Answer using only the provided order and policy context.",
input_items=[
{
"role": "user",
"content": state["query"],
},
],
)
except ModelUnavailable:
record_fallback(
FallbackDecision(
source="primary-model-name",
target="fallback-model-name",
reason="primary_model_unavailable",
preserves_safety_posture=True,
),
)
answer = generate_answer_with_fallback_model(
instructions="Answer using only the provided order and policy context.",
input_items=[
{
"role": "user",
"content": state["query"],
},
],
)
A fallback must not silently weaken:
- Authorization.
- Content filtering.
- Output schema validation.
- Budget enforcement.
- Tenant isolation.
- Audit logging.
Record fallback source, target, reason, and whether the fallback preserved the safety posture. If it cannot preserve the posture, the safer outcome is usually a controlled stop or escalation.
Connect feedback to the execution
Feedback is not the same thing as evaluation. It is a product signal that needs to be mapped back to a trace or session, validated, and later turned into a Langfuse score in Chapter 20.
In this chapter, implement only the application-side feedback event builder. Do not attach it to Langfuse yet. Chapter 20 will turn this bounded event into a Langfuse score.
Create a new file named src/agent_observability/feedback.py and put the full code below in that file:
from dataclasses import dataclass
from datetime import UTC, datetime
from .telemetry import tracer
ALLOWED_FEEDBACK_VALUES = {"positive", "negative"}
@dataclass(frozen=True)
class FeedbackEvent:
interaction_id: str
feedback_type: str
bounded_value: str
timestamp: str
agent_version: str
def build_user_feedback_event(
*,
interaction_id: str,
value: str,
agent_version: str,
) -> FeedbackEvent:
if value not in ALLOWED_FEEDBACK_VALUES:
raise ValueError("invalid feedback value")
return FeedbackEvent(
interaction_id=interaction_id,
feedback_type="user_rating",
bounded_value=value,
timestamp=datetime.now(UTC).isoformat(),
agent_version=agent_version,
)
def record_feedback_event(event: FeedbackEvent) -> None:
with tracer.start_as_current_span("feedback.record user_rating") as span:
span.set_attribute("app.feedback.type", event.feedback_type)
span.set_attribute("app.feedback.value", event.bounded_value)
span.set_attribute("app.feedback.source", "conversation_footer")
span.set_attribute("app.feedback.agent_version", event.agent_version)
This file intentionally does not accept free-text feedback. Free text is content and needs the capture policy from this chapter before it can be stored or exported.
Our demo app does not have a user interface or HTTP API at this point. That is intentional. So do not create an API file, do not invent a product surface, and do not call feedback from run_agent. Feedback is a post-response signal; run_agent is still the agent execution boundary.
To verify the feedback event shape in the demo we already have, add a manual scenario to src/agent_observability/manual_scenarios.py. First, merge this import with the existing imports at the top of the file:
from .feedback import build_user_feedback_event, record_feedback_event
Then add this function near the other run_* scenario functions:
def run_feedback() -> None:
with agent_task_span("manual-feedback", f"conv_{uuid4().hex}") as span:
event = build_user_feedback_event(
interaction_id="interaction_demo_001",
value="negative",
agent_version=settings.agent_version,
)
record_feedback_event(event)
span.set_attribute("app.manual_scenario", "feedback")
span.set_attribute("app.task.outcome", "success")
Finally, register the scenario in the existing SCENARIOS dictionary in the same file:
SCENARIOS: dict[str, Callable[[], None]] = {
"success": run_success,
"stream": run_stream,
"retry": run_retry,
"fallback": run_fallback,
"retry-classification": run_retry_classification,
"feedback": run_feedback,
}
Run it from the demo project:
PYTHONPATH=src python -m agent_observability.manual_scenarios feedback
This does not model a real product feedback button. It only validates that feedback is bounded, traceable, and ready to be turned into a Langfuse score in Chapter 20.
The equivalent production boundary would sit outside the agent graph. It would receive an interaction_id from a product surface, validate tenant ownership, build the bounded event, and then write the Langfuse score. In code shape, that boundary would look like this:
from .feedback import build_user_feedback_event, record_feedback_event
def submit_feedback(
*,
interaction_id: str,
value: str,
agent_version: str,
) -> None:
event = build_user_feedback_event(
interaction_id=interaction_id,
value=value,
agent_version=agent_version,
)
record_feedback_event(event)
The interaction_id must be created by the application layer when it returns the agent response to the client. It should map to the internal trace ID inside a trusted service. Do not expose backend trace URLs or credentials.
Feedback events should include:
interaction_id
feedback_type
bounded_value
timestamp
agent_version
Attach feedback to a trace or conversation in Langfuse after validating the identifier and tenant boundary. Chapter 20 shows the Langfuse score write. This chapter only prepares the bounded application event.
For thumbs-up/down style feedback, keep the telemetry value bounded:
app.feedback.type = "user_rating"
app.feedback.value = "negative"
app.feedback.source = "conversation_footer"
The public interaction_id should map to the internal trace ID in an application database. Treat it like an untrusted lookup key until tenant ownership has been checked.
Negative signals are ambiguous
Negative behavior is useful for triage, but it is not automatic ground truth. Add these signals as selectors for review and evaluation queues, not as direct correctness labels.
Open or create src/agent_observability/signals.py. The file defines a bounded negative signal event and a helper to record it in telemetry:
from dataclasses import dataclass
from datetime import UTC, datetime
from .telemetry import tracer
ALLOWED_NEGATIVE_SIGNALS = {
"repeated_question",
"user_reformulated_request",
"conversation_abandoned",
"human_takeover",
}
@dataclass(frozen=True)
class NegativeSignalEvent:
interaction_id: str
signal_name: str
source: str
timestamp: str
agent_version: str
def build_negative_signal_event(
*,
interaction_id: str,
signal_name: str,
source: str,
agent_version: str,
) -> NegativeSignalEvent:
if signal_name not in ALLOWED_NEGATIVE_SIGNALS:
raise ValueError("invalid negative signal")
return NegativeSignalEvent(
interaction_id=interaction_id,
signal_name=signal_name,
source=source,
timestamp=datetime.now(UTC).isoformat(),
agent_version=agent_version,
)
def record_negative_signal_event(event: NegativeSignalEvent) -> None:
with tracer.start_as_current_span(
f"signal.record {event.signal_name}",
) as span:
span.set_attribute("app.signal.name", event.signal_name)
span.set_attribute("app.signal.source", event.source)
span.set_attribute("app.signal.agent_version", event.agent_version)
span.set_attribute("app.signal.used_for", "triage_selector")
span.set_attribute("app.interaction.id", event.interaction_id)
Then add a manual scenario to src/agent_observability/manual_scenarios.py. First, merge this import with the imports at the top of the file:
from .signals import build_negative_signal_event, record_negative_signal_event
Add this function near the other run_* scenario functions:
def run_negative_signal() -> None:
with agent_task_span("manual-negative-signal", f"conv_{uuid4().hex}") as span:
event = build_negative_signal_event(
interaction_id="interaction_demo_002",
signal_name="repeated_question",
source="manual_scenario",
agent_version=settings.agent_version,
)
record_negative_signal_event(event)
span.set_attribute("app.manual_scenario", "negative-signal")
span.set_attribute("app.task.outcome", "success")
Register it in the existing SCENARIOS dictionary:
SCENARIOS: dict[str, Callable[[], None]] = {
"success": run_success,
"stream": run_stream,
"retry": run_retry,
"fallback": run_fallback,
"retry-classification": run_retry_classification,
"feedback": run_feedback,
"negative-signal": run_negative_signal,
}
Run it from the demo project:
PYTHONPATH=src python -m agent_observability.manual_scenarios negative-signal
In Langfuse, the trace should contain a child span named signal.record repeated_question with bounded attributes:
app.signal.name = "repeated_question"
app.signal.source = "manual_scenario"
app.signal.used_for = "triage_selector"
That span does not mean the answer was wrong. It means the trace is a useful candidate for review, scoring, or an annotation queue later.
These signals are useful selectors:
- User asks the same question again.
- User edits or reformulates a request.
- User abandons the conversation.
- Human takeover occurs.
- A response is copied or ignored.
Correlate them with task type, latency, tool outcome, and evaluation before drawing conclusions.
Degraded telemetry behavior
The agent should normally continue when the telemetry backend is unavailable, but it should not buffer forever or silently lose required audit records. This is mostly configuration, but the decision belongs in the code comments and runbook because it affects incident response.
In the demo, a comment alone is not enough. We need bounded exporter and batch processor settings so a broken Collector or Langfuse endpoint does not cause unbounded waiting or memory pressure.
Add these values to .env file in the demo repository:
OTEL_EXPORTER_OTLP_TIMEOUT_SECONDS=5
OTEL_BSP_MAX_QUEUE_SIZE=512
OTEL_BSP_SCHEDULE_DELAY_MILLIS=1000
OTEL_BSP_MAX_EXPORT_BATCH_SIZE=128
OTEL_BSP_EXPORT_TIMEOUT_MILLIS=5000
Then add the matching fields to src/agent_observability/config.py:
otel_exporter_otlp_timeout_seconds: float = Field(default=5.0, ge=1.0, le=30.0)
otel_bsp_max_queue_size: int = Field(default=512, ge=64, le=4096)
otel_bsp_schedule_delay_millis: int = Field(default=1000, ge=100, le=10000)
otel_bsp_max_export_batch_size: int = Field(default=128, ge=1, le=512)
otel_bsp_export_timeout_millis: int = Field(default=5000, ge=1000, le=30000)
Finally, update src/agent_observability/telemetry.py so those values are actually used by the OTLP exporter and the BatchSpanProcessor:
def configure_tracing() -> TracerProvider:
# Runtime policy:
# - Read-only support actions continue if telemetry export fails.
# - High-risk write actions should fail closed if their required audit record
# cannot be persisted by the application.
# - Export queues and timeouts stay bounded by configuration.
resource = Resource.create(
{
"service.name": settings.otel_service_name,
"service.version": settings.agent_version,
"deployment.environment.name": settings.deployment_environment,
}
)
provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(
endpoint=settings.otel_exporter_otlp_traces_endpoint,
timeout=settings.otel_exporter_otlp_timeout_seconds,
)
provider.add_span_processor(
BatchSpanProcessor(
exporter,
max_queue_size=settings.otel_bsp_max_queue_size,
schedule_delay_millis=settings.otel_bsp_schedule_delay_millis,
max_export_batch_size=settings.otel_bsp_max_export_batch_size,
export_timeout_millis=settings.otel_bsp_export_timeout_millis,
)
)
trace.set_tracer_provider(provider)
return provider
This does not guarantee that every span is delivered when the backend is down. It makes the failure mode explicit: the read-only demo keeps running, export attempts are bounded, and the process is not allowed to wait forever on telemetry.
Security audit requirements may differ: a high-risk action can be configured to fail closed if its required audit record cannot be written. Document this decision per action class.
Run the hardened graph
At this point the chapter changed .env.example, config.py, runtime.py, capture.py, approvals.py, feedback.py, signals.py, graph.py, telemetry.py, and manual_scenarios.py. The easiest way to validate the behavior is to run the same local graph execution from Chapter 16.
Update src/agent_observability/main.py so it exercises the hardened LangGraph path directly:
from .graph import run_agent
from .telemetry import configure_tracing
def main() -> None:
provider = configure_tracing()
try:
result = run_agent(
{
"query": "Where is my order?",
"conversation_id": "conv_82d9535867844b4c8cdff",
"order_reference": "ORDER-924",
"region": "eu",
},
)
print(result["outcome"])
finally:
provider.force_flush(timeout_millis=5000)
provider.shutdown()
if __name__ == "__main__":
main()
Then run the module from the demo repository:
PYTHONPATH=src python -m agent_observability.main
The configure_tracing() call is what installs the OpenTelemetry provider and OTLP exporter. The force_flush() call is what gives the BatchSpanProcessor a chance to send spans before the local process exits. If either one is missing, the terminal can still print answer, but Langfuse may receive no new trace.
For the normal path, expect answer or escalated, depending on the order fixture used by your tool stub. To test budget behavior, temporarily set max_iterations=1 in run_agent. The output should become budget_stopped, and the root trace should include app.budget.type, app.budget.limit, and app.task.outcome.
Inspect the result in Langfuse
Open the trace created by the manual run and check the root span first. It should show the final task outcome and bounded runtime-control attributes. Then inspect child spans:
invoke_agent order-status
workflow.node retrieve_policy
workflow.node read_memory
workflow.node fetch_order
workflow.node classify_outcome
workflow.route outcome
workflow.node compose_answer
For a budget stop, the trace should still be readable:
invoke_agent order-status
app.task.outcome = "budget_stopped"
app.budget.type = "iterations"
app.budget.limit = 1
For a lack-of-progress stop, you should see:
app.task.outcome = "no_progress_stopped"
app.workflow.progress_fingerprint = "..."
Do not expect Langfuse to show raw prompt text, raw order references, or free-text feedback from this chapter. The point of the chapter is to harden runtime behavior while keeping the trace safe to inspect.
What should exist before we go to Chapter 18
At this point the demo should have:
- budget checks before repeated graph iterations and model calls;
- terminal outcomes for budget stops, policy blocks, validation failures, and fallbacks;
- loop-detection fingerprints based on approved bounded fields;
- a capture policy that fails closed when redaction is required but unavailable;
- approval records bound to canonical action digests;
- feedback events that use bounded values and trusted trace mapping;
- explicit behavior for telemetry backend failure by action class.
Chapter 18 turns those expectations into Langfuse product views: sessions, users, prompt versions, scores, evaluators, and datasets. Chapter 23 turns the full contract into tests.
References
- OpenAI production best practices
- Langfuse scores via SDK
- OWASP LLM06: Excessive Agency
- OpenTelemetry SDK configuration
Next up: Ch 18 - Langfuse Sessions, Users, and Trace Context turns individual traces into conversation, user, release, and product views.