step_with_function_streaming_agentos.py
Copy
Ask AI
from typing import AsyncIterator, Union
from agno.agent.agent import Agent
from agno.db.in_memory import InMemoryDb
# Import the workflows
from agno.db.sqlite import SqliteDb
from agno.models.openai.chat import OpenAIChat
from agno.os import AgentOS
from agno.team import Team
from agno.tools.googlesearch import GoogleSearchTools
from agno.tools.hackernews import HackerNewsTools
from agno.workflow.step import Step, StepInput, StepOutput, WorkflowRunOutputEvent
from agno.workflow.workflow import Workflow
# Define agents
hackernews_agent = Agent(
name="Hackernews Agent",
model=OpenAIChat(id="gpt-4o"),
tools=[HackerNewsTools()],
instructions="Extract key insights and content from Hackernews posts",
)
web_agent = Agent(
name="Web Agent",
model=OpenAIChat(id="gpt-4o"),
tools=[GoogleSearchTools()],
instructions="Search the web for the latest news and trends",
)
# Define research team for complex analysis
research_team = Team(
name="Research Team",
members=[hackernews_agent, web_agent],
instructions="Analyze content and create comprehensive social media strategy",
)
content_planner = Agent(
name="Content Planner",
model=OpenAIChat(id="gpt-4o"),
instructions=[
"Plan a content schedule over 4 weeks for the provided topic and research content",
"Ensure that I have posts for 3 posts per week",
],
db=InMemoryDb(),
)
async def custom_content_planning_function(
step_input: StepInput,
) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
"""
Custom function that does intelligent content planning with context awareness.
Note: This function calls content_planner.arun() internally, and all events
from that agent call will automatically get workflow context injected by
the workflow execution system - no manual intervention required!
"""
message = step_input.input
previous_step_content = step_input.previous_step_content
# Create intelligent planning prompt
planning_prompt = f"""
STRATEGIC CONTENT PLANNING REQUEST:
Core Topic: {message}
Research Results: {previous_step_content[:500] if previous_step_content else "No research results"}
Planning Requirements:
1. Create a comprehensive content strategy based on the research
2. Leverage the research findings effectively
3. Identify content formats and channels
4. Provide timeline and priority recommendations
5. Include engagement and distribution strategies
Please create a detailed, actionable content plan.
"""
try:
response_iterator = content_planner.arun(
planning_prompt, stream=True, stream_intermediate_steps=True
)
async for event in response_iterator:
yield event
response = content_planner.get_last_run_output()
enhanced_content = f"""
## Strategic Content Plan
**Planning Topic:** {message}
**Research Integration:** {"✓ Research-based" if previous_step_content else "✗ No research foundation"}
**Content Strategy:**
{response.content}
**Custom Planning Enhancements:**
- Research Integration: {"High" if previous_step_content else "Baseline"}
- Strategic Alignment: Optimized for multi-channel distribution
- Execution Ready: Detailed action items included
""".strip()
yield StepOutput(content=enhanced_content)
except Exception as e:
yield StepOutput(
content=f"Custom content planning failed: {str(e)}",
success=False,
)
# Define steps using different executor types
research_step = Step(
name="Research Step",
team=research_team,
)
content_planning_step = Step(
name="Content Planning Step",
executor=custom_content_planning_function,
)
streaming_content_workflow = Workflow(
name="Streaming Content Creation Workflow",
description="Automated content creation with streaming custom execution functions",
db=SqliteDb(
session_table="workflow_session",
db_file="tmp/workflow.db",
),
steps=[
research_step,
content_planning_step,
],
)
# Initialize the AgentOS with the workflows
agent_os = AgentOS(
description="Example OS setup",
workflows=[streaming_content_workflow],
)
app = agent_os.get_app()
if __name__ == "__main__":
agent_os.serve(app="workflow_with_custom_function_stream:app", reload=True)