Copy
Ask AI
"""
Selector Types
==============
Demonstrates router selector flexibility across string, step object, list, and nested-choice return patterns.
"""
from typing import List, Union
from agno.agent.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow.router import Router
from agno.workflow.step import Step
from agno.workflow.types import StepInput
from agno.workflow.workflow import Workflow
# ---------------------------------------------------------------------------
# Create Agents For String Selector
# ---------------------------------------------------------------------------
tech_expert = Agent(
name="tech_expert",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="You are a tech expert. Provide technical analysis.",
)
biz_expert = Agent(
name="biz_expert",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="You are a business expert. Provide business insights.",
)
generalist = Agent(
name="generalist",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="You are a generalist. Provide general information.",
)
tech_step = Step(name="Tech Research", agent=tech_expert)
business_step = Step(name="Business Research", agent=biz_expert)
general_step = Step(name="General Research", agent=generalist)
# ---------------------------------------------------------------------------
# Define Selectors
# ---------------------------------------------------------------------------
def route_by_topic(step_input: StepInput) -> Union[str, Step, List[Step]]:
topic = step_input.input.lower()
if "tech" in topic or "ai" in topic or "software" in topic:
return "Tech Research"
if "business" in topic or "market" in topic or "finance" in topic:
return "Business Research"
return "General Research"
# ---------------------------------------------------------------------------
# Create Workflow (String Selector)
# ---------------------------------------------------------------------------
workflow_string_selector = Workflow(
name="Expert Routing (String Selector)",
steps=[
Router(
name="Topic Router",
selector=route_by_topic,
choices=[tech_step, business_step, general_step],
),
],
)
# ---------------------------------------------------------------------------
# Create Agents For step_choices Selector
# ---------------------------------------------------------------------------
researcher = Agent(
name="researcher",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="You are a researcher.",
)
writer = Agent(
name="writer",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="You are a writer.",
)
reviewer = Agent(
name="reviewer",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="You are a reviewer.",
)
def dynamic_selector(
step_input: StepInput,
step_choices: list,
) -> Union[str, Step, List[Step]]:
user_input = step_input.input.lower()
step_map = {s.name: s for s in step_choices if hasattr(s, "name") and s.name}
print(f"Available steps: {list(step_map.keys())}")
if "research" in user_input:
return "researcher"
if "write" in user_input:
return step_map.get("writer", step_choices[0])
if "full" in user_input:
return [step_map["researcher"], step_map["writer"], step_map["reviewer"]]
return step_choices[0]
# ---------------------------------------------------------------------------
# Create Workflow (step_choices)
# ---------------------------------------------------------------------------
workflow_step_choices = Workflow(
name="Dynamic Routing (step_choices)",
steps=[
Router(
name="Dynamic Router",
selector=dynamic_selector,
choices=[researcher, writer, reviewer],
),
],
)
# ---------------------------------------------------------------------------
# Create Agents For Nested Choices Selector
# ---------------------------------------------------------------------------
step_a = Agent(name="step_a", model=OpenAIChat(id="gpt-4o-mini"), instructions="Step A")
step_b = Agent(name="step_b", model=OpenAIChat(id="gpt-4o-mini"), instructions="Step B")
step_c = Agent(name="step_c", model=OpenAIChat(id="gpt-4o-mini"), instructions="Step C")
def nested_selector(
step_input: StepInput,
step_choices: list,
) -> Union[str, Step, List[Step]]:
user_input = step_input.input.lower()
if "single" in user_input:
return step_choices[0]
return step_choices[1]
# ---------------------------------------------------------------------------
# Create Workflow (Nested Choices)
# ---------------------------------------------------------------------------
workflow_nested = Workflow(
name="Nested Choices Routing",
steps=[
Router(
name="Nested Router",
selector=nested_selector,
choices=[step_a, [step_b, step_c]],
),
],
)
# ---------------------------------------------------------------------------
# Run Workflows
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print("=" * 60)
print("Example 1: String-based selector (returns step name)")
print("=" * 60)
workflow_string_selector.print_response("Tell me about AI trends", stream=True)
print("\n" + "=" * 60)
print("Example 2: step_choices parameter")
print("=" * 60)
workflow_step_choices.print_response("I need to research something", stream=True)
print("\n" + "=" * 60)
print("Example 3: Nested choices")
print("=" * 60)
workflow_nested.print_response("Run the sequence", stream=True)
Run the Example
Copy
Ask AI
# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/04_workflows/05_conditional_branching
# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate
python selector_types.py