Copy
Ask AI
"""
Workflow With Custom Function Executors
=======================================
Demonstrates AgentOS workflows using both sync and streaming custom function steps.
"""
from typing import AsyncIterator, Union
from agno.agent import Agent
from agno.db.in_memory import InMemoryDb
from agno.db.postgres import PostgresDb
from agno.db.sqlite import SqliteDb
from agno.models.openai import OpenAIChat
from agno.os import AgentOS
from agno.team import Team
from agno.tools.hackernews import HackerNewsTools
from agno.tools.websearch import WebSearchTools
from agno.workflow.step import Step, StepInput, StepOutput, WorkflowRunOutputEvent
from agno.workflow.workflow import Workflow
# ---------------------------------------------------------------------------
# Setup
# ---------------------------------------------------------------------------
USE_STREAMING_WORKFLOW = False
db_url = "postgresql+psycopg://ai:ai@localhost:5532/ai"
# ---------------------------------------------------------------------------
# Create Agents And Team
# ---------------------------------------------------------------------------
hackernews_agent = Agent(
name="Hackernews Agent",
model=OpenAIChat(id="gpt-4o"),
tools=[HackerNewsTools()],
instructions="Extract key insights and content from Hackernews posts",
)
web_agent = Agent(
name="Web Agent",
model=OpenAIChat(id="gpt-4o"),
tools=[WebSearchTools()],
instructions="Search the web for the latest news and trends",
)
research_team = Team(
name="Research Team",
members=[hackernews_agent, web_agent],
instructions="Analyze content and create comprehensive social media strategy",
)
sync_content_planner = Agent(
name="Content Planner",
model=OpenAIChat(id="gpt-4o"),
instructions=[
"Plan a content schedule over 4 weeks for the provided topic and research content",
"Ensure that I have posts for 3 posts per week",
],
)
streaming_content_planner = Agent(
name="Content Planner",
model=OpenAIChat(id="gpt-4o"),
instructions=[
"Plan a content schedule over 4 weeks for the provided topic and research content",
"Ensure that I have posts for 3 posts per week",
],
db=InMemoryDb(),
)
# ---------------------------------------------------------------------------
# Create Custom Functions
# ---------------------------------------------------------------------------
def custom_content_planning_function(step_input: StepInput) -> StepOutput:
"""Create a content plan using prior workflow context."""
message = step_input.input
previous_step_content = step_input.previous_step_content
planning_prompt = f"""
STRATEGIC CONTENT PLANNING REQUEST:
Core Topic: {message}
Research Results: {previous_step_content[:500] if previous_step_content else "No research results"}
Planning Requirements:
1. Create a comprehensive content strategy based on the research
2. Leverage the research findings effectively
3. Identify content formats and channels
4. Provide timeline and priority recommendations
5. Include engagement and distribution strategies
Please create a detailed, actionable content plan.
"""
try:
response = sync_content_planner.run(planning_prompt)
enhanced_content = f"""
## Strategic Content Plan
**Planning Topic:** {message}
**Research Integration:** {"Research-based" if previous_step_content else "No research foundation"}
**Content Strategy:**
{response.content}
**Custom Planning Enhancements:**
- Research Integration: {"High" if previous_step_content else "Baseline"}
- Strategic Alignment: Optimized for multi-channel distribution
- Execution Ready: Detailed action items included
""".strip()
return StepOutput(content=enhanced_content)
except Exception as exc:
return StepOutput(
content=f"Custom content planning failed: {str(exc)}", success=False
)
async def streaming_custom_content_planning_function(
step_input: StepInput,
) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""Create a content plan with streamed planner output events."""
message = step_input.input
previous_step_content = step_input.previous_step_content
planning_prompt = f"""
STRATEGIC CONTENT PLANNING REQUEST:
Core Topic: {message}
Research Results: {previous_step_content[:500] if previous_step_content else "No research results"}
Planning Requirements:
1. Create a comprehensive content strategy based on the research
2. Leverage the research findings effectively
3. Identify content formats and channels
4. Provide timeline and priority recommendations
5. Include engagement and distribution strategies
Please create a detailed, actionable content plan.
"""
try:
response_iterator = streaming_content_planner.arun(
planning_prompt,
stream=True,
stream_events=True,
)
async for event in response_iterator:
yield event
response = streaming_content_planner.get_last_run_output()
enhanced_content = f"""
## Strategic Content Plan
**Planning Topic:** {message}
**Research Integration:** {"Research-based" if previous_step_content else "No research foundation"}
**Content Strategy:**
{response.content}
**Custom Planning Enhancements:**
- Research Integration: {"High" if previous_step_content else "Baseline"}
- Strategic Alignment: Optimized for multi-channel distribution
- Execution Ready: Detailed action items included
""".strip()
yield StepOutput(content=enhanced_content)
except Exception as exc:
yield StepOutput(
content=f"Custom content planning failed: {str(exc)}", success=False
)
# ---------------------------------------------------------------------------
# Create Workflows
# ---------------------------------------------------------------------------
sync_content_creation_workflow = Workflow(
name="Content Creation Workflow",
description="Automated content creation with custom execution options",
db=PostgresDb(
session_table="workflow_session",
db_url=db_url,
),
steps=[
Step(
name="Research Step",
team=research_team,
),
Step(
name="Content Planning Step",
executor=custom_content_planning_function,
),
],
)
streaming_content_creation_workflow = Workflow(
name="Streaming Content Creation Workflow",
description="Automated content creation with streaming custom execution functions",
db=SqliteDb(
session_table="workflow_session",
db_file="tmp/workflow.db",
),
steps=[
Step(
name="Research Step",
team=research_team,
),
Step(
name="Content Planning Step",
executor=streaming_custom_content_planning_function,
),
],
)
# ---------------------------------------------------------------------------
# Create AgentOS
# ---------------------------------------------------------------------------
agent_os = AgentOS(
description="Example app for basic agent with playground capabilities",
workflows=[
streaming_content_creation_workflow
if USE_STREAMING_WORKFLOW
else sync_content_creation_workflow
],
)
app = agent_os.get_app()
# ---------------------------------------------------------------------------
# Run
# ---------------------------------------------------------------------------
if __name__ == "__main__":
agent_os.serve(app="workflow_with_custom_function:app", reload=True)
Run the Example
Copy
Ask AI
# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/05_agent_os/workflow
# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate
python workflow_with_custom_function.py