Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.agno.com/llms.txt

Use this file to discover all available pages before exploring further.

Run agents, teams, and workflows in the background by passing background=True to .arun(). Execution continues even if the client disconnects. The behavior depends on whether you also set stream=True.

Execution Modes

backgroundstreamBehavior
FalseTrueDefault streaming. Runs inline. Client disconnect cancels the run.
FalseFalseNon-streaming. Returns full response.
TrueFalseFire-and-forget. Returns PENDING immediately. Poll for results.
TrueTrueResumable streaming. Runs in a detached task. Events are buffered. Reconnect via /resume.
Background execution requires a database (db) on the agent, team, or workflow for persisting run state.

Fire-and-Forget

Start a background run and poll for the result. Works identically for agents, teams, and workflows.
import asyncio

from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.models.openai import OpenAIResponses
from agno.run.base import RunStatus

db = PostgresDb(
    db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
    session_table="background_exec_sessions",
)

agent = Agent(
    name="BackgroundAgent",
    model=OpenAIResponses(id="gpt-5-mini"),
    db=db,
)

async def main():
    # Returns immediately with PENDING status
    run_output = await agent.arun(
        "Write a short analysis of quantum computing trends.",
        background=True,
    )
    print(f"Run ID: {run_output.run_id}, Status: {run_output.status}")

    # Poll until complete
    for _ in range(60):
        await asyncio.sleep(1)
        result = await agent.aget_run_output(
            run_id=run_output.run_id,
            session_id=run_output.session_id,
        )
        if result and result.status == RunStatus.completed:
            print(f"Done: {result.content}")
            break

asyncio.run(main())

Resumable Streaming (SSE)

Combine background=True with stream=True for resumable SSE streaming. The run executes in a detached asyncio.Task that survives client disconnects. Events are buffered with sequential event_index values so clients can reconnect without losing events.

How It Works

Client connects     → StreamingResponse reads from queue ← Background task runs
Client disconnects  → StreamingResponse cancelled         ← Background task keeps running
Client reconnects   → /resume reads from subscriber queue ← Background task still publishing
  1. The run persists RUNNING status in the database
  2. A detached asyncio.Task executes and publishes events to an in-memory buffer
  3. The client receives SSE events, each containing an event_index and run_id
  4. On disconnect, the client records last_event_index
  5. On reconnect, the client calls /resume with last_event_index to catch up on missed events

Starting a Resumable Stream

Resumable streaming requires a running AgentOS server. Pass background=true and stream=true in the request. The pattern is the same for agents, teams, and workflows. Only the URL path differs.
Workflows also support WebSocket-based reconnection. See the WebSocket reconnect example.
import asyncio
import json
import httpx

BASE_URL = "http://localhost:7777"

async def start_resumable_stream():
    async with httpx.AsyncClient(base_url=BASE_URL, timeout=60) as client:
        # Use /agents, /teams, or /workflows
        agents = (await client.get("/agents")).json()
        agent_id = agents[0]["id"]

        form_data = {
            "message": "Write a detailed story about a brave knight.",
            "stream": "true",
            "background": "true",
        }

        run_id = None
        session_id = None
        last_event_index = None

        async with client.stream("POST", f"/agents/{agent_id}/runs", data=form_data) as response:
            buffer = ""
            async for chunk in response.aiter_text():
                buffer += chunk
                while "\n\n" in buffer:
                    event_str, buffer = buffer.split("\n\n", 1)
                    for line in event_str.strip().split("\n"):
                        if not line.startswith("data: "):
                            continue
                        data = json.loads(line[6:])

                        # Track identifiers for reconnection
                        if data.get("run_id") and not run_id:
                            run_id = data["run_id"]
                        if data.get("session_id") and not session_id:
                            session_id = data["session_id"]
                        if data.get("event_index") is not None:
                            last_event_index = data["event_index"]

                        print(f"[{data.get('event_index')}] {data.get('event')}: {str(data.get('content', ''))[:60]}")

        return run_id, session_id, last_event_index

asyncio.run(start_resumable_stream())
Each SSE event includes:
  • event_index: Sequential integer for ordering and resumption
  • run_id: The run identifier for reconnection
  • session_id: The session identifier

Reconnecting via /resume

On disconnect (page refresh, network loss), reconnect to /resume with the last event_index:
async def resume_stream(agent_id: str, run_id: str, session_id: str, last_event_index: int):
    form_data = {"last_event_index": str(last_event_index)}
    if session_id:
        form_data["session_id"] = session_id

    async with httpx.AsyncClient(base_url=BASE_URL, timeout=120) as client:
        async with client.stream(
            "POST", f"/agents/{agent_id}/runs/{run_id}/resume", data=form_data
        ) as response:
            buffer = ""
            async for chunk in response.aiter_text():
                buffer += chunk
                while "\n\n" in buffer:
                    event_str, buffer = buffer.split("\n\n", 1)
                    for line in event_str.strip().split("\n"):
                        if not line.startswith("data: "):
                            continue
                        data = json.loads(line[6:])
                        event_type = data.get("event")

                        if event_type in ("catch_up", "replay", "subscribed"):
                            print(f"[META] {event_type}: {data}")
                        else:
                            print(f"[{data.get('event_index')}] {event_type}: {str(data.get('content', ''))[:60]}")

Resume Endpoints

The resume endpoint follows the same pattern for agents, teams, and workflows:
POST /agents/{agent_id}/runs/{run_id}/resume
POST /teams/{team_id}/runs/{run_id}/resume
POST /workflows/{workflow_id}/runs/{run_id}/resume

Content-Type: multipart/form-data
last_event_index=N&session_id=S
Resume behavior depends on run state:
ScenarioConditionBehavior
Catch up + liveRun still active in bufferReplays missed events, then streams live events
ReplayRun completed, still in bufferReplays all missed events
DB fallbackBuffer expired (30 min)Falls back to database

Meta Events

The /resume stream includes meta events before data events:
EventMeaning
catch_upRun still active. Missed events follow, then live events.
replayRun already completed. All missed events follow.
subscribedCatch-up complete. Now receiving live events.
errorRun not found or other issue.

Multi-Container Deployments

The detached task and event buffer live in-process on the instance that started the run. In a multi-replica setup, a /resume request that lands on a different instance misses the buffer and falls back to the database (no live tail until the run completes). Route /resume requests by run_id from the URL path (sticky session / consistent hashing at the load balancer) so they reach the originating instance. Only /resume needs affinity. The initial run-start request can hit any instance.

Developer Resources