"""
This example demonstrates asynchronous streaming responses from a team.
The team uses specialized agents with financial tools to provide real-time
stock information with async streaming output.
"""
import asyncio
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.team.team import Team
from agno.tools.exa import ExaTools
from agno.utils.pprint import apprint_run_response
# Stock price and analyst data agent
stock_searcher = Agent(
name="Stock Searcher",
model=OpenAIChat("gpt-5-mini"),
role="Searches the web for information on a stock.",
tools=[
ExaTools(
include_domains=["cnbc.com", "reuters.com", "bloomberg.com", "wsj.com"],
text=False,
show_results=True,
highlights=False,
)
],
)
# Company information agent
company_info_agent = Agent(
name="Company Info Searcher",
model=OpenAIChat("gpt-5-mini"),
role="Searches the web for information on a company.",
tools=[
ExaTools(
include_domains=["cnbc.com", "reuters.com", "bloomberg.com", "wsj.com"],
text=False,
show_results=True,
highlights=False,
)
],
)
# Create team with async streaming capabilities
team = Team(
name="Stock Research Team",
model=OpenAIChat("gpt-5-mini"),
members=[stock_searcher, company_info_agent],
markdown=True,
show_members_responses=True,
)
async def streaming_with_arun():
"""Demonstrate async streaming using arun() method."""
await apprint_run_response(
team.arun(input="What is the current stock price of NVDA?", stream=True)
)
async def streaming_with_aprint_response():
"""Demonstrate async streaming using aprint_response() method."""
await team.aprint_response("What is the current stock price of NVDA?", stream=True)
if __name__ == "__main__":
asyncio.run(streaming_with_arun())
# asyncio.run(streaming_with_aprint_response())
Create a virtual environment
Terminal
and create a python virtual environment.python3 -m venv .venv
source .venv/bin/activate
Install required libraries
pip install agno exa_py
Set environment variables
export OPENAI_API_KEY=****
export EXA_API_KEY=****
Run the agent
python cookbook/examples/teams/streaming/04_async_team_streaming.py