router_steps_workflow.py
Copy
Ask AI
from typing import List
from agno.agent.agent import Agent
from agno.tools.hackernews import HackerNewsTools
from agno.tools.yfinance import YFinanceTools
from agno.workflow.router import Router
from agno.workflow.step import Step
from agno.workflow.types import StepInput
from agno.workflow.workflow import Workflow
# Define the research agents
hackernews_agent = Agent(
name="HackerNews Researcher",
instructions="You are a researcher specializing in finding the latest tech news and discussions from Hacker News. Focus on startup trends, programming topics, and tech industry insights.",
tools=[HackerNewsTools()],
)
finance_agent = Agent(
name="Finance Researcher",
instructions="You are a finance researcher. Search for stock data, market trends, and financial information to gather detailed insights.",
tools=[YFinanceTools()],
)
content_agent = Agent(
name="Content Publisher",
instructions="You are a content creator who takes research data and creates engaging, well-structured articles. Format the content with proper headings, bullet points, and clear conclusions.",
)
# Create the research steps
research_hackernews = Step(
name="research_hackernews",
agent=hackernews_agent,
description="Research latest tech trends from Hacker News",
)
research_finance = Step(
name="research_finance",
agent=finance_agent,
description="Research financial data and market trends",
)
publish_content = Step(
name="publish_content",
agent=content_agent,
description="Create and format final content for publication",
)
# Now returns Step(s) to execute
def research_router(step_input: StepInput) -> List[Step]:
"""
Decide which research method to use based on the input topic.
Returns a list containing the step(s) to execute.
"""
# Use the original workflow message if this is the first step
topic = step_input.previous_step_content or step_input.input or ""
topic = topic.lower()
# Check if the topic is tech/startup related - use HackerNews
tech_keywords = [
"startup",
"programming",
"ai",
"machine learning",
"software",
"developer",
"coding",
"tech",
"silicon valley",
"venture capital",
"cryptocurrency",
"blockchain",
"open source",
"github",
]
if any(keyword in topic for keyword in tech_keywords):
print(f"Tech topic detected: Using HackerNews research for '{topic}'")
return [research_hackernews]
else:
print(f"Finance topic detected: Using finance research for '{topic}'")
return [research_finance]
workflow = Workflow(
name="Intelligent Research Workflow",
description="Automatically selects the best research method based on topic, then publishes content",
steps=[
Router(
name="research_strategy_router",
selector=research_router,
choices=[research_hackernews, research_finance],
description="Intelligently selects research method based on topic",
),
publish_content,
],
)
if __name__ == "__main__":
workflow.print_response(
"Latest developments in artificial intelligence and machine learning"
)