Workflows are all about control and flexibility.Your workflow logic is just a python function, so you have full control over the workflow logic. You can:
Validate input before processing
Depending on the input, spawn agents and run them in parallel
Cache results as needed
Correct any intermediate errors
Stream the output
Return a single or multiple outputs
This level of control is critical for reliability.
It is important to understand that when you build a workflow, you are writing a python function, meaning you decide if the function streams the output or not. To stream the output, yield an Iterator[RunResponse] from the run() method of your workflow.
news_report_generator.py
Copy
Ask AI
# Define the workflowclass GenerateNewsReport(Workflow): agent_1: Agent = ... agent_2: Agent = ... agent_3: Agent = ... def run(self, ...) -> Iterator[RunResponse]: # Run agents and gather the response # These can be batch responses, you can also stream intermediate results if you want final_agent_input = ... # Generate the final response from the writer agent agent_3_response_stream: Iterator[RunResponse] = self.agent_3.run(final_agent_input, stream=True) # Yield the response yield agent_3_response_stream# Instantiate the workflowgenerate_news_report = GenerateNewsReport()# Run workflow and get the response as an iterator of RunResponse objectsreport_stream: Iterator[RunResponse] = generate_news_report.run(...)# Print the responsepprint_run_response(report_stream, markdown=True)
Simply return a RunResponse object from the run() method of your workflow to return a single output.
news_report_generator.py
Copy
Ask AI
# Define the workflowclass GenerateNewsReport(Workflow): agent_1: Agent = ... agent_2: Agent = ... agent_3: Agent = ... def run(self, ...) -> RunResponse: # Run agents and gather the response final_agent_input = ... # Generate the final response from the writer agent agent_3_response: RunResponse = self.agent_3.run(final_agent_input) # Return the response return agent_3_response# Instantiate the workflowgenerate_news_report = GenerateNewsReport()# Run workflow and get the response as a RunResponse objectreport: RunResponse = generate_news_report.run(...)# Print the responsepprint_run_response(report, markdown=True)