Test SSE stream reconnection for team runs usingDocumentation Index
Fetch the complete documentation index at: https://docs.agno.com/llms.txt
Use this file to discover all available pages before exploring further.
background=True, stream=True. The team runs in a detached task that survives client disconnections. Events are buffered so the client can reconnect via /resume and catch up on missed events.
Prerequisites
- An AgentOS server running on
http://localhost:7777with at least one team registered (e.g.,python cookbook/05_agent_os/basic.py). - The team must have a
dbconfigured. Background runs require persistent storage. - Run the script.
import asyncio
import json
from typing import Optional
import httpx
BASE_URL = "http://localhost:7777"
EVENTS_BEFORE_DISCONNECT = 6
DISCONNECT_DURATION = 3
def parse_sse_line(line: str) -> Optional[dict]:
if line.startswith("data: "):
try:
return json.loads(line[6:])
except json.JSONDecodeError:
return None
return None
async def test_team_sse_reconnection():
print("=" * 70)
print("Team SSE Reconnection Test")
print("=" * 70)
# Discover a team
async with httpx.AsyncClient(base_url=BASE_URL, timeout=30) as client:
resp = await client.get("/teams")
resp.raise_for_status()
teams = resp.json()
if not teams:
print("No teams available")
return
team_id = teams[0]["id"]
print(f"Using team: {team_id}")
# Phase 1: Start streaming, disconnect after a few events
run_id: Optional[str] = None
session_id: Optional[str] = None
last_event_index: Optional[int] = None
events_phase1: list[dict] = []
print(f"\nPhase 1: Starting SSE stream, will disconnect after {EVENTS_BEFORE_DISCONNECT} events...")
async with httpx.AsyncClient(base_url=BASE_URL, timeout=60) as client:
form_data = {
"message": "Tell me a detailed story about a brave knight. Make it at least 5 paragraphs.",
"stream": "true",
"background": "true",
}
async with client.stream("POST", f"/teams/{team_id}/runs", data=form_data) as response:
event_count = 0
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
while "\n\n" in buffer:
event_str, buffer = buffer.split("\n\n", 1)
for line in event_str.strip().split("\n"):
data = parse_sse_line(line)
if data is None:
continue
if data.get("run_id") and not run_id:
run_id = data["run_id"]
if data.get("session_id") and not session_id:
session_id = data["session_id"]
if data.get("event_index") is not None:
last_event_index = data["event_index"]
events_phase1.append(data)
event_count += 1
print(f" [{event_count}] event={data.get('event')} index={data.get('event_index')}")
if event_count >= EVENTS_BEFORE_DISCONNECT:
break
if event_count >= EVENTS_BEFORE_DISCONNECT:
break
if event_count >= EVENTS_BEFORE_DISCONNECT:
break
print(f"\n[DISCONNECT] run_id={run_id}, last_event_index={last_event_index}")
if not run_id:
print("Could not determine run_id")
return
print(f"\nSimulating disconnect for {DISCONNECT_DURATION}s...")
await asyncio.sleep(DISCONNECT_DURATION)
# Phase 2: Resume via /resume endpoint
print("\nPhase 2: Reconnecting via /resume...")
events_phase2: list[dict] = []
form_data: dict = {}
if last_event_index is not None:
form_data["last_event_index"] = str(last_event_index)
if session_id:
form_data["session_id"] = session_id
async with httpx.AsyncClient(base_url=BASE_URL, timeout=120) as client:
async with client.stream(
"POST", f"/teams/{team_id}/runs/{run_id}/resume", data=form_data
) as response:
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
while "\n\n" in buffer:
event_str, buffer = buffer.split("\n\n", 1)
for line in event_str.strip().split("\n"):
data = parse_sse_line(line)
if data is None:
continue
events_phase2.append(data)
event_type = data.get("event")
if event_type in ("catch_up", "replay", "subscribed"):
print(f" [META] {event_type}")
else:
print(f" [RESUME] event={event_type} index={data.get('event_index')}")
data_events = [e for e in events_phase2 if e.get("event") not in ("catch_up", "replay", "subscribed", "error")]
print(f"\nPhase 1: {len(events_phase1)} events | Phase 2: {len(data_events)} data events")
print(f"Total: {len(events_phase1) + len(data_events)} events (no events lost)")
if __name__ == "__main__":
asyncio.run(test_team_sse_reconnection())
Run the Example
git clone https://github.com/agno-agi/agno.git
cd agno
uv pip install -U agno httpx
python cookbook/05_agent_os/client/11_team_sse_reconnect.py