Custom Functions in Workflows

Custom functions provide maximum flexibility by allowing you to define specific logic for step execution. Use them to preprocess inputs, orchestrate agents and teams, and postprocess outputs with complete programmatic control. Key Capabilities
  • Custom Logic: Implement complex business rules and data transformations
  • Agent Integration: Call agents and teams within your custom processing logic
  • Data Flow Control: Transform outputs between steps for optimal data handling
Implementation Pattern Define a Step with a custom function as the executor. The function must accept a StepInput object and return a StepOutput object, ensuring seamless integration with the workflow system.
content_planning_step = Step(
    name="Content Planning Step",
    executor=custom_content_planning_function,
)

def custom_content_planning_function(step_input: StepInput) -> StepOutput:
    """
    Custom function that does intelligent content planning with context awareness
    """
    message = step_input.input
    previous_step_content = step_input.previous_step_content

    # Create intelligent planning prompt
    planning_prompt = f"""
        STRATEGIC CONTENT PLANNING REQUEST:

        Core Topic: {message}

        Research Results: {previous_step_content[:500] if previous_step_content else "No research results"}

        Planning Requirements:
        1. Create a comprehensive content strategy based on the research
        2. Leverage the research findings effectively
        3. Identify content formats and channels
        4. Provide timeline and priority recommendations
        5. Include engagement and distribution strategies

        Please create a detailed, actionable content plan.
    """

    try:
        response = content_planner.run(planning_prompt)

        enhanced_content = f"""
            ## Strategic Content Plan

            **Planning Topic:** {message}

            **Research Integration:** {"✓ Research-based" if previous_step_content else "✗ No research foundation"}

            **Content Strategy:**
            {response.content}

            **Custom Planning Enhancements:**
            - Research Integration: {"High" if previous_step_content else "Baseline"}
            - Strategic Alignment: Optimized for multi-channel distribution
            - Execution Ready: Detailed action items included
        """.strip()

        return StepOutput(content=enhanced_content)

    except Exception as e:
        return StepOutput(
            content=f"Custom content planning failed: {str(e)}",
            success=False,
        )
Standard Pattern All custom functions follow this consistent structure:
def custom_content_planning_function(step_input: StepInput) -> StepOutput:
    # 1. Custom preprocessing
    # 2. Call agents/teams as needed
    # 3. Custom postprocessing
    return StepOutput(content=enhanced_content)
More Examples:

Background Workflow Execution

Execute workflows as non-blocking background tasks by passing background=True to Workflow.arun(). This returns a WorkflowRunOutput object with a run_id for polling the workflow status until completion.
Background execution requires async workflows using .arun(). Poll for results using workflow.get_run(run_id) and check completion status with .has_completed().Ideal for long-running operations like large-scale data processing, multi-step research, or batch operations that shouldn’t block your main application thread.
import asyncio

from agno.agent import Agent
from agno.db.sqlite import SqliteDb
from agno.models.openai import OpenAIChat
from agno.team import Team
from agno.tools.googlesearch import GoogleSearchTools
from agno.tools.hackernews import HackerNewsTools
from agno.utils.pprint import pprint_run_response
from agno.workflow.step import Step
from agno.workflow.workflow import Workflow

# Define agents
hackernews_agent = Agent(
    name="Hackernews Agent",
    model=OpenAIChat(id="gpt-5-mini"),
    tools=[HackerNewsTools()],
    role="Extract key insights and content from Hackernews posts",
)
web_agent = Agent(
    name="Web Agent",
    model=OpenAIChat(id="gpt-5-mini"),
    tools=[GoogleSearchTools()],
    role="Search the web for the latest news and trends",
)

# Define research team for complex analysis
research_team = Team(
    name="Research Team",
    members=[hackernews_agent, web_agent],
    instructions="Research tech topics from Hackernews and the web",
)

content_planner = Agent(
    name="Content Planner",
    model=OpenAIChat(id="gpt-5-mini"),
    instructions=[
        "Plan a content schedule over 4 weeks for the provided topic and research content",
        "Ensure that I have posts for 3 posts per week",
    ],
)

# Define steps
research_step = Step(
    name="Research Step",
    team=research_team,
)

content_planning_step = Step(
    name="Content Planning Step",
    agent=content_planner,
)

