This example demonstrates Workflows 2.0 advanced data flow capabilities
This example demonstrates Workflows 2.0 shows how to:
get_step_content()
)get_all_previous_content()
)from agno.agent.agent import Agent
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.tools.hackernews import HackerNewsTools
from agno.workflow.v2.step import Step
from agno.workflow.v2.types import StepInput, StepOutput
from agno.workflow.v2.workflow import Workflow
# Define the research agents
hackernews_agent = Agent(
name="HackerNews Researcher",
instructions="You are a researcher specializing in finding the latest tech news and discussions from Hacker News. Focus on startup trends, programming topics, and tech industry insights.",
tools=[HackerNewsTools()],
)
web_agent = Agent(
name="Web Researcher",
instructions="You are a comprehensive web researcher. Search across multiple sources including news sites, blogs, and official documentation to gather detailed information.",
tools=[DuckDuckGoTools()],
)
reasoning_agent = Agent(
name="Reasoning Agent",
instructions="You are an expert analyst who creates comprehensive reports by analyzing and synthesizing information from multiple sources. Create well-structured, insightful reports.",
)
# Create the research steps
research_hackernews = Step(
name="research_hackernews",
agent=hackernews_agent,
description="Research latest tech trends from Hacker News",
)
research_web = Step(
name="research_web",
agent=web_agent,
description="Comprehensive web research on the topic",
)
# Custom function step that has access to ALL previous step outputs
def create_comprehensive_report(step_input: StepInput) -> StepOutput:
"""
Custom function that creates a report using data from multiple previous steps.
This function has access to ALL previous step outputs and the original workflow message.
"""
# Access original workflow input
original_topic = step_input.message or ""
print(f"--> Original topic: {original_topic}")
# Access specific step outputs by name
hackernews_data = step_input.get_step_content("research_hackernews") or ""
web_data = step_input.get_step_content("research_web") or ""
# Or access ALL previous content
all_research = step_input.get_all_previous_content()
# Create a comprehensive report combining all sources
report = f"""
# Comprehensive Research Report: {original_topic}
## Executive Summary
Based on research from HackerNews and web sources, here's a comprehensive analysis of {original_topic}.
## HackerNews Insights
{hackernews_data[:500]}...
## Web Research Findings
{web_data[:500]}...
"""
return StepOutput(
step_name="comprehensive_report", content=report.strip(), success=True
)
comprehensive_report_step = Step(
name="comprehensive_report",
executor=create_comprehensive_report,
description="Create comprehensive report from all research sources",
)
# Final reasoning step using reasoning agent
reasoning_step = Step(
name="final_reasoning",
agent=reasoning_agent,
description="Apply reasoning to create final insights and recommendations",
)
workflow = Workflow(
name="Enhanced Research Workflow",
description="Multi-source research with custom data flow and reasoning",
steps=[
research_hackernews,
# research_web,
comprehensive_report_step, # Has access to both previous steps
reasoning_step, # Gets the last step output (comprehensive report)
],
)
if __name__ == "__main__":
workflow.print_response(
"Latest developments in artificial intelligence and machine learning",
stream=True,
stream_intermediate_steps=True,
)