1
Create a Python file
Create a Python file and add the above code.
Copy
Ask AI
touch user_input_required_stream_async.py
2
Add the following code to your Python file
user_input_required_stream_async.py
Copy
Ask AI
import asyncio
from typing import List
from agno.agent import Agent
from agno.db.sqlite import SqliteDb
from agno.models.openai import OpenAIChat
from agno.tools import tool
from agno.tools.function import UserInputField
# You can either specify the user_input_fields leave empty for all fields to be provided by the user
@tool(requires_user_input=True, user_input_fields=["to_address"])
def send_email(subject: str, body: str, to_address: str) -> str:
"""
Send an email.
Args:
subject (str): The subject of the email.
body (str): The body of the email.
to_address (str): The address to send the email to.
"""
return f"Sent email to {to_address} with subject {subject} and body {body}"
agent = Agent(
model=OpenAIChat(id="gpt-5-mini"),
tools=[send_email],
markdown=True,
db=SqliteDb(session_table="test_session", db_file="tmp/example.db"),
)
async def main():
async for run_event in agent.arun(
"Send an email with the subject 'Hello' and the body 'Hello, world!'",
stream=True,
):
if run_event.is_paused: # Or agent.run_response.is_paused
for tool in run_event.tools_requiring_user_input: # type: ignore
input_schema: List[UserInputField] = tool.user_input_schema # type: ignore
for field in input_schema:
# Get user input for each field in the schema
field_type = field.field_type
field_description = field.description
# Display field information to the user
print(f"\nField: {field.name}")
print(f"Description: {field_description}")
print(f"Type: {field_type}")
# Get user input
if field.value is None:
user_value = input(f"Please enter a value for {field.name}: ")
else:
print(f"Value: {field.value}")
user_value = field.value
# Update the field value
field.value = user_value
async for resp in agent.acontinue_run( # type: ignore
run_id=run_event.run_id,
updated_tools=run_event.tools,
stream=True,
):
print(resp.content, end="")
# Or for simple debug flow
# agent.aprint_response("Send an email with the subject 'Hello' and the body 'Hello, world!'")
if __name__ == "__main__":
asyncio.run(main())
3
Create a virtual environment
Open the
Terminal and create a python virtual environment.Copy
Ask AI
python3 -m venv .venv
source .venv/bin/activate
4
Install libraries
Copy
Ask AI
pip install -U agno openai
5
Export your OpenAI API key
Copy
Ask AI
export OPENAI_API_KEY="your_openai_api_key_here"
6
Run Agent
Copy
Ask AI
python user_input_required_stream_async.py
7
Find All Cookbooks
Explore all the available cookbooks in the Agno repository. Click the link below to view the code on GitHub:Agno Cookbooks on GitHub