Documentation Index
Fetch the complete documentation index at: https://docs.agno.com/llms.txt
Use this file to discover all available pages before exploring further.
A single agent.run(files=[File(...)]) handles one document. Production runs see folders, queues, and nightly drops. Three primitives cover the production shape: concurrent runs for ad-hoc batches, background runs for long jobs, scheduled runs for nightly intake.
Concurrent batch over a list
The simplest batch is a folder of files. agent.arun is async, so a semaphore plus asyncio.gather is enough.
import asyncio
from pathlib import Path
from agno.agent import Agent
from agno.media import File
from agno.models.openai import OpenAIResponses
from pydantic import BaseModel
from your_schemas import Invoice # define your output schema once
agent = Agent(
model=OpenAIResponses(id="gpt-5.5"),
instructions="Extract invoice fields and line items. Null for missing.",
output_schema=Invoice,
)
async def extract_one(path: Path, sem: asyncio.Semaphore) -> Invoice:
async with sem:
run = await agent.arun(
"Extract this invoice.",
files=[File(filepath=str(path))],
)
return run.content
async def extract_folder(folder: Path, concurrency: int = 8) -> list[Invoice]:
sem = asyncio.Semaphore(concurrency)
paths = sorted(folder.glob("*.pdf"))
return await asyncio.gather(*(extract_one(p, sem) for p in paths))
invoices = asyncio.run(extract_folder(Path("./incoming-invoices")))
# [Invoice(invoice_number='1042', ...), Invoice(invoice_number='1043', ...), ...]
A semaphore is the smallest concurrency control. It keeps you under the provider’s rate limit and bounds memory. Tune the concurrency to the slowest of: your rate quota, your DB write throughput, your memory budget.
Background runs for long jobs
Sync runs hold an HTTP connection until the model returns. For multi-page contracts or scanned PDFs that take minutes, start the run in the background and poll.
import asyncio
from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.models.openai import OpenAIResponses
from agno.run.base import RunStatus
db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai")
agent = Agent(model=OpenAIResponses(id="gpt-5.5"), db=db, output_schema=Invoice)
async def extract_long(file_url: str) -> Invoice:
started = await agent.arun(
"Extract this invoice.",
files=[File(url=file_url)],
background=True,
)
# started.status is RunStatus.pending; the work continues in the background.
while True:
await asyncio.sleep(2)
run = await agent.aget_run_output(
run_id=started.run_id,
session_id=started.session_id,
)
if run is None:
continue
if run.status == RunStatus.completed:
return run.content
if run.status == RunStatus.error:
raise RuntimeError(f"Run {started.run_id} failed")
The background run is persisted in db. The agent process can restart and a different process can poll the same run_id. That is the durability property: state lives in the database, not in the calling process.
Scheduled batch with retries
For nightly intake (an SFTP drop, a Drive folder, a queue), put an AgentOS in front of your agent and let the scheduler fire the run on cron.
from agno.agent import Agent
from agno.db.postgres import PostgresDb
from agno.models.openai import OpenAIResponses
from agno.os import AgentOS
db = PostgresDb(db_url="postgresql+psycopg://ai:ai@localhost:5532/ai")
extractor = Agent(
id="invoice-extractor",
model=OpenAIResponses(id="gpt-5.5"),
db=db,
output_schema=Invoice,
)
agent_os = AgentOS(
agents=[extractor],
db=db,
scheduler=True,
scheduler_poll_interval=15, # check for due jobs every N seconds
)
app = agent_os.get_app()
Then create the schedule in Python. ScheduleManager writes to the same db the AgentOS polls.
from agno.scheduler import ScheduleManager
mgr = ScheduleManager(db)
mgr.create(
name="nightly-invoice-intake",
cron="0 2 * * *", # 2am every day
endpoint="/agents/invoice-extractor/runs",
payload={"message": "Process the overnight invoice drop."},
timezone="America/New_York",
max_retries=2,
retry_delay_seconds=300,
if_exists="update",
)
if_exists="update" makes the call idempotent. Re-running the bootstrap script does not create duplicates. The scheduler retries on HTTP failure with the configured delay, and every fire writes a row to agno_schedule_runs with status and timing.
Pattern comparison
| Pattern | When to reach for it | Process lifetime |
|---|
asyncio.gather over agent.arun | One-time backfill, a fixed list of files | One process, end-to-end |
agent.arun(background=True) + poll | Single long document, restart-tolerant | State in db, process can restart |
AgentOS(scheduler=True) + ScheduleManager | Recurring intake (nightly, hourly) | Long-running AgentOS process |
Workflow with Loop / Parallel steps | Multi-step pipelines per document | Either ad-hoc or scheduled |
The scheduler fires endpoints. Endpoints are agents, teams, or workflows. So a nightly job that ingests a folder, extracts each file, and writes to your warehouse is a workflow exposed at /workflows/<id>/runs, scheduled with the same ScheduleManager.create call. See Workflows.
Observability
Every scheduled fire creates a row in agno_schedule_runs with the schedule id, attempt number, status, and the run_id of the underlying agent run. To see the last day of activity:
runs = mgr.get_runs(schedule_id, limit=100)
for r in runs:
print(r.triggered_at, r.status, r.attempt, r.error or "")
Failed attempts keep their error text. Retries are separate rows with the same schedule_id and an incrementing attempt. That is the audit trail you can hand to ops.
Production checklist
| Concern | What to add |
|---|
| Idempotency per document | Pass a deterministic session_id (e.g. document hash) so re-runs upsert. |
| Dead-letter queue | After max_retries, the row stays in agno_schedule_runs with status="failed". Read it and route to a manual queue. |
| Per-provider rate limiting | The asyncio.Semaphore is enough for one provider. For mixed providers, run one semaphore per provider. |
| Storage of inputs | File(url=...) keeps the URL but not the bytes. If retention matters, store the source PDF before extraction. |
| Authoritative cost | RunMetrics.cost is populated when the provider returns it. For exact reconciliation, attach a token-rate table downstream. |
Next steps
| Task | Guide |
|---|
| Pause on low-confidence fields | Human routing and eval |
| Compose multiple agents into a pipeline | Workflows |
| See the workflow + scheduler integration | Scheduling |
Developer Resources