Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.agno.com/llms.txt

Use this file to discover all available pages before exploring further.

Test SSE stream reconnection for team runs using background=True, stream=True. The team runs in a detached task that survives client disconnections. Events are buffered so the client can reconnect via /resume and catch up on missed events.

Prerequisites

  1. An AgentOS server running on http://localhost:7777 with at least one team registered (e.g., python cookbook/05_agent_os/basic.py).
  2. The team must have a db configured. Background runs require persistent storage.
  3. Run the script.
import asyncio
import json
from typing import Optional

import httpx

BASE_URL = "http://localhost:7777"
EVENTS_BEFORE_DISCONNECT = 6
DISCONNECT_DURATION = 3


def parse_sse_line(line: str) -> Optional[dict]:
    if line.startswith("data: "):
        try:
            return json.loads(line[6:])
        except json.JSONDecodeError:
            return None
    return None


async def test_team_sse_reconnection():
    print("=" * 70)
    print("Team SSE Reconnection Test")
    print("=" * 70)

    # Discover a team
    async with httpx.AsyncClient(base_url=BASE_URL, timeout=30) as client:
        resp = await client.get("/teams")
        resp.raise_for_status()
        teams = resp.json()
        if not teams:
            print("No teams available")
            return
        team_id = teams[0]["id"]
        print(f"Using team: {team_id}")

    # Phase 1: Start streaming, disconnect after a few events
    run_id: Optional[str] = None
    session_id: Optional[str] = None
    last_event_index: Optional[int] = None
    events_phase1: list[dict] = []

    print(f"\nPhase 1: Starting SSE stream, will disconnect after {EVENTS_BEFORE_DISCONNECT} events...")

    async with httpx.AsyncClient(base_url=BASE_URL, timeout=60) as client:
        form_data = {
            "message": "Tell me a detailed story about a brave knight. Make it at least 5 paragraphs.",
            "stream": "true",
            "background": "true",
        }
        async with client.stream("POST", f"/teams/{team_id}/runs", data=form_data) as response:
            event_count = 0
            buffer = ""
            async for chunk in response.aiter_text():
                buffer += chunk
                while "\n\n" in buffer:
                    event_str, buffer = buffer.split("\n\n", 1)
                    for line in event_str.strip().split("\n"):
                        data = parse_sse_line(line)
                        if data is None:
                            continue
                        if data.get("run_id") and not run_id:
                            run_id = data["run_id"]
                        if data.get("session_id") and not session_id:
                            session_id = data["session_id"]
                        if data.get("event_index") is not None:
                            last_event_index = data["event_index"]

                        events_phase1.append(data)
                        event_count += 1
                        print(f"  [{event_count}] event={data.get('event')} index={data.get('event_index')}")

                        if event_count >= EVENTS_BEFORE_DISCONNECT:
                            break
                    if event_count >= EVENTS_BEFORE_DISCONNECT:
                        break
                if event_count >= EVENTS_BEFORE_DISCONNECT:
                    break

    print(f"\n[DISCONNECT] run_id={run_id}, last_event_index={last_event_index}")

    if not run_id:
        print("Could not determine run_id")
        return

    print(f"\nSimulating disconnect for {DISCONNECT_DURATION}s...")
    await asyncio.sleep(DISCONNECT_DURATION)

    # Phase 2: Resume via /resume endpoint
    print("\nPhase 2: Reconnecting via /resume...")
    events_phase2: list[dict] = []

    form_data: dict = {}
    if last_event_index is not None:
        form_data["last_event_index"] = str(last_event_index)
    if session_id:
        form_data["session_id"] = session_id

    async with httpx.AsyncClient(base_url=BASE_URL, timeout=120) as client:
        async with client.stream(
            "POST", f"/teams/{team_id}/runs/{run_id}/resume", data=form_data
        ) as response:
            buffer = ""
            async for chunk in response.aiter_text():
                buffer += chunk
                while "\n\n" in buffer:
                    event_str, buffer = buffer.split("\n\n", 1)
                    for line in event_str.strip().split("\n"):
                        data = parse_sse_line(line)
                        if data is None:
                            continue
                        events_phase2.append(data)
                        event_type = data.get("event")

                        if event_type in ("catch_up", "replay", "subscribed"):
                            print(f"  [META] {event_type}")
                        else:
                            print(f"  [RESUME] event={event_type} index={data.get('event_index')}")

    data_events = [e for e in events_phase2 if e.get("event") not in ("catch_up", "replay", "subscribed", "error")]
    print(f"\nPhase 1: {len(events_phase1)} events | Phase 2: {len(data_events)} data events")
    print(f"Total: {len(events_phase1) + len(data_events)} events (no events lost)")


if __name__ == "__main__":
    asyncio.run(test_team_sse_reconnection())

Run the Example

git clone https://github.com/agno-agi/agno.git
cd agno

uv pip install -U agno httpx

python cookbook/05_agent_os/client/11_team_sse_reconnect.py