from agno.workflow import Workflow
from agno.workflow.router import Router
from agno.workflow.step import Step
from agno.workflow.types import StepInput, StepOutput
from agno.db.sqlite import SqliteDb
def quick_analysis(step_input: StepInput) -> StepOutput:
return StepOutput(content="Quick analysis: Basic metrics computed")
def deep_analysis(step_input: StepInput) -> StepOutput:
return StepOutput(content="Deep analysis: Full statistical analysis")
def custom_analysis(step_input: StepInput) -> StepOutput:
return StepOutput(content="Custom analysis: User-defined parameters")
workflow = Workflow(
name="analysis_workflow",
db=SqliteDb(db_file="workflow.db"),
steps=[
Step(name="prepare", executor=prepare_data),
Router(
name="analysis_router",
choices=[
Step(name="quick", description="Fast analysis (2 min)", executor=quick_analysis),
Step(name="deep", description="Full analysis (10 min)", executor=deep_analysis),
Step(name="custom", description="Custom parameters", executor=custom_analysis),
],
requires_user_input=True,
user_input_message="Select analysis type:",
allow_multiple_selections=False,
),
Step(name="report", executor=generate_report),
],
)
run_output = workflow.run("Analyze Q4 data")
if run_output.is_paused:
for req in run_output.steps_requiring_route:
print(f"Router: {req.step_name}")
print(f"Message: {req.user_input_message}")
print(f"Options: {req.available_choices}")
choice = input("Select: ")
req.select(choice)
run_output = workflow.continue_run(run_output)
print(run_output.content)