Skip to main content
1

Create a Python file

Create a Python file and add the above code.
touch user_input_required_stream_async.py
2

Add the following code to your Python file

user_input_required_stream_async.py
import asyncio
from typing import List

from agno.agent import Agent
from agno.db.sqlite import SqliteDb
from agno.models.openai import OpenAIChat
from agno.tools import tool
from agno.tools.function import UserInputField


# You can either specify the user_input_fields leave empty for all fields to be provided by the user
@tool(requires_user_input=True, user_input_fields=["to_address"])
def send_email(subject: str, body: str, to_address: str) -> str:
    """
    Send an email.

    Args:
        subject (str): The subject of the email.
        body (str): The body of the email.
        to_address (str): The address to send the email to.
    """
    return f"Sent email to {to_address} with subject {subject} and body {body}"


agent = Agent(
    model=OpenAIChat(id="gpt-4o-mini"),
    tools=[send_email],
    markdown=True,
    db=SqliteDb(session_table="test_session", db_file="tmp/example.db"),
)


async def main():
    async for run_event in agent.arun(
        "Send an email with the subject 'Hello' and the body 'Hello, world!'",
        stream=True,
    ):
        if run_event.is_paused:  # Or agent.run_response.is_paused
            for requirement in run_event.active_requirements:
                if requirement.needs_user_input:
                    input_schema: List[UserInputField] = requirement.user_input_schema  # type: ignore

                    for field in input_schema:
                        # Get user input for each field in the schema
                        field_type = field.field_type
                        field_description = field.description

                        # Display field information to the user
                        print(f"\nField: {field.name}")
                        print(f"Description: {field_description}")
                        print(f"Type: {field_type}")

                        # Get user input
                        if field.value is None:
                            user_value = input(f"Please enter a value for {field.name}: ")
                        else:
                            print(f"Value: {field.value}")
                            user_value = field.value

                        # Update the field value
                        field.value = user_value

            async for resp in agent.acontinue_run(  # type: ignore
                run_id=run_event.run_id,
                updated_tools=run_event.tools,
                stream=True,
            ):
                print(resp.content, end="")

    # Or for simple debug flow
    # agent.aprint_response("Send an email with the subject 'Hello' and the body 'Hello, world!'")


if __name__ == "__main__":
    asyncio.run(main())

3

Create a virtual environment

Open the Terminal and create a python virtual environment.
python3 -m venv .venv
source .venv/bin/activate
4

Install libraries

pip install -U agno openai
5

Export your OpenAI API key

  export OPENAI_API_KEY="your_openai_api_key_here"
6

Run Agent

python user_input_required_stream_async.py
7

Find All Cookbooks

Explore all the available cookbooks in the Agno repository. Click the link below to view the code on GitHub:Agno Cookbooks on GitHub