advanced_workflow_patterns.py
Copy
Ask AI
from agno.workflow import Condition, Loop, Parallel, Router, Step, Workflow
def research_post_processor(step_input) -> StepOutput:
"""Post-process and consolidate research data from parallel conditions"""
research_data = step_input.previous_step_content or ""
try:
# Analyze research quality and completeness
word_count = len(research_data.split())
has_tech_content = any(keyword in research_data.lower()
for keyword in ["technology", "ai", "software", "tech"])
has_business_content = any(keyword in research_data.lower()
for keyword in ["market", "business", "revenue", "strategy"])
# Create enhanced research summary
enhanced_summary = f"""
## Research Analysis Report
**Data Quality:** {"✓ High-quality" if word_count > 200 else "⚠ Limited data"}
**Content Coverage:**
- Technical Analysis: {"✓ Completed" if has_tech_content else "✗ Not available"}
- Business Analysis: {"✓ Completed" if has_business_content else "✗ Not available"}
**Research Findings:**
{research_data}
""".strip()
return StepOutput(
content=enhanced_summary,
success=True,
)
except Exception as e:
return StepOutput(
content=f"Research post-processing failed: {str(e)}",
success=False,
error=str(e)
)
# Complex workflow combining multiple patterns
workflow = Workflow(
name="Advanced Multi-Pattern Workflow",
steps=[
Parallel(
Condition(
name="Tech Check",
evaluator=is_tech_topic,
steps=[Step(name="Tech Research", agent=tech_researcher)]
),
Condition(
name="Business Check",
evaluator=is_business_topic,
steps=[
Loop(
name="Deep Business Research",
steps=[Step(name="Market Research", agent=market_researcher)],
end_condition=research_quality_check,
max_iterations=3
)
]
),
name="Conditional Research Phase"
),
Step(
name="Research Post-Processing",
executor=research_post_processor,
description="Consolidate and analyze research findings with quality metrics"
),
Router(
name="Content Type Router",
selector=content_type_selector,
choices=[blog_post_step, social_media_step, report_step]
),
Step(name="Final Review", agent=reviewer),
]
)
workflow.print_response("Create a comprehensive analysis of sustainable technology trends and their business impact for 2024", markdown=True)