Back to the Couchbase homepageCouchbase logo
Couchbase Developer

  • Docs

    • Integrations
    • SDKs
    • Mobile SDKs

    • AI Developer
    • Backend
    • Full-stack
    • Mobile
    • Ops / DBA

    • Data Modeling
    • Scalability

  • Tutorials

    • Developer Community
    • Ambassador Program
  • Sign In
  • Try Free

RAG with OpenAI, Haystack, and Couchbase Search Vector Index

  • Learn how to build a semantic search engine using the Couchbase Search Vector Index.
  • This tutorial demonstrates how Haystack integrates Couchbase Search Vector Index with embeddings generated by OpenAI services.
  • Perform Retrieval-Augmented Generation (RAG) using Haystack with Couchbase and OpenAI services.

View Source

BBC News Dataset RAG Pipeline with Haystack, Couchbase Search Vector Index, and OpenAI

This notebook demonstrates how to build a Retrieval Augmented Generation (RAG) system using:

  • The BBC News dataset containing real-time news articles
  • Couchbase Capella Search Vector Index for low-latency vector retrieval
  • Haystack framework for the RAG pipeline
  • OpenAI for embeddings and text generation

The system allows users to ask questions about current events and get AI-generated answers based on the latest news articles.

Installing Necessary Libraries

To build our RAG system, we need a set of libraries. The libraries we install handle everything from connecting to databases to performing AI tasks. Each library has a specific role: Couchbase libraries manage database operations, Haystack handles AI model integrations and pipeline management, and we will use the OpenAI SDK for generating embeddings and calling OpenAI's language models.

%pip install datasets haystack-ai couchbase-haystack openai pandas
[Output too long, omitted for brevity]

Importing Necessary Libraries

The script starts by importing a series of libraries required for various tasks, including handling JSON, logging, time tracking, Couchbase connections, Haystack components for RAG pipeline, embedding generation, and dataset loading.

import getpass
import base64
import logging
import sys
import time
import pandas as pd
from datetime import timedelta

from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.exceptions import CouchbaseException
from couchbase.options import ClusterOptions
from datasets import load_dataset
from haystack import Pipeline, GeneratedAnswer
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.preprocessors import DocumentCleaner
from haystack.components.writers import DocumentWriter
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.utils import Secret
from haystack.dataclasses import Document

from couchbase_haystack import (
    CouchbaseSearchDocumentStore,
    CouchbasePasswordAuthenticator,
    CouchbaseClusterOptions,
    CouchbaseSearchEmbeddingRetriever,
)
from couchbase.options import KnownConfigProfiles

# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
/Users/viraj.agarwal/Tasks/Task16.5/.venv/lib/python3.13/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm

Prerequisites

Create and Deploy Your Operational cluster on Capella

To get started with Couchbase Capella, create an account and use it to deploy an operational cluster.

To know more, please follow the instructions.

Couchbase Capella Configuration

When running Couchbase using Capella, the following prerequisites need to be met:

  • Have a multi-node Capella cluster running the Data, Query, Index, and Search services.
  • Create the database credentials to access the travel-sample bucket (Read and Write) used in the application.
  • Allow access to the Cluster from the IP on which the application is running.

OpenAI Models Setup

In order to create the RAG application, we need an embedding model to ingest the documents for Vector Search and a large language model (LLM) for generating the responses based on the context.

For this implementation, we'll use OpenAI's models which provide state-of-the-art performance for both embeddings and text generation:

Embedding Model: We'll use OpenAI's text-embedding-3-large model, which provides high-quality embeddings with 3,072 dimensions for semantic search capabilities.

Large Language Model: We'll use OpenAI's gpt-4o model for generating responses based on the retrieved context. This model offers excellent reasoning capabilities and can handle complex queries effectively.

Prerequisites for OpenAI Integration:

  • Create an OpenAI account at platform.openai.com
  • Generate an API key from your OpenAI dashboard
  • Ensure you have sufficient credits or a valid payment method set up
  • Set up your API key as an environment variable or input it securely in the notebook