content_creation_workflow = Workflow(
    name="Content Creation Workflow",
    description="Automated content creation from blog posts to social media",
    db=SqliteDb(
        session_table="workflow_session",
        db_file="tmp/workflow.db",
    ),
    steps=[research_step, content_planning_step],
)


async def main():
    print("🚀 Starting Async Background Workflow Test")

    # Start background execution (async)
    bg_response = await content_creation_workflow.arun(
        input="AI trends in 2024", background=True
    )
    print(f"✅ Initial Response: {bg_response.status} - {bg_response.content}")
    print(f"📋 Run ID: {bg_response.run_id}")

    # Poll every 5 seconds until completion
    poll_count = 0

    while True:
        poll_count += 1
        print(f"\n🔍 Poll #{poll_count} (every 5s)")

        result = content_creation_workflow.get_run(bg_response.run_id)

        if result is None:
            print("⏳ Workflow not found yet, still waiting...")
            if poll_count > 50:
                print(f"⏰ Timeout after {poll_count} attempts")
                break
            await asyncio.sleep(5)
            continue

        if result.has_completed():
            break

        if poll_count > 200:
            print(f"⏰ Timeout after {poll_count} attempts")
            break

        await asyncio.sleep(5)

    final_result = content_creation_workflow.get_run(bg_response.run_id)

    print("\n📊 Final Result:")
    print("=" * 50)
    pprint_run_response(final_result, markdown=True)


if __name__ == "__main__":
    asyncio.run(main())
You can also use websockets for background workflows. See the Workflow Websocket example.

Workflow Cancellation

Workflows can be cancelled during execution to stop processing immediately and free up resources. This is particularly useful for long-running workflows, background tasks, or when user requirements change mid-execution. The cancellation system provides graceful termination with proper cleanup and event logging.

When to Use Cancellation

  • User-initiated stops: Allow users to cancel long-running processes
  • Resource management: Free up computational resources when workflows are no longer needed
  • Priority changes: Cancel lower-priority workflows to make room for urgent tasks

Cancelling Background Workflows

For workflows running in the background (using background=True), you can cancel them using the run_id:
import asyncio
from agno.workflow import Workflow
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow.step import Step

# Setup workflow
long_running_workflow = Workflow(
    name="Long Running Analysis",
    steps=[
        Step(name="Research", agent=Agent(model=OpenAIChat(id="gpt-5-mini"), instructions="You are a helpful assistant that can research the web.")),
        Step(name="Deep Analysis", agent=Agent(model=OpenAIChat(id="gpt-5-mini"), instructions="You are a helpful assistant that can analyze the web.")),
        Step(name="Report Generation", agent=Agent(model=OpenAIChat(id="gpt-5-mini"), instructions="You are a helpful assistant that can generate a report.")),
    ]
)

async def main():
    # Start background workflow
    bg_response = await long_running_workflow.arun(
        input="Comprehensive market analysis for emerging technologies",
        background=True
    )
    
    print(f"Started workflow with run_id: {bg_response.run_id}")
    
    # Simulate some time passing
    await asyncio.sleep(5)
    
    # Cancel the workflow
    cancellation_result = long_running_workflow.cancel_run(bg_response.run_id)
    
    if cancellation_result:  # cancellation_result is a bool
        print(f"✅ Workflow {bg_response.run_id} cancelled successfully")
    else:
        print(f"❌ Failed to cancel workflow {bg_response.run_id}")

asyncio.run(main())
When a workflow in streaming mode is cancelled, a specific event is triggered: WorkflowRunEvent.workflow_cancelled or simply known as WorkflowCancelledEvent
More Examples:

Early Stopping

Workflows support early termination when specific conditions are met, preventing unnecessary processing and implementing safety gates. Any step can trigger early termination by returning StepOutput(stop=True), immediately halting the entire workflow execution. Workflows early stop diagram
from agno.workflow import Step, Workflow, StepInput, StepOutput

def security_gate(step_input: StepInput) -> StepOutput:
    """Security gate that stops deployment if vulnerabilities found"""
    security_result = step_input.previous_step_content or ""
    
    if "VULNERABLE" in security_result.upper():
        return StepOutput(
            content="🚨 SECURITY ALERT: Critical vulnerabilities detected. Deployment blocked.",
            stop=True  # Stop the entire workflow
        )
    else:
        return StepOutput(
            content="✅ Security check passed. Proceeding with deployment...",
            stop=False
        )

