Code
cookbook/08_knowledge/vector_db/couchbase_db/async_couchbase_db.py
Copy
Ask AI
import asyncio
import os
import time
from agno.agent import Agent
from agno.knowledge.embedder.openai import OpenAIEmbedder
from agno.knowledge.knowledge import Knowledge
from agno.vectordb.couchbase import CouchbaseSearch
from couchbase.auth import PasswordAuthenticator
from couchbase.management.search import SearchIndex
from couchbase.options import ClusterOptions, KnownConfigProfiles
# Couchbase connection settings
username = os.getenv("COUCHBASE_USER") # Replace with your username
password = os.getenv("COUCHBASE_PASSWORD") # Replace with your password
connection_string = os.getenv("COUCHBASE_CONNECTION_STRING")
# Create cluster options with authentication
auth = PasswordAuthenticator(username, password)
cluster_options = ClusterOptions(auth)
cluster_options.apply_profile(KnownConfigProfiles.WanDevelopment)
# Define the vector search index
search_index = SearchIndex(
name="vector_search",
source_type="gocbcore",
idx_type="fulltext-index",
source_name="recipe_bucket",
plan_params={"index_partitions": 1, "num_replicas": 0},
params={
"doc_config": {
"docid_prefix_delim": "",
"docid_regexp": "",
"mode": "scope.collection.type_field",
"type_field": "type",
},
"mapping": {
"default_analyzer": "standard",
"default_datetime_parser": "dateTimeOptional",
"index_dynamic": True,
"store_dynamic": True,
"default_mapping": {"dynamic": True, "enabled": False},
"types": {
"recipe_scope.recipes": {
"dynamic": False,
"enabled": True,
"properties": {
"content": {
"enabled": True,
"fields": [
{
"docvalues": True,
"include_in_all": False,
"include_term_vectors": False,
"index": True,
"name": "content",
"store": True,
"type": "text",
}
],
},
"embedding": {
"enabled": True,
"dynamic": False,
"fields": [
{
"vector_index_optimized_for": "recall",
"docvalues": True,
"dims": 3072,
"include_in_all": False,
"include_term_vectors": False,
"index": True,
"name": "embedding",
"similarity": "dot_product",
"store": True,
"type": "vector",
}
],
},
"meta": {
"dynamic": True,
"enabled": True,
"properties": {
"name": {
"enabled": True,
"fields": [
{
"docvalues": True,
"include_in_all": False,
"include_term_vectors": False,
"index": True,
"name": "name",
"store": True,
"analyzer": "keyword",
"type": "text",
}
],
}
},
},
},
}
},
},
},
)
knowledge_base = Knowledge(
vector_db=CouchbaseSearch(
bucket_name="recipe_bucket",
scope_name="recipe_scope",
collection_name="recipes",
couchbase_connection_string=connection_string,
cluster_options=cluster_options,
search_index=search_index,
embedder=OpenAIEmbedder(
id="text-embedding-3-large",
dimensions=3072,
api_key=os.getenv("OPENAI_API_KEY"),
),
wait_until_index_ready=60,
overwrite=True,
),
)
# Create and use the agent
agent = Agent(knowledge=knowledge_base)
async def run_agent():
await knowledge_base.ainsert(
url="https://agno-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"
)
time.sleep(5) # wait for the vector index to be sync with kv
await agent.aprint_response("How to make Thai curry?", markdown=True)
if __name__ == "__main__":
asyncio.run(run_agent())
Usage
1
Set up your virtual environment
Copy
Ask AI
uv venv --python 3.12
source .venv/bin/activate
2
Start Couchbase
Copy
Ask AI
docker run -d --name couchbase-server \
-p 8091-8096:8091-8096 \
-p 11210:11210 \
-e COUCHBASE_ADMINISTRATOR_USERNAME=Administrator \
-e COUCHBASE_ADMINISTRATOR_PASSWORD=password \
couchbase:latest
- Bucket:
recipe_bucket - Scope:
recipe_scope - Collection:
recipes
3
Install dependencies
Copy
Ask AI
uv pip install -U couchbase pypdf openai agno
4
Set environment variables
Copy
Ask AI
export COUCHBASE_USER="Administrator"
export COUCHBASE_PASSWORD="password"
export COUCHBASE_CONNECTION_STRING="couchbase://localhost"
export OPENAI_API_KEY=xxx
5
Run Agent
Copy
Ask AI
python cookbook/08_knowledge/vector_db/couchbase_db/async_couchbase_db.py