Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.agno.com/llms.txt

Use this file to discover all available pages before exploring further.

A single agent.run(files=[File(...)]) handles one document. Production runs see folders, queues, and nightly drops. Three primitives cover the production shape: concurrent runs for ad-hoc batches, background runs for long jobs, scheduled runs for nightly intake.

Concurrent batch over a list

The simplest batch is a folder of files. agent.arun is async, so a semaphore plus asyncio.gather is enough.
import asyncio
from pathlib import Path

from agno.agent import Agent
from agno.media import File
from agno.models.openai import OpenAIResponses
from pydantic import BaseModel

from your_schemas import Invoice  # define your output schema once


agent = Agent(
    model=OpenAIResponses(id="gpt-5.5"),
    instructions="Extract invoice fields and line items. Null for missing.",
    output_schema=Invoice,
)


async def extract_one(path: Path, sem: asyncio.Semaphore) -> Invoice:
    async with sem:
        run = await agent.arun(
            "Extract this invoice.",
            files=[File(filepath=str(path))],
        )
        return run.content


async def extract_folder(folder: Path, concurrency: int = 8) -> list[Invoice]:
    sem = asyncio.Semaphore(concurrency)
    paths = sorted(folder.glob("*.pdf"))
    return await asyncio.gather(*(extract_one(p, sem) for p in paths))


invoices = asyncio.run(extract_folder(Path("./incoming-invoices")))
# [Invoice(invoice_number='1042', ...), Invoice(invoice_number='1043', ...), ...]
A semaphore is the smallest concurrency control. It keeps you under the provider’s rate limit and bounds memory. Tune the concurrency to the slowest of: your rate quota, your DB write throughput, your memory budget.

Background runs for long jobs

Sync runs hold an HTTP connection until the model returns. For multi-page contracts or scanned PDFs that take minutes, start the run in the background and poll.
import asyncio

from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.models.openai import OpenAIResponses
from agno.run.base import RunStatus

db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai")
agent = Agent(model=OpenAIResponses(id="gpt-5.5"), db=db, output_schema=Invoice)


async def extract_long(file_url: str) -> Invoice:
    started = await agent.arun(
        "Extract this invoice.",
        files=[File(url=file_url)],
        background=True,
    )
    # started.status is RunStatus.pending; the work continues in the background.

    while True:
        await asyncio.sleep(2)
        run = await agent.aget_run_output(
            run_id=started.run_id,
            session_id=started.session_id,
        )
        if run is None:
            continue
        if run.status == RunStatus.completed:
            return run.content
        if run.status == RunStatus.error:
            raise RuntimeError(f"Run {started.run_id} failed")
The background run is persisted in db. The agent process can restart and a different process can poll the same run_id. That is the durability property: state lives in the database, not in the calling process.

Scheduled batch with retries

For nightly intake (an SFTP drop, a Drive folder, a queue), put an AgentOS in front of your agent and let the scheduler fire the run on cron.
from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.models.openai import OpenAIResponses
from agno.os import AgentOS

db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai")

extractor = Agent(
    id="invoice-extractor",
    model=OpenAIResponses(id="gpt-5.5"),
    db=db,
    output_schema=Invoice,
)

agent_os = AgentOS(
    agents=[extractor],
    db=db,
    scheduler=True,
    scheduler_poll_interval=15,    # check for due jobs every N seconds
)
app = agent_os.get_app()
Then create the schedule in Python. ScheduleManager writes to the same db the AgentOS polls.
from agno.scheduler import ScheduleManager

mgr = ScheduleManager(db)

mgr.create(
    name="nightly-invoice-intake",
    cron="0 2 * * *",                 # 2am every day
    endpoint="/agents/invoice-extractor/runs",
    payload={"message": "Process the overnight invoice drop."},
    timezone="America/New_York",
    max_retries=2,
    retry_delay_seconds=300,
    if_exists="update",
)
if_exists="update" makes the call idempotent. Re-running the bootstrap script does not create duplicates. The scheduler retries on HTTP failure with the configured delay, and every fire writes a row to agno_schedule_runs with status and timing.

Pattern comparison

PatternWhen to reach for itProcess lifetime
asyncio.gather over agent.arunOne-time backfill, a fixed list of filesOne process, end-to-end
agent.arun(background=True) + pollSingle long document, restart-tolerantState in db, process can restart
AgentOS(scheduler=True) + ScheduleManagerRecurring intake (nightly, hourly)Long-running AgentOS process
Workflow with Loop / Parallel stepsMulti-step pipelines per documentEither ad-hoc or scheduled
The scheduler fires endpoints. Endpoints are agents, teams, or workflows. So a nightly job that ingests a folder, extracts each file, and writes to your warehouse is a workflow exposed at /workflows/<id>/runs, scheduled with the same ScheduleManager.create call. See Workflows.

Observability

Every scheduled fire creates a row in agno_schedule_runs with the schedule id, attempt number, status, and the run_id of the underlying agent run. To see the last day of activity:
runs = mgr.get_runs(schedule_id, limit=100)
for r in runs:
    print(r.triggered_at, r.status, r.attempt, r.error or "")
Failed attempts keep their error text. Retries are separate rows with the same schedule_id and an incrementing attempt. That is the audit trail you can hand to ops.

Production checklist

ConcernWhat to add
Idempotency per documentPass a deterministic session_id (e.g. document hash) so re-runs upsert.
Dead-letter queueAfter max_retries, the row stays in agno_schedule_runs with status="failed". Read it and route to a manual queue.
Per-provider rate limitingThe asyncio.Semaphore is enough for one provider. For mixed providers, run one semaphore per provider.
Storage of inputsFile(url=...) keeps the URL but not the bytes. If retention matters, store the source PDF before extraction.
Authoritative costRunMetrics.cost is populated when the provider returns it. For exact reconciliation, attach a token-rate table downstream.

Next steps

TaskGuide
Pause on low-confidence fieldsHuman routing and eval
Compose multiple agents into a pipelineWorkflows
See the workflow + scheduler integrationScheduling

Developer Resources