Skip to content

Streaming

Agiwo is streaming-first. All LLM responses flow through the same streaming pipeline, whether you use run(), run_stream(), start(), or scheduler route_root_input().

async for event in agent.run_stream("Explain recursion in one sentence."):
if event.type == "step_delta" and event.delta.content:
print(event.delta.content, end="", flush=True)
handle = agent.start("Write a three-line release note.")
async for event in handle.stream():
if event.type == "step_delta" and event.delta.content:
print(event.delta.content, end="", flush=True)
result = await handle.wait()
from agiwo.scheduler import Scheduler
async with Scheduler() as scheduler:
route = await scheduler.route_root_input(
"Research topic X",
agent=agent,
persistent=False,
)
assert route.stream is not None
async for event in route.stream:
if event.type == "step_delta" and event.delta.content:
print(event.delta.content, end="", flush=True)

For an existing root:

route = await scheduler.route_root_input(
"Continue the analysis",
agent=agent,
state_id=existing_state_id,
)
assert route.stream is not None
async for event in route.stream:
process(event)

Only one live stream subscriber is allowed per root state_id. If you steer a currently RUNNING root, RouteResult.stream is None because the existing subscriber keeps consuming that root’s stream.

AgentStreamItem can represent:

  • run_started
  • step_delta
  • step_completed
  • messages_rebuilt
  • compaction_applied
  • compaction_failed
  • retrospect_applied
  • termination_decided
  • run_rolled_back
  • run_completed
  • run_failed

This lets one stream consumer work across direct agent runs and scheduler-managed runs.