Workflows are all about control and flexibility.

Your workflow logic is just a python function, so you have full control over the workflow logic. You can:

  • Validate input before processing
  • Depending on the input, spawn agents and run them in parallel
  • Cache results as needed
  • Correct any intermediate errors
  • Stream the output
  • Return a single or multiple outputs

This level of control is critical for reliability.

Streaming

It is important to understand that when you build a workflow, you are writing a python function, meaning you decide if the function streams the output or not. To stream the output, yield an Iterator[RunResponse] from the run() method of your workflow.

news_report_generator.py
# Define the workflow
class GenerateNewsReport(Workflow):
    agent_1: Agent = ...

    agent_2: Agent = ...

    agent_3: Agent = ...

    def run(self, ...) -> Iterator[RunResponse]:
        # Run agents and gather the response
        # These can be batch responses, you can also stream intermediate results if you want
        final_agent_input = ...

        # Generate the final response from the writer agent
        agent_3_response_stream: Iterator[RunResponse] = self.agent_3.run(final_agent_input, stream=True)

        # Yield the response
        yield agent_3_response_stream

# Instantiate the workflow
generate_news_report = GenerateNewsReport()

# Run workflow and get the response as an iterator of RunResponse objects
report_stream: Iterator[RunResponse] = generate_news_report.run(...)

# Print the response
pprint_run_response(report_stream, markdown=True)

Batch

Simply return a RunResponse object from the run() method of your workflow to return a single output.

news_report_generator.py
# Define the workflow
class GenerateNewsReport(Workflow):
    agent_1: Agent = ...

    agent_2: Agent = ...

    agent_3: Agent = ...

    def run(self, ...) -> RunResponse:
        # Run agents and gather the response
        final_agent_input = ...

        # Generate the final response from the writer agent
        agent_3_response: RunResponse = self.agent_3.run(final_agent_input)

        # Return the response
        return agent_3_response

# Instantiate the workflow
generate_news_report = GenerateNewsReport()

# Run workflow and get the response as a RunResponse object
report: RunResponse = generate_news_report.run(...)

# Print the response
pprint_run_response(report, markdown=True)