Subgraphs, Subagents, and Handoff Traces

The graph from Chapter 16 is intentionally small: retrieve policy, read memory, fetch order, classify outcome, compose answer or escalate. Real agents rarely stay that tidy. A workflow grows a specialist subgraph, a coordinator invokes another agent, or a queue worker takes over work later.

Those boundaries are where traces often become misleading. A child span says “this happened inside the same execution.” A span link or stable task ID says “this later work was caused by that earlier work, but it was not the same execution anymore.”

This chapter makes one runnable change to the demo: replace the simple escalate_node with a local LangGraph subgraph. It also adds a reusable subagent span helper and a handoff contract pattern for the next time the demo grows a real specialist or worker.

What we will change

Work in the demo project:

cd agent-observability-demo

This chapter touches these files:

FileWhat to do
src/agent_observability/graph.pyAdd escalation fields to AgentState, create an escalation subgraph, and replace the old escalation node with an adapter node.
src/agent_observability/subagents.pyCreate a helper for synchronous subagent spans.
No queue file yetKeep queue-backed handoff code as a contract pattern until the demo has a real worker.

The local subgraph is runnable now. The subagent helper is infrastructure for the next specialist agent. The queue handoff example is intentionally not wired into the demo yet because the demo has no queue, worker, or external job runner.

Choose the relationship first

Pick the trace relationship from the execution model, not from how you want the graph picture to look.

Execution patternTrace relationshipUse when
Subgraph called inside the same processChild workflow spans inside the same traceThe parent graph waits and state remains local.
Synchronous subagentChild invoke_agent span inside the same traceA distinct agent role runs during the same request.
Queue-backed handoffNew consumer trace linked to the producerWork is scheduled independently or resumes later.
Long human-owned handoffNew trace segment plus stable task IDKeeping one trace open would distort timing and retention.

For the order-status demo, escalation is still local and synchronous. That means a subgraph belongs in the same trace.

Add escalation fields to AgentState

Open src/agent_observability/graph.py. In the existing AgentState class, add these fields near outcome:

class AgentState(TypedDict, total=False):
    query: str
    conversation_id: str
    order_reference: str
    region: str
    policy_document_ids: list[str]
    memory_record_types: list[str]
    order_state: str
    answer: str
    outcome: str
    escalation_reason: str
    escalation_channel: str
    capture_mode: str
    capture_policy_version: str

The new fields are bounded operational values. They are safe to expose as telemetry attributes if needed. Do not add raw escalation notes, user messages, retrieved text, or memory values here just because the subgraph could carry them.

Replace the simple escalation node with a subgraph

In src/agent_observability/graph.py, find the current escalate_node:

@traced_node("escalate")
def escalate_node(state: AgentState) -> dict[str, Any]:
    return {"outcome": "escalated"}

Replace it with the full block below. Put it in the same location, before the builder = StateGraph(AgentState) section.

class EscalationState(TypedDict, total=False):
    query: str
    order_state: str
    escalation_reason: str
    escalation_channel: str


@traced_node("classify_escalation_reason")
def classify_escalation_reason(state: EscalationState) -> dict[str, str]:
    if state.get("order_state") == "missing":
        return {"escalation_reason": "missing_order_state"}
    return {"escalation_reason": "policy_review"}


@traced_node("select_escalation_channel")
def select_escalation_channel(state: EscalationState) -> dict[str, str]:
    if state.get("escalation_reason") == "missing_order_state":
        return {"escalation_channel": "support_ticket"}
    return {"escalation_channel": "policy_queue"}


escalation_builder = StateGraph(EscalationState)
escalation_builder.add_node(
    "classify_escalation_reason",
    classify_escalation_reason,
)
escalation_builder.add_node(
    "select_escalation_channel",
    select_escalation_channel,
)
escalation_builder.add_edge(START, "classify_escalation_reason")
escalation_builder.add_edge(
    "classify_escalation_reason",
    "select_escalation_channel",
)
escalation_builder.add_edge("select_escalation_channel", END)
escalation_graph = escalation_builder.compile()


