How Custom Functions Work

Custom functions provide flexibility by allowing developers to define specific logic for step execution. They can be used to preprocess inputs, call agents, and postprocess outputs.

  • executor: Step can be defined with a custom execution function that handles the step logic.
  • Integration with Agents and Teams: Custom functions can interact with agents and teams, leveraging their capabilities.

While defining a Step, you can specify a custom function as an executor. This function should accept a StepInput object and return a StepOutput object.

content_planning_step = Step(
    name="Content Planning Step",
    executor=custom_content_planning_function,
)

def custom_content_planning_function(step_input: StepInput) -> StepOutput:
    """
    Custom function that does intelligent content planning with context awareness
    """
    message = step_input.message
    previous_step_content = step_input.previous_step_content

    # Create intelligent planning prompt
    planning_prompt = f"""
        STRATEGIC CONTENT PLANNING REQUEST:

        Core Topic: {message}

        Research Results: {previous_step_content[:500] if previous_step_content else "No research results"}

        Planning Requirements:
        1. Create a comprehensive content strategy based on the research
        2. Leverage the research findings effectively
        3. Identify content formats and channels
        4. Provide timeline and priority recommendations
        5. Include engagement and distribution strategies

        Please create a detailed, actionable content plan.
    """

    try:
        response = content_planner.run(planning_prompt)

        enhanced_content = f"""
            ## Strategic Content Plan

            **Planning Topic:** {message}

            **Research Integration:** {"✓ Research-based" if previous_step_content else "✗ No research foundation"}

            **Content Strategy:**
            {response.content}

            **Custom Planning Enhancements:**
            - Research Integration: {"High" if previous_step_content else "Baseline"}
            - Strategic Alignment: Optimized for multi-channel distribution
            - Execution Ready: Detailed action items included
        """.strip()

        return StepOutput(content=enhanced_content, response=response)

    except Exception as e:
        return StepOutput(
            content=f"Custom content planning failed: {str(e)}",
            success=False,
        )

Just make sure to follow this structure and return the output as a StepOutput object.

def custom_content_planning_function(step_input: StepInput) -> StepOutput:
    # Custom preprocessing
    # Call the agent
    # Custom postprocessing
    return StepOutput(content=enhanced_content)

More Examples:

Early Stopping

Workflows can be terminated early when certain conditions are met, preventing unnecessary processing and ensuring safety gates work properly. Any step can trigger early termination by returning StepOutput(stop=True).

Early Stop Workflows

from agno.workflow.v2 import Step, Workflow
from agno.workflow.v2.types import StepInput, StepOutput

def security_gate(step_input: StepInput) -> StepOutput:
    """Security gate that stops deployment if vulnerabilities found"""
    security_result = step_input.previous_step_content or ""
    
    if "VULNERABLE" in security_result.upper():
        return StepOutput(
            content="🚨 SECURITY ALERT: Critical vulnerabilities detected. Deployment blocked.",
            stop=True  # Stop the entire workflow
        )
    else:
        return StepOutput(
            content="✅ Security check passed. Proceeding with deployment...",
            stop=False
        )

# Secure deployment pipeline
workflow = Workflow(
    name="Secure Deployment Pipeline",
    steps=[
        Step(name="Security Scan", agent=security_scanner),
        Step(name="Security Gate", executor=security_gate),  # May stop here
        Step(name="Deploy Code", agent=code_deployer),       # Only if secure
        Step(name="Setup Monitoring", agent=monitoring_agent), # Only if deployed
    ]
)

# Test with vulnerable code - workflow stops at security gate
workflow.print_response("Scan this code: exec(input('Enter command: '))")

More Examples:

Access Multiple Previous Steps Output

Advanced workflows often need to access data from multiple previous steps, not just the immediate previous step. The StepInput object provides powerful methods to access any previous step’s output by name or get all previous content.

