Streaming Graph Updates Without Leaking State

Streaming is another output path. It can improve the user experience, but it can also leak graph state through a browser event, log line, or span attribute before anyone notices.

By this point in the series, the trace shape is stable, content capture is restricted, Langfuse context is present, prompt identity is tracked, scores exist, datasets can gate releases, and Chapter 25 introduced a subgraph boundary. Now it is safe to add graph streaming.

This chapter adds a streaming entrypoint to the demo and a manual scenario that prints sanitized graph progress while Langfuse records only stream lifecycle metadata.

What we will change

Work in the demo project:

cd agent-observability-demo

This chapter touches three files:

FileWhat to do
src/agent_observability/graph.pyAdd a sanitized stream_agent_updates() entrypoint beside run_agent().
src/agent_observability/streaming.pyCreate an observed streaming wrapper with bounded span attributes.
src/agent_observability/manual_scenarios.pyAdd a graph-updates-stream manual scenario.

The existing manual_scenarios stream scenario streams model tokens. This chapter adds graph update streaming. Keep those concepts separate.

Pick the stream mode by consumer

LangGraph supports several stream modes. Do not use the same stream for every consumer.

Stream modeGood forRisk
updatesUI progress by node and workflow lifecycleCan reveal updated field names and partial outputs.
valuesLocal debugging with full state snapshotsUsually too much data for telemetry or browser clients.
messagesToken streaming from model callsUser-visible content, needs the content policy from earlier chapters.
customExplicit allowlisted eventsSafe only when the event schema is controlled.
checkpointsDurable execution visibilityCan reveal persistence timing and state metadata.
tasksTask lifecycle and schedulingUseful for operations, but task IDs need governance.
debugDevelopment troubleshootingDo not export in production by default.

My default for production UI progress is updates plus a small number of custom events. I avoid values outside local debugging because it contains graph state.

Add a streaming entrypoint to graph.py

Open src/agent_observability/graph.py. Update the existing collections.abc import so it includes Iterator:

from collections.abc import Callable, Iterator

Then add this code at the bottom of the file, directly after run_agent():

SAFE_UPDATE_FIELDS = {
    "policy_document_ids",
    "memory_record_types",
    "order_state",
    "outcome",
    "escalation_reason",
    "escalation_channel",
    "prompt_name",
    "prompt_version",
    "prompt_label",
    "prompt_source",
    "capture_mode",
    "capture_policy_version",
}


def sanitize_graph_update(event: dict[str, Any]) -> dict[str, Any]:
    sanitized: dict[str, Any] = {}
    for node_name, update in event.items():
        if not isinstance(update, dict):
            continue
        allowed_update = {
            key: value
            for key, value in update.items()
            if key in SAFE_UPDATE_FIELDS
        }
        if allowed_update:
            sanitized[node_name] = allowed_update
    return sanitized


def stream_agent_updates(initial_state: AgentState) -> Iterator[dict[str, Any]]:
    state = dict(initial_state)
    state.setdefault(
        "budget",
        ExecutionBudget(max_iterations=8, max_model_calls=2),
    )
    state.setdefault(
        "progress",
        ProgressTracker(max_repeated_fingerprints=2, seen={}),
    )

    for event in graph.stream(state, stream_mode="updates"):
        sanitized = sanitize_graph_update(event)
        if sanitized:
            yield sanitized

Keep this near the graph boundary. Do not sprinkle streaming logic inside every node. The sanitizer is the boundary between internal graph state and anything that can reach a browser, log, or telemetry backend.

Add a regression test for that boundary in tests/test_streaming.py:

from agent_observability.graph import sanitize_graph_update


def test_stream_update_sanitizer_drops_sensitive_state() -> None:
    sanitized = sanitize_graph_update(
        {
            "compose_answer": {
                "answer": "Raw answer text",
                "query": "Where is ORDER-CANARY-0001?",
                "order_state": "missing",
                "outcome": "escalated",
            }
        }
    )

    assert sanitized == {
        "compose_answer": {
            "order_state": "missing",
            "outcome": "escalated",
        }
    }
    assert "ORDER-CANARY-0001" not in repr(sanitized)

Create the observed streaming wrapper

Create src/agent_observability/streaming.py:

from collections.abc import Iterator
from typing import Any

from opentelemetry.trace import SpanKind, Status, StatusCode

from .graph import AgentState, stream_agent_updates
from .telemetry import tracer


def observed_stream_agent_updates(
    initial_state: AgentState,
) -> Iterator[dict[str, Any]]:
    event_count = 0
    completed = False

    with tracer.start_as_current_span(
        "workflow.stream updates",
        kind=SpanKind.INTERNAL,
        attributes={
            "app.workflow.stream.mode": "updates",
        },
    ) as span:
        try:
            for event in stream_agent_updates(initial_state):
                event_count += 1
                yield event
            completed = True
        except Exception as exc:
            span.record_exception(exc)
            span.set_attribute("error.type", exc.__class__.__name__)
            span.set_status(Status(StatusCode.ERROR, "stream_error"))
            raise
        finally:
            span.set_attribute("app.workflow.stream.event_count", event_count)
            span.set_attribute("app.workflow.stream.completed", completed)

This records stream behavior without storing stream content. That distinction matters. The terminal or UI receives sanitized progress events; Langfuse receives count, mode, and completion status.

Do not export full stream events to spans

Avoid this pattern:

span.set_attribute("app.workflow.stream.event", repr(event))