@traced_node("escalate")
def escalate_node(state: AgentState) -> dict[str, Any]:
    result = escalation_graph.invoke(
        {
            "query": state["query"],
            "order_state": state.get("order_state", "unknown"),
        }
    )
    return {
        "outcome": "escalated",
        "escalation_reason": result["escalation_reason"],
        "escalation_channel": result["escalation_channel"],
    }

The adapter node is deliberate. The parent graph uses AgentState; the escalation subgraph uses EscalationState. The adapter keeps that boundary explicit and avoids passing the full parent state into a smaller specialist graph.

Keep the existing builder line:

builder.add_node("escalate", escalate_node)

Do not replace it with builder.add_node("escalate", escalation_graph) in this version of the demo. Direct subgraph-as-node is fine when the parent and subgraph share state keys intentionally. Here the adapter is clearer because the subgraph has its own state contract.

Add bounded subagent spans

Create src/agent_observability/subagents.py:

from collections.abc import Callable
from typing import Any

from opentelemetry.trace import SpanKind

from .telemetry import tracer


Subagent = Callable[[dict[str, Any]], dict[str, Any]]


def invoke_subagent(
    *,
    agent_name: str,
    agent_version: str,
    delegated_scope: str,
    payload: dict[str, Any],
    run: Subagent,
) -> dict[str, Any]:
    with tracer.start_as_current_span(
        f"invoke_agent {agent_name}",
        kind=SpanKind.INTERNAL,
        attributes={
            "gen_ai.operation.name": "invoke_agent",
            "gen_ai.agent.name": agent_name,
            "gen_ai.agent.version": agent_version,
            "app.handoff.type": "synchronous_subagent",
            "app.handoff.delegated_scope": delegated_scope,
        },
    ) as span:
        result = run(payload)
        span.set_attribute(
            "app.handoff.result",
            result.get("outcome", "unknown"),
        )
        return result

This file is not called by the order-status graph yet. Add it now so the trace contract exists before the demo grows a real specialist. When you later add a refund, fraud, or policy specialist, call it through invoke_subagent() instead of hiding the delegation inside a normal Python function.

The payload is intentionally not written to telemetry. It can contain user text, retrieved evidence, tool output, or memory. Record only bounded delegation metadata.

Use a bounded handoff contract

Before a coordinator delegates work, define what is allowed to cross the boundary:

{
  "target_agent": "refund-specialist",
  "delegated_scope": "refund_eligibility_review",
  "allowed_tools": ["read_order", "read_refund_policy"],
  "forbidden_tools": ["issue_refund"],
  "policy_version": "refund-handoff-v2"
}

Record the contract, not the handoff message:

app.handoff.type = "synchronous_subagent"
app.handoff.target = "refund-specialist"
app.handoff.delegated_scope = "refund_eligibility_review"
app.handoff.policy.version = "refund-handoff-v2"

This prevents a real failure mode: the parent agent delegates a narrow task, while the child agent receives broad authority because the boundary was never modeled.

Model independently scheduled handoffs separately

Do not add this code to the demo yet. The demo does not have a queue-backed worker. Use this pattern when you introduce one.

Producer side:

from opentelemetry.propagate import inject


def enqueue_handoff(task: dict[str, str]) -> None:
    carrier: dict[str, str] = {}
    with tracer.start_as_current_span("handoff.enqueue refund-specialist") as span:
        span.set_attribute("app.handoff.type", "delegate")
        span.set_attribute("app.handoff.target", "refund-specialist")
        span.set_attribute("app.handoff.task_id", task["task_id"])
        inject(carrier)
        queue.send({"headers": carrier, "body": task})

Consumer side when the worker processes the message as part of the same active execution and parentage is still accurate:

from opentelemetry import context
from opentelemetry.propagate import extract


def handle_handoff(message: dict[str, object]) -> None:
    parent_context = extract(message["headers"])
    token = context.attach(parent_context)
    try:
        with tracer.start_as_current_span("handoff.consume refund-specialist") as span:
            span.set_attribute("app.handoff.type", "delegate")
            span.set_attribute("app.handoff.target", "refund-specialist")
            process_refund_specialist_task(message["body"])
    finally:
        context.detach(token)

