Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
name | Optional[str] | None | Workflow name |
id | Optional[str] | None | Workflow ID (autogenerated if not set) |
description | Optional[str] | None | Workflow description |
steps | Optional[WorkflowSteps] | None | Workflow steps - can be a callable function, Steps object, or list of steps |
db | Optional[BaseDb] | None | Database to use for this workflow |
session_id | Optional[str] | None | Default session_id to use for this workflow (autogenerated if not set) |
user_id | Optional[str] | None | Default user_id to use for this workflow |
session_state | Optional[Dict[str, Any]] | None | Default session state (stored in the database to persist across runs) |
debug_mode | Optional[bool] | False | If True, the workflow runs in debug mode |
stream | Optional[bool] | None | Stream the response from the Workflow |
stream_events | bool | False | Stream the intermediate steps from the Workflow |
stream_executor_events | bool | True | Stream the events emitted by the Step executor (the agent/team events) together with the Workflow events |
store_events | bool | False | Persist the events on the run response |
events_to_skip | Optional[List[Union[WorkflowRunEvent, RunEvent, TeamRunEvent]]] | None | Events to skip when persisting the events on the run response |
store_executor_outputs | bool | True | Control whether to store executor responses (agent/team responses) in flattened runs |
websocket_handler | Optional[WebSocketHandler] | None | WebSocket handler for real-time communication |
input_schema | Optional[Type[BaseModel]] | None | Input schema to validate the input to the workflow |
metadata | Optional[Dict[str, Any]] | None | Metadata stored with this workflow |
add_workflow_history_to_steps | bool | False | If True, add the workflow history to the steps |
num_history_runs | int | None | Number of runs to include in the workflow history, if not provided, all history runs are included |
cache_session | bool | False | If True, cache the current workflow session in memory for faster access |
telemetry | bool | True | Log minimal telemetry for analytics |
Functions
run
Execute the workflow synchronously with optional streaming.
Parameters:
input(Optional[Union[str, Dict[str, Any], List[Any], BaseModel]]): The input to send to the workflowadditional_data(Optional[Dict[str, Any]]): Additional data to include with the inputuser_id(Optional[str]): User ID to usesession_id(Optional[str]): Session ID to usesession_state(Optional[Dict[str, Any]]): Session state to useaudio(Optional[List[Audio]]): Audio files to includeimages(Optional[List[Image]]): Image files to includevideos(Optional[List[Video]]): Video files to includefiles(Optional[List[File]]): Files to includestream(bool): Whether to stream the responsestream_events(Optional[bool]): Whether to stream intermediate steps
Union[WorkflowRunOutput, Iterator[WorkflowRunOutputEvent]]: Either a WorkflowRunOutput or an iterator of WorkflowRunOutputEvents, depending on thestreamparameter
arun
Execute the workflow asynchronously with optional streaming.
Parameters:
input(Optional[Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]]): The input to send to the workflowadditional_data(Optional[Dict[str, Any]]): Additional data to include with the inputuser_id(Optional[str]): User ID to usesession_id(Optional[str]): Session ID to usesession_state(Optional[Dict[str, Any]]): Session state to useaudio(Optional[List[Audio]]): Audio files to includeimages(Optional[List[Image]]): Image files to includevideos(Optional[List[Video]]): Video files to includefiles(Optional[List[File]]): Files to includestream(bool): Whether to stream the responsestream_events(Optional[bool]): Whether to stream intermediate stepsbackground(Optional[bool]): Whether to run in backgroundwebsocket(Optional[WebSocket]): WebSocket for real-time communication
Union[WorkflowRunOutput, AsyncIterator[WorkflowRunOutputEvent]]: Either a WorkflowRunOutput or an iterator of WorkflowRunOutputEvents, depending on thestreamparameter
print_response
Print workflow execution with rich formatting and optional streaming.
Parameters:
input(Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]): The input to send to the workflowadditional_data(Optional[Dict[str, Any]]): Additional data to include with the inputuser_id(Optional[str]): User ID to usesession_id(Optional[str]): Session ID to useaudio(Optional[List[Audio]]): Audio files to includeimages(Optional[List[Image]]): Image files to includevideos(Optional[List[Video]]): Video files to includefiles(Optional[List[File]]): Files to includestream(Optional[bool]): Whether to stream the response contentstream_events(Optional[bool]): Whether to stream intermediate stepsmarkdown(bool): Whether to render content as markdownshow_time(bool): Whether to show execution timeshow_step_details(bool): Whether to show individual step outputsconsole(Optional[Any]): Rich console instance (optional)
aprint_response
Print workflow execution with rich formatting and optional streaming asynchronously.
Parameters:
input(Union[str, Dict[str, Any], List[Any], BaseModel, List[Message]]): The input to send to the workflowadditional_data(Optional[Dict[str, Any]]): Additional data to include with the inputuser_id(Optional[str]): User ID to usesession_id(Optional[str]): Session ID to useaudio(Optional[List[Audio]]): Audio files to includeimages(Optional[List[Image]]): Image files to includevideos(Optional[List[Video]]): Video files to includefiles(Optional[List[File]]): Files to includestream(Optional[bool]): Whether to stream the response contentstream_events(Optional[bool]): Whether to stream intermediate stepsmarkdown(bool): Whether to render content as markdownshow_time(bool): Whether to show execution timeshow_step_details(bool): Whether to show individual step outputsconsole(Optional[Any]): Rich console instance (optional)
cancel_run
Cancel a running workflow execution.
Parameters:
run_id(str): The run_id to cancel
bool: True if the run was found and marked for cancellation, False otherwise
get_run
Get the status and details of a background workflow run.
Parameters:
run_id(str): The run ID to get
Optional[WorkflowRunOutput]: The workflow run output if found
get_run_output
Get a WorkflowRunOutput from the database.
Parameters:
run_id(str): The run IDsession_id(Optional[str]): Session ID to use
Optional[WorkflowRunOutput]: The run output
get_last_run_output
Get the last run response from the database for the given session ID.
Parameters:
session_id(Optional[str]): Session ID to use
Optional[WorkflowRunOutput]: The last run output
get_session
Get the session for the given session ID.
Parameters:
session_id(Optional[str]): Session ID to use
Optional[WorkflowSession]: The workflow session
get_session_state
Get the session state for the given session ID.
Parameters:
session_id(Optional[str]): Session ID to use
Dict[str, Any]: The session state
get_session_name
Get the session name for the given session ID.
Parameters:
session_id(Optional[str]): Session ID to use
str: The session name
set_session_name
Set the session name and save to storage.
Parameters:
session_id(Optional[str]): Session ID to useautogenerate(bool): Whether to autogenerate the namesession_name(Optional[str]): The name to set
WorkflowSession: The updated session
get_session_metrics
Get the session metrics for the given session ID.
Parameters:
session_id(Optional[str]): Session ID to use
Optional[Metrics]: The session metrics
delete_session
Delete a session.
Parameters:
session_id(str): Session ID to delete
save_session
Save the WorkflowSession to storage.
Parameters:
session(WorkflowSession): The session to save
to_dict
Convert workflow to dictionary representation.
Returns:
Dict[str, Any]: Dictionary representation of the workflow