Streaming Graph Updates Without Leaking State
Streaming is another output path. It can improve the user experience, but it can also leak graph state through a browser event, log line, or span attribute before anyone notices.
By this point in the series, the trace shape is stable, content capture is restricted, Langfuse context is present, prompt identity is tracked, scores exist, datasets can gate releases, and Chapter 25 introduced a subgraph boundary. Now it is safe to add graph streaming.
This chapter adds a streaming entrypoint to the demo and a manual scenario that prints sanitized graph progress while Langfuse records only stream lifecycle metadata.
What we will change
Work in the demo project:
cd agent-observability-demo
This chapter touches three files:
| File | What to do |
|---|---|
src/agent_observability/graph.py | Add a sanitized stream_agent_updates() entrypoint beside run_agent(). |
src/agent_observability/streaming.py | Create an observed streaming wrapper with bounded span attributes. |
src/agent_observability/manual_scenarios.py | Add a graph-updates-stream manual scenario. |
The existing manual_scenarios stream scenario streams model tokens. This chapter adds graph update streaming. Keep those concepts separate.
Pick the stream mode by consumer
LangGraph supports several stream modes. Do not use the same stream for every consumer.
| Stream mode | Good for | Risk |
|---|---|---|
updates | UI progress by node and workflow lifecycle | Can reveal updated field names and partial outputs. |
values | Local debugging with full state snapshots | Usually too much data for telemetry or browser clients. |
messages | Token streaming from model calls | User-visible content, needs the content policy from earlier chapters. |
custom | Explicit allowlisted events | Safe only when the event schema is controlled. |
checkpoints | Durable execution visibility | Can reveal persistence timing and state metadata. |
tasks | Task lifecycle and scheduling | Useful for operations, but task IDs need governance. |
debug | Development troubleshooting | Do not export in production by default. |
My default for production UI progress is updates plus a small number of custom events. I avoid values outside local debugging because it contains graph state.
Add a streaming entrypoint to graph.py
Open src/agent_observability/graph.py. Update the existing collections.abc import so it includes Iterator:
from collections.abc import Callable, Iterator
Then add this code at the bottom of the file, directly after run_agent():
SAFE_UPDATE_FIELDS = {
"policy_document_ids",
"memory_record_types",
"order_state",
"outcome",
"escalation_reason",
"escalation_channel",
"prompt_name",
"prompt_version",
"prompt_label",
"prompt_source",
"capture_mode",
"capture_policy_version",
}
def sanitize_graph_update(event: dict[str, Any]) -> dict[str, Any]:
sanitized: dict[str, Any] = {}
for node_name, update in event.items():
if not isinstance(update, dict):
continue
allowed_update = {
key: value
for key, value in update.items()
if key in SAFE_UPDATE_FIELDS
}
if allowed_update:
sanitized[node_name] = allowed_update
return sanitized
def stream_agent_updates(initial_state: AgentState) -> Iterator[dict[str, Any]]:
state = dict(initial_state)
state.setdefault(
"budget",
ExecutionBudget(max_iterations=8, max_model_calls=2),
)
state.setdefault(
"progress",
ProgressTracker(max_repeated_fingerprints=2, seen={}),
)
for event in graph.stream(state, stream_mode="updates"):
sanitized = sanitize_graph_update(event)
if sanitized:
yield sanitized
Keep this near the graph boundary. Do not sprinkle streaming logic inside every node. The sanitizer is the boundary between internal graph state and anything that can reach a browser, log, or telemetry backend.
Add a regression test for that boundary in tests/test_streaming.py:
from agent_observability.graph import sanitize_graph_update
def test_stream_update_sanitizer_drops_sensitive_state() -> None:
sanitized = sanitize_graph_update(
{
"compose_answer": {
"answer": "Raw answer text",
"query": "Where is ORDER-CANARY-0001?",
"order_state": "missing",
"outcome": "escalated",
}
}
)
assert sanitized == {
"compose_answer": {
"order_state": "missing",
"outcome": "escalated",
}
}
assert "ORDER-CANARY-0001" not in repr(sanitized)
Create the observed streaming wrapper
Create src/agent_observability/streaming.py:
from collections.abc import Iterator
from typing import Any
from opentelemetry.trace import SpanKind, Status, StatusCode
from .graph import AgentState, stream_agent_updates
from .telemetry import tracer
def observed_stream_agent_updates(
initial_state: AgentState,
) -> Iterator[dict[str, Any]]:
event_count = 0
completed = False
with tracer.start_as_current_span(
"workflow.stream updates",
kind=SpanKind.INTERNAL,
attributes={
"app.workflow.stream.mode": "updates",
},
) as span:
try:
for event in stream_agent_updates(initial_state):
event_count += 1
yield event
completed = True
except Exception as exc:
span.record_exception(exc)
span.set_attribute("error.type", exc.__class__.__name__)
span.set_status(Status(StatusCode.ERROR, "stream_error"))
raise
finally:
span.set_attribute("app.workflow.stream.event_count", event_count)
span.set_attribute("app.workflow.stream.completed", completed)
This records stream behavior without storing stream content. That distinction matters. The terminal or UI receives sanitized progress events; Langfuse receives count, mode, and completion status.
Do not export full stream events to spans
Avoid this pattern:
span.set_attribute("app.workflow.stream.event", repr(event))
A stream event can contain user input, retrieved text, memory values, model output, tool payloads, or internal state. If telemetry needs streaming visibility, record lifecycle metadata:
app.workflow.stream.mode = "updates"
app.workflow.stream.event_count = 7
app.workflow.stream.completed = true
app.workflow.stream.dropped_events = 0
Put those attributes on a dedicated workflow.stream span, as this chapter does, or on the root task span if the stream wrapper is too small to justify its own span.
Add a manual graph streaming scenario
Open src/agent_observability/manual_scenarios.py.
Add this import near the other local imports:
from .streaming import observed_stream_agent_updates
Then add this function after the existing run_stream() function. It uses ORDER-404 on purpose so the graph takes the escalation path and does not need a model answer to demonstrate progress streaming.
def run_graph_updates_stream() -> None:
conversation_id = f"conv_{uuid4().hex}"
with langfuse_trace_context(
session_id=conversation_id,
user_id="usr_pseudo_stream_demo",
trace_name="manual-graph-updates-stream",
version=settings.agent_version,
tags=("order-status", "streaming", "manual-scenario"),
metadata={
"environment": settings.deployment_environment,
"workflow": "order-status",
"region": "eu",
},
):
with agent_task_span("order-status-stream", conversation_id) as span:
for event in observed_stream_agent_updates(
{
"query": "Where is my missing order?",
"conversation_id": conversation_id,
"order_reference": "ORDER-404",
"region": "eu",
}
):
print(event)
span.set_attribute("app.manual_scenario", "graph-updates-stream")
span.set_attribute("app.task.outcome", "streamed")
Finally, add it to the SCENARIOS dictionary:
SCENARIOS: dict[str, Callable[[], None]] = {
"success": run_success,
"stream": run_stream,
"graph-updates-stream": run_graph_updates_stream,
"retry": run_retry,
"fallback": run_fallback,
"retry-classification": run_retry_classification,
"feedback": run_feedback,
"negative-signal": run_negative_signal,
"prompt-management": run_prompt_management,
"prompt-ab-test": run_prompt_ab_test,
"prompt-cache-fallback": run_prompt_cache_fallback,
}
Run the streaming scenario
Run:
PYTHONPATH=src python -m agent_observability.manual_scenarios graph-updates-stream
Expected terminal output should be a sequence of sanitized updates. The exact ordering can vary as the graph evolves, but it should look like this:
{'retrieve_policy': {'policy_document_ids': ['policy-eu-shipping-v14', 'policy-eu-escalation-v3']}}
{'read_memory': {'memory_record_types': ['support_preference']}}
{'fetch_order': {'order_state': 'missing'}}
{'classify_outcome': {'outcome': 'escalate'}}
{'escalate': {'outcome': 'escalated', 'escalation_reason': 'missing_order_state', 'escalation_channel': 'support_ticket'}}
You should not see the raw user query, raw memory value, retrieved policy text, tool payload, or full graph state in the terminal output.
Inspect in Langfuse
Open Tracing and find manual-graph-updates-stream.
Expect a trace shaped like:
invoke_agent order-status-stream
workflow.stream updates
workflow.node retrieve_policy
workflow.node read_memory
workflow.node fetch_order
workflow.node classify_outcome
workflow.node escalate
If Chapter 25 has been applied, the escalation subgraph nodes also appear under the escalation path:
workflow.stream updates
workflow.node escalate
workflow.node classify_escalation_reason
workflow.node select_escalation_channel
Inspect workflow.stream updates. It should contain:
app.workflow.stream.mode = "updates"
app.workflow.stream.event_count = <number>
app.workflow.stream.completed = true
It should not contain raw stream events, user query text, full graph state, retrieved text, memory values, or token deltas.
Stream model messages separately
Graph progress streaming and token streaming are different paths.
The existing manual_scenarios stream scenario demonstrates token streaming from the model. It is useful for latency and user experience checks, but it should not be mixed with graph progress events.
For model stream telemetry, prefer timing and completion metadata:
gen_ai.request.stream = true
gen_ai.response.time_to_first_chunk = 0.84
app.openai.stream.completed = true
app.openai.stream.chunk_count = 42
Do not copy token deltas to trace attributes. Chapter 14 already captures safer model-call metadata.
Streaming subgraphs
When subgraphs are involved, progress output can include updates from the parent graph and subgraph. That is useful for UI progress, but it can look noisy if labels are ambiguous.
Use stable display labels in the UI layer:
parent.retrieve_policy
parent.fetch_order
parent.classify_outcome
escalation.classify_escalation_reason
escalation.select_escalation_channel
For telemetry, keep the trace authoritative. The stream is for live consumption; the trace is for incident analysis.
What should exist before the appendix
At this point the demo should have:
- a sanitized
stream_agent_updates()entrypoint insrc/agent_observability/graph.py; src/agent_observability/streaming.pywithobserved_stream_agent_updates();- a
graph-updates-streammanual scenario; - a
workflow.stream updatesspan with bounded lifecycle metadata; - a streaming sanitizer regression test in
tests/test_streaming.py; - no full graph state exported to telemetry;
- separate handling for model token streaming and graph progress streaming;
- a Langfuse trace named
manual-graph-updates-streamthat remains readable.
The appendix collects the terminology and references used throughout the series.
References
Appendix: Glossary and References collects terminology, standards, implementation documentation, and research used throughout the series.