This notebook demonstrates how to build a Retrieval Augmented Generation (RAG) system using:
The system allows users to ask questions about current events and get AI-generated answers based on the latest news articles.
To build our RAG system, we need a set of libraries. The libraries we install handle everything from connecting to databases to performing AI tasks. Each library has a specific role: Couchbase libraries manage database operations, Haystack handles AI model integrations and pipeline management, and we will use the OpenAI SDK for generating embeddings and calling OpenAI's language models.
%pip install datasets haystack-ai couchbase-haystack openai pandas[Output too long, omitted for brevity]The script starts by importing a series of libraries required for various tasks, including handling JSON, logging, time tracking, Couchbase connections, Haystack components for RAG pipeline, embedding generation, and dataset loading.
import getpass
import base64
import logging
import sys
import time
import pandas as pd
from datetime import timedelta
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.exceptions import CouchbaseException
from couchbase.options import ClusterOptions
from datasets import load_dataset
from haystack import Pipeline, GeneratedAnswer
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.writers import DocumentWriter
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.utils import Secret
from haystack.dataclasses import Document
from couchbase_haystack import (
CouchbaseSearchDocumentStore,
CouchbasePasswordAuthenticator,
CouchbaseClusterOptions,
CouchbaseSearchEmbeddingRetriever,
)
from couchbase.options import KnownConfigProfiles
# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
/Users/viraj.agarwal/Tasks/Task16.5/.venv/lib/python3.13/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
from .autonotebook import tqdm as notebook_tqdmTo get started with Couchbase Capella, create an account and use it to deploy an operational cluster.
To know more, please follow the instructions.
When running Couchbase using Capella, the following prerequisites need to be met:
In order to create the RAG application, we need an embedding model to ingest the documents for Vector Search and a large language model (LLM) for generating the responses based on the context.
For this implementation, we'll use OpenAI's models which provide state-of-the-art performance for both embeddings and text generation:
Embedding Model: We'll use OpenAI's text-embedding-3-large model, which provides high-quality embeddings with 3,072 dimensions for semantic search capabilities.
Large Language Model: We'll use OpenAI's gpt-4o model for generating responses based on the retrieved context. This model offers excellent reasoning capabilities and can handle complex queries effectively.
Prerequisites for OpenAI Integration:
For more details about OpenAI's models and pricing, please refer to the OpenAI documentation.
Enter your Couchbase and OpenAI credentials:
OPENAI_API_KEY is your OpenAI API key which can be obtained from your OpenAI dashboard at platform.openai.com.
INDEX_NAME is the name of the Search Vector Index used for vector search operations.
CB_CONNECTION_STRING = input("Couchbase Cluster URL (default: localhost): ") or "localhost"
CB_USERNAME = input("Couchbase Username (default: admin): ") or "admin"
CB_PASSWORD = input("Couchbase password (default: Password@12345): ") or "Password@12345"
CB_BUCKET_NAME = input("Couchbase Bucket: ")
CB_SCOPE_NAME = input("Couchbase Scope: ")
CB_COLLECTION_NAME = input("Couchbase Collection: ")
CB_INDEX_NAME = input("Vector Search Index: ")
OPENAI_API_KEY = input("OpenAI API Key: ")
# Check if the variables are correctly loaded
if not all([CB_CONNECTION_STRING, CB_USERNAME, CB_PASSWORD, CB_BUCKET_NAME, CB_SCOPE_NAME, CB_COLLECTION_NAME, CB_INDEX_NAME, OPENAI_API_KEY]):
raise ValueError("All configuration variables must be provided.")from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.management.buckets import CreateBucketSettings
from couchbase.management.collections import CollectionSpec
from couchbase.management.search import SearchIndex
import json
# Connect to Couchbase cluster
cluster = Cluster(CB_CONNECTION_STRING, ClusterOptions(
PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)))
# Create bucket if it does not exist
bucket_manager = cluster.buckets()
try:
bucket_manager.get_bucket(CB_BUCKET_NAME)
print(f"Bucket '{CB_BUCKET_NAME}' already exists.")
except Exception as e:
print(f"Bucket '{CB_BUCKET_NAME}' does not exist. Creating bucket...")
bucket_settings = CreateBucketSettings(name=CB_BUCKET_NAME, ram_quota_mb=500)
bucket_manager.create_bucket(bucket_settings)
print(f"Bucket '{CB_BUCKET_NAME}' created successfully.")
# Create scope and collection if they do not exist
collection_manager = cluster.bucket(CB_BUCKET_NAME).collections()
scopes = collection_manager.get_all_scopes()
scope_exists = any(scope.name == CB_SCOPE_NAME for scope in scopes)
if scope_exists:
print(f"Scope '{CB_SCOPE_NAME}' already exists.")
else:
print(f"Scope '{CB_SCOPE_NAME}' does not exist. Creating scope...")
collection_manager.create_scope(CB_SCOPE_NAME)
print(f"Scope '{CB_SCOPE_NAME}' created successfully.")
collections = [collection.name for scope in scopes if scope.name == CB_SCOPE_NAME for collection in scope.collections]
collection_exists = CB_COLLECTION_NAME in collections
if collection_exists:
print(f"Collection '{CB_COLLECTION_NAME}' already exists in scope '{CB_SCOPE_NAME}'.")
else:
print(f"Collection '{CB_COLLECTION_NAME}' does not exist in scope '{CB_SCOPE_NAME}'. Creating collection...")
collection_manager.create_collection(collection_name=CB_COLLECTION_NAME, scope_name=CB_SCOPE_NAME)
print(f"Collection '{CB_COLLECTION_NAME}' created successfully.")
# Create Search Vector Index from search_vector_index.json file at scope level
with open('search_vector_index.json', 'r') as search_file:
search_index_definition = SearchIndex.from_json(json.load(search_file))
# Update search index definition with user inputs
search_index_definition.name = CB_INDEX_NAME
search_index_definition.source_name = CB_BUCKET_NAME
# Update types mapping
old_type_key = next(iter(search_index_definition.params['mapping']['types'].keys()))
type_obj = search_index_definition.params['mapping']['types'].pop(old_type_key)
search_index_definition.params['mapping']['types'][f"{CB_SCOPE_NAME}.{CB_COLLECTION_NAME}"] = type_obj
search_index_name = search_index_definition.name
# Get scope-level search manager
scope_search_manager = cluster.bucket(CB_BUCKET_NAME).scope(CB_SCOPE_NAME).search_indexes()
try:
# Check if index exists at scope level
existing_index = scope_search_manager.get_index(search_index_name)
print(f"Search Vector Index '{search_index_name}' already exists at scope level.")
except Exception as e:
print(f"Search Vector Index '{search_index_name}' does not exist at scope level. Creating index from search_vector_index.json...")
with open('search_vector_index.json', 'r') as search_file:
scope_search_manager.upsert_index(search_index_definition)
print(f"Search Vector Index '{search_index_name}' created successfully at scope level.")Bucket 'b' already exists.
Scope 's' already exists.
Collection 'c' already exists in scope 's'.
Search Vector Index 'vector_search' already exists at scope level.Load the TMDB movie dataset and prepare documents for indexing:
# Load TMDB dataset
print("Loading TMDB dataset...")
dataset = load_dataset("AiresPucrs/tmdb-5000-movies")
movies_df = pd.DataFrame(dataset['train'])
print(f"Total movies found: {len(movies_df)}")
# Create documents from movie data
docs_data = []
for _, row in movies_df.iterrows():
if pd.isna(row['overview']):
continue
try:
docs_data.append({
'id': str(row["id"]),
'content': f"Title: {row['title']}\nGenres: {', '.join([genre['name'] for genre in eval(row['genres'])])}\nOverview: {row['overview']}",
'metadata': {
'title': row['title'],
'genres': row['genres'],
'original_language': row['original_language'],
'popularity': float(row['popularity']),
'release_date': row['release_date'],
'vote_average': float(row['vote_average']),
'vote_count': int(row['vote_count']),
'budget': int(row['budget']),
'revenue': int(row['revenue'])
}
})
except Exception as e:
logger.error(f"Error processing movie {row['title']}: {e}")
print(f"Created {len(docs_data)} documents with valid overviews")
documents = [Document(id=doc['id'], content=doc['content'], meta=doc['metadata'])
for doc in docs_data]Loading TMDB dataset...
Generating train split: 100%|██████████| 4803/4803 [00:00<00:00, 123144.70 examples/s]
Total movies found: 4803
Created 4800 documents with valid overviewsSet up the Couchbase document store for storing movie data and embeddings:
# Initialize document store
document_store = CouchbaseSearchDocumentStore(
cluster_connection_string=Secret.from_token(CB_CONNECTION_STRING),
authenticator=CouchbasePasswordAuthenticator(
username=Secret.from_token(CB_USERNAME),
password=Secret.from_token(CB_PASSWORD)
),
cluster_options=CouchbaseClusterOptions(
profile=KnownConfigProfiles.WanDevelopment,
),
bucket=CB_BUCKET_NAME,
scope=CB_SCOPE_NAME,
collection=CB_COLLECTION_NAME,
vector_search_index=CB_INDEX_NAME,
)
print("Couchbase document store initialized successfully.")Couchbase document store initialized successfully.Configure the document embedder using Capella AI's endpoint and the E5 Mistral model. This component will generate embeddings for each movie overview to enable semantic search
embedder = OpenAIDocumentEmbedder(
api_key=Secret.from_token(OPENAI_API_KEY),
model="text-embedding-3-large",
)
rag_embedder = OpenAITextEmbedder(
api_key=Secret.from_token(OPENAI_API_KEY),
model="text-embedding-3-large",
)
Configure the LLM generator using Capella AI's endpoint and Llama 3.1 model. This component will generate natural language responses based on the retrieved documents.
llm = OpenAIGenerator(
api_key=Secret.from_token(OPENAI_API_KEY),
model="gpt-4o",
)Build the pipeline for processing and indexing movie documents:
# Create indexing pipeline
index_pipeline = Pipeline()
index_pipeline.add_component("cleaner", DocumentCleaner())
index_pipeline.add_component("embedder", embedder)
index_pipeline.add_component("writer", DocumentWriter(document_store=document_store))
# Connect indexing components
index_pipeline.connect("cleaner.documents", "embedder.documents")
index_pipeline.connect("embedder.documents", "writer.documents")<haystack.core.pipeline.pipeline.Pipeline object at 0x124121550>
🚅 Components
- cleaner: DocumentCleaner
- embedder: OpenAIDocumentEmbedder
- writer: DocumentWriter
🛤️ Connections
- cleaner.documents -> embedder.documents (list[Document])
- embedder.documents -> writer.documents (list[Document])Execute the pipeline for processing and indexing movie documents:
# Run indexing pipeline
if documents:
# Process documents in batches for better performance
batch_size = 100
total_docs = len(documents[:200])
for i in range(0, total_docs, batch_size):
batch = documents[i:i + batch_size]
result = index_pipeline.run({"cleaner": {"documents": batch}})
print(f"Processed batch {i//batch_size + 1}: {len(batch)} documents")
print(f"\nSuccessfully processed {total_docs} documents")
print(f"Sample document metadata: {documents[0].meta}")
else:
print("No documents created. Skipping indexing.")Calculating embeddings: 4it [00:06, 1.73s/it]
Processed batch 1: 100 documents
Calculating embeddings: 4it [00:06, 1.66s/it]
Processed batch 2: 100 documents
Successfully processed 200 documents
Sample document metadata: {'title': 'Four Rooms', 'genres': '[{"id": 80, "name": "Crime"}, {"id": 35, "name": "Comedy"}]', 'original_language': 'en', 'popularity': 22.87623, 'release_date': '1995-12-09', 'vote_average': 6.5, 'vote_count': 530, 'budget': 4000000, 'revenue': 4300000}Set up the Retrieval Augmented Generation pipeline for answering questions about movies:
# Define RAG prompt template
prompt_template = """
Given these documents, answer the question.\nDocuments:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
\nQuestion: {{question}}
\nAnswer:
"""
# Create RAG pipeline
rag_pipeline = Pipeline()
# Add components
rag_pipeline.add_component(
"query_embedder",
rag_embedder,
)
rag_pipeline.add_component("retriever", CouchbaseSearchEmbeddingRetriever(document_store=document_store))
rag_pipeline.add_component("prompt_builder", PromptBuilder(template=prompt_template))
rag_pipeline.add_component("llm",llm)
rag_pipeline.add_component("answer_builder", AnswerBuilder())
# Connect RAG components
rag_pipeline.connect("query_embedder", "retriever.query_embedding")
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder.prompt", "llm.prompt")
rag_pipeline.connect("llm.replies", "answer_builder.replies")
rag_pipeline.connect("llm.meta", "answer_builder.meta")
rag_pipeline.connect("retriever", "answer_builder.documents")
print("RAG pipeline created successfully.")PromptBuilder has 2 prompt variables, but `required_variables` is not set. By default, all prompt variables are treated as optional, which may lead to unintended behavior in multi-branch pipelines. To avoid unexpected execution, ensure that variables intended to be required are explicitly set in `required_variables`.
RAG pipeline created successfully.Use the RAG pipeline to ask questions about movies and get AI-generated answers:
# Example question
question = "Why did Manni call Lolla?"
# Run the RAG pipeline
result = rag_pipeline.run(
{
"query_embedder": {"text": question},
"retriever": {"top_k": 5},
"prompt_builder": {"question": question},
"answer_builder": {"query": question},
},
include_outputs_from={"retriever", "query_embedder"}
)
# Get the generated answer
answer: GeneratedAnswer = result["answer_builder"]["answers"][0]
# Print retrieved documents
print("=== Retrieved Documents ===")
retrieved_docs = result["retriever"]["documents"]
for idx, doc in enumerate(retrieved_docs, start=1):
print(f"Id: {doc.id} Title: {doc.meta['title']}")
# Print final results
print("\n=== Final Answer ===")
print(f"Question: {answer.query}")
print(f"Answer: {answer.data}")
print("\nSources:")
for doc in answer.documents:
print(f"-> {doc.meta['title']}")=== Retrieved Documents ===
Id: 006b97c08110cb1b9b58e03943c91fa9412cfe7a2a22830ba5b9e3eb0c342344 Title: Run Lola Run
Id: 33543dab4c048c9467d632f319e02bca94da6f178250c14d26eabfb30911a823 Title: Mambo Italiano
Id: 94c55246e02c290767531f6359b5f44145191e3f2d62a3a64ed4718a666be9f2 Title: Good bye, Lenin!
Id: 00b4d1f455e45fbffa39f72be6de635bdcdb6b8a04289ba4aea41061700b9096 Title: Mean Streets
Id: 9241f819303fe61a25e05469856c01a8843d53a6ce7cec340bf0def848ddb470 Title: Magnolia
=== Final Answer ===
Question: Why did Manni call Lolla?
Answer: Manni called Lola because he lost 100,000 DM in a subway train that belongs to a very bad guy, and he needs her help to raise the money within 20 minutes to prevent him from having to rob a store to get the money.
Sources:
-> Run Lola Run
-> Mambo Italiano
-> Good bye, Lenin!
-> Mean Streets
-> MagnoliaIn this tutorial, we built a Retrieval-Augmented Generation (RAG) system using Couchbase Capella, OpenAI, and Haystack with the BBC News dataset. This demonstrates how to combine Couchbase Search Vector Index with large language models to answer questions about current events using real-time information.
The key components include:
text-embedding-3-large) and text generation (gpt-4o)This approach enables AI applications to access and reason over current information that extends beyond the LLM's training data, making responses more accurate and relevant for real-world use cases.