Skip to main content

Code

streaming_workflow.py
from agno.agent import Agent
from agno.db.sqlite import SqliteDb
from agno.models.openai import OpenAIChat
from agno.os.app import AgentOS
from agno.os.interfaces.telegram import Telegram
from agno.tools.duckduckgo import DuckDuckGoTools
from agno.workflow.step import Step
from agno.workflow.steps import Steps
from agno.workflow.workflow import Workflow

db = SqliteDb(
    session_table="telegram_streaming_wf_sessions",
    db_file="tmp/telegram_streaming_workflow.db",
)

researcher = Agent(
    name="Researcher",
    model=OpenAIChat(id="gpt-4o-mini"),
    tools=[DuckDuckGoTools()],
    instructions=[
        "Research the topic using web search.",
        "Provide bullet-point findings with sources.",
    ],
)

writer = Agent(
    name="Writer",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions=[
        "Write a clear, concise summary from the research.",
        "Use **bold** for key terms and keep it under 300 words.",
        "Suitable for reading on a phone screen.",
    ],
)

research_write_workflow = Workflow(
    name="Research and Write",
    description="Two-step workflow: research a topic, then write a polished summary",
    steps=[
        Steps(
            name="research_and_write",
            description="Research then write",
            steps=[
                Step(
                    name="research", agent=researcher, description="Research the topic"
                ),
                Step(name="write", agent=writer, description="Write the summary"),
            ],
        )
    ],
    db=db,
)

agent_os = AgentOS(
    workflows=[research_write_workflow],
    interfaces=[
        Telegram(
            workflow=research_write_workflow,
            reply_to_mentions_only=False,
            streaming=True,
            start_message="Research bot ready. Send me a topic and I will research and summarize it.",
        )
    ],
)
app = agent_os.get_app()

if __name__ == "__main__":
    agent_os.serve(app="streaming_workflow:app", reload=True)

Usage

1

Set up your virtual environment

uv venv --python 3.12
source .venv/bin/activate
2

Set Environment Variables

export TELEGRAM_TOKEN=your-bot-token-from-botfather
export OPENAI_API_KEY=your-openai-api-key
export APP_ENV=development
3

Install dependencies

uv pip install -U "agno[telegram]" duckduckgo-search
4

Run Example

python streaming_workflow.py

Key Features

  • Live Step Progress: Users see which step is running in real time (e.g., “Running step: research…”)
  • Web Search: Researcher agent uses DuckDuckGo to find current information
  • Streaming Output: Writer output streams token-by-token with live message edits
  • Custom Start Message: Overrides the default /start response