def create_comprehensive_report(step_input: StepInput) -> StepOutput:
    """
    Custom function that creates a report using data from multiple previous steps.
    This function has access to ALL previous step outputs and the original workflow message.
    """

    # Access original workflow input
    original_topic = step_input.workflow_message or ""

    # Access specific step outputs by name
    hackernews_data = step_input.get_step_content("research_hackernews") or ""
    web_data = step_input.get_step_content("research_web") or ""

    # Or access ALL previous content
    all_research = step_input.get_all_previous_content()

    # Create a comprehensive report combining all sources
    report = f"""
        # Comprehensive Research Report: {original_topic}

        ## Executive Summary
        Based on research from HackerNews and web sources, here's a comprehensive analysis of {original_topic}.

        ## HackerNews Insights
        {hackernews_data[:500]}...

        ## Web Research Findings  
        {web_data[:500]}...
    """

    return StepOutput(
        step_name="comprehensive_report", 
        content=report.strip(), 
        success=True
    )

# Use in workflow
workflow = Workflow(
    name="Enhanced Research Workflow",
    steps=[
        Step(name="research_hackernews", agent=hackernews_agent),
        Step(name="research_web", agent=web_agent),
        Step(name="comprehensive_report", executor=create_comprehensive_report),  # Accesses both previous steps
        Step(name="final_reasoning", agent=reasoning_agent),
    ],
)

Key Methods:

  • step_input.get_step_content("step_name") - Get content from specific step by name
  • step_input.get_all_previous_content() - Get all previous step content combined
  • step_input.workflow_message - Access the original workflow input message
  • step_input.previous_step_content - Get content from immediate previous step

In case of Parallel step, when you do step_input.get_step_content("parallel_step_name"), it will return a dict with each key as individual_step_name for all the outputs from the steps defined in parallel. Example:

parallel_step_output = step_input.get_step_content("parallel_step_name")

parallel_step_output will be a dict with each key as individual_step_name for all the outputs from the steps defined in parallel.

{
    "individual_step_name_1": "output_from_individual_step_1",
    "individual_step_name_2": "output_from_individual_step_2",
}

More Examples:

Store Events

Workflows can automatically store all events for later analysis, debugging, or audit purposes. You can also filter out specific event types to reduce noise and storage overhead. You can access these events on the WorkflowRunResponse and in the runs column in your Workflow's Session DB in your configured storage backend (SQLite, PostgreSQL, etc.).

  • store_events=True: Automatically stores all workflow events in the database
  • events_to_skip=[]: Filter out specific event types to reduce storage and noise

Access all stored events via workflow.run_response.events

Available Events to Skip:

from agno.run.v2.workflow import WorkflowRunEvent

# Common events you might want to skip
events_to_skip = [
    WorkflowRunEvent.workflow_started,
    WorkflowRunEvent.workflow_completed,
    WorkflowRunEvent.step_started,
    WorkflowRunEvent.step_completed,
    WorkflowRunEvent.parallel_execution_started,
    WorkflowRunEvent.parallel_execution_completed,
    WorkflowRunEvent.condition_execution_started,
    WorkflowRunEvent.condition_execution_completed,
    WorkflowRunEvent.loop_execution_started,
    WorkflowRunEvent.loop_execution_completed,
    WorkflowRunEvent.router_execution_started,
    WorkflowRunEvent.router_execution_completed,
]

When to use:

  • Debugging: Store all events to analyze workflow execution flow
  • Audit Trails: Keep records of all workflow activities for compliance
  • Performance Analysis: Analyze timing and execution patterns
  • Error Investigation: Review event sequences leading to failures
  • Noise Reduction: Skip verbose events like step_started to focus on results

Example Use Cases:

# store everything
debug_workflow = Workflow(
    name="Debug Workflow",
    store_events=True,
    steps=[...]
)

# store only important events
production_workflow = Workflow(
    name="Production Workflow", 
    store_events=True,
    events_to_skip=[
        WorkflowRunEvent.step_started,
        WorkflowRunEvent.parallel_execution_started,
        # keep step_completed and workflow_completed
    ],
    steps=[...]
)

# No event storage
fast_workflow = Workflow(
    name="Fast Workflow",
    store_events=False,  
    steps=[...]
)

More Examples:

Additional Data

When to use: When you need to pass metadata, configuration, or contextual information to specific steps without it being part of the main workflow message flow.

  • Separation of Concerns: Keep workflow logic separate from metadata
  • Step-Specific Context: Access additional information in custom functions
  • Clean Message Flow: Main message stays focused on content
  • Flexible Configuration: Pass user info, priorities, settings, etc.

Access Pattern: step_input.additional_data provides dictionary access to all additional data

from agno.workflow.v2 import Step, Workflow
from agno.workflow.v2.types import StepInput, StepOutput

def custom_content_planning_function(step_input: StepInput) -> StepOutput:
    """Custom function that uses additional_data for enhanced context"""
    
    # Access the main workflow message
    message = step_input.message
    previous_content = step_input.previous_step_content
    
    # Access additional_data that was passed with the workflow
    additional_data = step_input.additional_data or {}
    user_email = additional_data.get("user_email", "No email provided")
    priority = additional_data.get("priority", "normal")
    client_type = additional_data.get("client_type", "standard")
    
    # Create enhanced planning prompt with context
    planning_prompt = f"""
        STRATEGIC CONTENT PLANNING REQUEST:
        
        Core Topic: {message}
        Research Results: {previous_content[:500] if previous_content else "No research results"}
        
        Additional Context:
        - Client Type: {client_type}
        - Priority Level: {priority}
        - Contact Email: {user_email}
        
        {"🚨 HIGH PRIORITY - Expedited delivery required" if priority == "high" else "📝 Standard delivery timeline"}
        
        Please create a detailed, actionable content plan.
    """
    
    response = content_planner.run(planning_prompt)
    
    enhanced_content = f"""
        ## Strategic Content Plan
        
        **Planning Topic:** {message}
        **Client Details:** {client_type} | {priority.upper()} priority | {user_email}
        
        **Content Strategy:**
        {response.content}
    """
    
    return StepOutput(content=enhanced_content, response=response)

# Define workflow with steps
workflow = Workflow(
    name="Content Creation Workflow",
    steps=[
        Step(name="Research Step", team=research_team),
        Step(name="Content Planning Step", executor=custom_content_planning_function),
    ]
)

# Run workflow with additional_data
workflow.print_response(
    message="AI trends in 2024",
    additional_data={
        "user_email": "kaustubh@agno.com",
        "priority": "high",
        "client_type": "enterprise",
        "budget": "$50000",
        "deadline": "2024-12-15"
    },
    markdown=True,
    stream=True
)

More Examples:

Structured Inputs

Use Pydantic models for type-safe inputs:

from pydantic import BaseModel, Field

class ResearchRequest(BaseModel):
    topic: str = Field(description="Research topic")
    depth: int = Field(description="Research depth (1-10)")
    sources: List[str] = Field(description="Preferred sources")

workflow.print_response(
    message=ResearchRequest(
        topic="AI trends 2024",
        depth=8,
        sources=["academic", "industry"]
    )
)

More Examples:

Structured IO at Each Step Level

Workflows features a powerful type-safe data flow system where each step in your workflow can:

  1. Receive structured input (Pydantic models, lists, dicts, or raw strings)
  2. Produce structured output (validated Pydantic models)
  3. Maintain type safety throughout the entire workflow execution

How Data Flows Between Steps

  1. Input Handling:

    • The first step receives the workflow’s input message
    • Subsequent steps receive the previous step’s structured output
  2. Output Processing:

    • Each Agent processes the input using its response_model
    • The output is automatically validated against the model
# Define agents with response models
research_agent = Agent(
    name="Research Specialist",
    model=OpenAIChat(id="gpt-4"),
    response_model=ResearchFindings,  # <-- Set on Agent
)

analysis_agent = Agent(
    name="Analysis Expert", 
    model=OpenAIChat(id="gpt-4"),
    response_model=AnalysisResults,  # <-- Set on Agent
)

# Steps reference these agents
workflow = Workflow(steps=[
    Step(agent=research_agent),  # Will output ResearchFindings
    Step(agent=analysis_agent)   # Will output AnalysisResults
])

Structured Data Transformation in Custom Functions

Custom functions in workflows can access the structured output of previous steps via step_input.previous_step_content, which preserves the original Pydantic model type (e.g., ResearchFindings). To transform data:

  • Type-Check Inputs: Use isinstance(step_input.previous_step_content, ModelName) to verify the input structure.
  • Modify Data: Extract fields (e.g., step_input.previous_step_content.topic), process them, and construct a new Pydantic model (e.g., AnalysisReport).
  • Return Typed Output: Wrap the new model in StepOutput(content=new_model). This ensures type safety for downstream steps. Example:
   def transform_data(step_input: StepInput) -> StepOutput:
       research = step_input.previous_step_content  # Type: ResearchFindings
       analysis = AnalysisReport(
           analysis_type="Custom",
           key_findings=[f"Processed: {research.topic}"],
           ...  # Modified fields
       )
       return StepOutput(content=analysis)

More Examples:

Media Input

Workflows seamlessly handle media artifacts (images, videos, audio) throughout the execution pipeline. Media can be provided as input for Workflow.run() and Workflow.print_response() and is passed through to individual steps (whether Agent, Team or Custom Function).

During execution, media artifacts accumulate across steps - each step receives shared media from previous steps and can produce additional media outputs. The Step class handles automatic conversion between artifact formats, ensuring compatibility between workflow components and agent/team executors. All media artifacts are preserved in StepOutput and propagated to subsequent steps, creating a comprehensive flow where the final WorkflowRunResponse contains all accumulated images, videos, and audio from the entire execution chain.

Here’s an example of how to pass image as input:

from agno.agent import Agent
from agno.media import Image
from agno.models.openai import OpenAIChat
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.workflow.v2.step import Step
from agno.workflow.v2.workflow import Workflow
from agno.storage.sqlite import SqliteStorage

# Define agents
image_analyzer = Agent(
    name="Image Analyzer",
    model=OpenAIChat(id="gpt-4o"),
    instructions="Analyze the provided image and extract key details, objects, and context.",
)

news_researcher = Agent(
    name="News Researcher", 
    model=OpenAIChat(id="gpt-4o"),
    tools=[DuckDuckGoTools()],
    instructions="Search for latest news and information related to the analyzed image content.",
)

# Define steps
analysis_step = Step(
    name="Image Analysis Step",
    agent=image_analyzer,
)

research_step = Step(
    name="News Research Step", 
    agent=news_researcher,
)

# Create workflow with media input
media_workflow = Workflow(
    name="Image Analysis and Research Workflow",
    description="Analyze an image and research related news",
    steps=[analysis_step, research_step],
    storage=SqliteStorage(
        table_name="workflow_v2",
        db_file="tmp/workflow_v2.db",
        mode="workflow_v2",
    ),
)

# Run workflow with image input
if __name__ == "__main__":
    media_workflow.print_response(
        message="Please analyze this image and find related news",
        images=[
            Image(url="https://upload.wikimedia.org/wikipedia/commons/0/0c/GoldenGateBridge-001.jpg")
        ],
        markdown=True,
    )

If you are using Workflow.run(), you need to use WorkflowRunResponse to access the images, videos, and audio.

from agno.run.v2.workflow import WorkflowRunResponse

response: WorkflowRunResponse = media_workflow.run(
    message="Please analyze this image and find related news",
    images=[
        Image(url="https://upload.wikimedia.org/wikipedia/commons/0/0c/GoldenGateBridge-001.jpg")
    ],
    markdown=True,
)

print(response.images)

Similarly, you can pass Video and Audio as input.

More Examples: