In this guide, we will walk you through building a powerful semantic search engine using Couchbase as the backend database, OpenAI as the embedding and LLM provider, and PydanticAI as an agent orchestrator. Semantic search goes beyond simple keyword matching by understanding the context and meaning behind the words in a query, making it an essential tool for applications that require intelligent information retrieval. This tutorial is designed to be beginner-friendly, with clear, step-by-step instructions that will equip you with the knowledge to create a fully functional semantic search system from scratch.
This tutorial is available as a Jupyter Notebook (.ipynb file) that you can run interactively.
You can either download the notebook file and run it on Google Colab or run it on your system by setting up the Python environment.
To get started with Couchbase Capella, create an account and use it to deploy a forever free tier operational cluster. This account provides you with an environment where you can explore and learn about Capella with no time constraint.
To know more, please follow the instructions.
When running Couchbase using Capella, the following prerequisites need to be met.
To build our semantic search engine, we need a robust set of tools. The libraries we install handle everything from connecting to databases to performing complex machine learning tasks. Each library has a specific role: Couchbase libraries manage database operations, LangChain handles AI model integrations, and OpenAI provides advanced AI models for generating embeddings and understanding natural language. By setting up these libraries, we ensure our environment is equipped to handle the data-intensive and computationally complex tasks required for semantic search.
%pip install --quiet -U datasets==3.5.0 langchain-couchbase==0.3.0 langchain-openai==0.3.13 python-dotenv==1.1.0 pydantic-ai==0.1.1 ipywidgets==8.1.6Note: you may need to restart the kernel to use updated packages.The script starts by importing a series of libraries required for various tasks, including handling JSON, logging, time tracking, Couchbase connections, embedding generation, and dataset loading. These libraries provide essential functions for working with data, managing database connections, and processing machine learning models.
import getpass
import json
import logging
import os
import time
from uuid import uuid4
from datetime import timedelta
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.exceptions import (InternalServerFailureException,
QueryIndexAlreadyExistsException)
from couchbase.management.buckets import CreateBucketSettings
from couchbase.management.search import SearchIndex
from couchbase.options import ClusterOptions
from datasets import load_dataset
from dotenv import load_dotenv
from langchain_couchbase.vectorstores import CouchbaseSearchVectorStore
from langchain_openai import OpenAIEmbeddings
from tqdm import tqdm
from dataclasses import dataclass
from pydantic_ai import Agent, RunContextLogging is configured to track the progress of the script and capture any errors or warnings. This is crucial for debugging and understanding the flow of execution. The logging output includes timestamps, log levels (e.g., INFO, ERROR), and messages that describe what is happening in the script.
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', force=True)In this section, we prompt the user to input essential configuration settings needed. These settings include sensitive information like API keys, database credentials, and specific configuration names. Instead of hardcoding these details into the script, we request the user to provide them at runtime, ensuring flexibility and security.
The script also validates that all required inputs are provided, raising an error if any crucial information is missing. This approach ensures that your integration is both secure and correctly configured without hardcoding sensitive information, enhancing the overall security and maintainability of your code.
load_dotenv()
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') or getpass.getpass('Enter your OpenAI API Key: ')
CB_HOST = os.getenv('CB_HOST') or input('Enter your Couchbase host (default: couchbase://localhost): ') or 'couchbase://localhost'
CB_USERNAME = os.getenv('CB_USERNAME') or input('Enter your Couchbase username (default: Administrator): ') or 'Administrator'
CB_PASSWORD = os.getenv('CB_PASSWORD') or getpass.getpass('Enter your Couchbase password (default: password): ') or 'password'
CB_BUCKET_NAME = os.getenv('CB_BUCKET_NAME') or input('Enter your Couchbase bucket name (default: vector-search-testing): ') or 'vector-search-testing'
INDEX_NAME = os.getenv('INDEX_NAME') or input('Enter your index name (default: vector_search_pydantic_ai): ') or 'vector_search_pydantic_ai'
SCOPE_NAME = os.getenv('SCOPE_NAME') or input('Enter your scope name (default: shared): ') or 'shared'
COLLECTION_NAME = os.getenv('COLLECTION_NAME') or input('Enter your collection name (default: pydantic_ai): ') or 'pydantic_ai'
# Check if the variables are correctly loaded
if not OPENAI_API_KEY:
raise ValueError("Missing OpenAI API Key")
if 'OPENAI_API_KEY' not in os.environ:
os.environ['OPENAI_API_KEY'] = OPENAI_API_KEYConnecting to a Couchbase cluster is the foundation of our project. Couchbase will serve as our primary data store, handling all the storage and retrieval operations required for our semantic search engine. By establishing this connection, we enable our application to interact with the database, allowing us to perform operations such as storing embeddings, querying data, and managing collections. This connection is the gateway through which all data will flow, so ensuring it's set up correctly is paramount.
try:
auth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)
options = ClusterOptions(auth)
cluster = Cluster(CB_HOST, options)
cluster.wait_until_ready(timedelta(seconds=5))
logging.info("Successfully connected to Couchbase")
except Exception as e:
raise ConnectionError(f"Failed to connect to Couchbase: {str(e)}")2025-04-11 13:54:19,537 - INFO - Successfully connected to CouchbaseThe setup_collection() function handles creating and configuring the hierarchical data organization in Couchbase:
Bucket Creation:
Scope Management:
Collection Setup:
Additional Tasks:
def setup_collection(cluster, bucket_name, scope_name, collection_name):
try:
# Check if bucket exists, create if it doesn't
try:
bucket = cluster.bucket(bucket_name)
logging.info(f"Bucket '{bucket_name}' exists.")
except Exception as e:
logging.info(f"Bucket '{bucket_name}' does not exist. Creating it...")
bucket_settings = CreateBucketSettings(
name=bucket_name,
bucket_type='couchbase',
ram_quota_mb=1024,
flush_enabled=True,
num_replicas=0
)
cluster.buckets().create_bucket(bucket_settings)
time.sleep(2) # Wait for bucket creation to complete and become available
bucket = cluster.bucket(bucket_name)
logging.info(f"Bucket '{bucket_name}' created successfully.")
bucket_manager = bucket.collections()
# Check if scope exists, create if it doesn't
scopes = bucket_manager.get_all_scopes()
scope_exists = any(scope.name == scope_name for scope in scopes)
if not scope_exists and scope_name != "_default":
logging.info(f"Scope '{scope_name}' does not exist. Creating it...")
bucket_manager.create_scope(scope_name)
logging.info(f"Scope '{scope_name}' created successfully.")
# Check if collection exists, create if it doesn't
collections = bucket_manager.get_all_scopes()
collection_exists = any(
scope.name == scope_name and collection_name in [col.name for col in scope.collections]
for scope in collections
)
if not collection_exists:
logging.info(f"Collection '{collection_name}' does not exist. Creating it...")
bucket_manager.create_collection(scope_name, collection_name)
time.sleep(2)
logging.info(f"Collection '{collection_name}' created successfully.")
else:
logging.info(f"Collection '{collection_name}' already exists.Skipping creation.")
collection = bucket.scope(scope_name).collection(collection_name)
time.sleep(2) # Give the collection time to be ready for queries
# Ensure primary index exists
try:
cluster.query(f"CREATE PRIMARY INDEX IF NOT EXISTS ON `{bucket_name}`.`{scope_name}`.`{collection_name}`").execute()
logging.info("Primary index present or created successfully.")
except Exception as e:
logging.warning(f"Error creating primary index: {str(e)}")
# Clear all documents in the collection
try:
query = f"DELETE FROM `{bucket_name}`.`{scope_name}`.`{collection_name}`"
cluster.query(query).execute()
logging.info("All documents cleared from the collection.")
except Exception as e:
logging.warning(f"Error while clearing documents: {str(e)}. The collection might be empty.")
return collection
except Exception as e:
raise RuntimeError(f"Error setting up collection: {str(e)}")
setup_collection(cluster, CB_BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME)2025-04-11 13:54:23,668 - INFO - Bucket 'vector-search-testing' does not exist. Creating it...
2025-04-11 13:54:25,721 - INFO - Bucket 'vector-search-testing' created successfully.
2025-04-11 13:54:25,728 - INFO - Scope 'shared' does not exist. Creating it...
2025-04-11 13:54:25,777 - INFO - Scope 'shared' created successfully.
2025-04-11 13:54:25,796 - INFO - Collection 'pydantic_ai' does not exist. Creating it...
2025-04-11 13:54:27,843 - INFO - Collection 'pydantic_ai' created successfully.
2025-04-11 13:54:28,120 - INFO - Primary index present or created successfully.
2025-04-11 13:54:28,133 - INFO - All documents cleared from the collection.
<couchbase.collection.Collection at 0x16febe640>Semantic search requires an efficient way to retrieve relevant documents based on a user's query. This is where the Couchbase Vector Search Index comes into play. In this step, we load the Vector Search Index definition from a JSON file, which specifies how the index should be structured. This includes the fields to be indexed, the dimensions of the vectors, and other parameters that determine how the search engine processes queries based on vector similarity.
This vector search index configuration requires specific default settings to function properly. This tutorial uses the bucket named vector-search-testing with the scope shared and collection pydantic_ai. The configuration is set up for vectors with exactly 1536 dimensions, using dot product similarity and optimized for recall. If you want to use a different bucket, scope, or collection, you will need to modify the index configuration accordingly.
For more information on creating a vector search index, please follow the instructions.
# If you are running this script locally (not in Google Colab), uncomment the following line
# and provide the path to your index definition file.
# index_definition_path = '/path_to_your_index_file/pydantic_ai_index.json' # Local setup: specify your file path here
# # Version for Google Colab
# def load_index_definition_colab():
# from google.colab import files
# print("Upload your index definition file")
# uploaded = files.upload()
# index_definition_path = list(uploaded.keys())[0]
# try:
# with open(index_definition_path, 'r') as file:
# index_definition = json.load(file)
# return index_definition
# except Exception as e:
# raise ValueError(f"Error loading index definition from {index_definition_path}: {str(e)}")
# Version for Local Environment
def load_index_definition_local(index_definition_path):
try:
with open(index_definition_path, 'r') as file:
index_definition = json.load(file)
return index_definition
except Exception as e:
raise ValueError(f"Error loading index definition from {index_definition_path}: {str(e)}")
# Usage
# Uncomment the appropriate line based on your environment
# index_definition = load_index_definition_colab()
index_definition = load_index_definition_local('pydantic_ai_index.json')With the index definition loaded, the next step is to create or update the Vector Search Index in Couchbase. This step is crucial because it optimizes our database for vector similarity search operations, allowing us to perform searches based on the semantic content of documents rather than just keywords. By creating or updating a Vector Search Index, we enable our search engine to handle complex queries that involve finding semantically similar documents using vector embeddings, which is essential for a robust semantic search engine.
try:
scope_index_manager = cluster.bucket(CB_BUCKET_NAME).scope(SCOPE_NAME).search_indexes()
# Check if index already exists
existing_indexes = scope_index_manager.get_all_indexes()
index_name = index_definition["name"]
if index_name in [index.name for index in existing_indexes]:
logging.info(f"Index '{index_name}' found")
else:
logging.info(f"Creating new index '{index_name}'...")
# Create SearchIndex object from JSON definition
search_index = SearchIndex.from_json(index_definition)
# Upsert the index (create if not exists, update if exists)
scope_index_manager.upsert_index(search_index)
logging.info(f"Index '{index_name}' successfully created/updated.")
except QueryIndexAlreadyExistsException:
logging.info(f"Index '{index_name}' already exists. Skipping creation/update.")
except InternalServerFailureException as e:
error_message = str(e)
logging.error(f"InternalServerFailureException raised: {error_message}")
try:
# Accessing the response_body attribute from the context
error_context = e.context
response_body = error_context.response_body
if response_body:
error_details = json.loads(response_body)
error_message = error_details.get('error', '')
if "collection: 'pydantic_ai' doesn't belong to scope: 'shared'" in error_message:
raise ValueError("Collection 'pydantic_ai' does not belong to scope 'shared'. Please check the collection and scope names.")
except ValueError as ve:
logging.error(str(ve))
raise
except Exception as json_error:
logging.error(f"Failed to parse the error message: {json_error}")
raise RuntimeError(f"Internal server error while creating/updating search index: {error_message}")2025-04-11 13:54:41,157 - INFO - Creating new index 'vector-search-testing.shared.vector_search_pydantic_ai'...
2025-04-11 13:54:41,316 - INFO - Index 'vector-search-testing.shared.vector_search_pydantic_ai' successfully created/updated.Embeddings are at the heart of semantic search. They are numerical representations of text that capture the semantic meaning of the words and phrases. Unlike traditional keyword-based search, which looks for exact matches, embeddings allow our search engine to understand the context and nuances of language, enabling it to retrieve documents that are semantically similar to the query, even if they don't contain the exact keywords. By creating embeddings using OpenAI, we equip our search engine with the ability to understand and process natural language in a way that's much closer to how humans understand language. This step transforms our raw text data into a format that the search engine can use to find and rank relevant documents.
try:
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
api_key=OPENAI_API_KEY,
)
logging.info("Successfully created OpenAIEmbeddings")
except Exception as e:
raise ValueError(f"Error creating OpenAIEmbeddings: {str(e)}")2025-04-11 13:55:10,426 - INFO - Successfully created OpenAIEmbeddingsThe vector store is set up to manage the embeddings created in the previous step. The vector store is essentially a database optimized for storing and retrieving high-dimensional vectors. In this case, the vector store is built on top of Couchbase, allowing the script to store the embeddings in a way that can be efficiently searched.
try:
vector_store = CouchbaseSearchVectorStore(
cluster=cluster,
bucket_name=CB_BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
embedding=embeddings,
index_name=INDEX_NAME,
)
logging.info("Successfully created vector store")
except Exception as e:
raise ValueError(f"Failed to create vector store: {str(e)}")
2025-04-11 13:55:12,849 - INFO - Successfully created vector storeTo build a search engine, we need data to search through. We use the BBC News dataset from RealTimeData, which provides real-world news articles. This dataset contains news articles from BBC covering various topics and time periods. Loading the dataset is a crucial step because it provides the raw material that our search engine will work with. The quality and diversity of the news articles make it an excellent choice for testing and refining our search engine, ensuring it can handle real-world news content effectively.
The BBC News dataset allows us to work with authentic news articles, enabling us to build and test a search engine that can effectively process and retrieve relevant news content. The dataset is loaded using the Hugging Face datasets library, specifically accessing the "RealTimeData/bbc_news_alltime" dataset with the "2024-12" version.
try:
news_dataset = load_dataset(
"RealTimeData/bbc_news_alltime", "2024-12", split="train"
)
print(f"Loaded the BBC News dataset with {len(news_dataset)} rows")
logging.info(f"Successfully loaded the BBC News dataset with {len(news_dataset)} rows.")
except Exception as e:
raise ValueError(f"Error loading the BBC News dataset: {str(e)}")2025-04-11 13:55:22,967 - INFO - Successfully loaded the BBC News dataset with 2687 rows.
Loaded the BBC News dataset with 2687 rowsWe will use the content of the news articles for our RAG system.
The dataset contains a few duplicate records. We are removing them to avoid duplicate results in the retrieval stage of our RAG system.
news_articles = news_dataset["content"]
unique_articles = set()
for article in news_articles:
if article:
unique_articles.add(article)
unique_news_articles = list(unique_articles)
print(f"We have {len(unique_news_articles)} unique articles in our database.")We have 1749 unique articles in our database.With the Vector store set up, the next step is to populate it with data. We save the BBC articles dataset to the vector store. For each document, we will generate the embeddings for the article to use with the semantic search using LangChain. Here one of the articles is larger than the maximum tokens that we can use for our embedding model. If we want to ingest that document, we could split the document and ingest it in parts. However, since it is only a single document for simplicity, we ignore that document from the ingestion process.
# Save the current logging level
current_logging_level = logging.getLogger().getEffectiveLevel()
# # Set logging level to CRITICAL to suppress lower level logs
logging.getLogger().setLevel(logging.CRITICAL)
articles = [article for article in unique_news_articles if article and len(article) <= 50000]
try:
vector_store.add_texts(
texts=articles
)
except Exception as e:
raise ValueError(f"Failed to save documents to vector store: {str(e)}")
# Restore the original logging level
logging.getLogger().setLevel(current_logging_level)From PydanticAI's website:
PydanticAI is a Python agent framework designed to make it less painful to build production grade applications with Generative AI.
PydanticAI allows us to define agents and tools easily to create Gen-AI apps in an innovative and painless manner. Some of its features are:
Built by the Pydantic Team: Built by the team behind Pydantic (the validation layer of the OpenAI SDK, the Anthropic SDK, LangChain, LlamaIndex, AutoGPT, Transformers, CrewAI, Instructor and many more).
Model-agnostic: Supports OpenAI, Anthropic, Gemini, Deepseek, Ollama, Groq, Cohere, and Mistral, and there is a simple interface to implement support for other models.
Type-safe: Designed to make type checking as powerful and informative as possible for you.
Python-centric Design: Leverages Python's familiar control flow and agent composition to build your AI-driven projects, making it easy to apply standard Python best practices you'd use in any other (non-AI) project.
Structured Responses: Harnesses the power of Pydantic to validate and structure model outputs, ensuring responses are consistent across runs.
Dependency Injection System: Offers an optional dependency injection system to provide data and services to your agent's system prompts, tools and result validators. This is useful for testing and eval-driven iterative development.
Streamed Responses: Provides the ability to stream LLM outputs continuously, with immediate validation, ensuring rapid and accurate results.
Graph Support: Pydantic Graph provides a powerful way to define graphs using typing hints, this is useful in complex applications where standard control flow can degrade to spaghetti code.
PydanticAI makes heavy use of dependency injection to provide data and services to your agent's system prompts and tools. We define dependencies using a dataclass, which serves as a container for our dependencies.
In our case, the only dependency for our agent to work in the CouchbaseSearchVectorStore instance. However, we will still use a dataclass as it is good practice. In the future, in case we wish to add more dependencies, we can just add more fields to the dataclass Deps.
We also initialize an agent as a GPT-4o model. PydanticAI supports many different LLM providers, including Anthropic, Google, Cohere, etc. which can also be used. While initializing the agent, we also pass the type of the dependencies. This is mainly used for type checking, and not actually used at runtime.
@dataclass
class Deps:
vector_store: CouchbaseSearchVectorStore
agent = Agent("openai:gpt-4o", deps_type=Deps)PydanticAI has the concept of function tools, which are functions that can be called by LLMs to retrieve extra information that can help form a better response.
We can perform RAG by creating a tool which retrieves documents that are semantically similar to the query, and allowing the agent to call the tool when required. We can add the function as a tool using the @agent.tool decorator.
Notice that we also add the context parameter, which contains the dependencies that are passed to the tool (in this case, the only dependency is the vector store).
@agent.tool
async def retrieve(context: RunContext[Deps], search_query: str) -> str:
"""Retrieve news data based on a search query.
Args:
context: The call context
search_query: The search query
"""
search_results = context.deps.vector_store.similarity_search_with_score(search_query, k=5)
return "\n\n".join(
f"# Documents:\n{doc.page_content}"
for doc, score in search_results
)Finally, we create a function that allows us to define our dependencies and run our agent.
async def run_agent(question: str):
deps = Deps(
vector_store=vector_store,
)
answer = await agent.run(question, deps=deps)
return answerWe have now finished setting up our vector store and agent! The system is now ready to accept queries.
query = "What was manchester city manager pep guardiola's reaction to the team's current form?"
output = await run_agent(query)
print("=" * 20, "Agent Output", "=" * 20)
print(output.data)2025-04-11 13:56:53,839 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-04-11 13:56:54,485 - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-04-11 13:57:01,928 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
==================== Agent Output ====================
Pep Guardiola has expressed a mix of determination and concern regarding Manchester City's current form. He acknowledged the personal impact of the team's downturn, admitting that the situation has affected his sleep and diet due to the worst run of results he has ever faced in his managerial career. Guardiola described his state of mind as "ugly," noting the team's precarious position in competitions and the need to defend better and avoid mistakes.
Despite these challenges, Guardiola remains committed to finding solutions, emphasizing the need to improve defensive concepts and restore the team's intensity and form. He acknowledged the errors from some of the best players in the world and expressed a need for the team to stay positive and for players to have the necessary support to overcome their current struggles.
Moreover, Guardiola expressed a pragmatic view of the situation, accepting that the team must "survive" the season and acknowledging a potential need for a significant rebuild to address the challenges they're facing. As a testament to his commitment, he noted his intention to continue shaping the club during his newly extended contract period. Throughout, he reiterated his belief in the team and emphasized the need to find a way forward.We can use the all_messages() method in the output object to observe how the agent and tools work.
In the cell below, we see an extremely detailed list of all the model's messages and tool calls, which happens step by step:
UserPromptPart, which consists of the query the user sends to the agent.retrieve tool in the ToolCallPart message. This includes the search_query argument. Couchbase uses this search_query to perform semantic search over all the ingested news articles.retrieve tool returns a ToolReturnPart object with all the context required for the model to answer the user's query. The retrieve documents were truncated, because a large amount of context was retrieved.from pprint import pprint
for idx, message in enumerate(output.all_messages(), start=1):
print(f"Step {idx}:")
pprint(message.__repr__())
print("=" * 50)Step 1:
('ModelRequest(parts=[UserPromptPart(content="What was manchester city manager '
'pep guardiola\'s reaction to the team\'s current form?", '
'timestamp=datetime.datetime(2025, 4, 11, 8, 26, 52, 836357, '
"tzinfo=datetime.timezone.utc), part_kind='user-prompt')], kind='request')")
==================================================
Step 2:
("ModelResponse(parts=[ToolCallPart(tool_name='retrieve', "
'args=\'{"search_query":"Pep Guardiola reaction to Manchester City current '
'form"}\', tool_call_id=\'call_oo4Jjn93VkRJ3q9PnAwkt3xm\', '
"part_kind='tool-call')], model_name='gpt-4o-2024-08-06', "
'timestamp=datetime.datetime(2025, 4, 11, 8, 26, 53, '
"tzinfo=datetime.timezone.utc), kind='response')")
==================================================
Step 3:
("ModelRequest(parts=[ToolReturnPart(tool_name='retrieve', content='# "
'Documents:\\nManchester City boss Pep Guardiola has won 18 trophies since he '
'arrived at the club in 2016\\n\\nManchester City boss Pep Guardiola says he '
'is "fine" despite admitting his sleep and diet are being affected by the '
'worst run of results in his entire managerial career. In an interview with '
'former Italy international Luca Toni for Amazon Prime Sport before '
"Wednesday\\'s Champions League defeat by Juventus, Guardiola touched on the "
"personal impact City\\'s sudden downturn in form has had. Guardiola said his "
'state of mind was "ugly", that his sleep was "worse" and he was eating '
"lighter as his digestion had suffered. City go into Sunday\\'s derby against "
... (output truncated for brevity)