Copy
Ask AI
"""
Streaming A2A Messages with A2AClient
This example demonstrates real-time streaming responses
using the A2A protocol.
Prerequisites:
1. Start an AgentOS server with A2A interface:
python cookbook/06_agent_os/client_a2a/servers/agno_server.py
2. Run this script:
python cookbook/06_agent_os/client_a2a/02_streaming.py
"""
import asyncio
from agno.client.a2a import A2AClient
# ---------------------------------------------------------------------------
# Create Example
# ---------------------------------------------------------------------------
async def basic_streaming():
"""Stream a response from an A2A agent."""
print("=" * 60)
print("Streaming A2A Response")
print("=" * 60)
client = A2AClient("http://localhost:7003/a2a/agents/basic-agent")
print("\nStreaming response from agent...")
print("\nResponse: ", end="", flush=True)
async for event in client.stream_message(
message="Tell me a short joke.",
):
# Print content as it arrives
if event.is_content and event.content:
print(event.content, end="", flush=True)
async def streaming_with_events():
"""Stream with detailed event tracking."""
print("\n" + "=" * 60)
print("Streaming with Event Details")
print("=" * 60)
client = A2AClient("http://localhost:7003/a2a/agents/basic-agent")
print("\nEvent log:")
content_buffer = []
async for event in client.stream_message(
message="What is Python?",
):
if event.content:
content_buffer.append(event.content)
if event.is_final:
print("\nFull response:")
print("".join(content_buffer))
async def main():
await basic_streaming()
await streaming_with_events()
# ---------------------------------------------------------------------------
# Run Example
# ---------------------------------------------------------------------------
if __name__ == "__main__":
asyncio.run(main())
Run the Example
Copy
Ask AI
# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/05_agent_os/client_a2a
# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate
python 02_streaming.py