Copy
Ask AI
"""
Structured Output Streaming
===========================
Demonstrates sync and async streaming with structured team outputs.
"""
import asyncio
from agno.agent import Agent
from agno.models.openai import OpenAIResponses
from agno.team import Team
from agno.tools.yfinance import YFinanceTools
from agno.utils.pprint import apprint_run_response
from pydantic import BaseModel
class StockAnalysis(BaseModel):
symbol: str
company_name: str
analysis: str
class CompanyAnalysis(BaseModel):
company_name: str
analysis: str
class StockReport(BaseModel):
symbol: str
company_name: str
analysis: str
# ---------------------------------------------------------------------------
# Create Members
# ---------------------------------------------------------------------------
stock_searcher = Agent(
name="Stock Searcher",
model=OpenAIResponses(id="gpt-5.2-mini"),
output_schema=StockAnalysis,
role="Searches the web for information on a stock.",
tools=[
YFinanceTools(
include_tools=["get_current_stock_price", "get_analyst_recommendations"],
)
],
)
company_info_agent = Agent(
name="Company Info Searcher",
model=OpenAIResponses(id="gpt-5.2-mini"),
role="Searches the web for information on a stock.",
output_schema=CompanyAnalysis,
tools=[
YFinanceTools(
include_tools=["get_company_info", "get_company_news"],
)
],
)
# ---------------------------------------------------------------------------
# Create Team
# ---------------------------------------------------------------------------
team = Team(
name="Stock Research Team",
model=OpenAIResponses(id="gpt-5.2-mini"),
members=[stock_searcher, company_info_agent],
output_schema=StockReport,
markdown=True,
show_members_responses=True,
)
# ---------------------------------------------------------------------------
# Run Team
# ---------------------------------------------------------------------------
async def test_structured_streaming() -> None:
async_stream = team.arun(
"Give me a stock report for NVDA",
stream=True,
stream_events=True,
)
run_response = None
async for event_or_response in async_stream:
run_response = event_or_response
assert isinstance(run_response.content, StockReport)
print(f"Stock Symbol: {run_response.content.symbol}")
print(f"Company Name: {run_response.content.company_name}")
async def test_structured_streaming_with_arun() -> None:
await apprint_run_response(
team.arun(
input="Give me a stock report for AAPL",
stream=True,
stream_events=True,
)
)
if __name__ == "__main__":
stream_generator = team.run(
"Give me a stock report for NVDA",
stream=True,
stream_events=True,
)
run_response = None
for event_or_response in stream_generator:
run_response = event_or_response
assert isinstance(run_response.content, StockReport)
print(
f"Response content is correctly typed as StockReport: {type(run_response.content)}"
)
print(f"Stock Symbol: {run_response.content.symbol}")
print(f"Company Name: {run_response.content.company_name}")
asyncio.run(test_structured_streaming())
asyncio.run(test_structured_streaming_with_arun())
Run the Example
Copy
Ask AI
# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/03_teams/structured_input_output
# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate
python structured_output_streaming.py