For more details about OpenAI's models and pricing, please refer to the OpenAI documentation.

Configure Couchbase Credentials

Enter your Couchbase and OpenAI credentials:

OPENAI_API_KEY is your OpenAI API key which can be obtained from your OpenAI dashboard at platform.openai.com.

INDEX_NAME is the name of the Search Vector Index used for vector search operations.

CB_CONNECTION_STRING = input("Couchbase Cluster URL (default: localhost): ") or "localhost"
CB_USERNAME = input("Couchbase Username (default: admin): ") or "admin"
CB_PASSWORD = input("Couchbase password (default: Password@12345): ") or "Password@12345"
CB_BUCKET_NAME = input("Couchbase Bucket: ")
CB_SCOPE_NAME = input("Couchbase Scope: ")
CB_COLLECTION_NAME = input("Couchbase Collection: ")
CB_INDEX_NAME = input("Vector Search Index: ")
OPENAI_API_KEY = input("OpenAI API Key: ")

# Check if the variables are correctly loaded
if not all([CB_CONNECTION_STRING, CB_USERNAME, CB_PASSWORD, CB_BUCKET_NAME, CB_SCOPE_NAME, CB_COLLECTION_NAME, CB_INDEX_NAME, OPENAI_API_KEY]):
    raise ValueError("All configuration variables must be provided.")
from couchbase.cluster import Cluster 
from couchbase.options import ClusterOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.management.buckets import CreateBucketSettings
from couchbase.management.collections import CollectionSpec
from couchbase.management.search import SearchIndex
import json

# Connect to Couchbase cluster
cluster = Cluster(CB_CONNECTION_STRING, ClusterOptions(
    PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)))

# Create bucket if it does not exist
bucket_manager = cluster.buckets()
try:
    bucket_manager.get_bucket(CB_BUCKET_NAME)
    print(f"Bucket '{CB_BUCKET_NAME}' already exists.")
except Exception as e:
    print(f"Bucket '{CB_BUCKET_NAME}' does not exist. Creating bucket...")
    bucket_settings = CreateBucketSettings(name=CB_BUCKET_NAME, ram_quota_mb=500)
    bucket_manager.create_bucket(bucket_settings)
    print(f"Bucket '{CB_BUCKET_NAME}' created successfully.")

# Create scope and collection if they do not exist
collection_manager = cluster.bucket(CB_BUCKET_NAME).collections()
scopes = collection_manager.get_all_scopes()
scope_exists = any(scope.name == CB_SCOPE_NAME for scope in scopes)

if scope_exists:
    print(f"Scope '{CB_SCOPE_NAME}' already exists.")
else:
    print(f"Scope '{CB_SCOPE_NAME}' does not exist. Creating scope...")
    collection_manager.create_scope(CB_SCOPE_NAME)
    print(f"Scope '{CB_SCOPE_NAME}' created successfully.")

collections = [collection.name for scope in scopes if scope.name == CB_SCOPE_NAME for collection in scope.collections]
collection_exists = CB_COLLECTION_NAME in collections

if collection_exists:
    print(f"Collection '{CB_COLLECTION_NAME}' already exists in scope '{CB_SCOPE_NAME}'.")
else:
    print(f"Collection '{CB_COLLECTION_NAME}' does not exist in scope '{CB_SCOPE_NAME}'. Creating collection...")
    collection_manager.create_collection(collection_name=CB_COLLECTION_NAME, scope_name=CB_SCOPE_NAME)
    print(f"Collection '{CB_COLLECTION_NAME}' created successfully.")

