This example demonstrates Workflows 2.0 early termination of a running workflow.
This example shows how to create workflows that can terminate
gracefully when quality conditions aren’t met, preventing downstream processing of
invalid or unsafe data.When to use: When you need safety mechanisms, quality gates, or validation checkpoints
that should prevent downstream processing if conditions aren’t met. Ideal for data
validation pipelines, security checks, quality assurance workflows, or any process where
continuing with invalid inputs could cause problems.
early_stop_workflow_with_agents.py
Copy
Ask AI
from agno.agent import Agentfrom agno.models.openai import OpenAIChatfrom agno.workflow.v2 import Workflowfrom agno.workflow.v2.types import StepInput, StepOutput# Create agents with more specific validation criteriadata_validator = Agent( name="Data Validator", model=OpenAIChat(id="gpt-4o-mini"), instructions=[ "You are a data validator. Analyze the provided data and determine if it's valid.", "For data to be VALID, it must meet these criteria:", "- user_count: Must be a positive number (> 0)", "- revenue: Must be a positive number (> 0)", "- date: Must be in a reasonable date format (YYYY-MM-DD)", "", "Return exactly 'VALID' if all criteria are met.", "Return exactly 'INVALID' if any criteria fail.", "Also briefly explain your reasoning.", ],)data_processor = Agent( name="Data Processor", model=OpenAIChat(id="gpt-4o-mini"), instructions="Process and transform the validated data.",)report_generator = Agent( name="Report Generator", model=OpenAIChat(id="gpt-4o-mini"), instructions="Generate a final report from processed data.",)def early_exit_validator(step_input: StepInput) -> StepOutput: """ Custom function that checks data quality and stops workflow early if invalid """ # Get the validation result from previous step validation_result = step_input.previous_step_content or "" if "INVALID" in validation_result.upper(): return StepOutput( content="❌ Data validation failed. Workflow stopped early to prevent processing invalid data.", stop=True, # Stop the entire workflow here ) else: return StepOutput( content="✅ Data validation passed. Continuing with processing...", stop=False, # Continue normally )# Create workflow with conditional early terminationworkflow = Workflow( name="Data Processing with Early Exit", description="Process data but stop early if validation fails", steps=[ data_validator, # Step 1: Validate data early_exit_validator, # Step 2: Check validation and possibly stop early data_processor, # Step 3: Process data (only if validation passed) report_generator, # Step 4: Generate report (only if processing completed) ],)if __name__ == "__main__": print("\n=== Testing with INVALID data ===") workflow.print_response( message="Process this data: {'user_count': -50, 'revenue': 'invalid_amount', 'date': 'bad_date'}" ) print("=== Testing with VALID data ===") workflow.print_response( message="Process this data: {'user_count': 1000, 'revenue': 50000, 'date': '2024-01-15'}" )