A stream event can contain user input, retrieved text, memory values, model output, tool payloads, or internal state. If telemetry needs streaming visibility, record lifecycle metadata:

app.workflow.stream.mode = "updates"
app.workflow.stream.event_count = 7
app.workflow.stream.completed = true
app.workflow.stream.dropped_events = 0

Put those attributes on a dedicated workflow.stream span, as this chapter does, or on the root task span if the stream wrapper is too small to justify its own span.

Add a manual graph streaming scenario

Open src/agent_observability/manual_scenarios.py.

Add this import near the other local imports:

from .streaming import observed_stream_agent_updates

Then add this function after the existing run_stream() function. It uses ORDER-404 on purpose so the graph takes the escalation path and does not need a model answer to demonstrate progress streaming.

def run_graph_updates_stream() -> None:
    conversation_id = f"conv_{uuid4().hex}"
    with langfuse_trace_context(
        session_id=conversation_id,
        user_id="usr_pseudo_stream_demo",
        trace_name="manual-graph-updates-stream",
        version=settings.agent_version,
        tags=("order-status", "streaming", "manual-scenario"),
        metadata={
            "environment": settings.deployment_environment,
            "workflow": "order-status",
            "region": "eu",
        },
    ):
        with agent_task_span("order-status-stream", conversation_id) as span:
            for event in observed_stream_agent_updates(
                {
                    "query": "Where is my missing order?",
                    "conversation_id": conversation_id,
                    "order_reference": "ORDER-404",
                    "region": "eu",
                }
            ):
                print(event)

            span.set_attribute("app.manual_scenario", "graph-updates-stream")
            span.set_attribute("app.task.outcome", "streamed")

Finally, add it to the SCENARIOS dictionary:

SCENARIOS: dict[str, Callable[[], None]] = {
    "success": run_success,
    "stream": run_stream,
    "graph-updates-stream": run_graph_updates_stream,
    "retry": run_retry,
    "fallback": run_fallback,
    "retry-classification": run_retry_classification,
    "feedback": run_feedback,
    "negative-signal": run_negative_signal,
    "prompt-management": run_prompt_management,
    "prompt-ab-test": run_prompt_ab_test,
    "prompt-cache-fallback": run_prompt_cache_fallback,
}

Run the streaming scenario

Run:

PYTHONPATH=src python -m agent_observability.manual_scenarios graph-updates-stream

Expected terminal output should be a sequence of sanitized updates. The exact ordering can vary as the graph evolves, but it should look like this:

{'retrieve_policy': {'policy_document_ids': ['policy-eu-shipping-v14', 'policy-eu-escalation-v3']}}
{'read_memory': {'memory_record_types': ['support_preference']}}
{'fetch_order': {'order_state': 'missing'}}
{'classify_outcome': {'outcome': 'escalate'}}
{'escalate': {'outcome': 'escalated', 'escalation_reason': 'missing_order_state', 'escalation_channel': 'support_ticket'}}

You should not see the raw user query, raw memory value, retrieved policy text, tool payload, or full graph state in the terminal output.

Inspect in Langfuse

Open Tracing and find manual-graph-updates-stream.

Expect a trace shaped like:

invoke_agent order-status-stream
  workflow.stream updates
    workflow.node retrieve_policy
    workflow.node read_memory
    workflow.node fetch_order
    workflow.node classify_outcome
    workflow.node escalate

If Chapter 25 has been applied, the escalation subgraph nodes also appear under the escalation path:

workflow.stream updates
  workflow.node escalate
    workflow.node classify_escalation_reason
    workflow.node select_escalation_channel

Inspect workflow.stream updates. It should contain:

app.workflow.stream.mode = "updates"
app.workflow.stream.event_count = <number>
app.workflow.stream.completed = true

It should not contain raw stream events, user query text, full graph state, retrieved text, memory values, or token deltas.

Stream model messages separately

Graph progress streaming and token streaming are different paths.

The existing manual_scenarios stream scenario demonstrates token streaming from the model. It is useful for latency and user experience checks, but it should not be mixed with graph progress events.

For model stream telemetry, prefer timing and completion metadata:

gen_ai.request.stream = true
gen_ai.response.time_to_first_chunk = 0.84
app.openai.stream.completed = true
app.openai.stream.chunk_count = 42

Do not copy token deltas to trace attributes. Chapter 14 already captures safer model-call metadata.

Streaming subgraphs

When subgraphs are involved, progress output can include updates from the parent graph and subgraph. That is useful for UI progress, but it can look noisy if labels are ambiguous.

Use stable display labels in the UI layer:

parent.retrieve_policy
parent.fetch_order
parent.classify_outcome
escalation.classify_escalation_reason
escalation.select_escalation_channel

For telemetry, keep the trace authoritative. The stream is for live consumption; the trace is for incident analysis.

What should exist before the appendix

At this point the demo should have:

  • a sanitized stream_agent_updates() entrypoint in src/agent_observability/graph.py;
  • src/agent_observability/streaming.py with observed_stream_agent_updates();
  • a graph-updates-stream manual scenario;
  • a workflow.stream updates span with bounded lifecycle metadata;
  • a streaming sanitizer regression test in tests/test_streaming.py;
  • no full graph state exported to telemetry;
  • separate handling for model token streaming and graph progress streaming;
  • a Langfuse trace named manual-graph-updates-stream that remains readable.

The appendix collects the terminology and references used throughout the series.

References


Appendix: Glossary and References collects terminology, standards, implementation documentation, and research used throughout the series.