Copy
Ask AI
"""
Running Workflows with AgentOSClient
This example demonstrates how to execute workflow runs using
AgentOSClient, including both streaming and non-streaming responses.
Prerequisites:
1. Start an AgentOS server with a workflow configured
2. Run this script: python 07_run_workflows.py
"""
import asyncio
from agno.client import AgentOSClient
# ---------------------------------------------------------------------------
# Create Example
# ---------------------------------------------------------------------------
async def run_workflow_non_streaming():
"""Execute a non-streaming workflow run."""
print("=" * 60)
print("Non-Streaming Workflow Run")
print("=" * 60)
client = AgentOSClient(base_url="http://localhost:7777")
# Get available workflows
config = await client.aget_config()
if not config.workflows:
print("No workflows available")
return
workflow_id = config.workflows[0].id
print(f"Running workflow: {workflow_id}")
try:
# Execute the workflow
result = await client.run_workflow(
workflow_id=workflow_id,
message="What are the benefits of using Python for data science?",
)
print(f"\nRun ID: {result.run_id}")
print(f"Content: {result.content}")
except Exception as e:
print(f"Error: {e}")
if hasattr(e, "response"):
print(f"Response: {e.response.text}")
async def run_workflow_streaming():
"""Execute a streaming workflow run."""
print("\n" + "=" * 60)
print("Streaming Workflow Run")
print("=" * 60)
client = AgentOSClient(base_url="http://localhost:7777")
# Get available workflows
config = await client.aget_config()
if not config.workflows:
print("No workflows available")
return
workflow_id = config.workflows[0].id
print(f"Streaming from workflow: {workflow_id}")
print("\nResponse: ", end="", flush=True)
try:
# Stream the response - returns typed WorkflowRunOutputEvent objects
# Workflows can emit both workflow events and nested agent events
async for event in client.run_workflow_stream(
workflow_id=workflow_id,
message="Explain machine learning in simple terms.",
):
# Handle content from agent events (RunContent) or workflow completion
if event.event == "RunContent" and hasattr(event, "content"):
print(event.content, end="", flush=True)
elif (
event.event == "WorkflowAgentCompleted"
and hasattr(event, "content")
and event.content
):
print(event.content, end="", flush=True)
print("\n")
except Exception as e:
print(f"\nError: {type(e).__name__}: {e}")
async def main():
await run_workflow_non_streaming()
await run_workflow_streaming()
# ---------------------------------------------------------------------------
# Run Example
# ---------------------------------------------------------------------------
if __name__ == "__main__":
asyncio.run(main())
Run the Example
Copy
Ask AI
# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/05_agent_os/client
# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate
python 07_run_workflows.py