In this guide, we will walk you through building a Retrieval Augmented Generation (RAG) application with Haystack orchestrating Capella Model Services and Couchbase Capella. We will use the models hosted on Capella Model Services for response generation and generating embeddings.
This notebook demonstrates how to build a RAG system using:
We leverage Couchbase's Hyperscale and Composite Vector Indexes to enable efficient semantic search at scale. Hyperscale indexes prioritize high-throughput vector similarity across billions of vectors with a compact on-disk footprint, while Composite indexes blend scalar predicates with a vector column to narrow candidate sets before similarity search. For a deeper dive into how these indexes work, see the overview of Capella vector indexes.
Semantic search goes beyond simple keyword matching by understanding the context and meaning behind the words in a query, making it an essential tool for applications that require intelligent information retrieval. This tutorial shows how to combine Capella Model Services and Haystack with Couchbase's Hyperscale and Composite Vector Indexes to deliver a production-ready RAG workflow.
To get started with Couchbase Capella, create an account and use it to deploy an operational cluster.
To know more, please follow the instructions.
When running Couchbase using Capella, the following prerequisites need to be met:
In order to create the RAG application, we need an embedding model to ingest the documents for Vector Search and a large language model (LLM) for generating the responses based on the context.
Capella Model Service allows you to create both the embedding model and the LLM in the same VPC as your database. There are multiple options for both the Embedding & Large Language Models, along with Value Adds to the models.
Create the models using the Capella Model Services interface. While creating the model, it is possible to cache the responses (both standard and semantic cache) and apply guardrails to the LLM responses.
For more details, please refer to the documentation. These models are compatible with the Haystack OpenAI integration.
After the models are deployed, please create the API keys for them and whitelist the keys on the IP on which the tutorial is being run. For more details, please refer to the documentation on generating the API keys.
To build our RAG system, we need a set of libraries. The libraries we install handle everything from connecting to databases to performing AI tasks. Each library has a specific role: Couchbase libraries manage database operations, Haystack handles AI model integrations and pipeline management, and we will use the OpenAI SDK (compatible with Capella Model Services) for generating embeddings and calling language models.
# Install required packages
%pip install -r requirements.txtThe script starts by importing a series of libraries required for various tasks, including handling JSON, logging, time tracking, Couchbase connections, Haystack components for RAG pipeline, embedding generation, and dataset loading.
import getpass
import logging
import sys
import time
import pandas as pd
from datetime import timedelta
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.exceptions import CouchbaseException
from couchbase.options import ClusterOptions, KnownConfigProfiles, QueryOptions
from datasets import load_dataset
from haystack import Pipeline, Document, GeneratedAnswer
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.writers import DocumentWriter
from haystack.utils import Secret
from haystack.components.builders import PromptBuilder
from couchbase_haystack import (
CouchbaseQueryDocumentStore,
CouchbaseQueryEmbeddingRetriever,
QueryVectorSearchType,
QueryVectorSearchSimilarity,
CouchbasePasswordAuthenticator,
CouchbaseClusterOptions
)
In this section, we prompt the user to input essential configuration settings needed. These settings include sensitive information like database credentials, collection names, and API keys. Instead of hardcoding these details into the script, we request the user to provide them at runtime, ensuring flexibility and security.
The script also validates that all required inputs are provided, raising an error if any crucial information is missing. This approach ensures that your integration is both secure and correctly configured without hardcoding sensitive information, enhancing the overall security and maintainability of your code.
CAPELLA_MODEL_SERVICES_ENDPOINT is the Capella Model Services endpoint found in the models section.
Note that the Capella Model Services Endpoint also requires an additional
/v1from the endpoint shown on the UI if it is not shown on the UI.
INDEX_NAME is the name of the Hyperscale Index we will create for vector search operations.
CB_CONNECTION_STRING = input("Couchbase Cluster URL (default: localhost): ") or "couchbase://localhost"
CB_USERNAME = input("Couchbase Username (default: admin): ") or "admin"
CB_PASSWORD = getpass.getpass("Couchbase password (default: Password@12345): ") or "Password@12345"
CB_BUCKET_NAME = input("Couchbase Bucket: ")
SCOPE_NAME = input("Couchbase Scope: ")
COLLECTION_NAME = input("Couchbase Collection: ")
INDEX_NAME = input("Vector Search Index: ")
# Get Capella AI endpoint
CAPELLA_MODEL_SERVICES_ENDPOINT = input("Enter your Capella Model Services Endpoint: ")
LLM_MODEL_NAME = input("Enter the LLM name: ")
LLM_API_KEY = getpass.getpass("Enter your Capella Model Services LLM API Key: ")
EMBEDDING_MODEL_NAME = input("Enter the Embedding Model name: ")
EMBEDDING_API_KEY = getpass.getpass("Enter your Capella Model Services Embedding Model API Key: ")
EMBEDDING_DIMENSION = input("Enter the Embedding Dimension (e.g. 3072, 4096): ") or "3072"
# Check if the variables are correctly loaded
if not all([CB_CONNECTION_STRING, CB_USERNAME, CB_PASSWORD, CB_BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME, INDEX_NAME, CAPELLA_MODEL_SERVICES_ENDPOINT, LLM_MODEL_NAME, LLM_API_KEY, EMBEDDING_MODEL_NAME, EMBEDDING_API_KEY]):
raise ValueError("All configuration variables must be provided.")Logging is essential for tracking the execution of our script and debugging any issues that may arise. We set up a logger that will display information about the script's progress, including timestamps and log levels.
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)The next step is to establish a connection to our Couchbase Capella cluster. This connection will allow us to interact with the database, store and retrieve documents, and perform vector searches.
try:
# Initialize the Couchbase Cluster
auth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)
options = ClusterOptions(auth)
options.apply_profile(KnownConfigProfiles.WanDevelopment)
# Connect to the cluster
cluster = Cluster(CB_CONNECTION_STRING, options)
# Wait for the cluster to be ready
cluster.wait_until_ready(timedelta(seconds=5))
logging.info("Successfully connected to the Couchbase cluster")
except CouchbaseException as e:
raise RuntimeError(f"Failed to connect to Couchbase: {str(e)}")Before we can store our data, we need to ensure that the appropriate bucket, scope, and collection exist in our Couchbase cluster. The code below checks if these components exist and creates them if they don't, providing a foundation for storing our vector embeddings and documents.
from couchbase.management.buckets import CreateBucketSettings
import json
# Create bucket if it does not exist
bucket_manager = cluster.buckets()
try:
bucket_manager.get_bucket(CB_BUCKET_NAME)
print(f"Bucket '{CB_BUCKET_NAME}' already exists.")
except Exception as e:
print(f"Bucket '{CB_BUCKET_NAME}' does not exist. Creating bucket...")
bucket_settings = CreateBucketSettings(name=CB_BUCKET_NAME, ram_quota_mb=500)
bucket_manager.create_bucket(bucket_settings)
print(f"Bucket '{CB_BUCKET_NAME}' created successfully.")
# Create scope and collection if they do not exist
collection_manager = cluster.bucket(CB_BUCKET_NAME).collections()
scopes = collection_manager.get_all_scopes()
scope_exists = any(scope.name == SCOPE_NAME for scope in scopes)
if scope_exists:
print(f"Scope '{SCOPE_NAME}' already exists.")
else:
print(f"Scope '{SCOPE_NAME}' does not exist. Creating scope...")
collection_manager.create_scope(SCOPE_NAME)
print(f"Scope '{SCOPE_NAME}' created successfully.")
collections = [collection.name for scope in scopes if scope.name == SCOPE_NAME for collection in scope.collections]
collection_exists = COLLECTION_NAME in collections
if collection_exists:
print(f"Collection '{COLLECTION_NAME}' already exists in scope '{SCOPE_NAME}'.")
else:
print(f"Collection '{COLLECTION_NAME}' does not exist in scope '{SCOPE_NAME}'. Creating collection...")
collection_manager.create_collection(collection_name=COLLECTION_NAME, scope_name=SCOPE_NAME)
print(f"Collection '{COLLECTION_NAME}' created successfully.")
To build a RAG engine, we need data to search through. We use the BBC Realtime News dataset, a dataset with up-to-date BBC news articles grouped by month. This dataset contains articles that were created after the LLM was trained. It will showcase the use of RAG to augment the LLM.
The BBC News dataset's varied content allows us to simulate real-world scenarios where users ask complex questions, enabling us to fine-tune our RAG's ability to understand and respond to various types of queries.
try:
news_dataset = load_dataset('RealTimeData/bbc_news_alltime', '2024-12', split="train")
print(f"Loaded the BBC News dataset with {len(news_dataset)} rows")
except Exception as e:
raise ValueError(f"Error loading BBC News dataset: {str(e)}")# Print the first two examples from the dataset
print("Dataset columns:", news_dataset.column_names)
print("\nFirst two examples:")
print(news_dataset[:2])We need to extract the context passages from the dataset to use as our knowledge base for the RAG system.
import hashlib
news_articles = news_dataset
unique_articles = {}
for article in news_articles:
content = article.get("content")
if content:
content_hash = hashlib.md5(content.encode()).hexdigest() # Generate hash of content
if content_hash not in unique_articles:
unique_articles[content_hash] = article # Store full article
unique_news_articles = list(unique_articles.values()) # Convert back to list
print(f"We have {len(unique_news_articles)} unique articles in our database.")
Embeddings are numerical representations of text that capture semantic meaning. Unlike keyword-based search, embeddings enable semantic search to understand context and retrieve documents that are conceptually similar even without exact keyword matches. We'll use the model deployed on Capella Model Services to create high-quality embeddings. This model transforms our text data into vector representations that can be efficiently searched using Haystack's OpenAI document embedder (configured to point to Capella).
try:
# Set up the document embedder for processing documents
document_embedder = OpenAIDocumentEmbedder(
api_base_url=CAPELLA_MODEL_SERVICES_ENDPOINT,
api_key=Secret.from_token(EMBEDDING_API_KEY),
model=EMBEDDING_MODEL_NAME
)
# Set up the text embedder for query processing
rag_embedder = OpenAITextEmbedder(
api_base_url=CAPELLA_MODEL_SERVICES_ENDPOINT,
api_key=Secret.from_token(EMBEDDING_API_KEY),
model=EMBEDDING_MODEL_NAME
)
print("Successfully created embedding models")
except Exception as e:
raise ValueError(f"Error creating embedding models: {str(e)}")We can test the text embeddings model by generating an embedding for a string
test_result = rag_embedder.run(text="this is a test sentence")
test_embedding = test_result["embedding"]
print(f"Embedding dimension: {len(test_embedding)}")The CouchbaseQueryDocumentStore from the couchbase_haystack package provides seamless integration with Couchbase, supporting both Hyperscale and Composite Vector Indexes.
try:
# Create the Couchbase vector document store
document_store = CouchbaseQueryDocumentStore(
cluster_connection_string=Secret.from_token(CB_CONNECTION_STRING),
authenticator=CouchbasePasswordAuthenticator(
username=Secret.from_token(CB_USERNAME),
password=Secret.from_token(CB_PASSWORD)
),
cluster_options=CouchbaseClusterOptions(
profile=KnownConfigProfiles.WanDevelopment,
),
bucket=CB_BUCKET_NAME,
scope=SCOPE_NAME,
collection=COLLECTION_NAME,
search_type=QueryVectorSearchType.ANN,
similarity=QueryVectorSearchSimilarity.COSINE
)
print("Successfully created Couchbase vector document store")
except Exception as e:
raise ValueError(f"Failed to create Couchbase vector document store: {str(e)}")In this section, we'll process our news articles and create Haystack Document objects. Each Document is created with specific metadata that will be used for retrieval and generation. We'll observe examples of the document content to understand how the documents are structured.
haystack_documents = []
# Process and store documents
for article in unique_news_articles: # Process all unique articles
try:
document = Document(
content=article["content"],
meta={
"title": article["title"],
"description": article["description"],
"published_date": article["published_date"],
"link": article["link"],
}
)
haystack_documents.append(document)
except Exception as e:
print(f"Failed to create document: {str(e)}")
continue
# Observing an example of the document content
print("Document content preview:")
print(f"Content: {haystack_documents[0].content[:200]}...")
print(f"Metadata: {haystack_documents[0].meta}")
print(f"Created {len(haystack_documents)} documents")
In this section, we'll create an indexing pipeline to process our documents. The pipeline will:
This transforms raw news articles into searchable vector representations stored in Couchbase for later semantic retrieval in the RAG system.
# Process documents: split into chunks, generate embeddings, and store in document store
# Create indexing pipeline
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("cleaner", DocumentCleaner())
indexing_pipeline.add_component("embedder", document_embedder)
indexing_pipeline.add_component("writer", DocumentWriter(document_store=document_store))
indexing_pipeline.connect("cleaner.documents", "embedder.documents")
indexing_pipeline.connect("embedder.documents", "writer.documents")
Execute the pipeline for processing and indexing BCC news documents:
# Run the indexing pipeline
if haystack_documents:
result = indexing_pipeline.run({"cleaner": {"documents": haystack_documents[:1200]}})
print(f"Indexed {result['writer']['documents_written']} document chunks")
else:
print("No documents created. Skipping indexing.")
Large language models are AI systems that are trained to understand and generate human language. We'll be using the model deployed on Capella Model Services to process user queries and generate meaningful responses based on the retrieved context from our Couchbase document store. This model is a key component of our RAG system, allowing it to go beyond simple keyword matching and truly understand the intent behind a query. By integrating the LLM, we equip our RAG system with the ability to interpret complex queries, understand the nuances of language, and provide more accurate and contextually relevant responses.
The language model's ability to understand context and generate coherent responses is what makes our RAG system truly intelligent. It can not only find the right information but also present it in a way that is useful and understandable to the user.
The LLM is configured using Haystack's OpenAI generator component with your Capella Model Services API key for seamless integration.
try:
# Set up the LLM generator
generator = OpenAIGenerator(
api_base_url=CAPELLA_MODEL_SERVICES_ENDPOINT,
api_key=Secret.from_token(LLM_API_KEY),
model=LLM_MODEL_NAME
)
logging.info("Successfully created the generator")
except Exception as e:
raise ValueError(f"Error creating generator: {str(e)}")In this section, we'll create a RAG pipeline using Haystack components. This pipeline serves as the foundation for our RAG system, enabling semantic search capabilities and efficient retrieval of relevant information.
The RAG pipeline provides a complete workflow that allows us to:
# Define RAG prompt template
prompt_template = """
Given these documents, answer the question.\nDocuments:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
\nQuestion: {{question}}
\nAnswer:
"""
# Create the RAG pipeline
rag_pipeline = Pipeline()
# Add components to the pipeline
rag_pipeline.add_component(
"query_embedder",
rag_embedder,
)
rag_pipeline.add_component("retriever", CouchbaseQueryEmbeddingRetriever(document_store=document_store))
rag_pipeline.add_component("prompt_builder", PromptBuilder(template=prompt_template))
rag_pipeline.add_component("llm",generator)
rag_pipeline.add_component("answer_builder", AnswerBuilder())
# Connect RAG components
rag_pipeline.connect("query_embedder", "retriever.query_embedding")
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder.prompt", "llm.prompt")
rag_pipeline.connect("llm.replies", "answer_builder.replies")
rag_pipeline.connect("llm.meta", "answer_builder.meta")
rag_pipeline.connect("retriever", "answer_builder.documents")
print("Successfully created RAG pipeline")Let's test our RAG system by performing a semantic search on a sample query. In this example, we'll use a question about Pep Guardiola's reaction to Manchester City's recent form. The RAG system will:
This demonstrates how our system combines the power of vector search with language model capabilities to provide accurate, contextual answers based on the information in our database.
Note: By default, without any Hyperscale or Composite Vector Index, Couchbase falls back to linear brute-force search that compares the query vector against every document in the collection. This works for small datasets but can become slow as the dataset grows.
# Sample query from the dataset
query = "What was Pep Guardiola's reaction to Manchester City's current form?"
try:
# Perform the semantic search using the RAG pipeline
start_time = time.time()
result = rag_pipeline.run({
"query_embedder": {"text": query},
"retriever": {"top_k": 5},
"prompt_builder": {"question": query},
"answer_builder": {"query": query},
},
include_outputs_from={"retriever", "query_embedder"}
)
search_elapsed_time = time.time() - start_time
# Get the generated answer
answer: GeneratedAnswer = result["answer_builder"]["answers"][0]
# Print retrieved documents
print("=== Retrieved Documents ===")
retrieved_docs = result["retriever"]["documents"]
for idx, doc in enumerate(retrieved_docs, start=1):
print(f"Id: {doc.id} Title: {doc.meta['title']}")
# Print final results
print("\n=== Final Answer ===")
print(f"Question: {answer.query}")
print(f"Answer: {answer.data}")
print("\nSources:")
for doc in answer.documents:
print(f"-> {doc.meta['title']}")
# Display search results
print(f"\nLinear Vector Search Results (completed in {search_elapsed_time:.2f} seconds):")
#print(result["generator"]["replies"][0])
except Exception as e:
raise RuntimeError(f"Error performing RAG search: {e}")While the above RAG system works effectively, you can significantly improve query performance by enabling Couchbase Capella's Hyperscale or Composite Vector Indexes.
Use this type of index when you primarily query vector values and need low-latency similarity search at scale. In general, Hyperscale Vector Indexes are the best starting point for most vector search workloads.
Use Composite Vector Indexes when you want to perform searches that blend scalar predicates and vector similarity so that the scalar filters tighten the candidate set.
For an in-depth comparison and tuning guidance, review the Couchbase vector index documentation and the overview of Capella vector indexes.
The index_description parameter controls how Couchbase optimizes vector storage and search performance through centroids and quantization:
Format: 'IVF[<centroids>],{PQ|SQ}<settings>'
Centroids (IVF - Inverted File):
IVF,SQ8), Couchbase auto-selects based on dataset sizeQuantization Options:
SQ4, SQ6, SQ8 (4, 6, or 8 bits per dimension)PQ<subquantizers>x<bits> (e.g., PQ32x8)Common Examples:
IVF,SQ8 – Auto centroids, 8-bit scalar quantization (good default)IVF1000,SQ6 – 1000 centroids, 6-bit scalar quantizationIVF,PQ32x8 – Auto centroids, 32 subquantizers with 8 bitsFor detailed configuration options, see the Quantization & Centroid Settings.
In the code below, we demonstrate creating a Hyperscale index for optimal performance. You can adapt the same flow to create a COMPOSITE index by replacing the index type and options.
# Create a Hyperscale Vector Index for optimized vector search
try:
hyperscale_index_name = f"{INDEX_NAME}_hyperscale"
# Use the cluster connection to create the Hyperscale index
scope = cluster.bucket(CB_BUCKET_NAME).scope(SCOPE_NAME)
options = {
"dimension": int(EMBEDDING_DIMENSION), # dimension based on the model
"similarity": "cosine",
"description": "IVF,PQ32x8",
"scan_nprobes": 3,
}
scope.query(
f"""
CREATE VECTOR INDEX {hyperscale_index_name}
ON {COLLECTION_NAME} (embedding VECTOR)
WITH {json.dumps(options)}
""",
QueryOptions(
timeout=timedelta(seconds=300)
)).execute()
print(f"Successfully created Hyperscale index: {hyperscale_index_name}")
except Exception as e:
print(f"Hyperscale index may already exist or error occurred: {str(e)}")
The example below runs the same RAG query, but now uses the Hyperscale index created above. You'll notice improved performance as the index efficiently retrieves data. If you create a Composite index, the workflow is identical — Haystack automatically routes queries through the scalar filters before performing the vector similarity search.
# Test the optimized Hyperscale vector search
query = "What was Pep Guardiola's reaction to Manchester City's current form?"
try:
# The RAG pipeline will automatically use the optimized Hyperscale index
# Perform the semantic search with Hyperscale optimization
start_time = time.time()
result = rag_pipeline.run({
"query_embedder": {"text": query},
"retriever": {"top_k": 4},
"prompt_builder": {"question": query},
"answer_builder": {"query": query},
},
include_outputs_from={"retriever", "query_embedder"}
)
search_elapsed_time = time.time() - start_time
# Get the generated answer
answer: GeneratedAnswer = result["answer_builder"]["answers"][0]
# Print retrieved documents
print("=== Retrieved Documents ===")
retrieved_docs = result["retriever"]["documents"]
for idx, doc in enumerate(retrieved_docs, start=0):
print(f"Id: {doc.id} Title: {doc.meta['title']}")
# Print final results
print("\n=== Final Answer ===")
print(f"Question: {answer.query}")
print(f"Answer: {answer.data}")
print("\nSources:")
for doc in answer.documents:
print(f"-> {doc.meta['title']}")
# Display search results
print(f"\nOptimized Hyperscale Vector Search Results (completed in {search_elapsed_time:.2f} seconds):")
#print(result["generator"]["replies"][0])
except Exception as e:
raise RuntimeError(f"Error performing optimized semantic search: {e}")
To optimize performance and reduce costs, Capella Model Services employ two caching mechanisms:
Semantic Cache
Capella Model Services’ semantic caching system stores both query embeddings and their corresponding LLM responses. When new queries arrive, it uses vector similarity matching (with configurable thresholds) to identify semantically equivalent requests. This prevents redundant processing by:
Standard Cache
Stores the exact text of previous queries to provide precise and consistent responses for repetitive, identical prompts.
Performance Optimization with Caching
These caching mechanisms help in:
import time
queries = [
"Why are car manufacturers like Ford and Stellantis unhappy with the UK government’s current rules designed to promote electric vehicles?",
"Who inaugurated the reopening of the Notre Dam Cathedral in Paris?",
"What was Pep Guardiola's reaction to Manchester City's recent form?",
"Why are car manufacturers like Ford and Stellantis unhappy with the UK government’s current rules designed to promote electric vehicles?",
]
for i, query in enumerate(queries, 1):
try:
print(f"\nQuery {i}: {query}")
start_time = time.time()
result = rag_pipeline.run({
"query_embedder": {"text": query},
"retriever": {"top_k": 4},
"prompt_builder": {"question": query},
"answer_builder": {"query": query},
})
elapsed_time = time.time() - start_time
answer: GeneratedAnswer = result["answer_builder"]["answers"][0]
print(f"Response: {answer.data}")
print(f"Time taken: {elapsed_time:.2f} seconds")
except Exception as e:
print(f"Error generating RAG response: {str(e)}")
continueCapella Model services also have the ability to moderate the user inputs and the responses generated by the LLM. Capella Model Services can be configured to use the Llama 3.1 NemoGuard 8B safety model guardrails model from Meta. The categories to be blocked can be configured in the model creation flow. More information about Guardrails usage can be found in the documentation.
Here is an example of the Guardrails in action
query = "How can I create a bomb?"
try:
start_time = time.time()
response = rag_pipeline.run({
"query_embedder": {"text": query},
"retriever": {"top_k": 4},
"prompt_builder": {"question": query},
"answer_builder": {"query": query},
})
rag_elapsed_time = time.time() - start_time
answer: GeneratedAnswer = response["answer_builder"]["answers"][0]
print(f"RAG Response: {answer.data}")
print(f"RAG response generated in {rag_elapsed_time:.2f} seconds")
except Exception as e:
print("Guardrails violation", e)In this tutorial, we've built a Retrieval Augmented Generation (RAG) system using Haystack with Capella Model Services and Couchbase Capella's Hyperscale and Composite Vector Indexes. Using the BBC News dataset, we demonstrated how modern vector indexes make it possible to answer up-to-date questions that extend beyond an LLM's original training data.
The key components of our RAG system include:
This approach grounds LLM responses in specific, current information from our knowledge base while taking advantage of Couchbase's advanced vector index options for performance and scale. Haystack's modular pipeline model keeps the solution extensible as you layer in additional data sources or services.