The Workflow.run() function runs the agent and generates a response, either as a WorkflowRunResponse object or a stream of WorkflowRunResponse objects.

Many of our examples use workflow.print_response() which is a helper utility to print the response in the terminal. This uses workflow.run() under the hood.

Running your Workflow

Here’s how to run your workflow. The response is captured in the response.

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.storage.sqlite import SqliteStorage
from agno.team import Team
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.hackernews import HackerNewsTools
from agno.workflow.v2.step import Step
from agno.workflow.v2.workflow import Workflow
from agno.run.v2.workflow import WorkflowRunResponse
from agno.utils.pprint import pprint_run_response

# Define agents
hackernews_agent = Agent(
    name="Hackernews Agent",
    model=OpenAIChat(id="gpt-4o-mini"),
    tools=[HackerNewsTools()],
    role="Extract key insights and content from Hackernews posts",
)
web_agent = Agent(
    name="Web Agent",
    model=OpenAIChat(id="gpt-4o-mini"),
    tools=[DuckDuckGoTools()],
    role="Search the web for the latest news and trends",
)

# Define research team for complex analysis
research_team = Team(
    name="Research Team",
    mode="coordinate",
    members=[hackernews_agent, web_agent],
    instructions="Research tech topics from Hackernews and the web",
)

content_planner = Agent(
    name="Content Planner",
    model=OpenAIChat(id="gpt-4o"),
    instructions=[
        "Plan a content schedule over 4 weeks for the provided topic and research content",
        "Ensure that I have posts for 3 posts per week",
    ],
)

content_creation_workflow = Workflow(
    name="Content Creation Workflow",
    description="Automated content creation from blog posts to social media",
    storage=SqliteStorage(
        table_name="workflow_v2",
        db_file="tmp/workflow_v2.db",
        mode="workflow_v2",
    ),
    steps=[research_team, content_planner],
)

# Create and use workflow
if __name__ == "__main__":
    response: WorkflowRunResponse = content_creation_workflow.run(
        message="AI trends in 2024",
        markdown=True,
    )

    pprint_run_response(response, markdown=True)

The Workflow.run() function returns a WorkflowRunResponse object when not streaming. Here is detailed documentation for `WorkflowRunResponse.

Async Execution

The Workflow.arun() function is the async version of Workflow.run().

Here is an example of how to use it:

from typing import AsyncIterator
import asyncio

from agno.agent.agent import Agent
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.workflow.v2.condition import Condition
from agno.workflow.v2.step import Step
from agno.workflow.v2.types import StepInput
from agno.workflow.v2.workflow import Workflow
from agno.run.v2.workflow import WorkflowRunResponseEvent, WorkflowRunEvent

# === BASIC AGENTS ===
researcher = Agent(
    name="Researcher",
    instructions="Research the given topic and provide detailed findings.",
    tools=[DuckDuckGoTools()],
)

summarizer = Agent(
    name="Summarizer",
    instructions="Create a clear summary of the research findings.",
)

fact_checker = Agent(
    name="Fact Checker",
    instructions="Verify facts and check for accuracy in the research.",
    tools=[DuckDuckGoTools()],
)

writer = Agent(
    name="Writer",
    instructions="Write a comprehensive article based on all available research and verification.",
)

# === CONDITION EVALUATOR ===
def needs_fact_checking(step_input: StepInput) -> bool:
    """Determine if the research contains claims that need fact-checking"""
    summary = step_input.previous_step_content or ""

    # Look for keywords that suggest factual claims
    fact_indicators = [
        "study shows",
        "breakthroughs",
        "research indicates",
        "according to",
        "statistics",
        "data shows",
        "survey",
        "report",
        "million",
        "billion",
        "percent",
        "%",
        "increase",
        "decrease",
    ]

    return any(indicator in summary.lower() for indicator in fact_indicators)


# === WORKFLOW STEPS ===
research_step = Step(
    name="research",
    description="Research the topic",
    agent=researcher,
)

summarize_step = Step(
    name="summarize",
    description="Summarize research findings",
    agent=summarizer,
)

# Conditional fact-checking step
fact_check_step = Step(
    name="fact_check",
    description="Verify facts and claims",
    agent=fact_checker,
)

write_article = Step(
    name="write_article",
    description="Write final article",
    agent=writer,
)

# === BASIC LINEAR WORKFLOW ===
basic_workflow = Workflow(
    name="Basic Linear Workflow",
    description="Research -> Summarize -> Condition(Fact Check) -> Write Article",
    steps=[
        research_step,
        summarize_step,
        Condition(
            name="fact_check_condition",
            description="Check if fact-checking is needed",
            evaluator=needs_fact_checking,
            steps=[fact_check_step],
        ),
        write_article,
    ],
)

async def main():
    try:
        response: WorkflowRunResponseEvent = await basic_workflow.arun(
            message="Recent breakthroughs in quantum computing",
        )
        pprint_run_response(response, markdown=True)

if __name__ == "__main__":
    asyncio.run(main())

Streaming Responses

To enable streaming, set stream=True when calling run(). This will return an iterator of WorkflowRunResponseEvent objects instead of a single response.

# Define your agents/team
...

content_creation_workflow = Workflow(
    name="Content Creation Workflow",
    description="Automated content creation from blog posts to social media",
    storage=SqliteStorage(
        table_name="workflow_v2",
        db_file="tmp/workflow_v2.db",
        mode="workflow_v2",
    ),
    steps=[research_team, content_planner],
)

# Create and use workflow
if __name__ == "__main__":
    response: Iterator[WorkflowRunResponseEvent] = content_creation_workflow.run(
        message="AI trends in 2024",
        markdown=True,
        stream=True,
    )

    pprint_run_response(response, markdown=True)

Streaming Intermediate Steps

In the case where you put stream_intermediate_steps=False (or not set it at all), we only yield WorkflowStartedEvent, WorkflowCompletedEvent along with all the Agent/Team events.

For even more detailed streaming, you can enable intermediate steps by setting stream_intermediate_steps=True. This will provide real-time updates about each step of the workflow.

from typing import Iterator

from agno.agent.agent import Agent
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.workflow.v2.condition import Condition
from agno.workflow.v2.step import Step
from agno.workflow.v2.types import StepInput
from agno.workflow.v2.workflow import Workflow
from agno.run.v2.workflow import WorkflowRunResponseEvent, WorkflowRunEvent

# === BASIC AGENTS ===
researcher = Agent(
    name="Researcher",
    instructions="Research the given topic and provide detailed findings.",
    tools=[DuckDuckGoTools()],
)

summarizer = Agent(
    name="Summarizer",
    instructions="Create a clear summary of the research findings.",
)

fact_checker = Agent(
    name="Fact Checker",
    instructions="Verify facts and check for accuracy in the research.",
    tools=[DuckDuckGoTools()],
)

writer = Agent(
    name="Writer",
    instructions="Write a comprehensive article based on all available research and verification.",
)

# === CONDITION EVALUATOR ===
def needs_fact_checking(step_input: StepInput) -> bool:
    """Determine if the research contains claims that need fact-checking"""
    summary = step_input.previous_step_content or ""

    # Look for keywords that suggest factual claims
    fact_indicators = [
        "study shows",
        "breakthroughs",
        "research indicates",
        "according to",
        "statistics",
        "data shows",
        "survey",
        "report",
        "million",
        "billion",
        "percent",
        "%",
        "increase",
        "decrease",
    ]

    return any(indicator in summary.lower() for indicator in fact_indicators)


# === WORKFLOW STEPS ===
research_step = Step(
    name="research",
    description="Research the topic",
    agent=researcher,
)

summarize_step = Step(
    name="summarize",
    description="Summarize research findings",
    agent=summarizer,
)

# Conditional fact-checking step
fact_check_step = Step(
    name="fact_check",
    description="Verify facts and claims",
    agent=fact_checker,
)

write_article = Step(
    name="write_article",
    description="Write final article",
    agent=writer,
)

# === BASIC LINEAR WORKFLOW ===
basic_workflow = Workflow(
    name="Basic Linear Workflow",
    description="Research -> Summarize -> Condition(Fact Check) -> Write Article",
    steps=[
        research_step,
        summarize_step,
        Condition(
            name="fact_check_condition",
            description="Check if fact-checking is needed",
            evaluator=needs_fact_checking,
            steps=[fact_check_step],
        ),
        write_article,
    ],
)

if __name__ == "__main__":
    try:
        response: Iterator[WorkflowRunResponseEvent] = basic_workflow.run(
            message="Recent breakthroughs in quantum computing",
            stream=True,
            stream_intermediate_steps=True,
        )
        for event in response:
            if event.event == WorkflowRunEvent.condition_execution_started.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.condition_execution_completed.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.workflow_started.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.step_started.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.step_completed.value:
                print(event)
                print() 
            elif event.event == WorkflowRunEvent.workflow_completed.value:
                print(event)
                print()
    except Exception as e:
        print(f"❌ Error: {e}")
        import traceback

        traceback.print_exc()

Async Streaming

The Workflow.arun(stream=True) returns an async iterator of WorkflowRunResponseEvent objects instead of a single response. So for example, if you want to stream the response, you can do the following:


# Define your workflow
...

async def main():
    try:
        response: AsyncIterator[WorkflowRunResponseEvent] = await basic_workflow.arun(
            message="Recent breakthroughs in quantum computing",
            stream=True,
            stream_intermediate_steps=True,
        )
        async for event in response:
            if event.event == WorkflowRunEvent.condition_execution_started.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.condition_execution_completed.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.workflow_started.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.step_started.value:
                print(event)
                print()
            elif event.event == WorkflowRunEvent.step_completed.value:
                print(event)
                print() 
            elif event.event == WorkflowRunEvent.workflow_completed.value:
                print(event)
                print()
    except Exception as e:
        print(f"❌ Error: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    asyncio.run(main())

Event Types

The following events are yielded by the Workflow.run() and Workflow.arun() functions depending on the workflow’s configuration:

Core Events

Event TypeDescription
WorkflowStartedIndicates the start of a workflow run
WorkflowCompletedSignals successful completion of the workflow run
WorkflowErrorIndicates an error occurred during the workflow run

Step Events

Event TypeDescription
StepStartedIndicates the start of a step
StepCompletedSignals successful completion of a step
StepErrorIndicates an error occurred during a step

Step Output Events (For custom functions)

Event TypeDescription
StepOutputIndicates the output of a step

Parallel Execution Events

Event TypeDescription
ParallelExecutionStartedIndicates the start of a parallel step
ParallelExecutionCompletedSignals successful completion of a parallel step

Condition Execution Events

Event TypeDescription
ConditionExecutionStartedIndicates the start of a condition
ConditionExecutionCompletedSignals successful completion of a condition

Loop Execution Events

Event TypeDescription
LoopExecutionStartedIndicates the start of a loop
LoopIterationStartedEventIndicates the start of a loop iteration
LoopIterationCompletedEventSignals successful completion of a loop iteration
LoopExecutionCompletedSignals successful completion of a loop

Router Execution Events

Event TypeDescription
RouterExecutionStartedIndicates the start of a router
RouterExecutionCompletedSignals successful completion of a router

Steps Execution Events

Event TypeDescription
StepsExecutionStartedIndicates the start of Steps being executed
StepsExecutionCompletedSignals successful completion of Steps execution

See detailed documentation in the WorkflowRunResponseEvent documentation.