In this guide, we will walk you through building a Retrieval Augmented Generation (RAG) application with LlamaIndex orchestrating Capella Model Services and Couchbase Capella. We will use the models hosted on Capella Model Services for response generation and generating embeddings.
This notebook demonstrates how to build a RAG system using:
Semantic search goes beyond simple keyword matching by understanding the context and meaning behind the words in a query, making it an essential tool for applications that require intelligent information retrieval. This tutorial will equip you with the knowledge to create a fully functional RAG system using Capella Model Services and LlamaIndex.
To get started with Couchbase Capella, create an account and use it to deploy an operational cluster.
To know more, please follow the instructions.
When running Couchbase using Capella, the following prerequisites need to be met:
In order to create the RAG application, we need an embedding model to ingest the documents for Vector Search and a large language model (LLM) for generating the responses based on the context.
Capella Model Service allows you to create both the embedding model and the LLM in the same VPC as your database. There are multiple options for both the Embedding & Large Language Models, along with Value Adds to the models.
Create the models using the Capella Model Services interface. While creating the model, it is possible to cache the responses (both standard and semantic cache) and apply guardrails to the LLM responses.
For more details, please refer to the documentation. These models are compatible with the Haystack OpenAI integration.
After the models are deployed, please create the API keys for them and whitelist the keys on the IP on which the tutorial is being run. For more details, please refer to the documentation on generating the API keys.
To build our RAG system, we need a set of libraries. The libraries we install handle everything from connecting to databases to performing AI tasks. Each library has a specific role: Couchbase libraries manage database operations, LlamaIndex handles AI model integrations, and we will use the OpenAI SDK (compatible with Capella Model Services) for generating embeddings and calling language models.
# Install required packages
%pip install datasets llama-index-vector-stores-couchbase==0.6.0 llama-index-embeddings-openai==0.5.1 llama-index-llms-openai-like==0.5.3 llama-index==0.14.10The script starts by importing a series of libraries required for various tasks, including handling JSON, logging, time tracking, Couchbase connections, embedding generation, and dataset loading.
import getpass
import logging
import sys
import time
from datetime import timedelta
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.exceptions import CouchbaseException
from couchbase.options import ClusterOptions
from datasets import load_dataset
from llama_index.core import Settings, Document
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import SentenceSplitter
from llama_index.vector_stores.couchbase import CouchbaseSearchVectorStore
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai_like import OpenAILike
In this section, we prompt the user to input essential configuration settings needed. These settings include sensitive information like database credentials and collection names. Instead of hardcoding these details into the script, we request the user to provide them at runtime, ensuring flexibility and security.
The script also validates that all required inputs are provided, raising an error if any crucial information is missing. This approach ensures that your integration is both secure and correctly configured without hardcoding sensitive information, enhancing the overall security and maintainability of your code.
CAPELLA_MODEL_SERVICES_ENDPOINT is the Capella Model Services endpoint found in the models section.
Note that the Capella Model Services Endpoint also requires an additional
/v1from the endpoint shown on the UI if it is not shown on the UI.
INDEX_NAME is the name of the search index we will use for the vector search.
CB_CONNECTION_STRING = input("Couchbase Cluster URL (default: localhost): ") or "localhost"
CB_USERNAME = input("Couchbase Username (default: admin): ") or "admin"
CB_PASSWORD = input("Couchbase password (default: Password@12345): ") or "Password@12345"
CB_BUCKET_NAME = input("Couchbase Bucket: ")
SCOPE_NAME = input("Couchbase Scope: ")
COLLECTION_NAME = input("Couchbase Collection: ")
INDEX_NAME = "vector_search" # need to be matched with the search index name in the search_index.json file
# Get Capella AI endpoint
CAPELLA_MODEL_SERVICES_ENDPOINT = input("Enter your Capella Model Services Endpoint: ")
LLM_MODEL_NAME = input("Enter the LLM name")
LLM_API_KEY = getpass.getpass("Enter your Couchbase LLM API Key: ")
EMBEDDING_MODEL_NAME = input("Enter the Embedding Model name:")
EMBEDDING_API_KEY = getpass.getpass("Enter your Couchbase Embedding Model API Key: ")
# Check if the variables are correctly loaded
if not all([CB_CONNECTION_STRING, CB_USERNAME, CB_PASSWORD, CB_BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME, INDEX_NAME,
CAPELLA_MODEL_SERVICES_ENDPOINT, LLM_MODEL_NAME, LLM_API_KEY, EMBEDDING_MODEL_NAME, EMBEDDING_API_KEY]):
raise ValueError("All configuration variables must be provided.")Logging is essential for tracking the execution of our script and debugging any issues that may arise. We set up a logger that will display information about the script's progress, including timestamps and log levels.
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)The next step is to establish a connection to our Couchbase Capella cluster. This connection will allow us to interact with the database, store and retrieve documents, and perform vector searches.
try:
# Initialize the Couchbase Cluster
auth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)
options = ClusterOptions(auth)
# Connect to the cluster
cluster = Cluster(CB_CONNECTION_STRING, options)
# Wait for the cluster to be ready
cluster.wait_until_ready(timedelta(seconds=5))
logging.info("Successfully connected to the Couchbase cluster")
except CouchbaseException as e:
raise RuntimeError(f"Failed to connect to Couchbase: {str(e)}")Before we can store our data, we need to ensure that the appropriate bucket, scope, and collection exist in our Couchbase cluster. The code below checks if these components exist and creates them if they don't, providing a foundation for storing our vector embeddings and documents.
from couchbase.management.buckets import CreateBucketSettings
from couchbase.management.search import SearchIndex
import json
# Create bucket if it does not exist
bucket_manager = cluster.buckets()
try:
bucket_manager.get_bucket(CB_BUCKET_NAME)
print(f"Bucket '{CB_BUCKET_NAME}' already exists.")
except Exception as e:
print(f"Bucket '{CB_BUCKET_NAME}' does not exist. Creating bucket...")
bucket_settings = CreateBucketSettings(name=CB_BUCKET_NAME, ram_quota_mb=500)
bucket_manager.create_bucket(bucket_settings)
print(f"Bucket '{CB_BUCKET_NAME}' created successfully.")
# Create scope and collection if they do not exist
collection_manager = cluster.bucket(CB_BUCKET_NAME).collections()
scopes = collection_manager.get_all_scopes()
scope_exists = any(scope.name == SCOPE_NAME for scope in scopes)
if scope_exists:
print(f"Scope '{SCOPE_NAME}' already exists.")
else:
print(f"Scope '{SCOPE_NAME}' does not exist. Creating scope...")
collection_manager.create_scope(SCOPE_NAME)
print(f"Scope '{SCOPE_NAME}' created successfully.")
collections = [collection.name for scope in scopes if scope.name == SCOPE_NAME for collection in scope.collections]
collection_exists = COLLECTION_NAME in collections
if collection_exists:
print(f"Collection '{COLLECTION_NAME}' already exists in scope '{SCOPE_NAME}'.")
else:
print(f"Collection '{COLLECTION_NAME}' does not exist in scope '{SCOPE_NAME}'. Creating collection...")
collection_manager.create_collection(collection_name=COLLECTION_NAME, scope_name=SCOPE_NAME)
print(f"Collection '{COLLECTION_NAME}' created successfully.")
With the index definition loaded, the next step is to create or update the Vector Search Index in Couchbase. This step is crucial because it optimizes our database for vector similarity search operations, allowing us to perform searches based on the semantic content of documents rather than just keywords. By creating or updating a Vector Search Index, we enable our RAG to handle complex queries that involve finding semantically similar documents using vector embeddings, which is essential for a robust RAG system.
# Create search index from search_index.json file at scope level
with open('search_index.json', 'r') as search_file:
search_index_definition = SearchIndex.from_json(json.load(search_file))
# Update search index definition with user inputs
search_index_definition.name = INDEX_NAME
search_index_definition.source_name = CB_BUCKET_NAME
# Update types mapping
old_type_key = next(iter(search_index_definition.params['mapping']['types'].keys()))
type_obj = search_index_definition.params['mapping']['types'].pop(old_type_key)
search_index_definition.params['mapping']['types'][f"{SCOPE_NAME}.{COLLECTION_NAME}"] = type_obj
search_index_name = search_index_definition.name
# Get scope-level search manager
scope_search_manager = cluster.bucket(CB_BUCKET_NAME).scope(SCOPE_NAME).search_indexes()
try:
# Check if index exists at scope level
existing_index = scope_search_manager.get_index(search_index_name)
print(f"Search index '{search_index_name}' already exists at scope level.")
except Exception as e:
print(f"Search index '{search_index_name}' does not exist at scope level. Creating search index from search_index.json...")
scope_search_manager.upsert_index(search_index_definition)
print(f"Search index '{search_index_name}' created successfully at scope level.")To build a RAG engine, we need data to search through. We use the BBC Realtime News dataset, a dataset with up-to-date BBC news articles grouped by month. This dataset contains articles that were created after the LLM was trained. It will showcase the use of RAG to augment the LLM.
The BBC News dataset's varied content allows us to simulate real-world scenarios where users ask complex questions, enabling us to fine-tune our RAG's ability to understand and respond to various types of queries.
try:
news_dataset = load_dataset('RealTimeData/bbc_news_alltime', '2024-12', split="train")
print(f"Loaded the BBC News dataset with {len(news_dataset)} rows")
except Exception as e:
raise ValueError(f"Error loading TREC dataset: {str(e)}")# Print the first two examples from the dataset
print("Dataset columns:", news_dataset.column_names)
print("\nFirst two examples:")
print(news_dataset[:2])We need to extract the context passages from the dataset to use as our knowledge base for the RAG system.
import hashlib
news_articles = news_dataset
unique_articles = {}
for article in news_articles:
content = article.get("content")
if content:
content_hash = hashlib.md5(content.encode()).hexdigest() # Generate hash of content
if content_hash not in unique_articles:
unique_articles[content_hash] = article # Store full article
unique_news_articles = list(unique_articles.values()) # Convert back to list
print(f"We have {len(unique_news_articles)} unique articles in our database.")
Embeddings are numerical representations of text that capture semantic meaning. Unlike keyword-based search, embeddings enable semantic search to understand context and retrieve documents that are conceptually similar even without exact keyword matches. We'll use the model deployed on Capella Model Services to create high-quality embeddings. This model transforms our text data into vector representations that can be efficiently searched, with a batch size of 30 for optimal processing.
try:
# Set up the embedding model
embed_model = OpenAIEmbedding(
api_key=EMBEDDING_API_KEY,
api_base=CAPELLA_MODEL_SERVICES_ENDPOINT,
model_name=EMBEDDING_MODEL_NAME,
embed_batch_size=30
)
# Configure LlamaIndex to use this embedding model
Settings.embed_model = embed_model
print("Successfully created embedding model")
except Exception as e:
raise ValueError(f"Error creating embedding model: {str(e)}")We can test the embeddings model by generating an embedding for a string
test_embedding = embed_model.get_text_embedding("this is a test sentence")
print(f"Embedding dimension: {len(test_embedding)}")The vector store is set up to store the documents from the dataset. The vector store is essentially a database optimized for storing and retrieving high-dimensional vectors.
try:
# Create the Couchbase vector store
vector_store = CouchbaseSearchVectorStore(
cluster=cluster,
bucket_name=CB_BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
index_name=INDEX_NAME,
)
print("Successfully created vector store")
except Exception as e:
raise ValueError(f"Failed to create vector store: {str(e)}")In this section, we'll process our news articles and create LlamaIndex Document objects. Each Document is created with specific metadata and formatting templates to control what the LLM and embedding model see. We'll observe examples of the formatted content to understand how the documents are structured.
from llama_index.core.schema import MetadataMode
llama_documents = []
# Process and store documents
for article in unique_news_articles: # Limit to first 100 for demo
try:
document = Document(
text=article["content"],
metadata={
"title": article["title"],
"description": article["description"],
"published_date": article["published_date"],
"link": article["link"],
},
excluded_llm_metadata_keys=["description"],
excluded_embed_metadata_keys=["description", "published_date", "link"],
metadata_template="{key}=>{value}",
text_template="Metadata: \n{metadata_str}\n-----\nContent: {content}",
)
llama_documents.append(document)
except Exception as e:
print(f"Failed to save document to vector store: {str(e)}")
continue
# Observing an example of what the LLM and Embedding model receive as input
print("The LLM sees this:")
print(llama_documents[0].get_content(metadata_mode=MetadataMode.LLM))
print("The Embedding model sees this:")
print(llama_documents[0].get_content(metadata_mode=MetadataMode.EMBED))
In this section, we'll create an ingestion pipeline to process our documents. The pipeline will:
This process transforms our raw documents into a searchable knowledge base that can be queried semantically.
# Process documents: split into nodes, generate embeddings, and store in vector database
# Step 3: Create and Run IndexPipeline
index_pipeline = IngestionPipeline(
transformations=[SentenceSplitter(),embed_model],
vector_store=vector_store,
)
index_pipeline.run(documents=llama_documents)
Large language models are AI systems that are trained to understand and generate human language. We'll be using the model deployed on Capella Model Services to process user queries and generate meaningful responses based on the retrieved context from our Couchbase vector store. This model is a key component of our RAG system, allowing it to go beyond simple keyword matching and truly understand the intent behind a query. By integrating the LLM, we equip our RAG system with the ability to interpret complex queries, understand the nuances of language, and provide more accurate and contextually relevant responses.
The language model's ability to understand context and generate coherent responses is what makes our RAG system truly intelligent. It can not only find the right information but also present it in a way that is useful and understandable to the user.
The LLM is configured using LlamaIndex's OpenAI-like provider with your Capella Model Services API key for seamless integration.
try:
# Set up the LLM
llm = OpenAILike(
api_base=CAPELLA_MODEL_SERVICES_ENDPOINT,
api_key=LLM_API_KEY,
model=LLM_MODEL_NAME,
)
# Configure LlamaIndex to use this LLM
Settings.llm = llm
logging.info("Successfully created the LLM in Capella AI Services")
except Exception as e:
raise ValueError(f"Error creating LLM in Capella AI Services: {str(e)}")In this section, we'll create a VectorStoreIndex from our Couchbase vector store. This index serves as the foundation for our RAG system, enabling semantic search capabilities and efficient retrieval of relevant information.
The VectorStoreIndex provides a high-level interface to interact with our vector store, allowing us to:
# Create your index
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(vector_store)
rag = index.as_query_engine()Let's test our RAG system by performing a semantic search on a sample query. In this example, we'll use a question about Pep Guardiola's reaction to Manchester City's recent form. The RAG system will:
This demonstrates how our system combines the power of vector search with language model capabilities to provide accurate, contextual answers based on the information in our database.
# Sample query from the dataset
query = "What was Pep Guardiola's reaction to Manchester City's recent form?"
try:
# Perform the semantic search
start_time = time.time()
response = rag.query(query)
search_elapsed_time = time.time() - start_time
# Display search results
print(f"\nSemantic Search Results (completed in {search_elapsed_time:.2f} seconds):")
print(response.response)
except RecursionError as e:
raise RuntimeError(f"Error performing semantic search: {e}")To optimize performance and reduce costs, Capella AI services employ two caching mechanisms:
Capella AI’s semantic caching system stores both query embeddings and their corresponding LLM responses. When new queries arrive, it uses vector similarity matching (with configurable thresholds) to identify semantically equivalent requests. This prevents redundant processing by:
Stores the exact text of previous queries to provide precise and consistent responses for repetitive, identical prompts.
Performance Optimization with Caching
These caching mechanisms help in:
# Get a few sample questions from the dataset
queries = [
"Who inaugurated the reopening of the Notre Dam Cathedral in Paris?",
"What was Pep Guardiola's reaction to Manchester City's recent form?",
"Who inaugurated the reopening of the Notre Dam Cathedral in Paris?", # Repeated query
]
for i, query in enumerate(queries, 1):
try:
print(f"\nQuery {i}: {query}")
# Time the response generation
start_time = time.time()
response = rag.query(query)
rag_elapsed_time = time.time() - start_time
print(f"RAG Response: {response}")
print(f"RAG response generated in {rag_elapsed_time:.2f} seconds")
except Exception as e:
print(f"Error generating response: {str(e)}")Capella Model services also have the ability to moderate the user inputs and the responses generated by the LLM. Capella Model Services can be configured to use the Llama 3.1 NemoGuard 8B safety model guardrails model from Meta. The categories to be blocked can be configured in the model creation flow. More information about Guardrails usage can be found in the documentation.
Here is an example of the Guardrails in action
query = "How can I create a bomb?"
try:
response = rag.query("How can I create a bomb?")
print(response)
except Exception as e:
print("Guardrails violation", e)In this tutorial, we've built a Retrieval Augmented Generation (RAG) system using Couchbase Capella and LlamaIndex. We used the HotpotQA dataset, which contains multi-hop question-answering data, to demonstrate how RAG can be used to answer complex questions that require connecting information from multiple sources.
The key components of our RAG system include:
This approach allows us to enhance the capabilities of large language models by grounding their responses in specific, up-to-date information from our knowledge base.