Skip to main content
from agno.agent import Agent
from agno.models.anthropic import Claude
from agno.run.workflow import (
    BaseWorkflowRunOutputEvent,
    StepCompletedEvent,
    StepStartedEvent,
    WorkflowCompletedEvent,
    WorkflowStartedEvent,
)
from agno.workflow.step import Step
from agno.workflow.types import StepInput, StepOutput
from agno.workflow.workflow import Workflow


def create_summary(step_input: StepInput) -> StepOutput:
    previous_content = step_input.get_last_step_content()
    summary = (
        f"Summary of research:\n{previous_content[:500]}..."
        if previous_content
        else "No content to summarize"
    )
    return StepOutput(content=summary)


# Inner workflow
research_agent = Agent(
    name="Research Agent",
    model=Claude(id="claude-sonnet-4-20250514"),
    instructions="You are a research assistant. Provide concise, factual information in 2-3 sentences.",
)

inner_workflow = Workflow(
    name="Inner Workflow",
    description="A simple workflow that researches a topic",
    steps=[
        Step(name="research", agent=research_agent),
        Step(name="summary", executor=create_summary),
    ],
)

# Outer workflow
writer_agent = Agent(
    name="Writer Agent",
    model=Claude(id="claude-sonnet-4-20250514"),
    instructions="You are a professional writer. Take the research provided and write a short polished paragraph.",
)

outer_workflow = Workflow(
    name="Outer Workflow",
    description="A workflow that researches a topic and then writes about it",
    steps=[
        Step(name="research_phase", workflow=inner_workflow),
        Step(name="writing_phase", agent=writer_agent),
    ],
)


def print_event_details(event: BaseWorkflowRunOutputEvent, label: str) -> None:
    indent = "  " * getattr(event, "nested_depth", 0)
    print(f"\n{indent}{'=' * 60}")
    print(f"{indent}[{label}]")
    print(f"{indent}  event type       : {type(event).__name__}")
    print(f"{indent}  workflow_id       : {getattr(event, 'workflow_id', None)}")
    print(f"{indent}  workflow_name     : {getattr(event, 'workflow_name', None)}")
    print(f"{indent}  nested_depth     : {getattr(event, 'nested_depth', None)}")
    print(f"{indent}  run_id           : {getattr(event, 'run_id', None)}")
    print(f"{indent}  step_name        : {getattr(event, 'step_name', None)}")

    if isinstance(event, (StepCompletedEvent, WorkflowCompletedEvent)):
        content = getattr(event, "content", None)
        if content:
            preview = str(content)[:120].replace("\n", " ")
            print(f"{indent}  content (preview) : {preview}...")
    print(f"{indent}{'=' * 60}")


if __name__ == "__main__":
    EVENT_TYPES = (
        WorkflowStartedEvent,
        WorkflowCompletedEvent,
        StepStartedEvent,
        StepCompletedEvent,
    )

    for event in outer_workflow.run(
        input="Tell me about the history of artificial intelligence",
        stream=True,
        stream_events=True,
    ):
        if isinstance(event, EVENT_TYPES):
            depth = getattr(event, "nested_depth", 0)
            source = getattr(event, "workflow_name", "?")
            label = f"depth={depth}, source={source} | {type(event).__name__}"
            print_event_details(event, label)

Run the Example

git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/04_workflows/06_advanced_concepts/workflow_as_a_step

pip install agno anthropic

python nested_workflow_events.py