Complete Example

import asyncio
from dataclasses import dataclass
from typing import Optional

from agno.agent import Agent
from agno.team import Team
from agno.models.openai import OpenAIChat
from agno.run.team import CustomEvent
from agno.tools import tool


# Our custom event, extending the CustomEvent class
@dataclass
class CustomerProfileEvent(CustomEvent):
    """CustomEvent for customer profile."""

    customer_name: Optional[str] = None
    customer_email: Optional[str] = None
    customer_phone: Optional[str] = None


# Our custom tool
@tool()
async def get_customer_profile():
    """
    Get the customer profile for the customer with ID 123.
    """
    yield CustomerProfileEvent(
        customer_name="John Doe",
        customer_email="john.doe@example.com",
        customer_phone="1234567890",
    )


agent = Agent(
    name="Customer Support Agent",
    role="Support agent that handles customer requests.",
    model=OpenAIChat(id="gpt-4o"),
)

# Setup the Team with our custom tool.
team = Team(
    members=[agent],
    tools=[get_customer_profile],
    model=OpenAIChat(id="gpt-4o"),
    instructions="You are a team that handles customer requests.",
)


async def run_team():
    # Running the Team: The team should call our custom tool and yield the custom event
    async for event in team.arun(
        "Hello, can you get me the customer profile for customer with ID 123?",
        stream=True,
    ):
        if isinstance(event, CustomEvent):
            print(f"✅ Custom event emitted: {event}")


asyncio.run(run_team())