Skip to main content
from typing import List

from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow import Router
from agno.workflow.step import Step
from agno.workflow.types import StepInput
from agno.workflow.workflow import Workflow


def topic_router(step_input: StepInput) -> List[Step]:
    """Route to a specialist based on keywords in the input."""
    text = (step_input.input or "").lower()

    if any(kw in text for kw in ["code", "programming", "software", "api"]):
        return [Step(name="tech_research", agent=tech_specialist)]
    elif any(kw in text for kw in ["history", "war", "ancient", "century"]):
        return [Step(name="history_research", agent=history_specialist)]
    else:
        return [Step(name="general_research", agent=general_specialist)]


# Specialist agents
tech_specialist = Agent(
    name="Tech Specialist",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="You are a technology expert. Provide detailed technical explanations.",
)

history_specialist = Agent(
    name="History Specialist",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="You are a historian. Provide detailed historical context and analysis.",
)

general_specialist = Agent(
    name="General Specialist",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="You are a general knowledge expert. Provide clear, informative answers.",
)

# Inner workflow: routed research
inner_workflow = Workflow(
    name="Routed Research",
    description="Routes to the right specialist based on the topic",
    steps=[
        Router(
            name="specialist_router",
            selector=topic_router,
            choices=[
                Step(name="tech_research", agent=tech_specialist),
                Step(name="history_research", agent=history_specialist),
                Step(name="general_research", agent=general_specialist),
            ],
        ),
    ],
)

# Outer workflow
editor = Agent(
    name="Editor",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions="You are an editor. Polish and improve the specialist's research into a clear article.",
)

outer_workflow = Workflow(
    name="Smart Research and Edit",
    description="Routes to the right specialist, then edits the result",
    steps=[
        Step(name="research_phase", workflow=inner_workflow),
        Step(name="editing_phase", agent=editor),
    ],
)


if __name__ == "__main__":
    outer_workflow.print_response(
        input="Explain how REST APIs work and best practices for designing them",
        stream=True,
    )

Run the Example

git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/04_workflows/06_advanced_concepts/workflow_as_a_step

pip install agno openai

python nested_workflow_with_router.py