Instrumenting LangGraph and Multi-Agent Workflows
Chapter 15 created spans for model-adjacent operations: tools, retrieval, and memory. LangGraph is where those operations become an execution path. The graph decides which operation runs, which state fields change, when work branches, and where the task stops.
Framework instrumentation can expose graph activity, but the application still owns business semantics. A node named call_model does not explain whether it routed a request, verified an outcome, or prepared a write action. Add spans at boundaries that matter during an incident.
What we will change
Work in the demo project:
cd agent-observability-demo
This chapter touches one file:
| File | What to do |
|---|---|
src/agent_observability/graph.py | Create graph state, traced nodes, routing, the graph builder, and run_agent(). |
Define observable state without exporting it
The first step is to define the graph state contract. LangGraph state is the object that moves from node to node, so it will contain user input, conversation identifiers, retrieved evidence references, tool results, model output, and routing decisions.
That does not mean the whole state should become telemetry. In this section, the goal is to separate fields the workflow needs from fields the observability backend may safely see. The graph will keep full values in memory, while spans record only selected metadata such as updated field names, iteration count, route outcome, document IDs, and bounded task outcomes.
This contract also prepares the tests in Chapter 23. If a node adds a new state field later, the trace should show that the field changed without leaking the value.
Create src/agent_observability/graph.py:
from typing import TypedDict
from langgraph.graph import END, START, StateGraph
class AgentState(TypedDict, total=False):
query: str
conversation_id: str
order_reference: str
region: str
policy_document_ids: list[str]
memory_record_types: list[str]
order_state: str
answer: str
outcome: str
iterations: int
The state contains content and identifiers that should not be copied wholesale into telemetry. Instrument selected metadata at each node.
Wrap nodes with operation spans
After the state contract is explicit, wrap each graph node with an operation span. The node span should answer a workflow question: “Which node ran, in which iteration, and which state fields did it update?”
This wrapper is intentionally small. It does not inspect the full state, serialize arguments, or copy return values into telemetry. It records the node name, the current iteration, and the list of updated field names. The retrieval, memory, tool, and model calls inside the node will create their own child spans with their own operational details.
The result is a trace that shows both layers: workflow movement at the node level and external work at the child-operation level.
Add this code to src/agent_observability/graph.py, below the AgentState definition:
from collections.abc import Callable
from functools import wraps
from typing import Any
from .telemetry import tracer
Node = Callable[[AgentState], dict[str, Any]]
def traced_node(name: str) -> Callable[[Node], Node]:
def decorate(function: Node) -> Node:
@wraps(function)
def wrapped(state: AgentState) -> dict[str, Any]:
with tracer.start_as_current_span(f"workflow.node {name}") as span:
span.set_attribute("app.workflow.node.name", name)
span.set_attribute(
"app.workflow.iteration",
state.get("iterations", 0),
)
result = function(state)
span.set_attribute(
"app.workflow.updated_fields",
sorted(result.keys()),
)
return result
return wrapped
return decorate
updated_fields reveals state shape, not values. Keep the catalog bounded so it remains queryable.
Turn operations into graph nodes
The wrappers from Chapters 14 and 15 should run inside graph nodes. This is where the standalone instrumentation becomes an agent workflow: one node retrieves policy evidence, one node reads memory categories, one node calls the order-status tool, and one node composes the answer.
The node span explains state movement. The child operation span explains the external or capability-specific work. For example, workflow.node fetch_order tells us that the graph reached the order lookup step; execute_tool fetch_order_status tells us whether the tool was authorized and what state the order system returned.
Add these imports and node functions to src/agent_observability/graph.py, below traced_node:
from .inference import generate_answer
from .memory import read_customer_memory
from .retrieval import retrieve_policy
from .tools import fetch_order_status
@traced_node("retrieve_policy")
def retrieve_policy_node(state: AgentState) -> dict[str, Any]:
documents = retrieve_policy(
query=state["query"],
region=state["region"],
)
return {
"policy_document_ids": [
document.document_id for document in documents
],
}
@traced_node("read_memory")
def read_memory_node(state: AgentState) -> dict[str, Any]:
records = read_customer_memory(
subject_id=state["conversation_id"],
purpose="order-status-support",
)
return {
"memory_record_types": sorted(
{record.memory_type for record in records}
),
}
@traced_node("fetch_order")
def fetch_order_node(state: AgentState) -> dict[str, Any]:
order = fetch_order_status(
order_reference=state["order_reference"],
subject_id=state["conversation_id"],
)
return {"order_state": order.state}
@traced_node("compose_answer")
def compose_answer_node(state: AgentState) -> dict[str, Any]:
answer = generate_answer(
instructions="Answer using only the provided order and policy context.",
input_items=[
{
"role": "user",
"content": state["query"],
},
],
)
return {"answer": answer}
The example uses conversation_id as the subject identifier to keep the chapter small. In a real application, use the authenticated subject or tenant-scoped principal, not an arbitrary conversation handle.
Instrument branches as decisions
After the graph can run work, add the decision point that chooses the next path. In this demo, the branch is intentionally small: if the order state is missing, route to escalation; otherwise, compose an answer.
The important observability rule is that route values must come from a bounded set. A route named answer, escalate, or retry is useful for filtering traces. A route value copied from a model explanation is not.
Add this code to src/agent_observability/graph.py, below the node functions from the previous section:
@traced_node("classify_outcome")
def classify_outcome(state: AgentState) -> dict[str, Any]:
if state.get("order_state") == "missing":
return {"outcome": "escalate"}
return {"outcome": "answer"}
def route_outcome(state: AgentState) -> str:
outcome = state["outcome"]
with tracer.start_as_current_span("workflow.route outcome") as span:
span.set_attribute("app.workflow.route", outcome)
return outcome
@traced_node("escalate")
def escalate_node(state: AgentState) -> dict[str, Any]:
return {"outcome": "escalated"}
Record the selected route from a bounded set. Do not capture an unrestricted model explanation as a route attribute.
Build the graph
Now assemble the nodes and edges. This is the first complete LangGraph workflow in the demo, so keep it sequential. A simple graph makes it easier to confirm parent-child span relationships before introducing parallelism, durable resume, or subagents.
Add this builder block to src/agent_observability/graph.py, below the branch functions:
builder = StateGraph(AgentState)
builder.add_node("retrieve_policy", retrieve_policy_node)
builder.add_node("read_memory", read_memory_node)
builder.add_node("fetch_order", fetch_order_node)
builder.add_node("classify_outcome", classify_outcome)
builder.add_node("compose_answer", compose_answer_node)
builder.add_node("escalate", escalate_node)
builder.add_edge(START, "retrieve_policy")
builder.add_edge("retrieve_policy", "read_memory")
builder.add_edge("read_memory", "fetch_order")
builder.add_edge("fetch_order", "classify_outcome")
builder.add_conditional_edges(
"classify_outcome",
route_outcome,
{"answer": "compose_answer", "escalate": "escalate"},
)
builder.add_edge("compose_answer", END)
builder.add_edge("escalate", END)
graph = builder.compile()
This first graph is sequential so its trace is easy to validate. Parallelize retrieval and order lookup only after tests prove that state merging and parent context remain correct.
Create one root span per active execution
The node spans need a parent task span. Without a root span, Langfuse can still show individual observations, but the trace will not clearly answer “what user task was this graph trying to complete?”
This function is the public entrypoint for the chapter’s graph. It creates one task span, invokes the compiled graph, and records the terminal outcome. Later chapters add sessions, users, prompt versions, and scores around this same boundary.
Add this function to src/agent_observability/graph.py, below graph = builder.compile():
from .telemetry import agent_task_span
def run_agent(initial_state: AgentState) -> AgentState:
with agent_task_span(
"order-status",
initial_state["conversation_id"],
) as span:
result = graph.invoke(initial_state)
span.set_attribute("app.task.outcome", result["outcome"])
span.set_attribute(
"app.workflow.iterations",
result.get("iterations", 0),
)
return result
The root span describes the task segment. Child spans describe graph nodes and the model, retrieval, and tool operations inside them.
Run the graph
Run one local graph execution before moving on. This validates that the compiled graph, root task span, route span, and operation spans are connected.
From the demo project root, run:
PYTHONPATH=src python - <<'PY'
from agent_observability.graph import run_agent
from agent_observability.telemetry import configure_tracing
provider = configure_tracing()
try:
result = run_agent(
{
"query": "Where is my order?",
"conversation_id": "conv_ch16_demo",
"order_reference": "ORDER-924",
"region": "eu",
}
)
print(result["outcome"])
finally:
provider.force_flush(timeout_millis=5000)
provider.shutdown()
PY
The terminal should print:
answer
A healthy sequential trace now has this shape:
invoke_agent order-status
workflow.node retrieve_policy
retrieval support-policy
workflow.node read_memory
memory.read customer-profile
workflow.node fetch_order
execute_tool fetch_order_status
HTTP GET order service
workflow.node classify_outcome
workflow.route outcome
workflow.node compose_answer
openai.responses.create
That structure is what Chapter 23 will test. If the trace only shows framework nodes, it is hard to explain behavior. If it only shows HTTP and model spans, it is hard to explain the workflow decision that caused them.
Parallel nodes
Do not parallelize this chapter’s implementation yet. This section is a design note for the next iteration after the sequential trace is validated.
When independent nodes run concurrently, their spans should overlap and share the correct causal parent. For the order-status workflow, retrieval and order lookup are candidates for parallel execution because neither needs the other’s result. The compose step still needs both.
Use this diagram as the target shape when you later refactor the graph. No code change is required in this chapter part.
Do not derive end-to-end latency by summing parallel child spans.
Checkpoints and durable resume
LangGraph checkpointers persist state for durable execution. They are useful when a graph can pause for human approval, queue work, or resume after process restart.
This chapter does not configure a checkpointer yet. When you add one, record checkpoint metadata in the checkpoint hook or immediately around the pause/resume boundary in src/agent_observability/graph.py. Do not export checkpoint state content.
Use attributes like these on the relevant workflow or task span:
app.workflow.checkpoint = "created"
app.workflow.checkpoint.sequence = 4
app.workflow.resume = true
app.workflow.resume_reason = "human_approval"
For a long pause, start a new trace segment and link it to the trace that requested approval. Preserve task and conversation identifiers according to policy.
Subgraphs and subagents
Subgraphs and subagents introduce an ownership boundary inside the workflow. A subgraph is still part of the same local graph. A subagent usually represents a distinct role, policy, model configuration, or worker.
This chapter does not add a subagent implementation. Chapter 25 implements the advanced version with local subgraphs, synchronous subagent spans, and independently scheduled handoff traces. For now, keep the sequential graph working and use this section as the design rule for the trace relationships you will add later.
A subgraph executed synchronously can remain under the parent trace. Use an invoke_agent span for a distinct agent role and record its name and version.
For independently scheduled delegation:
- End the delegation operation in the parent trace.
- Inject trace context into the task message where the messaging model supports it.
- Start a consumer or task span in the worker.
- Use a span link when the work should not be a direct child.
- Preserve a stable custom task ID for queries.
Avoid creating one trace that remains open for days only to represent organizational grouping.
Streaming graph updates
LangGraph can stream values, updates, messages, custom events, checkpoints, and tasks. This is useful for UI updates and debugging, but it is also an easy way to leak full state into telemetry.
Chapter 26 implements streaming graph updates as a separate advanced topic. If you add streaming later, keep the streaming code near the graph invocation boundary, usually beside run_agent or in the API route that calls it. Do not export the values stream to telemetry: it contains complete graph state. Prefer lifecycle metadata from updates, tasks, or custom events with an allowlist.
What should exist before we go to Chapter 17
At this point the demo should have:
- one root task span around
graph.invoke; - one workflow-node span for each meaningful graph node;
- child spans for retrieval, memory, tool execution, downstream service calls, and model calls;
- bounded route attributes for graph decisions;
- state field names in telemetry, not raw state values;
- a documented trace shape for the sequential workflow before adding parallelism or durable resume.
Chapter 17 adds the controls that keep this graph from running forever, weakening safety on fallback, or exporting content when a policy service is unavailable.
References
Next up: Ch 17 - Runtime Hardening and Feedback adds budgets, loop controls, redaction, approvals, and trace-linked feedback.