When parentage would be misleading because the consumer runs later, use a span link and app.handoff.task_id. The invariant is simple: during an incident, the producer and consumer work must be findable from the same handoff ID.

For a delayed independent worker, keep the producer context as a link instead of making it the parent:

from opentelemetry.context import Context
from opentelemetry.propagate import extract
from opentelemetry.trace import Link, SpanKind, get_current_span


def handle_delayed_handoff(message: dict[str, object]) -> None:
    linked_context = extract(message["headers"])
    linked_span_context = get_current_span(linked_context).get_span_context()
    links = [Link(linked_span_context)] if linked_span_context.is_valid else []

    with tracer.start_as_current_span(
        "handoff.consume refund-specialist",
        context=Context(),
        kind=SpanKind.CONSUMER,
        links=links,
    ) as span:
        body = message["body"]
        span.set_attribute("app.handoff.type", "delegate")
        span.set_attribute("app.handoff.target", "refund-specialist")
        span.set_attribute("app.handoff.task_id", body["task_id"])
        process_refund_specialist_task(body)

Use the parent-child consumer shape only when the consumer really continues the same execution. Use the linked shape when the worker owns a new execution segment, runs later, or may be sampled and retained independently.

Run the subgraph path

Use an order reference that the demo service treats as missing:

PYTHONPATH=src python - <<'PY'
from uuid import uuid4

from agent_observability.graph import run_agent
from agent_observability.langfuse_context import langfuse_trace_context
from agent_observability.telemetry import configure_tracing

provider = configure_tracing()
conversation_id = f"conv_{uuid4().hex}"

try:
    with langfuse_trace_context(
        session_id=conversation_id,
        user_id="usr_pseudo_subgraph_demo",
        trace_name="manual-subgraph-escalation",
        version="local",
        tags=("order-status", "subgraph", "manual-scenario"),
        metadata={
            "environment": "development",
            "workflow": "order-status",
            "region": "eu",
        },
    ):
        result = run_agent(
            {
                "query": "Where is my missing order?",
                "conversation_id": conversation_id,
                "order_reference": "ORDER-404",
                "region": "eu",
            }
        )
        print(result["outcome"])
        print(result["escalation_reason"])
        print(result["escalation_channel"])
finally:
    provider.force_flush(timeout_millis=5000)
    provider.shutdown()
PY

Expected terminal output:

escalated
missing_order_state
support_ticket

Inspect in Langfuse

Open Tracing and find manual-subgraph-escalation.

For the local subgraph path, expect one trace:

invoke_agent order-status
  workflow.node retrieve_policy
  workflow.node read_memory
  workflow.node fetch_order
    execute_tool fetch_order_status
  workflow.node classify_outcome
  workflow.route outcome
  workflow.node escalate
    workflow.node classify_escalation_reason
    workflow.node select_escalation_channel

The useful evidence is node-level: the order was missing, the outcome became escalate, the escalation reason became missing_order_state, and the selected channel became support_ticket. You should not see the full graph state or raw handoff payloads in span attributes.

For independent handoffs later, expect two trace segments connected by propagated context, a span link, or app.handoff.task_id:

trace A
  invoke_agent order-status
    handoff.enqueue refund-specialist

trace B
  handoff.consume refund-specialist
    invoke_agent refund-specialist

If both traces cannot be found from the handoff task ID, the incident workflow is incomplete.

What should exist before we go to Chapter 26

At this point the demo should have:

  • AgentState fields for escalation_reason and escalation_channel;
  • a local escalation subgraph inside src/agent_observability/graph.py;
  • an adapter escalate_node that invokes the subgraph without passing the full parent state;
  • src/agent_observability/subagents.py with invoke_subagent();
  • no raw handoff payloads in telemetry by default;
  • a Langfuse trace named manual-subgraph-escalation that shows the subgraph nodes inside the parent trace.

Chapter 26 applies the same boundary discipline to streaming graph updates.

References


Next up: Ch 26 - Streaming Graph Updates Without Leaking State exposes live workflow progress while keeping graph state and content out of telemetry.