# Secure deployment pipeline
workflow = Workflow(
    name="Secure Deployment Pipeline",
    steps=[
        Step(name="Security Scan", agent=security_scanner),
        Step(name="Security Gate", executor=security_gate),  # May stop here
        Step(name="Deploy Code", agent=code_deployer),       # Only if secure
        Step(name="Setup Monitoring", agent=monitoring_agent), # Only if deployed
    ]
)

# Test with vulnerable code - workflow stops at security gate
workflow.print_response("Scan this code: exec(input('Enter command: '))")
More Examples:

Accessing Multiple Previous Steps

Advanced workflows often require data from multiple previous steps beyond just the immediate predecessor. The StepInput object provides powerful methods to access any previous step’s output by name or retrieve all accumulated content.
def create_comprehensive_report(step_input: StepInput) -> StepOutput:
    """
    Custom function that creates a report using data from multiple previous steps.
    This function has access to ALL previous step outputs and the original workflow message.
    """

    # Access original workflow input
    original_topic = step_input.workflow_message or ""

    # Access specific step outputs by name
    hackernews_data = step_input.get_step_content("research_hackernews") or ""
    web_data = step_input.get_step_content("research_web") or ""

    # Or access ALL previous content
    all_research = step_input.get_all_previous_content()

    # Create a comprehensive report combining all sources
    report = f"""
        # Comprehensive Research Report: {original_topic}

        ## Executive Summary
        Based on research from HackerNews and web sources, here's a comprehensive analysis of {original_topic}.

        ## HackerNews Insights
        {hackernews_data[:500]}...

        ## Web Research Findings  
        {web_data[:500]}...
    """

    return StepOutput(
        step_name="comprehensive_report", 
        content=report.strip(), 
        success=True
    )

# Use in workflow
workflow = Workflow(
    name="Enhanced Research Workflow",
    steps=[
        Step(name="research_hackernews", agent=hackernews_agent),
        Step(name="research_web", agent=web_agent),
        Step(name="comprehensive_report", executor=create_comprehensive_report),  # Accesses both previous steps
        Step(name="final_reasoning", agent=reasoning_agent),
    ],
)
Available Methods
  • step_input.get_step_content("step_name") - Get content from specific step by name
  • step_input.get_all_previous_content() - Get all previous step content combined
  • step_input.workflow_message - Access the original workflow input message
  • step_input.previous_step_content - Get content from immediate previous step
In case of Parallel step, when you do step_input.get_step_content("parallel_step_name"), it will return a dict with each key as individual_step_name for all the outputs from the steps defined in parallel. Example:
parallel_step_output = step_input.get_step_content("parallel_step_name")
parallel_step_output will be a dict with each key as individual_step_name for all the outputs from the steps defined in parallel.
{
    "individual_step_name_1": "output_from_individual_step_1",
    "individual_step_name_2": "output_from_individual_step_2",
}
More Examples:

Event Storage and Management

Workflows can automatically store all execution events for analysis, debugging, and audit purposes. Filter specific event types to reduce noise and storage overhead while maintaining essential execution records. Access stored events via workflow.run_response.events and in the runs column of your workflow’s session database (SQLite, PostgreSQL, etc.).
  • store_events=True: Automatically stores all workflow events in the database
  • events_to_skip=[]: Filter out specific event types to reduce storage and noise
Access all stored events via workflow.run_response.events Available Events to Skip:
from agno.run.workflow import WorkflowRunEvent

# Common events you might want to skip
events_to_skip = [
    WorkflowRunEvent.workflow_started,
    WorkflowRunEvent.workflow_completed,
    WorkflowRunEvent.workflow_cancelled,
    WorkflowRunEvent.step_started,
    WorkflowRunEvent.step_completed,
    WorkflowRunEvent.parallel_execution_started,
    WorkflowRunEvent.parallel_execution_completed,
    WorkflowRunEvent.condition_execution_started,
    WorkflowRunEvent.condition_execution_completed,
    WorkflowRunEvent.loop_execution_started,
    WorkflowRunEvent.loop_execution_completed,
    WorkflowRunEvent.router_execution_started,
    WorkflowRunEvent.router_execution_completed,
]
Use Cases
  • Debugging: Store all events to analyze workflow execution flow
  • Audit Trails: Keep records of all workflow activities for compliance
  • Performance Analysis: Analyze timing and execution patterns
  • Error Investigation: Review event sequences leading to failures
  • Noise Reduction: Skip verbose events like step_started to focus on results