# Create Search Vector Index from search_vector_index.json file at scope level
with open('search_vector_index.json', 'r') as search_file:
    search_index_definition = SearchIndex.from_json(json.load(search_file))
    
       # Update search index definition with user inputs
    search_index_definition.name = CB_INDEX_NAME
    search_index_definition.source_name = CB_BUCKET_NAME
    
    # Update types mapping
    old_type_key = next(iter(search_index_definition.params['mapping']['types'].keys()))
    type_obj = search_index_definition.params['mapping']['types'].pop(old_type_key)
    search_index_definition.params['mapping']['types'][f"{CB_SCOPE_NAME}.{CB_COLLECTION_NAME}"] = type_obj
    
    search_index_name = search_index_definition.name
    
    # Get scope-level search manager
    scope_search_manager = cluster.bucket(CB_BUCKET_NAME).scope(CB_SCOPE_NAME).search_indexes()
    
    try:
        # Check if index exists at scope level
        existing_index = scope_search_manager.get_index(search_index_name)
        print(f"Search Vector Index '{search_index_name}' already exists at scope level.")
    except Exception as e:
        print(f"Search Vector Index '{search_index_name}' does not exist at scope level. Creating index from search_vector_index.json...")
        with open('search_vector_index.json', 'r') as search_file:
            scope_search_manager.upsert_index(search_index_definition)
            print(f"Search Vector Index '{search_index_name}' created successfully at scope level.")
Bucket 'b' already exists.
Scope 's' already exists.
Collection 'c' already exists in scope 's'.
Search Vector Index 'vector_search' already exists at scope level.

Load and Process Movie Dataset

Load the TMDB movie dataset and prepare documents for indexing:

# Load TMDB dataset
print("Loading TMDB dataset...")
dataset = load_dataset("AiresPucrs/tmdb-5000-movies")
movies_df = pd.DataFrame(dataset['train'])
print(f"Total movies found: {len(movies_df)}")

# Create documents from movie data
docs_data = []
for _, row in movies_df.iterrows():
    if pd.isna(row['overview']):
        continue
        
    try:
        docs_data.append({
            'id': str(row["id"]),
            'content': f"Title: {row['title']}\nGenres: {', '.join([genre['name'] for genre in eval(row['genres'])])}\nOverview: {row['overview']}",
            'metadata': {
                'title': row['title'],
                'genres': row['genres'],
                'original_language': row['original_language'],
                'popularity': float(row['popularity']),
                'release_date': row['release_date'],
                'vote_average': float(row['vote_average']),
                'vote_count': int(row['vote_count']),
                'budget': int(row['budget']),
                'revenue': int(row['revenue'])
            }
        })
    except Exception as e:
        logger.error(f"Error processing movie {row['title']}: {e}")

print(f"Created {len(docs_data)} documents with valid overviews")
documents = [Document(id=doc['id'], content=doc['content'], meta=doc['metadata']) 
            for doc in docs_data]
Loading TMDB dataset...


Generating train split: 100%|██████████| 4803/4803 [00:00<00:00, 123144.70 examples/s]


Total movies found: 4803
Created 4800 documents with valid overviews

Initialize Document Store

Set up the Couchbase document store for storing movie data and embeddings:

# Initialize document store
document_store = CouchbaseSearchDocumentStore(
    cluster_connection_string=Secret.from_token(CB_CONNECTION_STRING),
    authenticator=CouchbasePasswordAuthenticator(
        username=Secret.from_token(CB_USERNAME),
        password=Secret.from_token(CB_PASSWORD)
    ),
    cluster_options=CouchbaseClusterOptions(
        profile=KnownConfigProfiles.WanDevelopment,
    ),
    bucket=CB_BUCKET_NAME,
    scope=CB_SCOPE_NAME,
    collection=CB_COLLECTION_NAME,
    vector_search_index=CB_INDEX_NAME,
)

print("Couchbase document store initialized successfully.")
Couchbase document store initialized successfully.

Initialize Embedder for Document Embedding

Configure the document embedder using Capella AI's endpoint and the E5 Mistral model. This component will generate embeddings for each movie overview to enable semantic search

embedder = OpenAIDocumentEmbedder(
    api_key=Secret.from_token(OPENAI_API_KEY),
    model="text-embedding-3-large",
)

rag_embedder = OpenAITextEmbedder(
    api_key=Secret.from_token(OPENAI_API_KEY),
    model="text-embedding-3-large",
)

