Skip to main content
"""
Reasoning Model Stream Deepseek
===============================

Demonstrates this reasoning cookbook example.
"""

import asyncio
import os

from agno.agent import Agent
from agno.models.azure import AzureAIFoundry
from agno.run.agent import RunEvent  # noqa


# ---------------------------------------------------------------------------
# Create Example
# ---------------------------------------------------------------------------
def run_example() -> None:
    async def streaming_reasoning():
        """Test streaming reasoning with a Azure AI Foundry DeepSeek model."""
        # Create an agent with reasoning enabled
        agent = Agent(
            reasoning_model=AzureAIFoundry(
                id="DeepSeek-R1",
                azure_endpoint=os.getenv("AZURE_ENDPOINT"),
                api_key=os.getenv("AZURE_API_KEY"),
            ),
            reasoning=True,
            instructions="Think step by step about the problem.",
        )

        prompt = "What is 25 * 37? Show your reasoning."

        await agent.aprint_response(prompt, stream=True, stream_events=True)

        # Use manual event loop to see all events
        # async for run_output_event in agent.arun(
        #     prompt,
        #     stream=True,
        #     stream_events=True,
        # ):
        #     if run_output_event.event == RunEvent.run_started:
        #         print(f"\nEVENT: {run_output_event.event}")

        #     elif run_output_event.event == RunEvent.reasoning_started:
        #         print(f"\nEVENT: {run_output_event.event}")
        #         print("Reasoning started...\n")

        #     elif run_output_event.event == RunEvent.reasoning_content_delta:
        #         # This is the NEW streaming event for reasoning content
        #         print(run_output_event.reasoning_content, end="", flush=True)

        #     elif run_output_event.event == RunEvent.reasoning_step:
        #         print(f"\nEVENT: {run_output_event.event}")

        #     elif run_output_event.event == RunEvent.reasoning_completed:
        #         print(f"\n\nEVENT: {run_output_event.event}")

        #     elif run_output_event.event == RunEvent.run_content:
        #         if run_output_event.content:
        #             print(run_output_event.content, end="", flush=True)

        #     elif run_output_event.event == RunEvent.run_completed:
        #         print(f"\n\nEVENT: {run_output_event.event}")

    if __name__ == "__main__":
        asyncio.run(streaming_reasoning())


# ---------------------------------------------------------------------------
# Run Example
# ---------------------------------------------------------------------------
if __name__ == "__main__":
    run_example()

Run the Example

# Clone and setup repo
git clone https://github.com/agno-agi/agno.git
cd agno/cookbook/10_reasoning/models/azure_ai_foundry

# Create and activate virtual environment
./scripts/demo_setup.sh
source .venvs/demo/bin/activate

# Export relevant API keys
export AZURE_API_KEY="***"
export AZURE_ENDPOINT="***"

python reasoning_model_stream_deepseek.py