Skip to main content
1

Create a Python file

touch run_workflows.py
2

Add the following code to your Python file

run_workflows.py
import asyncio

from agno.client import AgentOSClient


async def run_workflow_non_streaming():
    """Execute a non-streaming workflow run."""
    print("=" * 60)
    print("Non-Streaming Workflow Run")
    print("=" * 60)

    client = AgentOSClient(base_url="http://localhost:7777")

    # Get available workflows
    config = await client.aget_config()
    if not config.workflows:
        print("No workflows available")
        return

    workflow_id = config.workflows[0].id
    print(f"Running workflow: {workflow_id}")

    try:
        result = await client.run_workflow(
            workflow_id=workflow_id,
            message="What are the benefits of using Python for data science?",
        )

        print(f"\nRun ID: {result.run_id}")
        print(f"Content: {result.content}")
    except Exception as e:
        print(f"Error: {e}")


async def run_workflow_streaming():
    """Execute a streaming workflow run."""
    print("\n" + "=" * 60)
    print("Streaming Workflow Run")
    print("=" * 60)

    client = AgentOSClient(base_url="http://localhost:7777")

    # Get available workflows
    config = await client.aget_config()
    if not config.workflows:
        print("No workflows available")
        return

    workflow_id = config.workflows[0].id
    print(f"Streaming from workflow: {workflow_id}")
    print("\nResponse: ", end="", flush=True)

    try:
        async for event in client.run_workflow_stream(
            workflow_id=workflow_id,
            message="Explain machine learning in simple terms.",
        ):
            if event.event == "RunContent" and hasattr(event, "content"):
                print(event.content, end="", flush=True)
            elif event.event == "WorkflowAgentCompleted" and hasattr(event, "content") and event.content:
                print(event.content, end="", flush=True)

        print("\n")
    except Exception as e:
        print(f"\nError: {type(e).__name__}: {e}")


async def main():
    await run_workflow_non_streaming()
    await run_workflow_streaming()


if __name__ == "__main__":
    asyncio.run(main())
3

Create a virtual environment

Open the Terminal and create a python virtual environment.
python3 -m venv .venv
source .venv/bin/activate
4

Install libraries

pip install -U agno openai
5

Export your OpenAI API key

export OPENAI_API_KEY="your_openai_api_key_here"
6

Start an AgentOS Server

Make sure you have an AgentOS server running with workflows configured. See Creating Your First OS for setup instructions.
7

Run the Client

python run_workflows.py