from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow import Parallel
from agno.workflow.step import Step
from agno.workflow.types import StepInput, StepOutput
from agno.workflow.workflow import Workflow
def merge_results(step_input: StepInput) -> StepOutput:
"""Merge content from previous steps."""
prev = step_input.previous_step_content or ""
return StepOutput(content=f"Merged: {prev[:500]}")
# Level 3: Mini-workflows for individual research tasks
data_agent = Agent(
name="Data Agent",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="Gather raw data and statistics on the topic. Be concise (2-3 sentences).",
)
analysis_agent = Agent(
name="Analysis Agent",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="Analyze the data provided. Identify key trends. Be concise (2-3 sentences).",
)
data_workflow = Workflow(
name="Data Collection",
description="Collects and analyzes raw data",
steps=[
Step(name="gather", agent=data_agent),
Step(name="analyze", agent=analysis_agent),
],
)
opinion_agent = Agent(
name="Opinion Agent",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="Provide expert opinion and perspective on the topic. Be concise (2-3 sentences).",
)
opinion_workflow = Workflow(
name="Expert Opinion",
description="Gathers expert perspectives",
steps=[Step(name="opinion", agent=opinion_agent)],
)
# Level 2: Research workflow with parallel Level 3 workflows
level2_workflow = Workflow(
name="Comprehensive Research",
description="Runs data collection and expert opinion in parallel",
steps=[
Parallel(
Step(name="data_branch", workflow=data_workflow),
Step(name="opinion_branch", workflow=opinion_workflow),
name="parallel_research",
),
Step(name="merge", executor=merge_results),
],
)
# Level 1: Outermost workflow
writer = Agent(
name="Writer",
model=OpenAIChat(id="gpt-4o-mini"),
instructions="Write a polished short paragraph synthesizing all research provided.",
)
outer_workflow = Workflow(
name="Full Pipeline",
description="3-level nested workflow: research (parallel mini-workflows) -> write",
steps=[
Step(name="research", workflow=level2_workflow),
Step(name="write", agent=writer),
],
)
if __name__ == "__main__":
outer_workflow.print_response(
input="What is the future of renewable energy?",
stream=True,
)