Skip to main content
"""
Arize Phoenix Project Routing
=============================

Demonstrates sending traces from different agents to different Phoenix projects.
"""

import asyncio
import os

from agno.agent import Agent
from agno.db.in_memory import InMemoryDb
from agno.models.openai import OpenAIChat
from agno.tools.websearch import WebSearchTools
from agno.tools.yfinance import YFinanceTools
from openinference.instrumentation import dangerously_using_project
from phoenix.otel import register
from pydantic import BaseModel

# ---------------------------------------------------------------------------
# Setup
# ---------------------------------------------------------------------------
os.environ["PHOENIX_API_KEY"] = os.getenv("PHOENIX_API_KEY")
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = (
    "https://app.phoenix.arize.com/"  # Add the suffix for your organization
)

# Register a single tracer provider (project name here is the default)
tracer_provider = register(
    project_name="default",
    auto_instrument=True,
)


class StockPrice(BaseModel):
    stock_price: float


class SearchResult(BaseModel):
    summary: str
    sources: list[str]


# ---------------------------------------------------------------------------
# Create Agents
# ---------------------------------------------------------------------------
# Agent 1 - Stock Price Agent
stock_agent = Agent(
    name="Stock Price Agent",
    model=OpenAIChat(id="gpt-4o-mini"),
    tools=[YFinanceTools()],
    db=InMemoryDb(),
    instructions="You are a stock price agent. Answer questions in the style of a stock analyst.",
    session_id="stock_session",
    output_schema=StockPrice,
)

# Agent 2 - Search Agent
search_agent = Agent(
    name="Search Agent",
    model=OpenAIChat(id="gpt-4o-mini"),
    tools=[WebSearchTools()],
    db=InMemoryDb(),
    instructions="You are a search agent. Find and summarize information from the web.",
    session_id="search_session",
    output_schema=SearchResult,
)


# ---------------------------------------------------------------------------
# Run Example
# ---------------------------------------------------------------------------
async def main() -> None:
    # Run stock_agent and send traces to "default" project
    with dangerously_using_project("default"):
        await stock_agent.aprint_response(
            "What is the current price of Tesla?", stream=True
        )

    # Run search_agent and send traces to "Testing-agno" project
    with dangerously_using_project("Testing-agno"):
        await search_agent.aprint_response(
            "What is the latest news about AI?", stream=True
        )


if __name__ == "__main__":
    asyncio.run(main())

Run the Example

# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/92_integrations/observability

# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate

# Export relevant API keys
export PHOENIX_API_KEY="***"
export PHOENIX_COLLECTOR_ENDPOINT="***"

python arize_phoenix_moving_traces_to_different_projects.py