Skip to main content
"""
Running Teams with AgentOSClient

This example demonstrates how to execute team runs using
AgentOSClient, including both streaming and non-streaming responses.

Prerequisites:
1. Start an AgentOS server with a team configured
2. Run this script: python 06_run_teams.py
"""

import asyncio

from agno.client import AgentOSClient

# ---------------------------------------------------------------------------
# Create Example
# ---------------------------------------------------------------------------


async def run_team_non_streaming():
    """Execute a non-streaming team run."""
    print("=" * 60)
    print("Non-Streaming Team Run")
    print("=" * 60)

    client = AgentOSClient(base_url="http://localhost:7777")

    # Get available teams
    config = await client.aget_config()
    if not config.teams:
        print("No teams available")
        return

    team_id = config.teams[0].id
    print(f"Running team: {team_id}")

    # Execute the team
    result = await client.run_team(
        team_id=team_id,
        message="What is the capital of France and what is 15 * 7?",
    )

    print(f"\nRun ID: {result.run_id}")
    print(f"Content: {result.content}")


async def run_team_streaming():
    """Execute a streaming team run."""
    print("\n" + "=" * 60)
    print("Streaming Team Run")
    print("=" * 60)

    client = AgentOSClient(base_url="http://localhost:7777")

    # Get available teams
    config = await client.aget_config()
    if not config.teams:
        print("No teams available")
        return

    team_id = config.teams[0].id
    print(f"Streaming from team: {team_id}")
    print("\nResponse: ", end="", flush=True)

    from agno.run.team import RunCompletedEvent, RunContentEvent

    # Stream the response
    async for event in client.run_team_stream(
        team_id=team_id,
        message="Tell me about Python programming in 2 sentences.",
    ):
        # Handle different event types
        if isinstance(event, RunContentEvent):
            print(event.content, end="", flush=True)
        elif isinstance(event, RunCompletedEvent):
            # Run completed - could access event.run_id here if needed
            pass

    print("\n")


async def main():
    await run_team_non_streaming()
    await run_team_streaming()


# ---------------------------------------------------------------------------
# Run Example
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    asyncio.run(main())

Run the Example

# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/05_agent_os/client

# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate

python 06_run_teams.py