from agno.agent import Agent
from agno.models.anthropic import Claude
from agno.run.workflow import (
BaseWorkflowRunOutputEvent,
StepCompletedEvent,
StepStartedEvent,
WorkflowCompletedEvent,
WorkflowStartedEvent,
)
from agno.workflow.step import Step
from agno.workflow.types import StepInput, StepOutput
from agno.workflow.workflow import Workflow
def create_summary(step_input: StepInput) -> StepOutput:
previous_content = step_input.get_last_step_content()
summary = (
f"Summary of research:\n{previous_content[:500]}..."
if previous_content
else "No content to summarize"
)
return StepOutput(content=summary)
# Inner workflow
research_agent = Agent(
name="Research Agent",
model=Claude(id="claude-sonnet-4-20250514"),
instructions="You are a research assistant. Provide concise, factual information in 2-3 sentences.",
)
inner_workflow = Workflow(
name="Inner Workflow",
description="A simple workflow that researches a topic",
steps=[
Step(name="research", agent=research_agent),
Step(name="summary", executor=create_summary),
],
)
# Outer workflow
writer_agent = Agent(
name="Writer Agent",
model=Claude(id="claude-sonnet-4-20250514"),
instructions="You are a professional writer. Take the research provided and write a short polished paragraph.",
)
outer_workflow = Workflow(
name="Outer Workflow",
description="A workflow that researches a topic and then writes about it",
steps=[
Step(name="research_phase", workflow=inner_workflow),
Step(name="writing_phase", agent=writer_agent),
],
)
def print_event_details(event: BaseWorkflowRunOutputEvent, label: str) -> None:
indent = " " * getattr(event, "nested_depth", 0)
print(f"\n{indent}{'=' * 60}")
print(f"{indent}[{label}]")
print(f"{indent} event type : {type(event).__name__}")
print(f"{indent} workflow_id : {getattr(event, 'workflow_id', None)}")
print(f"{indent} workflow_name : {getattr(event, 'workflow_name', None)}")
print(f"{indent} nested_depth : {getattr(event, 'nested_depth', None)}")
print(f"{indent} run_id : {getattr(event, 'run_id', None)}")
print(f"{indent} step_name : {getattr(event, 'step_name', None)}")
if isinstance(event, (StepCompletedEvent, WorkflowCompletedEvent)):
content = getattr(event, "content", None)
if content:
preview = str(content)[:120].replace("\n", " ")
print(f"{indent} content (preview) : {preview}...")
print(f"{indent}{'=' * 60}")
if __name__ == "__main__":
EVENT_TYPES = (
WorkflowStartedEvent,
WorkflowCompletedEvent,
StepStartedEvent,
StepCompletedEvent,
)
for event in outer_workflow.run(
input="Tell me about the history of artificial intelligence",
stream=True,
stream_events=True,
):
if isinstance(event, EVENT_TYPES):
depth = getattr(event, "nested_depth", 0)
source = getattr(event, "workflow_name", "?")
label = f"depth={depth}, source={source} | {type(event).__name__}"
print_event_details(event, label)