from agno.workflow import Workflow
from agno.workflow.step import Step
from agno.workflow.types import StepInput, StepOutput, UserInputField
from agno.db.sqlite import SqliteDb
def process_with_params(step_input: StepInput) -> StepOutput:
user_input = step_input.additional_data.get("user_input", {})
threshold = user_input.get("threshold", 0.5)
mode = user_input.get("mode", "fast")
return StepOutput(content=f"Processed with threshold={threshold}, mode={mode}")
workflow = Workflow(
name="configurable_pipeline",
db=SqliteDb(db_file="workflow.db"),
steps=[
Step(name="analyze", agent=analyze_agent),
Step(
name="process",
executor=process_with_params,
requires_user_input=True,
user_input_message="Configure processing:",
user_input_schema=[
UserInputField(
name="threshold",
field_type="float",
description="Processing threshold (0.0-1.0)",
required=True,
),
UserInputField(
name="mode",
field_type="str",
description="Mode: 'fast' or 'accurate'",
required=True,
),
UserInputField(
name="batch_size",
field_type="int",
description="Records per batch",
required=False,
),
],
),
Step(name="report", agent=report_agent),
],
)
run_output = workflow.run("Process Q4 data")
if run_output.is_paused:
for req in run_output.steps_requiring_user_input:
print(f"Step: {req.step_name}")
print(f"Message: {req.user_input_message}")
values = {}
for field in req.user_input_schema:
marker = "*" if field.required else ""
prompt = f"{field.name}{marker} ({field.field_type}): "
value = input(prompt)
# Convert to appropriate type
if value:
if field.field_type == "int":
values[field.name] = int(value)
elif field.field_type == "float":
values[field.name] = float(value)
elif field.field_type == "bool":
values[field.name] = value.lower() in ("true", "yes", "1")
else:
values[field.name] = value
req.set_user_input(**values)
run_output = workflow.continue_run(run_output)
print(run_output.content)