condition_and_parallel_steps_stream.py
Copy
Ask AI
from agno.agent import Agent
from agno.tools.hackernews import HackerNewsTools
from agno.tools.yfinance import YFinanceTools
from agno.workflow.condition import Condition
from agno.workflow.parallel import Parallel
from agno.workflow.step import Step
from agno.workflow.types import StepInput
from agno.workflow.workflow import Workflow
# === AGENTS ===
hackernews_agent = Agent(
name="HackerNews Researcher",
instructions="Research tech news and trends from Hacker News",
tools=[HackerNewsTools()],
)
finance_agent = Agent(
name="Finance Researcher",
instructions="Research financial data and market trends",
tools=[YFinanceTools()],
)
content_agent = Agent(
name="Content Creator",
instructions="Create well-structured content from research data",
)
# === RESEARCH STEPS ===
research_hackernews_step = Step(
name="ResearchHackerNews",
description="Research tech news from Hacker News",
agent=hackernews_agent,
)
research_finance_step = Step(
name="ResearchFinance",
description="Research financial data and trends",
agent=finance_agent,
)
prepare_input_for_write_step = Step(
name="PrepareInput",
description="Prepare and organize research data for writing",
agent=content_agent,
)
write_step = Step(
name="WriteContent",
description="Write the final content based on research",
agent=content_agent,
)
# === CONDITION EVALUATORS ===
def check_if_we_should_search_hn(step_input: StepInput) -> bool:
"""Check if we should search Hacker News"""
topic = step_input.input or step_input.previous_step_content or ""
tech_keywords = [
"ai",
"machine learning",
"programming",
"software",
"tech",
"startup",
"coding",
]
return any(keyword in topic.lower() for keyword in tech_keywords)
def check_if_we_should_search_finance(step_input: StepInput) -> bool:
"""Check if we should search for financial data"""
topic = step_input.input or step_input.previous_step_content or ""
finance_keywords = ["stock", "market", "finance", "investment", "trading", "price"]
return any(keyword in topic.lower() for keyword in finance_keywords)
if __name__ == "__main__":
workflow = Workflow(
name="Conditional Workflow",
steps=[
Parallel(
Condition(
name="HackerNewsCondition",
description="Check if we should search Hacker News for tech topics",
evaluator=check_if_we_should_search_hn,
steps=[research_hackernews_step],
),
Condition(
name="FinanceCondition",
description="Check if we should search for financial data",
evaluator=check_if_we_should_search_finance,
steps=[research_finance_step],
),
name="ConditionalResearch",
description="Run conditional research steps in parallel",
),
prepare_input_for_write_step,
write_step,
],
)
try:
workflow.print_response(
input="Latest AI developments in machine learning",
stream=True,
)
except Exception as e:
print(f"Error: {e}")
print()