Copy
Ask AI
"""
Workflow With Conditional
=========================
Demonstrates workflow with conditional.
"""
from agno.agent.agent import Agent
from agno.db.sqlite import SqliteDb
# ---------------------------------------------------------------------------
# Create Example
# ---------------------------------------------------------------------------
# Import the workflows
from agno.os import AgentOS
from agno.tools.websearch import WebSearchTools
from agno.workflow.condition import Condition
from agno.workflow.step import Step
from agno.workflow.types import StepInput
from agno.workflow.workflow import Workflow
# === BASIC AGENTS ===
researcher = Agent(
name="Researcher",
instructions="Research the given topic and provide detailed findings.",
tools=[WebSearchTools()],
)
summarizer = Agent(
name="Summarizer",
instructions="Create a clear summary of the research findings.",
)
fact_checker = Agent(
name="Fact Checker",
instructions="Verify facts and check for accuracy in the research.",
tools=[WebSearchTools()],
)
writer = Agent(
name="Writer",
instructions="Write a comprehensive article based on all available research and verification.",
)
# === CONDITION EVALUATOR ===
def needs_fact_checking(step_input: StepInput) -> bool:
"""Determine if the research contains claims that need fact-checking"""
summary = step_input.previous_step_content or ""
# Look for keywords that suggest factual claims
fact_indicators = [
"study shows",
"research indicates",
"according to",
"statistics",
"data shows",
"survey",
"report",
"million",
"billion",
"percent",
"%",
"increase",
"decrease",
]
return any(indicator in summary.lower() for indicator in fact_indicators)
# === WORKFLOW STEPS ===
research_step = Step(
name="research",
description="Research the topic",
agent=researcher,
)
summarize_step = Step(
name="summarize",
description="Summarize research findings",
agent=summarizer,
)
# Conditional fact-checking step
fact_check_step = Step(
name="fact_check",
description="Verify facts and claims",
agent=fact_checker,
)
write_article = Step(
name="write_article",
description="Write final article",
agent=writer,
)
# === BASIC LINEAR WORKFLOW ===
basic_workflow = Workflow(
name="basic-linear-workflow",
description="Research -> Summarize -> Condition(Fact Check) -> Write Article",
steps=[
research_step,
summarize_step,
Condition(
name="fact_check_condition",
description="Check if fact-checking is needed",
evaluator=needs_fact_checking,
steps=[fact_check_step],
),
write_article,
],
db=SqliteDb(
session_table="workflow_session",
db_file="tmp/workflow.db",
),
)
# Initialize the AgentOS with the workflows
agent_os = AgentOS(
description="Example OS setup",
workflows=[basic_workflow],
)
app = agent_os.get_app()
# ---------------------------------------------------------------------------
# Run Example
# ---------------------------------------------------------------------------
if __name__ == "__main__":
agent_os.serve(app="workflow_with_conditional:app", reload=True)
Run the Example
Copy
Ask AI
# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/05_agent_os/workflow
# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate
python workflow_with_conditional.py