Master advanced workflow features including custom functions, background execution, cancellation, early stopping, event management, and structured data handling for production-ready automation.
Step
with a custom function as the executor
. The function must accept a StepInput
object and return a StepOutput
object, ensuring seamless integration with the workflow system.
background=True
to Workflow.arun()
. This returns a WorkflowRunOutput
object with a run_id
for polling the workflow status until completion.
.arun()
. Poll for results using workflow.get_run(run_id)
and check completion status with .has_completed()
.Ideal for long-running operations like large-scale data processing, multi-step research, or batch operations that shouldn’t block your main application thread.background=True
), you can cancel them using the run_id
:
WorkflowRunEvent.workflow_cancelled
or simply known as WorkflowCancelledEvent
StepOutput(stop=True)
, immediately halting the entire workflow execution.
StepInput
object provides powerful methods to access any previous step’s output by name or retrieve all accumulated content.
step_input.get_step_content("step_name")
- Get content from specific step by namestep_input.get_all_previous_content()
- Get all previous step content combinedstep_input.workflow_message
- Access the original workflow input messagestep_input.previous_step_content
- Get content from immediate previous stepParallel
step, when you do step_input.get_step_content("parallel_step_name")
, it will return a dict with each key as individual_step_name
for all the outputs from the steps defined in parallel.
Example:parallel_step_output
will be a dict with each key as individual_step_name
for all the outputs from the steps defined in parallel.workflow.run_response.events
and in the runs
column of your workflow’s session database (SQLite, PostgreSQL, etc.).
store_events=True
: Automatically stores all workflow events in the databaseevents_to_skip=[]
: Filter out specific event types to reduce storage and noiseworkflow.run_response.events
Available Events to Skip:
step_started
to focus on resultsstep_input.additional_data
for dictionary access to all additional data passed to the workflow.
output_schema
step_input.previous_step_content
, preserving original Pydantic model types.
Transformation Pattern
isinstance(step_input.previous_step_content, ModelName)
to verify input structureStepOutput(content=new_model)
for type safetyWorkflow.run()
and Workflow.print_response()
WorkflowRunOutput
contains all accumulated media from the entire execution chainWorkflow.run()
, you need to use WorkflowRunOutput
to access the images, videos, and audio.Video
and Audio
as input.
More Examples: