Skip to main content
"""
Example demonstrating how to connect to a remote Agno A2A agent.

This example shows how to use RemoteAgent with the A2A protocol to connect
to an Agno agent that's exposed via the A2A interface.

Prerequisites:
1. Start an Agno A2A server:
   python cookbook/06_agent_os/remote/agno_a2a_server.py

   The server will run on http://localhost:7779

2. Set your OPENAI_API_KEY environment variable
"""

import asyncio

from agno.agent import RemoteAgent

# ---------------------------------------------------------------------------
# Create Example
# ---------------------------------------------------------------------------


async def remote_agno_a2a_agent_example():
    """Call a remote Agno agent exposed via A2A interface."""
    # Connect to remote Agno A2A agent
    # protocol="a2a" tells RemoteAgent to use A2A protocol
    # a2a_protocol="rest" uses REST API (default for Agno A2A servers)
    agent = RemoteAgent(
        base_url="http://localhost:7779/a2a/agents/assistant-agent-2",
        agent_id="assistant-agent-2",  # Agent ID from the A2A server
        protocol="a2a",
        a2a_protocol="rest",
    )

    print("Calling remote Agno A2A agent...")
    response = await agent.arun(
        "What is 15 * 23? Use the calculator tool.",
        user_id="user-123",
        session_id="session-456",
    )
    print(f"Response: {response.content}")


async def remote_agno_a2a_streaming_example():
    """Stream responses from a remote Agno A2A agent."""
    agent = RemoteAgent(
        base_url="http://localhost:7779/a2a/agents/researcher-agent-2",
        agent_id="researcher-agent-2",
        protocol="a2a",
        a2a_protocol="rest",
    )

    print("\nStreaming response from remote Agno A2A agent...")
    async for chunk in agent.arun(
        "Tell me a brief 2-sentence story about space exploration",
        session_id="session-456",
        user_id="user-123",
        stream=True,
        stream_events=True,
    ):
        if hasattr(chunk, "content") and chunk.content:
            print(chunk.content, end="", flush=True)
    print()  # New line after streaming


async def remote_agno_a2a_agent_info_example():
    """Get information about a remote Agno A2A agent."""
    agent = RemoteAgent(
        base_url="http://localhost:7779/a2a/agents/assistant-agent-2",
        agent_id="assistant-agent-2",
        protocol="a2a",
        a2a_protocol="rest",
    )

    print("\nGetting agent information...")
    config = await agent.get_agent_config()
    print(f"Agent ID: {config.id}")
    print(f"Agent Name: {config.name}")
    print(f"Agent Description: {config.description}")


async def main():
    """Run all examples in a single event loop."""
    print("=" * 60)
    print("Remote Agno A2A Agent Examples")
    print("=" * 60)
    print("\nNote: Make sure the Agno A2A server is running on port 7779")
    print("Start it with: python cookbook/06_agent_os/remote/agno_a2a_server.py\n")

    # Run examples
    print("1. Remote Agno A2A Agent Example:")
    await remote_agno_a2a_agent_example()

    print("\n2. Remote Agno A2A Streaming Example:")
    await remote_agno_a2a_streaming_example()

    print("\n3. Remote Agno A2A Agent Info Example:")
    await remote_agno_a2a_agent_info_example()


# ---------------------------------------------------------------------------
# Run Example
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    asyncio.run(main())

Run the Example

# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/05_agent_os/remote

# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate

# Export relevant API keys
export OPENAI_API_KEY="***"

python 03_remote_agno_a2a_agent.py