Initialize LLM Generator

Configure the LLM generator using Capella AI's endpoint and Llama 3.1 model. This component will generate natural language responses based on the retrieved documents.

llm = OpenAIGenerator(
    api_key=Secret.from_token(OPENAI_API_KEY),
    model="gpt-4o",
)

Create Indexing Pipeline

Build the pipeline for processing and indexing movie documents:

# Create indexing pipeline
index_pipeline = Pipeline()
index_pipeline.add_component("cleaner", DocumentCleaner())
index_pipeline.add_component("embedder", embedder)
index_pipeline.add_component("writer", DocumentWriter(document_store=document_store))

# Connect indexing components
index_pipeline.connect("cleaner.documents", "embedder.documents")
index_pipeline.connect("embedder.documents", "writer.documents")
<haystack.core.pipeline.pipeline.Pipeline object at 0x124121550>
🚅 Components
  - cleaner: DocumentCleaner
  - embedder: OpenAIDocumentEmbedder
  - writer: DocumentWriter
🛤️ Connections
  - cleaner.documents -> embedder.documents (list[Document])
  - embedder.documents -> writer.documents (list[Document])

Run Indexing Pipeline

Execute the pipeline for processing and indexing movie documents:

# Run indexing pipeline

if documents:
    # Process documents in batches for better performance
    batch_size = 100
    total_docs = len(documents[:200])
    
    for i in range(0, total_docs, batch_size):
        batch = documents[i:i + batch_size]
        result = index_pipeline.run({"cleaner": {"documents": batch}})
        print(f"Processed batch {i//batch_size + 1}: {len(batch)} documents")
    
    print(f"\nSuccessfully processed {total_docs} documents")
    print(f"Sample document metadata: {documents[0].meta}")
else:
    print("No documents created. Skipping indexing.")
Calculating embeddings: 4it [00:06,  1.73s/it]


Processed batch 1: 100 documents


Calculating embeddings: 4it [00:06,  1.66s/it]

Processed batch 2: 100 documents

Successfully processed 200 documents
Sample document metadata: {'title': 'Four Rooms', 'genres': '[{"id": 80, "name": "Crime"}, {"id": 35, "name": "Comedy"}]', 'original_language': 'en', 'popularity': 22.87623, 'release_date': '1995-12-09', 'vote_average': 6.5, 'vote_count': 530, 'budget': 4000000, 'revenue': 4300000}

Create RAG Pipeline

Set up the Retrieval Augmented Generation pipeline for answering questions about movies:

# Define RAG prompt template
prompt_template = """
Given these documents, answer the question.\nDocuments:
{% for doc in documents %}
    {{ doc.content }}
{% endfor %}

\nQuestion: {{question}}
\nAnswer:
"""

# Create RAG pipeline
rag_pipeline = Pipeline()

# Add components
rag_pipeline.add_component(
    "query_embedder",
    rag_embedder,
)
rag_pipeline.add_component("retriever", CouchbaseSearchEmbeddingRetriever(document_store=document_store))
rag_pipeline.add_component("prompt_builder", PromptBuilder(template=prompt_template))
rag_pipeline.add_component("llm",llm)
rag_pipeline.add_component("answer_builder", AnswerBuilder())

# Connect RAG components
rag_pipeline.connect("query_embedder", "retriever.query_embedding")
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder.prompt", "llm.prompt")
rag_pipeline.connect("llm.replies", "answer_builder.replies")
rag_pipeline.connect("llm.meta", "answer_builder.meta")
rag_pipeline.connect("retriever", "answer_builder.documents")

print("RAG pipeline created successfully.")
PromptBuilder has 2 prompt variables, but `required_variables` is not set. By default, all prompt variables are treated as optional, which may lead to unintended behavior in multi-branch pipelines. To avoid unexpected execution, ensure that variables intended to be required are explicitly set in `required_variables`.


RAG pipeline created successfully.

Ask Questions About Movies

Use the RAG pipeline to ask questions about movies and get AI-generated answers:

