Documentation Index
Fetch the complete documentation index at: https://docs.agno.com/llms.txt
Use this file to discover all available pages before exploring further.
When the built-in providers don’t fit, subclass ContextProvider. The base class handles tool wrapping, name derivation, and error shaping.
Minimal Example
from agno.agent import Agent
from agno.context import Answer, ContextProvider, Status
FAQ = {"pricing": "See agno.com/pricing", "support": "Email help@agno.com"}
class FAQContextProvider(ContextProvider):
def status(self) -> Status:
return Status(ok=True, detail=f"{len(FAQ)} entries")
async def astatus(self) -> Status:
return self.status()
def query(self, question: str, *, run_context=None) -> Answer:
key = next((k for k in FAQ if k in question.lower()), None)
return Answer(text=FAQ[key] if key else "No FAQ entry matches that.")
async def aquery(self, question: str, *, run_context=None) -> Answer:
return self.query(question, run_context=run_context)
faq = FAQContextProvider(id="faq")
agent = Agent(model=..., tools=faq.get_tools())
The agent now has a query_faq tool. Same shape as every built-in provider.
Required Methods
You must implement these four abstract methods:
| Method | Purpose |
|---|
query(question, *, run_context=None) -> Answer | Sync read |
aquery(question, *, run_context=None) -> Answer | Async read |
status() -> Status | Sync health check |
astatus() -> Status | Async health check |
Answer
Answer is what query() returns:
from agno.context import Answer, Document
# Text-only answer
return Answer(text="The weather is sunny.")
# Answer with source documents
return Answer(
text="Found 3 matching policies.",
results=[
Document(id="doc1", name="Refund Policy", uri="/policies/refund.md", snippet="..."),
Document(id="doc2", name="Privacy Policy", uri="/policies/privacy.md", snippet="..."),
]
)
Status
Status reports provider health:
from agno.context import Status
# Healthy
return Status(ok=True, detail="Connected to database")
# Unhealthy
return Status(ok=False, detail="API key invalid")
Optional Methods
Override these to customize behavior:
| Method | Default | Override when |
|---|
update() / aupdate() | Raises NotImplementedError | Provider supports writes |
asetup() | No-op | Need async init (MCP sessions, cache priming) |
aclose() | No-op | Hold long-lived state (watches, connections) |
instructions() | Generic guidance | Want source-specific usage hints |
Adding Write Support
Override update(), aupdate(), and _default_tools() for writable providers:
class NotesContextProvider(ContextProvider):
def __init__(self, id: str, storage: dict):
super().__init__(id)
self.storage = storage
def query(self, question: str, *, run_context=None) -> Answer:
matches = [v for k, v in self.storage.items() if question.lower() in k.lower()]
return Answer(text="\n".join(matches) if matches else "No matching notes.")
async def aquery(self, question: str, *, run_context=None) -> Answer:
return self.query(question, run_context=run_context)
def update(self, instruction: str, *, run_context=None) -> Answer:
if instruction.startswith("save note:"):
parts = instruction[10:].split(" - ", 1)
if len(parts) == 2:
self.storage[parts[0].strip()] = parts[1].strip()
return Answer(text=f"Saved note: {parts[0].strip()}")
return Answer(text="Could not parse instruction. Use: save note: <title> - <content>")
async def aupdate(self, instruction: str, *, run_context=None) -> Answer:
return self.update(instruction, run_context=run_context)
def _default_tools(self) -> list:
return self._read_write_tools() # Exposes both query and update tools
def status(self) -> Status:
return Status(ok=True, detail=f"{len(self.storage)} notes")
async def astatus(self) -> Status:
return self.status()
Now the agent has both query_notes and update_notes tools.
Async Lifecycle
For providers that need setup and teardown:
class StreamingAPIContextProvider(ContextProvider):
def __init__(self, id: str, api_url: str):
super().__init__(id)
self.api_url = api_url
self.session = None
async def asetup(self) -> None:
import aiohttp
self.session = aiohttp.ClientSession()
async def aclose(self) -> None:
if self.session:
await self.session.close()
async def aquery(self, question: str, *, run_context=None) -> Answer:
async with self.session.get(f"{self.api_url}/search", params={"q": question}) as resp:
data = await resp.json()
return Answer(text=data.get("answer", "No answer found."))
# ... implement query, status, astatus
Custom Instructions
Override instructions() to provide source-specific guidance:
def instructions(self) -> str:
return """
Use query_jira for:
- Finding issues by key (e.g., "PROJ-123")
- Searching by assignee, status, or labels
- Getting sprint information
Use update_jira for:
- Changing issue status
- Adding comments
- Updating assignee
"""
Using RunContext
The run_context parameter carries caller state. Use it for per-user behavior:
def query(self, question: str, *, run_context=None) -> Answer:
user_id = run_context.user_id if run_context else None
if user_id:
# Fetch user-specific data
user_docs = self.get_docs_for_user(user_id)
return Answer(text=self.search(question, user_docs))
# Fall back to global search
return Answer(text=self.search(question, self.all_docs))
Available on run_context:
user_id — identifies the caller
session_id — identifies the conversation
metadata — arbitrary dict passed through the call chain
dependencies — values injected via agent’s dependencies parameter
Wrapping External APIs
Pattern for wrapping a REST API:
import httpx
from agno.context import Answer, ContextProvider, Status
class WeatherContextProvider(ContextProvider):
def __init__(self, id: str, api_key: str):
super().__init__(id, write=False) # Read-only
self.api_key = api_key
self.client = httpx.Client()
def query(self, question: str, *, run_context=None) -> Answer:
# Extract city from question (simplified)
city = question.replace("weather in", "").strip()
resp = self.client.get(
"https://api.weather.com/v1/current",
params={"city": city, "key": self.api_key}
)
data = resp.json()
return Answer(text=f"Weather in {city}: {data['temp']}F, {data['condition']}")
async def aquery(self, question: str, *, run_context=None) -> Answer:
# Use async client for async version
async with httpx.AsyncClient() as client:
city = question.replace("weather in", "").strip()
resp = await client.get(
"https://api.weather.com/v1/current",
params={"city": city, "key": self.api_key}
)
data = resp.json()
return Answer(text=f"Weather in {city}: {data['temp']}F, {data['condition']}")
def status(self) -> Status:
try:
self.client.get("https://api.weather.com/health")
return Status(ok=True, detail="API reachable")
except Exception as e:
return Status(ok=False, detail=str(e))
async def astatus(self) -> Status:
return self.status()
Next
Provider Catalog
Browse all built-in providers for inspiration
Runtime Guide
See providers in production context