Skip to main content
"""
Background Execution WebSocket Server
=====================================

Demonstrates running background workflows and streaming workflow events over WebSocket.
"""

import json
import os
from typing import Dict

import uvicorn
from agno.agent import Agent
from agno.db.sqlite import SqliteDb
from agno.models.openai import OpenAIChat
from agno.tools.hackernews import HackerNewsTools
from agno.tools.websearch import WebSearchTools
from agno.workflow.step import Step
from agno.workflow.workflow import Workflow
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

# ---------------------------------------------------------------------------
# Create Configuration
# ---------------------------------------------------------------------------
SECURITY_KEY = os.getenv("SECURITY_KEY", "your-secret-key")  # Set your key here

# ---------------------------------------------------------------------------
# Create Agents
# ---------------------------------------------------------------------------
hackernews_agent = Agent(
    name="HackerNews Researcher",
    model=OpenAIChat(id="gpt-4o-mini"),
    tools=[HackerNewsTools()],
    instructions="Research tech news and trends from HackerNews",
)

search_agent = Agent(
    name="Search Agent",
    model=OpenAIChat(id="gpt-4o-mini"),
    tools=[WebSearchTools()],
    instructions="Search for additional information on the web",
)

# ---------------------------------------------------------------------------
# Create WebSocket App
# ---------------------------------------------------------------------------
app = FastAPI(title="Background Workflow WebSocket Server")

# Store active WebSocket connections and their auth status
active_connections: Dict[str, WebSocket] = {}
authenticated_connections: Dict[str, bool] = {}  # {connection_id: is_authenticated}


# ---------------------------------------------------------------------------
# Define Helpers
# ---------------------------------------------------------------------------
def validate_token(token: str) -> bool:
    """Validate authentication token"""
    # If no security key set, allow all connections
    if not SECURITY_KEY or SECURITY_KEY == "your-secret-key":
        return True
    return token == SECURITY_KEY


@app.get("/")
async def get():
    """API status endpoint"""
    return {
        "status": "running",
        "message": "Background Workflow WebSocket Server",
        "endpoints": {
            "websocket": "/ws",
            "start-workflow": "/workflow/start",
        },
        "connections": len(active_connections),
        "authenticated": len([c for c in authenticated_connections.values() if c]),
    }


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """WebSocket endpoint for background workflow events"""
    await websocket.accept()
    connection_id = f"conn_{len(active_connections)}"
    active_connections[connection_id] = websocket
    authenticated_connections[connection_id] = False  # Start unauthenticated

    print(f"[CONN] Client connected: {connection_id}")

    try:
        # Send connection confirmation
        await websocket.send_text(
            json.dumps(
                {
                    "event": "connected",
                    "connection_id": connection_id,
                    "message": "Connected to workflow events. Please authenticate to continue.",
                    "requires_auth": True,
                }
            )
        )

        # Keep connection alive
        while True:
            try:
                data = await websocket.receive_text()
                message_data = json.loads(data)
                action = message_data.get("action") or message_data.get("type")

                # Handle authentication
                if action == "authenticate":
                    token = message_data.get("token")
                    if not token:
                        await websocket.send_text(
                            json.dumps(
                                {"event": "auth_error", "error": "Token is required"}
                            )
                        )
                        continue

                    if validate_token(token):
                        authenticated_connections[connection_id] = True
                        await websocket.send_text(
                            json.dumps(
                                {
                                    "event": "authenticated",
                                    "message": "Authentication successful. You can now send commands.",
                                }
                            )
                        )
                        print(f"[AUTH] Client authenticated: {connection_id}")
                    else:
                        await websocket.send_text(
                            json.dumps(
                                {"event": "auth_error", "error": "Invalid token"}
                            )
                        )
                    continue

                # Check authentication for other actions
                if not authenticated_connections.get(connection_id, False):
                    await websocket.send_text(
                        json.dumps(
                            {
                                "event": "auth_required",
                                "error": "Authentication required. Send authenticate action with valid token.",
                            }
                        )
                    )
                    continue

                # Handle authenticated actions
                if action == "start-workflow":
                    await handle_start_workflow(websocket, message_data)
                elif action == "ping":
                    await websocket.send_text(json.dumps({"event": "pong"}))
                else:
                    # Echo back for testing
                    await websocket.send_text(
                        json.dumps({"event": "echo", "original_message": message_data})
                    )

            except WebSocketDisconnect:
                break
            except Exception as e:
                await websocket.send_text(
                    json.dumps(
                        {
                            "event": "error",
                            "message": f"Error processing message: {str(e)}",
                        }
                    )
                )

    except WebSocketDisconnect:
        pass
    finally:
        if connection_id in active_connections:
            del active_connections[connection_id]
        if connection_id in authenticated_connections:
            del authenticated_connections[connection_id]
        print(f"[CONN] Client disconnected: {connection_id}")


async def handle_start_workflow(websocket: WebSocket, message_data: dict):
    """Handle workflow start request via WebSocket"""
    message = message_data.get("message", "AI trends 2024")
    session_id = message_data.get("session_id", f"ws-session-{len(active_connections)}")

    workflow = Workflow(
        name="Tech Research Pipeline",
        steps=[
            Step(name="hackernews_research", agent=hackernews_agent),
            Step(name="web_search", agent=search_agent),
        ],
        db=SqliteDb(
            db_file="tmp/workflow_bg.db",
            session_table="workflow_bg",
        ),
    )

    try:
        # Send acknowledgment
        await websocket.send_text(
            json.dumps(
                {
                    "event": "workflow_starting",
                    "message": f"Starting workflow with message: {message}",
                    "session_id": session_id,
                }
            )
        )

        # Execute workflow in background with streaming and WebSocket
        result = await workflow.arun(
            input=message,
            session_id=session_id,
            stream=True,
            stream_events=True,
            background=True,
            websocket=websocket,
        )

        # Send completion notification
        await websocket.send_text(
            json.dumps(
                {
                    "event": "workflow_initiated",
                    "run_id": result.run_id,
                    "session_id": result.session_id,
                    "message": "Background streaming workflow initiated successfully",
                }
            )
        )

    except Exception as e:
        await websocket.send_text(
            json.dumps(
                {
                    "event": "workflow_error",
                    "error": str(e),
                    "message": "Failed to start workflow",
                }
            )
        )


# ---------------------------------------------------------------------------
# Run Server
# ---------------------------------------------------------------------------
if __name__ == "__main__":
    print("[START] Starting Background Workflow WebSocket Server...")
    print("[CONN] WebSocket: ws://localhost:8000/ws")
    print("[API] HTTP API: http://localhost:8000")
    print("[DOCS] API Docs: http://localhost:8000/docs")
    print(f"[AUTH] Security Key: {SECURITY_KEY}")

    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8000,
        log_level="info",
    )

Run the Example

# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/04_workflows/06_advanced_concepts/background_execution

# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate

# Export relevant API keys
export SECURITY_KEY="***"

python websocket_server.py