Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.agno.com/llms.txt

Use this file to discover all available pages before exploring further.

When the built-in providers don’t fit, subclass ContextProvider. The base class handles tool wrapping, name derivation, and error shaping.

Minimal Example

from agno.agent import Agent
from agno.context import Answer, ContextProvider, Status

FAQ = {"pricing": "See agno.com/pricing", "support": "Email help@agno.com"}

class FAQContextProvider(ContextProvider):
    def status(self) -> Status:
        return Status(ok=True, detail=f"{len(FAQ)} entries")

    async def astatus(self) -> Status:
        return self.status()

    def query(self, question: str, *, run_context=None) -> Answer:
        key = next((k for k in FAQ if k in question.lower()), None)
        return Answer(text=FAQ[key] if key else "No FAQ entry matches that.")

    async def aquery(self, question: str, *, run_context=None) -> Answer:
        return self.query(question, run_context=run_context)

faq = FAQContextProvider(id="faq")
agent = Agent(model=..., tools=faq.get_tools())
The agent now has a query_faq tool. Same shape as every built-in provider.

Required Methods

You must implement these four abstract methods:
MethodPurpose
query(question, *, run_context=None) -> AnswerSync read
aquery(question, *, run_context=None) -> AnswerAsync read
status() -> StatusSync health check
astatus() -> StatusAsync health check

Answer

Answer is what query() returns:
from agno.context import Answer, Document

# Text-only answer
return Answer(text="The weather is sunny.")

# Answer with source documents
return Answer(
    text="Found 3 matching policies.",
    results=[
        Document(id="doc1", name="Refund Policy", uri="/policies/refund.md", snippet="..."),
        Document(id="doc2", name="Privacy Policy", uri="/policies/privacy.md", snippet="..."),
    ]
)

Status

Status reports provider health:
from agno.context import Status

# Healthy
return Status(ok=True, detail="Connected to database")

# Unhealthy
return Status(ok=False, detail="API key invalid")

Optional Methods

Override these to customize behavior:
MethodDefaultOverride when
update() / aupdate()Raises NotImplementedErrorProvider supports writes
asetup()No-opNeed async init (MCP sessions, cache priming)
aclose()No-opHold long-lived state (watches, connections)
instructions()Generic guidanceWant source-specific usage hints

Adding Write Support

Override update(), aupdate(), and _default_tools() for writable providers:
class NotesContextProvider(ContextProvider):
    def __init__(self, id: str, storage: dict):
        super().__init__(id)
        self.storage = storage

    def query(self, question: str, *, run_context=None) -> Answer:
        matches = [v for k, v in self.storage.items() if question.lower() in k.lower()]
        return Answer(text="\n".join(matches) if matches else "No matching notes.")

    async def aquery(self, question: str, *, run_context=None) -> Answer:
        return self.query(question, run_context=run_context)

    def update(self, instruction: str, *, run_context=None) -> Answer:
        if instruction.startswith("save note:"):
            parts = instruction[10:].split(" - ", 1)
            if len(parts) == 2:
                self.storage[parts[0].strip()] = parts[1].strip()
                return Answer(text=f"Saved note: {parts[0].strip()}")
        return Answer(text="Could not parse instruction. Use: save note: <title> - <content>")

    async def aupdate(self, instruction: str, *, run_context=None) -> Answer:
        return self.update(instruction, run_context=run_context)

    def _default_tools(self) -> list:
        return self._read_write_tools()  # Exposes both query and update tools

    def status(self) -> Status:
        return Status(ok=True, detail=f"{len(self.storage)} notes")

    async def astatus(self) -> Status:
        return self.status()
Now the agent has both query_notes and update_notes tools.

Async Lifecycle

For providers that need setup and teardown:
class StreamingAPIContextProvider(ContextProvider):
    def __init__(self, id: str, api_url: str):
        super().__init__(id)
        self.api_url = api_url
        self.session = None

    async def asetup(self) -> None:
        import aiohttp
        self.session = aiohttp.ClientSession()

    async def aclose(self) -> None:
        if self.session:
            await self.session.close()

    async def aquery(self, question: str, *, run_context=None) -> Answer:
        async with self.session.get(f"{self.api_url}/search", params={"q": question}) as resp:
            data = await resp.json()
            return Answer(text=data.get("answer", "No answer found."))

    # ... implement query, status, astatus

Custom Instructions

Override instructions() to provide source-specific guidance:
def instructions(self) -> str:
    return """
    Use query_jira for:
    - Finding issues by key (e.g., "PROJ-123")
    - Searching by assignee, status, or labels
    - Getting sprint information

    Use update_jira for:
    - Changing issue status
    - Adding comments
    - Updating assignee
    """

Using RunContext

The run_context parameter carries caller state. Use it for per-user behavior:
def query(self, question: str, *, run_context=None) -> Answer:
    user_id = run_context.user_id if run_context else None

    if user_id:
        # Fetch user-specific data
        user_docs = self.get_docs_for_user(user_id)
        return Answer(text=self.search(question, user_docs))

    # Fall back to global search
    return Answer(text=self.search(question, self.all_docs))
Available on run_context:
  • user_id — identifies the caller
  • session_id — identifies the conversation
  • metadata — arbitrary dict passed through the call chain
  • dependencies — values injected via agent’s dependencies parameter

Wrapping External APIs

Pattern for wrapping a REST API:
import httpx
from agno.context import Answer, ContextProvider, Status

class WeatherContextProvider(ContextProvider):
    def __init__(self, id: str, api_key: str):
        super().__init__(id, write=False)  # Read-only
        self.api_key = api_key
        self.client = httpx.Client()

    def query(self, question: str, *, run_context=None) -> Answer:
        # Extract city from question (simplified)
        city = question.replace("weather in", "").strip()

        resp = self.client.get(
            "https://api.weather.com/v1/current",
            params={"city": city, "key": self.api_key}
        )
        data = resp.json()

        return Answer(text=f"Weather in {city}: {data['temp']}F, {data['condition']}")

    async def aquery(self, question: str, *, run_context=None) -> Answer:
        # Use async client for async version
        async with httpx.AsyncClient() as client:
            city = question.replace("weather in", "").strip()
            resp = await client.get(
                "https://api.weather.com/v1/current",
                params={"city": city, "key": self.api_key}
            )
            data = resp.json()
            return Answer(text=f"Weather in {city}: {data['temp']}F, {data['condition']}")

    def status(self) -> Status:
        try:
            self.client.get("https://api.weather.com/health")
            return Status(ok=True, detail="API reachable")
        except Exception as e:
            return Status(ok=False, detail=str(e))

    async def astatus(self) -> Status:
        return self.status()

Next

Provider Catalog

Browse all built-in providers for inspiration

Runtime Guide

See providers in production context