Configuration Examples
# store everything
debug_workflow = Workflow(
    name="Debug Workflow",
    store_events=True,
    steps=[...]
)

# store only important events
production_workflow = Workflow(
    name="Production Workflow", 
    store_events=True,
    events_to_skip=[
        WorkflowRunEvent.step_started,
        WorkflowRunEvent.parallel_execution_started,
        # keep step_completed and workflow_completed
    ],
    steps=[...]
)

# No event storage
fast_workflow = Workflow(
    name="Fast Workflow",
    store_events=False,  
    steps=[...]
)
More Examples:

Additional Data and Metadata

When to Use Pass metadata, configuration, or contextual information to specific steps without cluttering the main workflow message flow. Key Benefits
  • Separation of Concerns: Keep workflow logic separate from metadata
  • Step-Specific Context: Access additional information in custom functions
  • Clean Message Flow: Main message stays focused on content
  • Flexible Configuration: Pass user info, priorities, settings, and more
Access Pattern Use step_input.additional_data for dictionary access to all additional data passed to the workflow.
from agno.workflow import Step, Workflow, StepInput, StepOutput

def custom_content_planning_function(step_input: StepInput) -> StepOutput:
    """Custom function that uses additional_data for enhanced context"""
    
    # Access the main workflow message
    message = step_input.input
    previous_content = step_input.previous_step_content
    
    # Access additional_data that was passed with the workflow
    additional_data = step_input.additional_data or {}
    user_email = additional_data.get("user_email", "No email provided")
    priority = additional_data.get("priority", "normal")
    client_type = additional_data.get("client_type", "standard")
    
    # Create enhanced planning prompt with context
    planning_prompt = f"""
        STRATEGIC CONTENT PLANNING REQUEST:
        
        Core Topic: {message}
        Research Results: {previous_content[:500] if previous_content else "No research results"}
        
        Additional Context:
        - Client Type: {client_type}
        - Priority Level: {priority}
        - Contact Email: {user_email}
        
        {"🚨 HIGH PRIORITY - Expedited delivery required" if priority == "high" else "📝 Standard delivery timeline"}
        
        Please create a detailed, actionable content plan.
    """
    
    response = content_planner.run(planning_prompt)
    
    enhanced_content = f"""
        ## Strategic Content Plan
        
        **Planning Topic:** {message}
        **Client Details:** {client_type} | {priority.upper()} priority | {user_email}
        
        **Content Strategy:**
        {response.content}
    """
    
    return StepOutput(content=enhanced_content)

# Define workflow with steps
workflow = Workflow(
    name="Content Creation Workflow",
    steps=[
        Step(name="Research Step", team=research_team),
        Step(name="Content Planning Step", executor=custom_content_planning_function),
    ]
)

# Run workflow with additional_data
workflow.print_response(
    message="AI trends in 2024",
    additional_data={
        "user_email": "kaustubh@agno.com",
        "priority": "high",
        "client_type": "enterprise",
        "budget": "$50000",
        "deadline": "2024-12-15"
    },
    markdown=True,
    stream=True
)
More Examples:

Structured Inputs with Pydantic

Leverage Pydantic models for type-safe, validated workflow inputs:
from pydantic import BaseModel, Field

class ResearchRequest(BaseModel):
    topic: str = Field(description="Research topic")
    depth: int = Field(description="Research depth (1-10)")
    sources: List[str] = Field(description="Preferred sources")

workflow.print_response(
    message=ResearchRequest(
        topic="AI trends 2024",
        depth=8,
        sources=["academic", "industry"]
    )
)
More Examples:

Structured Input/Output at Step Level

Workflows feature a powerful type-safe data flow system enabling each step to:
  1. Receive structured input (Pydantic models, lists, dicts, or raw strings)
  2. Produce structured output (validated Pydantic models)
  3. Maintain type safety throughout entire workflow execution

