This notebook demonstrates how to implement a custom storage backend for CrewAI's memory system using Couchbase and Search Vector Index. Alternatively if you want to perform semantic search using Hyperscale or Composite Vector Index, please take a look at this tutorial.
Here's a breakdown of each section:
This tutorial is available as a Jupyter Notebook (.ipynb file) that you can run interactively. You can access the original notebook here.
You can either:
Create and Deploy Your Free Tier Operational cluster on Capella
Couchbase Capella Configuration When running Couchbase using Capella, the following prerequisites need to be met:
Memory in AI agents is a crucial capability that allows them to retain and utilize information across interactions, making them more effective and contextually aware. Without memory, agents would be limited to processing only the immediate input, lacking the ability to build upon past experiences or maintain continuity in conversations.
Note: This section on memory types and functionality is adapted from the CrewAI documentation.
Memory in AI agents typically involves:
In the CrewAI example, the CouchbaseStorage class implements:
In CrewAI, memory serves several important functions:
The vector-based approach (using embeddings) is particularly powerful because it allows for semantic search - finding memories that are conceptually related to the current context, not just exact keyword matches.
By implementing custom storage like Couchbase, you gain additional benefits like persistence, scalability, and the ability to leverage enterprise-grade database features for your agent memory systems.
This section installs the necessary Python packages:
crewai: The main CrewAI frameworklangchain-couchbase: LangChain integration for Couchbaselangchain-openai: LangChain integration for OpenAIpython-dotenv: For loading environment variables%pip install --quiet crewai==0.186.1 langchain-couchbase==1.0.1 langchain-openai python-dotenv==1.1.1Note: you may need to restart the kernel to use updated packages.The script starts by importing a series of libraries required for various tasks, including handling JSON, logging, time tracking, Couchbase connections, embedding generation, and dataset loading.
from typing import Any, Dict, List, Optional
import os
import logging
from datetime import timedelta
from dotenv import load_dotenv
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai import Agent, Crew, Task, Process
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.diagnostics import PingState, ServiceType
from langchain_couchbase.vectorstores import CouchbaseSearchVectorStore
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
import time
import json
import uuid
# Configure logging (disabled)
logging.basicConfig(level=logging.CRITICAL)
logger = logging.getLogger(__name__)In this section, we prompt the user to input essential configuration settings needed. These settings include sensitive information like database credentials, and specific configuration names. Instead of hardcoding these details into the script, we request the user to provide them at runtime, ensuring flexibility and security.
The script uses environment variables to store sensitive information, enhancing the overall security and maintainability of your code by avoiding hardcoded values.
Note: This implementation reads configuration parameters from environment variables. Before running this notebook, you need to set the following environment variables:
OPENAI_API_KEY: Your OpenAI API key for generating embeddingsCB_HOST: Couchbase cluster connection string (e.g., "couchbases://cb.example.com")CB_USERNAME: Username for Couchbase authenticationCB_PASSWORD: Password for Couchbase authenticationCB_BUCKET_NAME(optional): Bucket name (defaults to "vector-search-testing")SCOPE_NAME(optional): Scope name (defaults to "shared")COLLECTION_NAME(optional): Collection name (defaults to "crew")INDEX_NAME(optional): Vector search index name (defaults to "vector_search_crew")You can set these variables in a
.envfile in the same directory as this notebook, or set them directly in your environment.
If running in Google Colab, use the Secrets feature:
OPENAI_API_KEY, CB_HOST, CB_USERNAME, CB_PASSWORDfrom google.colab import userdata
import os
os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')
os.environ['CB_HOST'] = userdata.get('CB_HOST')
os.environ['CB_USERNAME'] = userdata.get('CB_USERNAME')
os.environ['CB_PASSWORD'] = userdata.get('CB_PASSWORD')load_dotenv("./.env")
# Verify environment variables
required_vars = ['OPENAI_API_KEY', 'CB_HOST', 'CB_USERNAME', 'CB_PASSWORD']
for var in required_vars:
if not os.getenv(var):
raise ValueError(f"{var} environment variable is required")This section demonstrates the implementation of a custom vector storage solution using Couchbase with Search Vector Index:
Note on Implementation: This example uses the LangChain Couchbase integration (
langchain_couchbase) for simplicity and to demonstrate integration with the broader LangChain ecosystem. In production environments, you may want to use the Couchbase SDK directly for better performance and more control.
For more information on using the Couchbase SDK directly and Search Vector Indexes, refer to:
class CouchbaseStorage(RAGStorage):
"""
Extends RAGStorage to handle embeddings for memory entries using Couchbase.
"""
def __init__(self, type: str, allow_reset: bool = True, embedder_config: Optional[Dict[str, Any]] = None, crew: Optional[Any] = None):
"""Initialize CouchbaseStorage with configuration."""
super().__init__(type, allow_reset, embedder_config, crew)
self._initialize_app()
def search(
self,
query: str,
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0,
) -> List[Dict[str, Any]]:
"""
Search memory entries using vector similarity.
"""
try:
# Add type filter
search_filter = {"memory_type": self.type}
if filter:
search_filter.update(filter)
# Execute search
results = self.vector_store.similarity_search_with_score(
query,
k=limit,
filter=search_filter
)
# Format results and deduplicate by content
seen_contents = set()
formatted_results = []
for i, (doc, score) in enumerate(results):
if score >= score_threshold:
content = doc.page_content
if content not in seen_contents:
seen_contents.add(content)
formatted_results.append({
"id": doc.metadata.get("memory_id", str(i)),
"metadata": doc.metadata,
"context": content,
"score": float(score)
})
logger.info(f"Found {len(formatted_results)} unique results for query: {query}")
return formatted_results
except Exception as e:
logger.error(f"Search failed: {str(e)}")
return []
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
"""
Save a memory entry with metadata.
"""
try:
# Generate unique ID
memory_id = str(uuid.uuid4())
timestamp = int(time.time() * 1000)
# Prepare metadata (create a copy to avoid modifying references)
if not metadata:
metadata = {}
else:
metadata = metadata.copy() # Create a copy to avoid modifying references
# Process agent-specific information if present
agent_name = metadata.get('agent', 'unknown')
# Clean up value if it has the typical LLM response format
value_str = str(value)
if "Final Answer:" in value_str:
# Extract just the actual content - everything after "Final Answer:"
parts = value_str.split("Final Answer:", 1)
if len(parts) > 1:
value = parts[1].strip()
logger.info(f"Cleaned up response format for agent: {agent_name}")
elif value_str.startswith("Thought:"):
# Handle thought/final answer format
if "Final Answer:" in value_str:
parts = value_str.split("Final Answer:", 1)
if len(parts) > 1:
value = parts[1].strip()
logger.info(f"Cleaned up thought process format for agent: {agent_name}")
# Update metadata
metadata.update({
"memory_id": memory_id,
"memory_type": self.type,
"timestamp": timestamp,
"source": "crewai"
})
# Log memory information for debugging
value_preview = str(value)[:100] + "..." if len(str(value)) > 100 else str(value)
metadata_preview = {k: v for k, v in metadata.items() if k != "embedding"}
logger.info(f"Saving memory for Agent: {agent_name}")
logger.info(f"Memory value preview: {value_preview}")
logger.info(f"Memory metadata: {metadata_preview}")
# Convert value to string if needed
if isinstance(value, (dict, list)):
value = json.dumps(value)
elif not isinstance(value, str):
value = str(value)
# Save to vector store
self.vector_store.add_texts(
texts=[value],
metadatas=[metadata],
ids=[memory_id]
)
logger.info(f"Saved memory {memory_id}: {value[:100]}...")
except Exception as e:
logger.error(f"Save failed: {str(e)}")
raise
def reset(self) -> None:
"""Reset the memory storage if allowed."""
if not self.allow_reset:
return
try:
# Delete documents of this memory type
self.cluster.query(
f"DELETE FROM `{self.bucket_name}`.`{self.scope_name}`.`{self.collection_name}` WHERE memory_type = $type",
type=self.type
).execute()
logger.info(f"Reset memory type: {self.type}")
except Exception as e:
logger.error(f"Reset failed: {str(e)}")
raise
def _initialize_app(self):
"""Initialize Couchbase connection and vector store."""
try:
# Initialize embeddings
if self.embedder_config and self.embedder_config.get("provider") == "openai":
self.embeddings = OpenAIEmbeddings(
openai_api_key=os.getenv('OPENAI_API_KEY'),
model=self.embedder_config.get("config", {}).get("model", "text-embedding-3-small")
)
else:
self.embeddings = OpenAIEmbeddings(
openai_api_key=os.getenv('OPENAI_API_KEY'),
model="text-embedding-3-small"
)
# Connect to Couchbase
auth = PasswordAuthenticator(
os.getenv('CB_USERNAME', ''),
os.getenv('CB_PASSWORD', '')
)
options = ClusterOptions(auth)
# Initialize cluster connection
self.cluster = Cluster(os.getenv('CB_HOST', ''), options)
self.cluster.wait_until_ready(timedelta(seconds=5))
# Check search service
ping_result = self.cluster.ping()
search_available = False
for service_type, endpoints in ping_result.endpoints.items():
if service_type == ServiceType.Search:
for endpoint in endpoints:
if endpoint.state == PingState.OK:
search_available = True
logger.info(f"Search service is responding at: {endpoint.remote}")
break
break
if not search_available:
raise RuntimeError("Search service not found or not responding")
# Set up storage configuration
self.bucket_name = os.getenv('CB_BUCKET_NAME', 'vector-search-testing')
self.scope_name = os.getenv('CB_SCOPE_NAME', 'shared')
self.collection_name = os.getenv('CB_COLLECTION_NAME', 'crew')
self.index_name = os.getenv('CB_INDEX_NAME', 'vector_search_crew')
# Initialize vector store
self.vector_store = CouchbaseSearchVectorStore(
cluster=self.cluster,
bucket_name=self.bucket_name,
scope_name=self.scope_name,
collection_name=self.collection_name,
embedding=self.embeddings,
index_name=self.index_name
)
logger.info(f"Initialized CouchbaseStorage for type: {self.type}")
except Exception as e:
logger.error(f"Initialization failed: {str(e)}")
raiseTest storing and retrieving a simple memory:
# Initialize storage
storage = CouchbaseStorage(
type="short_term",
embedder_config={
"provider": "openai",
"config": {"model": "text-embedding-3-small"}
}
)
# Reset storage
storage.reset()
# Test storage
test_memory = "Pep Guardiola praised Manchester City's current form, saying 'The team is playing well, we are in a good moment. The way we are training, the way we are playing - I am really pleased.'"
test_metadata = {"category": "sports", "test": "initial_memory"}
storage.save(test_memory, test_metadata)
# Test search
results = storage.search("What did Guardiola say about Manchester City?", limit=1)
for result in results:
print(f"Found: {result['context']}\nScore: {result['score']}\nMetadata: {result['metadata']}")Create agents and tasks to test memory retention:
# Initialize ShortTermMemory with our storage
memory = ShortTermMemory(storage=storage)
# Initialize language model
llm = ChatOpenAI(
model="gpt-4o",
temperature=0.7
)
# Create agents with memory
sports_analyst = Agent(
role='Sports Analyst',
goal='Analyze Manchester City performance',
backstory='Expert at analyzing football teams and providing insights on their performance',
llm=llm,
memory=True,
memory_storage=memory
)
journalist = Agent(
role='Sports Journalist',
goal='Create engaging football articles',
backstory='Experienced sports journalist who specializes in Premier League coverage',
llm=llm,
memory=True,
memory_storage=memory
)
# Create tasks
analysis_task = Task(
description='Analyze Manchester City\'s recent performance based on Pep Guardiola\'s comments: "The team is playing well, we are in a good moment. The way we are training, the way we are playing - I am really pleased."',
agent=sports_analyst,
expected_output="A comprehensive analysis of Manchester City's current form based on Guardiola's comments."
)
writing_task = Task(
description='Write a sports article about Manchester City\'s form using the analysis and Guardiola\'s comments.',
agent=journalist,
context=[analysis_task],
expected_output="An engaging sports article about Manchester City's current form and Guardiola's perspective."
)
# Create crew with memory
crew = Crew(
agents=[sports_analyst, journalist],
tasks=[analysis_task, writing_task],
process=Process.sequential,
memory=True,
short_term_memory=memory, # Explicitly pass our memory implementation
verbose=True
)
# Run the crew
result = crew.kickoff()
print("\nCrew Result:")
print("-" * 80)
print(result)
print("-" * 80)╭──────────────────────────────────────────── Crew Execution Started ─────────────────────────────────────────────╮ │ │ │ Crew Execution Started │ │ Name: crew │ │ ID: 7ac56ae1-b62f-4b07-952c-104a7243edb0 │ │ Tool Args: │ │ │ │ │ ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
/Users/viraj.agarwal/Tasks/Task10/.venv/lib/python3.13/site-packages/rich/live.py:256: UserWarning: install
"ipywidgets" for Jupyter support
warnings.warn('install "ipywidgets" for Jupyter support')
╭────────────────────────────────────────────── 🧠 Retrieved Memory ──────────────────────────────────────────────╮ │ │ │ Historical Data: │ │ - Ensure that the analysis contains specific examples or statistics to support the claims made about team │ │ performance. │ │ - Include insights from other sources or viewpoints to provide a well-rounded analysis. │ │ - Provide a comparison with past performance to highlight improvements or consistencies. │ │ - Include player-specific analysis if individual performance is hinted at in the comments. │ │ Entities: │ │ - Pep Guardiola(Football Manager): The current manager of Manchester City, known fo... │ │ │ ╰─────────────────────────────────────────── Retrieval Time: 1384.18ms ───────────────────────────────────────────╯
╭─────────────────────────────────────────────── 🤖 Agent Started ────────────────────────────────────────────────╮ │ │ │ Agent: Sports Analyst │ │ │ │ Task: Analyze Manchester City's recent performance based on Pep Guardiola's comments: "The team is playing │ │ well, we are in a good moment. The way we are training, the way we are playing - I am really pleased." │ │ │ ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
/Users/viraj.agarwal/Tasks/Task10/.venv/lib/python3.13/site-packages/rich/live.py:256: UserWarning: install
"ipywidgets" for Jupyter support
warnings.warn('install "ipywidgets" for Jupyter support')
/Users/viraj.agarwal/Tasks/Task10/.venv/lib/python3.13/site-packages/rich/live.py:256: UserWarning: install
"ipywidgets" for Jupyter support
warnings.warn('install "ipywidgets" for Jupyter support')
/Users/viraj.agarwal/Tasks/Task10/.venv/lib/python3.13/site-packages/rich/live.py:256: UserWarning: install
"ipywidgets" for Jupyter support
warnings.warn('install "ipywidgets" for Jupyter support')
╭──────────────────────────────────────────────── Task Completion ────────────────────────────────────────────────╮ │ │ │ Task Completed │ │ Name: 721d99b2-ac47-4976-8862-364bb668075e │ │ Agent: Sports Analyst │ │ Tool Args: │ │ │ │ │ ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
/Users/viraj.agarwal/Tasks/Task10/.venv/lib/python3.13/site-packages/rich/live.py:256: UserWarning: install
"ipywidgets" for Jupyter support
warnings.warn('install "ipywidgets" for Jupyter support')
╭────────────────────────────────────────────── 🧠 Retrieved Memory ──────────────────────────────────────────────╮ │ │ │ Historical Data: │ │ - Include specific quotes from Guardiola to enhance credibility. │ │ - Incorporate statistical data or match results to provide more depth. │ │ - Discuss recent matches or events in more detail. │ │ - Add perspectives from players or other analysts for a more rounded view. │ │ - Include potential future challenges for Manchester City. │ │ Entities: │ │ - Pep Guardiola(Individual): The manager of Manchester City, known for his tactical acumen and positive │ │ remarks about the team's performance. │ │ - Manch... │ │ │ ╰─────────────────────────────────────────── Retrieval Time: 991.13ms ────────────────────────────────────────────╯
╭─────────────────────────────────────────────── 🤖 Agent Started ────────────────────────────────────────────────╮ │ │ │ Agent: Sports Journalist │ │ │ │ Task: Write a sports article about Manchester City's form using the analysis and Guardiola's comments. │ │ │ ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
/Users/viraj.agarwal/Tasks/Task10/.venv/lib/python3.13/site-packages/rich/live.py:256: UserWarning: install
"ipywidgets" for Jupyter support
warnings.warn('install "ipywidgets" for Jupyter support')
/Users/viraj.agarwal/Tasks/Task10/.venv/lib/python3.13/site-packages/rich/live.py:256: UserWarning: install
"ipywidgets" for Jupyter support
warnings.warn('install "ipywidgets" for Jupyter support')
/Users/viraj.agarwal/Tasks/Task10/.venv/lib/python3.13/site-packages/rich/live.py:256: UserWarning: install
"ipywidgets" for Jupyter support
warnings.warn('install "ipywidgets" for Jupyter support')
╭──────────────────────────────────────────────── Task Completion ────────────────────────────────────────────────╮ │ │ │ Task Completed │ │ Name: 4fac1a2b-0fd1-484e-afe6-a4d4af236bd4 │ │ Agent: Sports Journalist │ │ Tool Args: │ │ │ │ │ ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
Crew Result:
--------------------------------------------------------------------------------
**Manchester City's Resilient Form Under Guardiola: A Symphony of Strategy and Skill**
In the ever-competitive landscape of the Premier League, Manchester City continues to set the benchmark for excellence, guided by the strategic genius of Pep Guardiola. Reflecting on their current form, Guardiola's satisfaction is palpable: "The team is playing well, we are in a good moment. The way we are training, the way we are playing - I am really pleased." These words not only highlight the team's current high morale but also underline the effectiveness of their training routines and the cohesive unit that Guardiola has meticulously crafted.
Historically, Manchester City has been a juggernaut in English football, and their recent performances are a testament to their sustained dominance. Their consistency in maintaining high possession rates and crafting scoring opportunities is unparalleled. Statistically, City often leads in metrics such as ball possession and pass accuracy, with figures regularly surpassing 60% possession in matches, illustrating their control and domination on the pitch.
Key to their success has been the stellar performances of individual players. Kevin De Bruyne's vision and precise passing have been instrumental in creating goal-scoring chances, while Erling Haaland's formidable goal-scoring abilities add a lethal edge to City's attack. Phil Foden's adaptability and technical prowess offer Guardiola the flexibility to shuffle tactics seamlessly. This trident of talent epitomizes the blend of skill and strategy that City embodies.
Defensively, Manchester City has shown marked improvement, a testament to Guardiola's focus on fortifying the backline. Their defensive solidity, coupled with an attacking flair, makes them a daunting adversary for any team. Guardiola's ability to adapt tactics to counter various styles of play is a hallmark of his tenure, ensuring City remains at the pinnacle of competition both domestically and on the European stage.
Analysts and pundits echo Guardiola's sentiments, praising Manchester City's ability to maintain elite standards and adapt to challenges with finesse. This holistic approach—encompassing rigorous training, strategic gameplay, and individual brilliance—cements Manchester City's status as leaders in football excellence.
However, the journey is far from over. As they navigate the rigors of the Premier League and European competitions, potential challenges loom. Sustaining fitness levels, managing squad rotations, and countering tactical innovations from rivals will be pivotal. Yet, with Guardiola at the helm, Manchester City is well-equipped to tackle these challenges head-on.
In conclusion, Manchester City's current form is a shining example of Guardiola's managerial prowess and the team's harmonious performance. Their continued success is a blend of strategic training, tactical adaptability, and outstanding individual contributions, positioning them as formidable contenders in any arena. As the season unfolds, fans and analysts alike will watch with bated breath to see how this footballing symphony continues to play out.
--------------------------------------------------------------------------------Query the stored memories to verify retention:
# Wait for memories to be stored
time.sleep(2)
# List all documents in the collection
try:
# Query to fetch all documents of this memory type
query_str = f"SELECT META().id, * FROM `{storage.bucket_name}`.`{storage.scope_name}`.`{storage.collection_name}` WHERE memory_type = $type"
query_result = storage.cluster.query(query_str, type=storage.type)
print(f"\nAll memory entries in Couchbase:")
print("-" * 80)
for i, row in enumerate(query_result, 1):
doc_id = row.get('id')
memory_id = row.get(storage.collection_name, {}).get('memory_id', 'unknown')
content = row.get(storage.collection_name, {}).get('text', '')[:100] + "..." # Truncate for readability
print(f"Entry {i}:")
print(f"ID: {doc_id}")
print(f"Memory ID: {memory_id}")
print(f"Content: {content}")
print("-" * 80)
except Exception as e:
print(f"Failed to list memory entries: {str(e)}")
# Test memory retention
memory_query = "What is Manchester City's current form according to Guardiola?"
memory_results = storage.search(
query=memory_query,
limit=5, # Increased to see more results
score_threshold=0.0 # Lower threshold to see all results
)
print("\nMemory Search Results:")
print("-" * 80)
for result in memory_results:
print(f"Context: {result['context']}")
print(f"Score: {result['score']}")
print("-" * 80)
# Try a more specific query to find agent interactions
interaction_query = "Manchester City playing style analysis tactical"
interaction_results = storage.search(
query=interaction_query,
limit=5,
score_threshold=0.0
)
print("\nAgent Interaction Memory Results:")
print("-" * 80)
for result in interaction_results:
print(f"Context: {result['context'][:200]}...") # Limit output size
print(f"Score: {result['score']}")
print("-" * 80)
All memory entries in Couchbase:
--------------------------------------------------------------------------------
Memory Search Results:
--------------------------------------------------------------------------------
Agent Interaction Memory Results:
--------------------------------------------------------------------------------