Skip to main content

Code

streaming.py
from agno.agent import Agent
from agno.db.sqlite import SqliteDb
from agno.models.openai import OpenAIChat
from agno.os.app import AgentOS
from agno.os.interfaces.telegram import Telegram

agent_db = SqliteDb(
    session_table="telegram_sessions", db_file="tmp/telegram_streaming.db"
)

telegram_agent = Agent(
    name="Telegram Streaming Bot",
    model=OpenAIChat(id="gpt-4o-mini"),
    db=agent_db,
    instructions=[
        "You are a helpful assistant on Telegram.",
        "Keep responses concise and friendly.",
        "When in a group, you respond only when mentioned with @.",
    ],
    add_history_to_context=True,
    num_history_runs=3,
    add_datetime_to_context=True,
    markdown=True,
)

agent_os = AgentOS(
    agents=[telegram_agent],
    interfaces=[
        Telegram(
            agent=telegram_agent,
            reply_to_mentions_only=True,
            streaming=True,
        )
    ],
)
app = agent_os.get_app()

if __name__ == "__main__":
    agent_os.serve(app="streaming:app", reload=True)

Usage

1

Set up your virtual environment

uv venv --python 3.12
source .venv/bin/activate
2

Set Environment Variables

export TELEGRAM_TOKEN=your-bot-token-from-botfather
export OPENAI_API_KEY=your-openai-api-key
export APP_ENV=development
3

Install dependencies

uv pip install -U "agno[telegram]"
4

Run Example

python streaming.py

Key Features

  • Token-by-Token Streaming: Responses appear incrementally as they are generated
  • Live Message Edits: The bot edits its message in real time, throttled to stay within Telegram rate limits
  • Conversation History: Maintains context with last 3 interactions
  • Persistent Memory: SQLite database for session storage