# Example question
question = "Why did Manni call Lolla?"

# Run the RAG pipeline
result = rag_pipeline.run(
    {
        "query_embedder": {"text": question},
        "retriever": {"top_k": 5},
        "prompt_builder": {"question": question},
        "answer_builder": {"query": question},
    },
    include_outputs_from={"retriever", "query_embedder"}
)

# Get the generated answer
answer: GeneratedAnswer = result["answer_builder"]["answers"][0]

# Print retrieved documents
print("=== Retrieved Documents ===")
retrieved_docs = result["retriever"]["documents"]
for idx, doc in enumerate(retrieved_docs, start=1):
    print(f"Id: {doc.id} Title: {doc.meta['title']}")

# Print final results
print("\n=== Final Answer ===")
print(f"Question: {answer.query}")
print(f"Answer: {answer.data}")
print("\nSources:")
for doc in answer.documents:
    print(f"-> {doc.meta['title']}")
=== Retrieved Documents ===
Id: 006b97c08110cb1b9b58e03943c91fa9412cfe7a2a22830ba5b9e3eb0c342344 Title: Run Lola Run
Id: 33543dab4c048c9467d632f319e02bca94da6f178250c14d26eabfb30911a823 Title: Mambo Italiano
Id: 94c55246e02c290767531f6359b5f44145191e3f2d62a3a64ed4718a666be9f2 Title: Good bye, Lenin!
Id: 00b4d1f455e45fbffa39f72be6de635bdcdb6b8a04289ba4aea41061700b9096 Title: Mean Streets
Id: 9241f819303fe61a25e05469856c01a8843d53a6ce7cec340bf0def848ddb470 Title: Magnolia

=== Final Answer ===
Question: Why did Manni call Lolla?
Answer: Manni called Lola because he lost 100,000 DM in a subway train that belongs to a very bad guy, and he needs her help to raise the money within 20 minutes to prevent him from having to rob a store to get the money.

Sources:
-> Run Lola Run
-> Mambo Italiano
-> Good bye, Lenin!
-> Mean Streets
-> Magnolia

Conclusion

In this tutorial, we built a Retrieval-Augmented Generation (RAG) system using Couchbase Capella, OpenAI, and Haystack with the BBC News dataset. This demonstrates how to combine Couchbase Search Vector Index with large language models to answer questions about current events using real-time information.

The key components include:

  • Couchbase Capella Search Vector Index for vector storage and retrieval
  • Haystack for pipeline orchestration and component management
  • OpenAI for embeddings (text-embedding-3-large) and text generation (gpt-4o)

This approach enables AI applications to access and reason over current information that extends beyond the LLM's training data, making responses more accurate and relevant for real-world use cases.


This tutorial is part of a Couchbase Learning Path:
Contents
Couchbase home page link

3155 Olsen Drive
Suite 150, San Jose
CA 95117, United States

  • Company
  • About
  • Leadership
  • News & Press
  • Careers
  • Events
  • Legal
  • Contact us
  • Support
  • Developer Portal
  • Documentation
  • Forums
  • Professional Services
  • Support Login
  • Support Policy
  • Training
  • Quicklinks
  • Blog
  • Downloads
  • Online Training
  • Resources
  • Why NoSQL
  • Pricing
  • Follow us
  • Social Media Link for TwitterTwitter
  • Social Media Link for LinkedInLinkedIn
  • Social Media Link for YoutubeYouTube
  • Social Media Link for FacebookFacebook
  • Social Media Link for GitHubGitHub
  • Social Media Link for Stack OverflowStack Overflow
  • Social Media Link for DiscordDiscord

© 2026 Couchbase, Inc. Couchbase and the Couchbase logo are registered trademarks of Couchbase, Inc. All third party trademarks (including logos and icons) referenced by Couchbase, Inc. remain the property of their respective owners.

  • Terms of Use
  • Privacy Policy
  • Cookie Policy
  • Support Policy
  • Do Not Sell My Personal Information
  • Marketing Preference Center
  • Trust Center