Streaming
Streaming
Section titled “Streaming”Agiwo is streaming-first. All LLM responses flow through the same streaming
pipeline, whether you use run(), run_stream(), start(), or scheduler
route_root_input().
Use run_stream() for the simple path
Section titled “Use run_stream() for the simple path”async for event in agent.run_stream("Explain recursion in one sentence."): if event.type == "step_delta" and event.delta.content: print(event.delta.content, end="", flush=True)Use start() when you need control
Section titled “Use start() when you need control”handle = agent.start("Write a three-line release note.")
async for event in handle.stream(): if event.type == "step_delta" and event.delta.content: print(event.delta.content, end="", flush=True)
result = await handle.wait()Stream from the scheduler
Section titled “Stream from the scheduler”from agiwo.scheduler import Scheduler
async with Scheduler() as scheduler: route = await scheduler.route_root_input( "Research topic X", agent=agent, persistent=False, ) assert route.stream is not None
async for event in route.stream: if event.type == "step_delta" and event.delta.content: print(event.delta.content, end="", flush=True)For an existing root:
route = await scheduler.route_root_input( "Continue the analysis", agent=agent, state_id=existing_state_id,)assert route.stream is not None
async for event in route.stream: process(event)Only one live stream subscriber is allowed per root state_id. If you steer a
currently RUNNING root, RouteResult.stream is None because the existing
subscriber keeps consuming that root’s stream.
Event shapes
Section titled “Event shapes”AgentStreamItem can represent:
run_startedstep_deltastep_completedmessages_rebuiltcompaction_appliedcompaction_failedretrospect_appliedtermination_decidedrun_rolled_backrun_completedrun_failed
This lets one stream consumer work across direct agent runs and scheduler-managed runs.