Skip to main content
"""
Step With Class Executor
========================

Demonstrates class-based step executors with sync and async workflow execution.
"""

import asyncio
from typing import AsyncIterator, Union

from agno.agent import Agent
from agno.db.in_memory import InMemoryDb
from agno.db.sqlite import SqliteDb
from agno.models.openai import OpenAIChat
from agno.run.workflow import WorkflowRunOutputEvent
from agno.team import Team
from agno.tools.hackernews import HackerNewsTools
from agno.tools.websearch import WebSearchTools
from agno.workflow.step import Step, StepInput, StepOutput
from agno.workflow.workflow import Workflow

# ---------------------------------------------------------------------------
# Create Agents
# ---------------------------------------------------------------------------
hackernews_agent = Agent(
    name="Hackernews Agent",
    model=OpenAIChat(id="gpt-4o"),
    tools=[HackerNewsTools()],
    instructions="Extract key insights and content from Hackernews posts",
)

web_agent = Agent(
    name="Web Agent",
    model=OpenAIChat(id="gpt-4o"),
    tools=[WebSearchTools()],
    instructions="Search the web for the latest news and trends",
)

content_planner = Agent(
    name="Content Planner",
    model=OpenAIChat(id="gpt-4o"),
    instructions=[
        "Plan a content schedule over 4 weeks for the provided topic and research content",
        "Ensure that I have posts for 3 posts per week",
    ],
)

streaming_content_planner = Agent(
    name="Content Planner",
    model=OpenAIChat(id="gpt-4o"),
    instructions=[
        "Plan a content schedule over 4 weeks for the provided topic and research content",
        "Ensure that I have posts for 3 posts per week",
    ],
    db=InMemoryDb(),
)

# ---------------------------------------------------------------------------
# Create Team
# ---------------------------------------------------------------------------
research_team = Team(
    name="Research Team",
    members=[hackernews_agent, web_agent],
    instructions="Analyze content and create comprehensive social media strategy",
)


# ---------------------------------------------------------------------------
# Define Class Executors
# ---------------------------------------------------------------------------
class CustomContentPlanning:
    def __call__(self, step_input: StepInput) -> StepOutput:
        message = step_input.input
        previous_step_content = step_input.previous_step_content
        planning_prompt = f"""
            STRATEGIC CONTENT PLANNING REQUEST:

            Core Topic: {message}

            Research Results: {previous_step_content[:500] if previous_step_content else "No research results"}

            Planning Requirements:
            1. Create a comprehensive content strategy based on the research
            2. Leverage the research findings effectively
            3. Identify content formats and channels
            4. Provide timeline and priority recommendations
            5. Include engagement and distribution strategies

            Please create a detailed, actionable content plan.
        """

        try:
            response = content_planner.run(planning_prompt)
            enhanced_content = f"""
                ## Strategic Content Plan

                **Planning Topic:** {message}

                **Research Integration:** {"✓ Research-based" if previous_step_content else "✗ No research foundation"}

                **Content Strategy:**
                {response.content}

                **Custom Planning Enhancements:**
                - Research Integration: {"High" if previous_step_content else "Baseline"}
                - Strategic Alignment: Optimized for multi-channel distribution
                - Execution Ready: Detailed action items included
            """.strip()
            return StepOutput(content=enhanced_content)
        except Exception as e:
            return StepOutput(
                content=f"Custom content planning failed: {str(e)}",
                success=False,
            )


class AsyncCustomContentPlanning:
    async def __call__(
        self,
        step_input: StepInput,
    ) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
        message = step_input.input
        previous_step_content = step_input.previous_step_content
        planning_prompt = f"""
            STRATEGIC CONTENT PLANNING REQUEST:

            Core Topic: {message}

            Research Results: {previous_step_content[:500] if previous_step_content else "No research results"}

            Planning Requirements:
            1. Create a comprehensive content strategy based on the research
            2. Leverage the research findings effectively
            3. Identify content formats and channels
            4. Provide timeline and priority recommendations
            5. Include engagement and distribution strategies

            Please create a detailed, actionable content plan.
        """

        try:
            response_iterator = streaming_content_planner.arun(
                planning_prompt,
                stream=True,
                stream_events=True,
            )
            async for event in response_iterator:
                yield event

            response = streaming_content_planner.get_last_run_output()
            enhanced_content = f"""
                ## Strategic Content Plan

                **Planning Topic:** {message}

                **Research Integration:** {"✓ Research-based" if previous_step_content else "✗ No research foundation"}

                **Content Strategy:**
                {response.content}

                **Custom Planning Enhancements:**
                - Research Integration: {"High" if previous_step_content else "Baseline"}
                - Strategic Alignment: Optimized for multi-channel distribution
                - Execution Ready: Detailed action items included
            """.strip()
            yield StepOutput(content=enhanced_content)
        except Exception as e:
            yield StepOutput(
                content=f"Custom content planning failed: {str(e)}",
                success=False,
            )


# ---------------------------------------------------------------------------
# Define Steps
# ---------------------------------------------------------------------------
research_step = Step(
    name="Research Step",
    team=research_team,
)

content_planning_step = Step(
    name="Content Planning Step",
    executor=CustomContentPlanning(),
)

async_content_planning_step = Step(
    name="Content Planning Step",
    executor=AsyncCustomContentPlanning(),
)

# ---------------------------------------------------------------------------
# Create Workflows
# ---------------------------------------------------------------------------
content_creation_workflow = Workflow(
    name="Content Creation Workflow",
    description="Automated content creation with custom execution options",
    db=SqliteDb(
        session_table="workflow_session",
        db_file="tmp/workflow.db",
    ),
    steps=[research_step, content_planning_step],
)

async_content_creation_workflow = Workflow(
    name="Content Creation Workflow",
    description="Automated content creation with custom execution options",
    db=SqliteDb(
        session_table="workflow_session",
        db_file="tmp/workflow.db",
    ),
    steps=[research_step, async_content_planning_step],
)

# ---------------------------------------------------------------------------
# Run Workflow
# ---------------------------------------------------------------------------
if __name__ == "__main__":
    # Sync
    content_creation_workflow.print_response(
        input="AI trends in 2024",
        markdown=True,
    )

    print("\n" + "=" * 60 + "\n")

    # Async Streaming
    asyncio.run(
        async_content_creation_workflow.aprint_response(
            input="AI agent frameworks 2025",
            markdown=True,
            stream=True,
        )
    )

Run the Example

# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/04_workflows/01_basic_workflows/02_step_with_function

# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate

python step_with_class.py