Data Flow Between Steps

Input Processing
  • First step receives the workflow’s input message
  • Subsequent steps receive the previous step’s structured output
Output Generation
  • Each Agent processes input using its configured output_schema
  • Output is automatically validated against the defined model
# Define agents with response models
research_agent = Agent(
    name="Research Specialist",
    model=OpenAIChat(id="gpt-4"),
    output_schema=ResearchFindings,  # <-- Set on Agent
)

analysis_agent = Agent(
    name="Analysis Expert", 
    model=OpenAIChat(id="gpt-4"),
    output_schema=AnalysisResults,  # <-- Set on Agent
)

# Steps reference these agents
workflow = Workflow(steps=[
    Step(agent=research_agent),  # Will output ResearchFindings
    Step(agent=analysis_agent)   # Will output AnalysisResults
])

Structured Data Transformation in Custom Functions

Custom functions can access structured output from previous steps via step_input.previous_step_content, preserving original Pydantic model types. Transformation Pattern
  • Type-Check Inputs: Use isinstance(step_input.previous_step_content, ModelName) to verify input structure
  • Modify Data: Extract fields, process them, and construct new Pydantic models
  • Return Typed Output: Wrap the new model in StepOutput(content=new_model) for type safety
Example Implementation
   def transform_data(step_input: StepInput) -> StepOutput:
       research = step_input.previous_step_content  # Type: ResearchFindings
       analysis = AnalysisReport(
           analysis_type="Custom",
           key_findings=[f"Processed: {research.topic}"],
           ...  # Modified fields
       )
       return StepOutput(content=analysis)
More Examples:

Media Input and Processing

Workflows seamlessly handle media artifacts (images, videos, audio) throughout the execution pipeline, enabling rich multimedia processing workflows. Media Flow System
  • Input Support: Media can be provided to Workflow.run() and Workflow.print_response()
  • Step Propagation: Media is passed through to individual steps (Agents, Teams, or Custom Functions)
  • Artifact Accumulation: Each step receives shared media from previous steps and can produce additional outputs
  • Format Compatibility: Automatic conversion between artifact formats ensures seamless integration
  • Complete Preservation: Final WorkflowRunOutput contains all accumulated media from the entire execution chain
Here’s an example of how to pass image as input:
from agno.agent import Agent
from agno.media import Image
from agno.models.openai import OpenAIChat
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.workflow import Step, Workflow
from agno.db.sqlite import SqliteDb

# Define agents
image_analyzer = Agent(
    name="Image Analyzer",
    model=OpenAIChat(id="gpt-5-mini"),
    instructions="Analyze the provided image and extract key details, objects, and context.",
)

news_researcher = Agent(
    name="News Researcher", 
    model=OpenAIChat(id="gpt-5-mini"),
    tools=[DuckDuckGoTools()],
    instructions="Search for latest news and information related to the analyzed image content.",
)

# Define steps
analysis_step = Step(
    name="Image Analysis Step",
    agent=image_analyzer,
)

research_step = Step(
    name="News Research Step", 
    agent=news_researcher,
)

# Create workflow with media input
media_workflow = Workflow(
    name="Image Analysis and Research Workflow",
    description="Analyze an image and research related news",
    steps=[analysis_step, research_step],
    db=SqliteDb(db_file="tmp/workflow.db"),
)

# Run workflow with image input
if __name__ == "__main__":
    media_workflow.print_response(
        message="Please analyze this image and find related news",
        images=[
            Image(url="https://upload.wikimedia.org/wikipedia/commons/0/0c/GoldenGateBridge-001.jpg")
        ],
        markdown=True,
    )
If you are using Workflow.run(), you need to use WorkflowRunOutput to access the images, videos, and audio.
from agno.run.workflow import WorkflowRunOutput

response: WorkflowRunOutput = media_workflow.run(
    message="Please analyze this image and find related news",
    images=[
        Image(url="https://upload.wikimedia.org/wikipedia/commons/0/0c/GoldenGateBridge-001.jpg")
    ],
    markdown=True,
)

print(response.images)
Similarly, you can pass Video and Audio as input. More Examples: