Explore advanced features and concepts in the Agno workflow system, including custom functions, error handling, and streaming capabilities.
preprocess inputs
, call agents
, and postprocess outputs
.
Step
, you can specify a custom function as an executor
. This function should accept a StepInput
object and return a StepOutput
object.
StepOutput
object.
background=True
to Workflow.arun()
. This will return a WorkflowRunResponse
object with a run_id
that you can use to poll for the result of the workflow until it is completed.
.arun()
.
For long-running workflows, you can poll for the result using result = workflow.get_run(run_id)
which returns the updated WorkflowRunResponse
.Use .has_completed()
to check if the workflow has finished executing. This is particularly useful for workflows that involve time-consuming operations like large-scale data processing, multi-step research tasks, or batch operations that you don’t want to block your main application thread.StepOutput(stop=True)
.
StepInput
object provides powerful methods to access any previous step’s output by name or get all previous content.
step_input.get_step_content("step_name")
- Get content from specific step by namestep_input.get_all_previous_content()
- Get all previous step content combinedstep_input.message
- Access the original workflow input messagestep_input.previous_step_content
- Get content from immediate previous stepParallel
step, when you do step_input.get_step_content("parallel_step_name")
, it will return a dict with each key as individual_step_name
for all the outputs from the steps defined in parallel.
Example:parallel_step_output
will be a dict with each key as individual_step_name
for all the outputs from the steps defined in parallel.WorkflowRunResponse
and in the runs
column in your Workflow's Session DB
in your configured storage backend (SQLite, PostgreSQL, etc.).
store_events=True
: Automatically stores all workflow events in the databaseevents_to_skip=[]
: Filter out specific event types to reduce storage and noiseworkflow.run_response.events
Available Events to Skip:
step_started
to focus on resultsstep_input.additional_data
provides dictionary access to all additional data
response_model
step_input.previous_step_content
, which preserves the original Pydantic model type (e.g., ResearchFindings). To transform data:
isinstance(step_input.previous_step_content, ModelName)
to verify the input structure.step_input.previous_step_content.topic
), process them, and construct a new Pydantic model (e.g., AnalysisReport).StepOutput(content=new_model)
. This ensures type safety for downstream steps. Example:Workflow.run()
and Workflow.print_response()
and is passed through to individual steps (whether Agent, Team or Custom Function).
During execution, media artifacts accumulate across steps - each step receives shared media from previous steps and can
produce additional media outputs. The Step
class handles automatic conversion between artifact formats, ensuring compatibility between
workflow components and agent/team
executors. All media artifacts are preserved in StepOutput
and propagated to
subsequent steps, creating a comprehensive flow where the final WorkflowRunResponse
contains all accumulated
images
, videos
, and audio
from the entire execution chain.
Here’s an example of how to pass image as input:
Workflow.run()
, you need to use WorkflowRunResponse
to access the images, videos, and audio.Video
and Audio
as input.
More Examples: