The Workflow.run()
function runs the agent and generates a response, either as a WorkflowRunResponse
object or a stream of WorkflowRunResponse
objects.
Many of our examples use workflow.print_response()
which is a helper utility to print the response in the terminal. This uses workflow.run()
under the hood.
Running your Workflow
Here’s how to run your workflow. The response is captured in the response
.
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.storage.sqlite import SqliteStorage
from agno.team import Team
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.hackernews import HackerNewsTools
from agno.workflow.v2.step import Step
from agno.workflow.v2.workflow import Workflow
from agno.run.v2.workflow import WorkflowRunResponse
from agno.utils.pprint import pprint_run_response
# Define agents
hackernews_agent = Agent(
name="Hackernews Agent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[HackerNewsTools()],
role="Extract key insights and content from Hackernews posts",
)
web_agent = Agent(
name="Web Agent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[DuckDuckGoTools()],
role="Search the web for the latest news and trends",
)
# Define research team for complex analysis
research_team = Team(
name="Research Team",
mode="coordinate",
members=[hackernews_agent, web_agent],
instructions="Research tech topics from Hackernews and the web",
)
content_planner = Agent(
name="Content Planner",
model=OpenAIChat(id="gpt-4o"),
instructions=[
"Plan a content schedule over 4 weeks for the provided topic and research content",
"Ensure that I have posts for 3 posts per week",
],
)
content_creation_workflow = Workflow(
name="Content Creation Workflow",
description="Automated content creation from blog posts to social media",
storage=SqliteStorage(
table_name="workflow_v2",
db_file="tmp/workflow_v2.db",
mode="workflow_v2",
),
steps=[research_team, content_planner],
)
# Create and use workflow
if __name__ == "__main__":
response: WorkflowRunResponse = content_creation_workflow.run(
message="AI trends in 2024",
markdown=True,
)
pprint_run_response(response, markdown=True)
The Workflow.run()
function returns a WorkflowRunResponse
object when not streaming.
Here is detailed documentation for `WorkflowRunResponse.
Async Execution
The Workflow.arun()
function is the async version of Workflow.run()
.
Here is an example of how to use it:
from typing import AsyncIterator
import asyncio
from agno.agent.agent import Agent
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.workflow.v2.condition import Condition
from agno.workflow.v2.step import Step
from agno.workflow.v2.types import StepInput
from agno.workflow.v2.workflow import Workflow
from agno.run.v2.workflow import WorkflowRunResponseEvent, WorkflowRunEvent
# === BASIC AGENTS ===
researcher = Agent(
name="Researcher",
instructions="Research the given topic and provide detailed findings.",
tools=[DuckDuckGoTools()],
)
summarizer = Agent(
name="Summarizer",
instructions="Create a clear summary of the research findings.",
)
fact_checker = Agent(
name="Fact Checker",
instructions="Verify facts and check for accuracy in the research.",
tools=[DuckDuckGoTools()],
)
writer = Agent(
name="Writer",
instructions="Write a comprehensive article based on all available research and verification.",
)
# === CONDITION EVALUATOR ===
def needs_fact_checking(step_input: StepInput) -> bool:
"""Determine if the research contains claims that need fact-checking"""
summary = step_input.previous_step_content or ""
# Look for keywords that suggest factual claims
fact_indicators = [
"study shows",
"breakthroughs",
"research indicates",
"according to",
"statistics",
"data shows",
"survey",
"report",
"million",
"billion",
"percent",
"%",
"increase",
"decrease",
]
return any(indicator in summary.lower() for indicator in fact_indicators)
# === WORKFLOW STEPS ===
research_step = Step(
name="research",
description="Research the topic",
agent=researcher,
)
summarize_step = Step(
name="summarize",
description="Summarize research findings",
agent=summarizer,
)
# Conditional fact-checking step
fact_check_step = Step(
name="fact_check",
description="Verify facts and claims",
agent=fact_checker,
)
write_article = Step(
name="write_article",
description="Write final article",
agent=writer,
)
# === BASIC LINEAR WORKFLOW ===
basic_workflow = Workflow(
name="Basic Linear Workflow",
description="Research -> Summarize -> Condition(Fact Check) -> Write Article",
steps=[
research_step,
summarize_step,
Condition(
name="fact_check_condition",
description="Check if fact-checking is needed",
evaluator=needs_fact_checking,
steps=[fact_check_step],
),
write_article,
],
)
async def main():
try:
response: WorkflowRunResponseEvent = await basic_workflow.arun(
message="Recent breakthroughs in quantum computing",
)
pprint_run_response(response, markdown=True)
if __name__ == "__main__":
asyncio.run(main())
Streaming Responses
To enable streaming, set stream=True
when calling run()
. This will return an iterator of WorkflowRunResponseEvent
objects instead of a single response.
# Define your agents/team
...
content_creation_workflow = Workflow(
name="Content Creation Workflow",
description="Automated content creation from blog posts to social media",
storage=SqliteStorage(
table_name="workflow_v2",
db_file="tmp/workflow_v2.db",
mode="workflow_v2",
),
steps=[research_team, content_planner],
)
# Create and use workflow
if __name__ == "__main__":
response: Iterator[WorkflowRunResponseEvent] = content_creation_workflow.run(
message="AI trends in 2024",
markdown=True,
stream=True,
)
pprint_run_response(response, markdown=True)
In the case where you put stream_intermediate_steps=False
(or not set it at all), we only yield WorkflowStartedEvent
, WorkflowCompletedEvent
along with all the Agent/Team
events.
For even more detailed streaming, you can enable intermediate steps by setting stream_intermediate_steps=True
. This will provide real-time updates about each step of the workflow.
from typing import Iterator
from agno.agent.agent import Agent
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.workflow.v2.condition import Condition
from agno.workflow.v2.step import Step
from agno.workflow.v2.types import StepInput
from agno.workflow.v2.workflow import Workflow
from agno.run.v2.workflow import WorkflowRunResponseEvent, WorkflowRunEvent
# === BASIC AGENTS ===
researcher = Agent(
name="Researcher",
instructions="Research the given topic and provide detailed findings.",
tools=[DuckDuckGoTools()],
)
summarizer = Agent(
name="Summarizer",
instructions="Create a clear summary of the research findings.",
)
fact_checker = Agent(
name="Fact Checker",
instructions="Verify facts and check for accuracy in the research.",
tools=[DuckDuckGoTools()],
)
writer = Agent(
name="Writer",
instructions="Write a comprehensive article based on all available research and verification.",
)
# === CONDITION EVALUATOR ===
def needs_fact_checking(step_input: StepInput) -> bool:
"""Determine if the research contains claims that need fact-checking"""
summary = step_input.previous_step_content or ""
# Look for keywords that suggest factual claims
fact_indicators = [
"study shows",
"breakthroughs",
"research indicates",
"according to",
"statistics",
"data shows",
"survey",
"report",
"million",
"billion",
"percent",
"%",
"increase",
"decrease",
]
return any(indicator in summary.lower() for indicator in fact_indicators)
# === WORKFLOW STEPS ===
research_step = Step(
name="research",
description="Research the topic",
agent=researcher,
)
summarize_step = Step(
name="summarize",
description="Summarize research findings",
agent=summarizer,
)
# Conditional fact-checking step
fact_check_step = Step(
name="fact_check",
description="Verify facts and claims",
agent=fact_checker,
)
write_article = Step(
name="write_article",
description="Write final article",
agent=writer,
)
# === BASIC LINEAR WORKFLOW ===
basic_workflow = Workflow(
name="Basic Linear Workflow",
description="Research -> Summarize -> Condition(Fact Check) -> Write Article",
steps=[
research_step,
summarize_step,
Condition(
name="fact_check_condition",
description="Check if fact-checking is needed",
evaluator=needs_fact_checking,
steps=[fact_check_step],
),
write_article,
],
)
if __name__ == "__main__":
try:
response: Iterator[WorkflowRunResponseEvent] = basic_workflow.run(
message="Recent breakthroughs in quantum computing",
stream=True,
stream_intermediate_steps=True,
)
for event in response:
if event.event == WorkflowRunEvent.condition_execution_started.value:
print(event)
print()
elif event.event == WorkflowRunEvent.condition_execution_completed.value:
print(event)
print()
elif event.event == WorkflowRunEvent.workflow_started.value:
print(event)
print()
elif event.event == WorkflowRunEvent.step_started.value:
print(event)
print()
elif event.event == WorkflowRunEvent.step_completed.value:
print(event)
print()
elif event.event == WorkflowRunEvent.workflow_completed.value:
print(event)
print()
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
Async Streaming
The Workflow.arun(stream=True)
returns an async iterator of WorkflowRunResponseEvent
objects instead of a single response.
So for example, if you want to stream the response, you can do the following:
# Define your workflow
...
async def main():
try:
response: AsyncIterator[WorkflowRunResponseEvent] = await basic_workflow.arun(
message="Recent breakthroughs in quantum computing",
stream=True,
stream_intermediate_steps=True,
)
async for event in response:
if event.event == WorkflowRunEvent.condition_execution_started.value:
print(event)
print()
elif event.event == WorkflowRunEvent.condition_execution_completed.value:
print(event)
print()
elif event.event == WorkflowRunEvent.workflow_started.value:
print(event)
print()
elif event.event == WorkflowRunEvent.step_started.value:
print(event)
print()
elif event.event == WorkflowRunEvent.step_completed.value:
print(event)
print()
elif event.event == WorkflowRunEvent.workflow_completed.value:
print(event)
print()
except Exception as e:
print(f"❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())
Event Types
The following events are yielded by the Workflow.run()
and Workflow.arun()
functions depending on the workflow’s configuration:
Core Events
Event Type | Description |
---|
WorkflowStarted | Indicates the start of a workflow run |
WorkflowCompleted | Signals successful completion of the workflow run |
WorkflowError | Indicates an error occurred during the workflow run |
Step Events
Event Type | Description |
---|
StepStarted | Indicates the start of a step |
StepCompleted | Signals successful completion of a step |
StepError | Indicates an error occurred during a step |
Step Output Events (For custom functions)
Event Type | Description |
---|
StepOutput | Indicates the output of a step |
Parallel Execution Events
Event Type | Description |
---|
ParallelExecutionStarted | Indicates the start of a parallel step |
ParallelExecutionCompleted | Signals successful completion of a parallel step |
Condition Execution Events
Event Type | Description |
---|
ConditionExecutionStarted | Indicates the start of a condition |
ConditionExecutionCompleted | Signals successful completion of a condition |
Loop Execution Events
Event Type | Description |
---|
LoopExecutionStarted | Indicates the start of a loop |
LoopIterationStartedEvent | Indicates the start of a loop iteration |
LoopIterationCompletedEvent | Signals successful completion of a loop iteration |
LoopExecutionCompleted | Signals successful completion of a loop |
Router Execution Events
Event Type | Description |
---|
RouterExecutionStarted | Indicates the start of a router |
RouterExecutionCompleted | Signals successful completion of a router |
Steps Execution Events
Event Type | Description |
---|
StepsExecutionStarted | Indicates the start of Steps being executed |
StepsExecutionCompleted | Signals successful completion of Steps execution |
See detailed documentation in the WorkflowRunResponseEvent documentation.