Copy
Ask AI
"""
Workflow With Input Schema
==========================
Demonstrates workflow with input schema.
"""
from typing import List
from agno.agent.agent import Agent
# ---------------------------------------------------------------------------
# Create Example
# ---------------------------------------------------------------------------
# Import the workflows
from agno.db.sqlite import SqliteDb
from agno.models.openai.chat import OpenAIChat
from agno.os import AgentOS
from agno.team.team import Team
from agno.tools.hackernews import HackerNewsTools
from agno.tools.websearch import WebSearchTools
from agno.workflow.step import Step
from agno.workflow.workflow import Workflow
from pydantic import BaseModel, Field
class ResearchTopic(BaseModel):
"""Structured research topic with specific requirements"""
topic: str
focus_areas: List[str] = Field(description="Specific areas to focus on")
target_audience: str = Field(description="Who this research is for")
sources_required: int = Field(description="Number of sources needed", default=5)
# Define agents
hackernews_agent = Agent(
name="Hackernews Agent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[HackerNewsTools()],
role="Extract key insights and content from Hackernews posts",
)
web_agent = Agent(
name="Web Agent",
model=OpenAIChat(id="gpt-4o-mini"),
tools=[WebSearchTools()],
role="Search the web for the latest news and trends",
)
# Define research team for complex analysis
research_team = Team(
name="Research Team",
model=OpenAIChat(id="gpt-4o-mini"),
members=[hackernews_agent, web_agent],
instructions="Research tech topics from Hackernews and the web",
)
content_planner = Agent(
name="Content Planner",
model=OpenAIChat(id="gpt-4o"),
instructions=[
"Plan a content schedule over 4 weeks for the provided topic and research content",
"Ensure that I have posts for 3 posts per week",
],
)
# Define steps
research_step = Step(
name="Research Step",
team=research_team,
)
content_planning_step = Step(
name="Content Planning Step",
agent=content_planner,
)
content_creation_workflow = Workflow(
name="Content Creation Workflow",
description="Automated content creation from blog posts to social media",
db=SqliteDb(
session_table="workflow_session",
db_file="tmp/workflow.db",
),
steps=[research_step, content_planning_step],
input_schema=ResearchTopic,
)
# Initialize the AgentOS with the workflows
agent_os = AgentOS(
description="Example OS setup",
workflows=[content_creation_workflow],
)
app = agent_os.get_app()
# ---------------------------------------------------------------------------
# Run Example
# ---------------------------------------------------------------------------
if __name__ == "__main__":
agent_os.serve(app="workflow_with_input_schema:app", reload=True)
Run the Example
Copy
Ask AI
# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/05_agent_os/workflow
# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate
python workflow_with_input_schema.py