Back to the Couchbase homepageCouchbase logo
Couchbase Developer

  • Docs

    • Integrations
    • SDKs
    • Mobile SDKs

    • AI Developer
    • Backend
    • Full-stack
    • Mobile
    • Ops / DBA

    • Data Modeling
    • Scalability

  • Tutorials

    • Developer Community
    • Ambassador Program
  • Sign In
  • Try Free

Building Intelligent Agents with Amazon Bedrock (Lambda)

  • Learn how to build intelligent agents using Amazon Bedrock Agents with AWS Lambda and Couchbase as the vector store.
  • This tutorial demonstrates how to create specialized agents that can process documents and interact with external APIs using serverless Lambda functions.
  • You'll understand how to implement secure multi-agent architectures using Amazon Bedrock's agent capabilities with a Lambda-based approach.

View Source

AWS Bedrock Agents with Couchbase Vector Search - Lambda Approach

This notebook demonstrates the Lambda approach for implementing AWS Bedrock agents with Couchbase Vector Search. In this approach, the agent invokes AWS Lambda functions to execute operations.

We'll implement a multi-agent architecture with specialized agents for different tasks:

  • Researcher Agent: Searches for relevant documents in the vector store
  • Writer Agent: Formats and presents the research findings

Alternative Approaches

This notebook demonstrates the Lambda Approach for AWS Bedrock Agents. For comparison, you might also want to check out the Custom Control Approach, which handles agent tools directly in your application code instead of using AWS Lambda functions.

The Custom Control approach offers simpler setup and more direct control, but may not scale as well. You can find that implementation here: Custom Control Approach Notebook

Note: If the link above doesn't work in your Jupyter environment, you can navigate to the file manually in the awsbedrock-agents/custom-control-approach/ directory.

Overview

The Lambda approach for AWS Bedrock Agents delegates the execution of an agent's defined functions (tools) to backend AWS Lambda functions. When the agent decides to use a tool, Bedrock directly invokes the corresponding Lambda function that you've specified in the agent's action group configuration. This Lambda function receives the parameters from the agent, executes the necessary logic (e.g., querying a Couchbase vector store, calling other APIs, performing computations), and then returns the result to the Bedrock Agent. The agent can then use this result to continue its reasoning process or formulate a final response to the user. This architecture promotes a clean separation of concerns, allows tool logic to be developed and scaled independently, and leverages the serverless capabilities of AWS Lambda.

Key Steps & Concepts

  1. Define Agent Instructions & Tool Schema:

    • Instructions: Craft a clear prompt that tells the agent its purpose, capabilities, and how it should behave (e.g., "You are a research assistant that uses the SearchAndFormat tool...").
    • Function Schema: Define the structure of the tool(s) the agent can use. In this notebook, we define a single tool (e.g., searchAndFormatDocuments) that the agent will call. This schema specifies the function name, description, and its input parameters (e.g., query, k, style). This schema acts as the contract between the agent and the Lambda function.
  2. Implement Lambda Handler Function:

    • Create an AWS Lambda function (e.g., bedrock_agent_search_and_format.py) that contains the actual Python code to execute the tool's logic.
    • Event Handling: The Lambda handler receives an event payload from Bedrock. This payload includes details like the API path (which corresponds to the function name in the schema), HTTP method, and the parameters supplied by the agent.
    • Business Logic: Inside the Lambda, parse the incoming event, extract parameters, and perform the required actions. For this notebook, this involves:
      • Connecting to Couchbase.
      • Initializing the Bedrock Embeddings client.
      • Performing a vector similarity search using the provided query and k value.
      • Optionally, formatting the search results based on the style parameter (though in this specific example, the formatting is largely illustrative and the LLM does the heavy lifting of presentation).
    • Response Structure: The Lambda must return a JSON response in a specific format that Bedrock expects. This response typically includes the actionGroup, apiPath, httpMethod, httpStatusCode, and a responseBody containing the result of the tool execution (e.g., the search results as a string).
    • Deployment: Package the Lambda function with its dependencies (e.g., requirements.txt) into a .zip file. This notebook includes helper functions to automate packaging (using a Makefile) and deployment, including uploading to S3 if the package is large. The Lambda also needs an IAM role with permissions to run, write logs, and interact with Bedrock and any other required AWS services.
    • Environment Variables: The Lambda function is configured with environment variables (e.g., Couchbase connection details, Bedrock model IDs) to allow it to connect to necessary services without hardcoding credentials. These are set during the Lambda creation/update process in the notebook.
  3. Create Agent in AWS Bedrock:

    • Use the bedrock_agent_client.create_agent SDK call. Provide the agent name, the ARN of the IAM role it will assume, the foundation model ID (e.g., Claude Sonnet), and the instructions defined in step 1.
  4. Create Agent Action Group (Linking to Lambda):

    • Use bedrock_agent_client.create_agent_action_group.
    • actionGroupExecutor: This is the crucial part for the Lambda approach. Set it to {'lambda': 'arn:aws:lambda:<region>:<account_id>:function:<lambda_function_name>'}. This tells Bedrock to invoke your specific Lambda function when this action group is triggered.
    • functionSchema: Provide the function schema defined in step 1. This allows the agent to understand how to call the Lambda function (i.e., what parameters to send).
    • Give the action group a name (e.g., SearchAndFormatActionGroup).
  5. Prepare Agent:

    • Call bedrock_agent_client.prepare_agent with the agentId. This makes the DRAFT version of the agent (with its newly configured action group) ready for use. The notebook includes a custom waiter to poll until the agent status is PREPARED.
  6. Create or Update Agent Alias:

    • An alias (e.g., prod) is used to invoke a specific version of the agent. The notebook checks if an alias exists and creates one if not, pointing to the latest prepared (DRAFT) version. Use bedrock_agent_client.create_agent_alias or update_agent_alias.
  7. Invoke Agent:

    • Use bedrock_agent_runtime_client.invoke_agent, providing the agentId, agentAliasId, a unique sessionId, and the user's inputText (prompt).
    • Bedrock takes over: when the agent decides to use the tool from the action group, Bedrock transparently calls the configured Lambda function with the necessary parameters.
    • The Lambda executes, returns its result to the agent, and the agent uses this result to generate its final response.
    • Your application code simply waits for and processes the final streaming response from the invoke_agent call. Unlike the Custom Control approach, there's no returnControl event for the application to handle for tool execution; Bedrock manages the Lambda invocation directly.

Pros

  • Decoupling & Modularity: Tool execution logic is encapsulated within Lambda functions, separate from the main application code. This allows for independent development, deployment, and scaling of tools.
  • Scalability & Serverless: Leverages the inherent scalability, concurrency, and pay-per-use benefits of AWS Lambda for tool execution.
  • Managed Execution Environment: AWS manages the underlying infrastructure, runtime environment, and invocation mechanism for the Lambda functions.
  • Simpler Application-Level Code: The application that invokes the Bedrock agent doesn't need to implement the tool's logic itself or handle returnControl events for function execution, as Bedrock orchestrates the call to Lambda directly.

Cons

  • Deployment & Configuration Overhead: Requires setting up, packaging, configuring dependencies, and deploying separate Lambda functions. IAM roles and permissions for Lambdas also need careful management.
  • State Management: If tools need to share state or complex context with the Lambda function, this must be explicitly passed, often via environment variables or by including necessary lookup logic within the Lambda itself.
  • Cold Starts: AWS Lambda cold starts can introduce latency the first time a function is invoked after a period of inactivity, potentially affecting agent response time.
  • Debugging Complexity: Troubleshooting can be more involved as it spans across the Bedrock Agent service, the Lambda service, and potentially other services the Lambda interacts with (like Couchbase). Centralized logging (e.g., CloudWatch Logs for Lambda) is essential.
  • Cost: Incurs costs associated with Lambda invocations, execution duration, and any resources used by the Lambda (e.g., data transfer, provisioned concurrency if used).

1. Imports

This section imports all necessary Python libraries. These include:

  • Standard libraries: json for data handling, logging for progress and error messages, os for interacting with the operating system (e.g., file paths), subprocess for running external commands (like make for Lambda packaging), time for delays, traceback for detailed error reporting, uuid for generating unique identifiers, and shutil for file operations.
  • boto3 and botocore: The AWS SDK for Python, used to interact with AWS services like Bedrock, IAM, Lambda, and S3. Specific configurations (Config) and waiters are also imported for robust client interactions.
  • couchbase: The official Couchbase SDK for Python, used for connecting to and interacting with the Couchbase cluster, including managing buckets, collections, and search indexes. Specific exception classes are imported for error handling.
  • dotenv: For loading environment variables from a .env file, which helps manage configuration settings like API keys and connection strings securely.
  • langchain_aws and langchain_couchbase: Libraries from the LangChain ecosystem. BedrockEmbeddings is used to generate text embeddings via Amazon Bedrock, and CouchbaseSearchVectorStore provides an interface for using Couchbase as a vector store in LangChain applications.
import json
import logging
import os
import shutil
import subprocess
import time
import traceback
import uuid
from datetime import timedelta

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from botocore.waiter import WaiterModel, create_waiter_with_client
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.exceptions import (BucketNotFoundException, CouchbaseException,
                                  QueryIndexAlreadyExistsException)
from couchbase.management.buckets import BucketSettings, BucketType
from couchbase.management.collections import CollectionSpec
from couchbase.management.search import SearchIndex
from couchbase.options import ClusterOptions
from dotenv import load_dotenv
from langchain_aws import BedrockEmbeddings
from langchain_couchbase.vectorstores import CouchbaseSearchVectorStore

2. Configuration

This section handles the initial setup of essential configurations for the notebook:

  • Logging: Configures the logging module to output messages with a specific format (timestamp, level, message), which helps in tracking the script's execution and diagnosing issues.
  • Environment Variables: Attempts to load environment variables from a .env file located either in the current directory or the parent directory. This is a common practice to keep sensitive information like credentials and hostnames out of the codebase. If the .env file is not found, the script will rely on variables already set in the execution environment.
  • Couchbase Settings: Defines variables for connecting to Couchbase, including the host, username, password, and the names for the bucket, scope, collection, and search index that will be used for this experiment. Default values are provided if specific environment variables are not set.
  • AWS Settings: Defines variables for AWS configuration, such as the region, access key ID, secret access key, and AWS account ID. These are crucial for boto3 to interact with AWS services.
  • Bedrock Model IDs: Specifies the model identifiers for the Amazon Bedrock text embedding model (e.g., amazon.titan-embed-text-v2:0) and the foundation model to be used by the agent (e.g., anthropic.claude-3-sonnet-20240229-v1:0).
  • File Paths: Sets up variables for various file paths used throughout the notebook, such as the directory for schemas, the path to the Couchbase search index JSON definition, and the path to the JSON file containing documents to be loaded into the vector store. Using os.getcwd() makes these paths relative to the notebook's current working directory.
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Load environment variables from project root .env
# In a notebook environment, '__file__' is not defined. Use a relative path or absolute path directly.
# Assuming the notebook is run from the 'lambda-experiments' directory
dotenv_path = os.path.join(os.getcwd(), '.env') # Or specify the full path if needed
logger.info(f"Attempting to load .env file from: {dotenv_path}")
if os.path.exists(dotenv_path):
    load_dotenv(dotenv_path=dotenv_path)
    logger.info(".env file loaded successfully.")
else:
    # Try loading from parent directory if not found in current
    parent_dotenv_path = os.path.join(os.path.dirname(os.getcwd()), '.env')
    if os.path.exists(parent_dotenv_path):
        load_dotenv(dotenv_path=parent_dotenv_path)
        logger.info(f".env file loaded successfully from parent directory: {parent_dotenv_path}")
    else:
        logger.warning(f".env file not found at {dotenv_path} or {parent_dotenv_path}. Relying on environment variables.")


# Couchbase Configuration
CB_HOST = os.getenv("CB_HOST", "couchbase://localhost")
CB_USERNAME = os.getenv("CB_USERNAME", "Administrator")
CB_PASSWORD = os.getenv("CB_PASSWORD", "password")
# Using a new bucket/scope/collection for experiments to avoid conflicts
CB_BUCKET_NAME = os.getenv("CB_BUCKET_NAME", "vector-search-exp")
SCOPE_NAME = os.getenv("SCOPE_NAME", "bedrock_exp")
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "docs_exp")
INDEX_NAME = os.getenv("INDEX_NAME", "vector_search_bedrock_exp")

# AWS Configuration
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_ACCOUNT_ID = os.getenv("AWS_ACCOUNT_ID")

# Bedrock Model IDs
EMBEDDING_MODEL_ID = "amazon.titan-embed-text-v2:0"
AGENT_MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0" # Using Sonnet for the agent

# Paths (relative to the notebook's execution directory)
SCRIPT_DIR = os.getcwd() # Use current working directory for notebook context
SCHEMAS_DIR = os.path.join(SCRIPT_DIR, 'schemas') # New Schemas Dir
SEARCH_FORMAT_SCHEMA_PATH = os.path.join(SCHEMAS_DIR, 'search_and_format_schema.json') # Added
INDEX_JSON_PATH = os.path.join(SCRIPT_DIR, 'aws_index.json') # Keep
DOCS_JSON_PATH = os.path.join(SCRIPT_DIR, 'documents.json') # Changed to load from script's directory
2025-06-09 13:39:41,393 - INFO - Attempting to load .env file from: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/.env
2025-06-09 13:39:41,395 - INFO - .env file loaded successfully.

3. Helper Functions

This section defines a comprehensive suite of helper functions to modularize the various operations required throughout the notebook. These functions encapsulate specific tasks, making the main execution flow cleaner and easier to understand. The categories of helper functions include:

  • Environment and Client Initialization: Checking for necessary environment variables and setting up AWS SDK (boto3) clients for services like IAM, Lambda, Bedrock, and S3.
  • Couchbase Interaction: Connecting to the Couchbase cluster, and robustly setting up buckets, scopes, collections, and search indexes. Includes functions to clear data from collections for clean experimental runs.
  • IAM Role Management: Creating or retrieving the necessary IAM roles with appropriate trust policies and permissions that allow Bedrock Agents and Lambda functions to operate and interact with other AWS services securely.
  • Lambda Function Deployment: A set of functions to manage the lifecycle of the Lambda function that the agent will invoke. This includes packaging the Lambda code and its dependencies (using a Makefile), uploading the deployment package (to S3 if it's large), creating or updating the Lambda function in AWS, and deleting it for cleanup.
  • Bedrock Agent Resource Management: Functions for creating the Bedrock Agent itself, defining its action groups (which link the agent to the Lambda function via its ARN and define the tool schema), preparing the agent to make it invocable, and managing agent aliases. Also includes functions to delete these agent resources for cleanup.
  • Agent Invocation: A function to test the fully configured agent by sending it a prompt and processing its streamed response, including any trace information for debugging.

3.1 check_environment_variables

This function verifies that all critical environment variables required for the script to run (e.g., AWS credentials, Couchbase password, AWS Account ID) are set. It logs an error and returns False if any are missing, otherwise logs success and returns True.

def check_environment_variables():
    """Check if required environment variables are set."""
    required_vars = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_ACCOUNT_ID", "CB_PASSWORD"]
    missing_vars = [var for var in required_vars if not os.getenv(var)]
    if missing_vars:
        logger.error(f"Missing required environment variables: {', '.join(missing_vars)}")
        logger.error("Please set these variables in your environment or .env file")
        return False
    logger.info("All required environment variables are set.")
    return True

3.2 initialize_aws_clients

This function sets up and returns the necessary AWS SDK (boto3) clients for interacting with various AWS services. It initializes clients for Bedrock Runtime (for embeddings and agent invocation), IAM (for managing roles and policies), Lambda (for deploying and managing Lambda functions), Bedrock Agent (for creating and managing agents), and Bedrock Agent Runtime (for invoking agents). It uses credentials and region from the environment configuration and includes a custom configuration (agent_config) with longer timeouts and retries, which is particularly important for Bedrock Agent operations that can take more time, like agent preparation.

def initialize_aws_clients():
    """Initialize required AWS clients."""
    try:
        logger.info(f"Initializing AWS clients in region: {AWS_REGION}")
        session = boto3.Session(
            aws_access_key_id=AWS_ACCESS_KEY_ID,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
            region_name=AWS_REGION
        )
        # Use a config with longer timeouts for agent operations
        agent_config = Config(
            connect_timeout=120,
            read_timeout=600, # Agent preparation can take time
            retries={'max_attempts': 5, 'mode': 'adaptive'}
        )
        bedrock_runtime = session.client('bedrock-runtime', region_name=AWS_REGION)
        iam_client = session.client('iam', region_name=AWS_REGION) 
        lambda_client = session.client('lambda', region_name=AWS_REGION)
        bedrock_agent_client = session.client('bedrock-agent', region_name=AWS_REGION, config=agent_config) # Add agent client
        bedrock_agent_runtime_client = session.client('bedrock-agent-runtime', region_name=AWS_REGION, config=agent_config) # Add agent runtime client
        logger.info("AWS clients initialized successfully.")
        return bedrock_runtime, iam_client, lambda_client, bedrock_agent_client, bedrock_agent_runtime_client # Return agent runtime client
    except Exception as e:
        logger.error(f"Error initializing AWS clients: {e}")
        raise

3.3 connect_couchbase

This function establishes a connection to the Couchbase cluster using the connection string (CB_HOST), username, and password from the environment configuration. It uses PasswordAuthenticator for authentication and ClusterOptions for potentially customizing connection parameters (though commented out in the example, it shows where timeouts could be set). It waits for the cluster to be ready before returning the Cluster object, ensuring that subsequent operations can be performed reliably.

def connect_couchbase():
    """Connect to Couchbase cluster."""
    try:
        logger.info(f"Connecting to Couchbase cluster at {CB_HOST}...")
        auth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)
        # Use robust options
        options = ClusterOptions(
             auth,
        )
        cluster = Cluster(CB_HOST, options)
        cluster.wait_until_ready(timedelta(seconds=10)) # Wait longer if needed
        logger.info("Successfully connected to Couchbase.")
        return cluster
    except CouchbaseException as e:
        logger.error(f"Couchbase connection error: {e}")
        raise
    except Exception as e:
        logger.error(f"Unexpected error connecting to Couchbase: {e}")
        raise

3.4 setup_collection

This comprehensive function is responsible for ensuring that the required Couchbase bucket, scope, and collection are available for the agent's vector store. It performs the following steps idempotently:

  • Checks if the specified bucket (bucket_name) exists. If not, it creates the bucket with defined settings (e.g., RAM quota, flush enabled). It includes a pause to allow the bucket to become ready.
  • Checks if the specified scope (scope_name) exists within the bucket. If not, it creates the scope and includes a brief pause.
  • Checks if the specified collection (collection_name) exists within the scope. If not, it creates the collection using a CollectionSpec and pauses.
  • Ensures that a primary N1QL index exists on the collection, creating it if it's missing. This is often useful for administrative queries or simpler lookups, though not strictly for vector search itself.

Finally, it returns a Collection object representing the target collection for further operations.

Note: Bucket Creation will not work on Capella.

def setup_collection(cluster, bucket_name, scope_name, collection_name):
    """Set up Couchbase collection (Original Logic from lamda-approach)"""
    logger.info(f"Setting up collection: {bucket_name}/{scope_name}/{collection_name}")
    try:
        # Check if bucket exists, create if it doesn't
        try:
            bucket = cluster.bucket(bucket_name)
            logger.info(f"Bucket '{bucket_name}' exists.")
        except BucketNotFoundException:
            logger.info(f"Bucket '{bucket_name}' does not exist. Creating it...")
            # Use BucketSettings with potentially lower RAM for experiment
            bucket_settings = BucketSettings(
                name=bucket_name,
                bucket_type=BucketType.COUCHBASE,
                ram_quota_mb=256, # Adjusted from 1024
                flush_enabled=True,
                num_replicas=0
            )
            try:
                 cluster.buckets().create_bucket(bucket_settings)
                 # Wait longer after bucket creation
                 logger.info(f"Bucket '{bucket_name}' created. Waiting for ready state (10s)...")
                 time.sleep(10) 
                 bucket = cluster.bucket(bucket_name) # Re-assign bucket object
            except Exception as create_e:
                 logger.error(f"Failed to create bucket '{bucket_name}': {create_e}")
                 raise
        except Exception as e:
             logger.error(f"Error getting bucket '{bucket_name}': {e}")
             raise

        bucket_manager = bucket.collections()

        # Check if scope exists, create if it doesn't
        scopes = bucket_manager.get_all_scopes()
        scope_exists = any(s.name == scope_name for s in scopes)

        if not scope_exists:
            logger.info(f"Scope '{scope_name}' does not exist. Creating it...")
            try:
                 bucket_manager.create_scope(scope_name)
                 logger.info(f"Scope '{scope_name}' created. Waiting (2s)...")
                 time.sleep(2)
            except CouchbaseException as e:
                 # Handle potential race condition or already exists error more robustly
                 if "already exists" in str(e).lower() or "scope_exists" in str(e).lower():
                      logger.info(f"Scope '{scope_name}' likely already exists (caught during creation attempt).")
                 else:
                      logger.error(f"Failed to create scope '{scope_name}': {e}")
                      raise
        else:
             logger.info(f"Scope '{scope_name}' already exists.")

        # Check if collection exists, create if it doesn't
        # Re-fetch scopes in case it was just created
        scopes = bucket_manager.get_all_scopes()
        collection_exists = False
        for s in scopes:
             if s.name == scope_name:
                  if any(c.name == collection_name for c in s.collections):
                       collection_exists = True
                       break
        
        if not collection_exists:
            logger.info(f"Collection '{collection_name}' does not exist in scope '{scope_name}'. Creating it...")
            try:
                # Use CollectionSpec
                collection_spec = CollectionSpec(collection_name, scope_name)
                bucket_manager.create_collection(collection_spec)
                logger.info(f"Collection '{collection_name}' created. Waiting (2s)...")
                time.sleep(2)
            except CouchbaseException as e:
                 if "already exists" in str(e).lower() or "collection_exists" in str(e).lower():
                     logger.info(f"Collection '{collection_name}' likely already exists (caught during creation attempt).")
                 else:
                     logger.error(f"Failed to create collection '{collection_name}': {e}")
                     raise
        else:
            logger.info(f"Collection '{collection_name}' already exists.")

        # Ensure primary index exists
        try:
            logger.info(f"Ensuring primary index exists on `{bucket_name}`.`{scope_name}`.`{collection_name}`...")
            cluster.query(f"CREATE PRIMARY INDEX IF NOT EXISTS ON `{bucket_name}`.`{scope_name}`.`{collection_name}`").execute()
            logger.info("Primary index present or created successfully.")
        except Exception as e:
            logger.error(f"Error creating primary index: {str(e)}")
            # Decide if this is fatal

        logger.info("Collection setup complete.")
        # Return the collection object for use
        return cluster.bucket(bucket_name).scope(scope_name).collection(collection_name)

    except Exception as e:
        logger.error(f"Error setting up collection: {str(e)}")
        logger.error(traceback.format_exc())
        raise

3.5 setup_search_index

This function is responsible for creating or updating the Couchbase Search (FTS) index required for vector similarity search. Key operations include:

  • Loading the index definition from a specified JSON file (index_definition_path).
  • Dynamically updating the loaded index definition to use the correct index_name and sourceName (bucket name) provided as arguments. This allows for a template index definition file to be reused.
  • Using the SearchIndexManager (obtained from the cluster object) to upsert_index. Upserting means the index will be created if it doesn't exist, or updated if an index with the same name already exists. This makes the operation idempotent.
  • After submitting the upsert operation, it includes a pause (time.sleep) to allow Couchbase some time to start the indexing process in the background.

Important Note: The provided aws_index.json file has hardcoded references for the bucket, scope, and collection names. If you have used different names for your bucket, scope, or collection than the defaults specified in this notebook or your .env file, you must modify the aws_index.json file to reflect your custom names before running the next cell.

def setup_search_index(cluster, index_name, bucket_name, scope_name, collection_name, index_definition_path):
    """Set up search indexes (Original Logic, adapted) """
    try:
        logger.info(f"Looking for index definition at: {index_definition_path}")
        if not os.path.exists(index_definition_path):
             logger.error(f"Index definition file not found: {index_definition_path}")
             raise FileNotFoundError(f"Index definition file not found: {index_definition_path}")

        with open(index_definition_path, 'r') as file:
            index_definition = json.load(file)
            index_definition['name'] = index_name
            index_definition['sourceName'] = bucket_name
            logger.info(f"Loaded index definition from {index_definition_path}, ensuring name is '{index_name}' and source is '{bucket_name}'.")

    except Exception as e:
        logger.error(f"Error loading index definition: {str(e)}")
        raise

    try:
        # Use the SearchIndexManager from the Cluster object for cluster-level indexes
        # Or use scope-level if the index JSON is structured for that
        # Assuming cluster level based on original script structure for upsert
        search_index_manager = cluster.search_indexes()

        # Create SearchIndex object from potentially modified JSON definition
        search_index = SearchIndex.from_json(index_definition)

        # Upsert the index (create if not exists, update if exists)
        logger.info(f"Upserting search index '{index_name}'...")
        search_index_manager.upsert_index(search_index)

        # Wait for indexing
        logger.info(f"Index '{index_name}' upsert operation submitted. Waiting for indexing (10s)...")
        time.sleep(10)

        logger.info(f"Search index '{index_name}' setup complete.")

    except QueryIndexAlreadyExistsException:
        # This exception might not be correct for SearchIndexManager
        # Upsert should handle exists cases, but log potential specific errors
        logger.warning(f"Search index '{index_name}' likely already existed (caught QueryIndexAlreadyExistsException, check if applicable). Upsert attempted.")
    except CouchbaseException as e:
        logger.error(f"Couchbase error during search index setup for '{index_name}': {e}")
        raise
    except Exception as e:
        logger.error(f"Unexpected error during search index setup for '{index_name}': {e}")
        raise

3.6 clear_collection

This utility function is used to delete all documents from a specified Couchbase collection. It constructs and executes a N1QL DELETE query targeting the given bucket, scope, and collection. This is useful for ensuring a clean state before loading new data for an experiment, preventing interference from previous runs. It also attempts to log the number of mutations (deleted documents) if the query metrics are available.

def clear_collection(cluster, bucket_name, scope_name, collection_name):
    """Delete all documents from the specified collection (Original Logic)."""
    try:
        logger.warning(f"Attempting to clear all documents from `{bucket_name}`.`{scope_name}`.`{collection_name}`...")
        query = f"DELETE FROM `{bucket_name}`.`{scope_name}`.`{collection_name}`"
        result = cluster.query(query).execute()
        # Try to get mutation count, handle if not available
        mutation_count = 0
        try:
             metrics_data = result.meta_data().metrics()
             if metrics_data:
                  mutation_count = metrics_data.mutation_count()
        except Exception as metrics_e:
             logger.warning(f"Could not retrieve mutation count after delete: {metrics_e}")
        logger.info(f"Successfully cleared documents from the collection (approx. {mutation_count} mutations).")
    except Exception as e:
        logger.error(f"Error clearing documents from collection: {e}. Collection might be empty or index not ready.")

3.7 create_agent_role

This function creates or updates the necessary IAM (Identity and Access Management) role that the Bedrock Agent and its associated Lambda function will assume. The role needs permissions to interact with AWS services on your behalf. Key aspects of this function are:

  • Assume Role Policy: Defines which AWS services (principals) are allowed to assume this role. In this case, it allows both lambda.amazonaws.com (for the Lambda function execution) and bedrock.amazonaws.com (for the Bedrock Agent service itself).
  • Idempotency: It first checks if a role with the specified role_name already exists.
    • If it exists, the function retrieves its ARN and updates its trust policy to ensure it matches the required configuration.
    • If it doesn't exist, it creates a new IAM role with the defined assume role policy and description.
  • Permissions Policies:
    • Attaches the AWS managed policy AWSLambdaBasicExecutionRole, which grants the Lambda function permissions to write logs to CloudWatch.
    • Creates and attaches an inline policy (LambdaBasicLoggingPermissions) for more specific logging permissions if needed, scoped to the Lambda log group.
    • Creates and attaches an inline policy (BedrockAgentPermissions) granting broad bedrock:* permissions. For production, these permissions should be scoped down to the minimum required.
  • Propagation Delays: Includes time.sleep calls after creating the role and after attaching policies to allow time for the changes to propagate within AWS, which helps prevent subsequent operations from failing due to eventual consistency issues.

It returns the ARN (Amazon Resource Name) of the created or updated IAM role, which is then used when creating the Bedrock Agent and the Lambda function.

def create_agent_role(iam_client, role_name, aws_account_id):
    """Creates or gets the IAM role for the Bedrock Agent Lambda functions."""
    logger.info(f"Checking/Creating IAM role: {role_name}")
    assume_role_policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "Service": [
                        "lambda.amazonaws.com",
                        "bedrock.amazonaws.com" 
                    ]
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }

    role_arn = None
    try:
        # Check if role exists
        get_role_response = iam_client.get_role(RoleName=role_name)
        role_arn = get_role_response['Role']['Arn']
        logger.info(f"IAM role '{role_name}' already exists with ARN: {role_arn}")
        
        # Ensure trust policy is up-to-date
        logger.info(f"Updating trust policy for existing role '{role_name}'...")
        iam_client.update_assume_role_policy(
            RoleName=role_name,
            PolicyDocument=json.dumps(assume_role_policy_document)
        )
        logger.info(f"Trust policy updated for role '{role_name}'.")

    except iam_client.exceptions.NoSuchEntityException:
        logger.info(f"IAM role '{role_name}' not found. Creating...")
        try:
            create_role_response = iam_client.create_role(
                RoleName=role_name,
                AssumeRolePolicyDocument=json.dumps(assume_role_policy_document),
                Description='IAM role for Bedrock Agent Lambda functions (Experiment)',
                MaxSessionDuration=3600
            )
            role_arn = create_role_response['Role']['Arn']
            logger.info(f"Successfully created IAM role '{role_name}' with ARN: {role_arn}")
            # Wait after role creation before attaching policies
            logger.info("Waiting 15s for role creation propagation...")
            time.sleep(15)
        except ClientError as e:
            logger.error(f"Error creating IAM role '{role_name}': {e}")
            raise
            
    except ClientError as e:
        logger.error(f"Error getting/updating IAM role '{role_name}': {e}")
        raise
        
    # Attach basic execution policy (idempotent)
    try:
        logger.info(f"Attaching basic Lambda execution policy to role '{role_name}'...")
        iam_client.attach_role_policy(
            RoleName=role_name,
            PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
        )
        logger.info("Attached basic Lambda execution policy.")
    except ClientError as e:
        logger.error(f"Error attaching basic Lambda execution policy: {e}")
        # Don't necessarily raise, might already be attached or other issue
        
    # Add minimal inline policy for logging (can be expanded later if needed)
    basic_inline_policy_name = "LambdaBasicLoggingPermissions"
    basic_inline_policy_doc = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream",
                    "logs:PutLogEvents"
                ],
                "Resource": f"arn:aws:logs:{AWS_REGION}:{aws_account_id}:log-group:/aws/lambda/*:*" # Scope down logs if possible
            }
            # Add S3 permissions here ONLY if Lambda code explicitly needs it
        ]
    }
    
    # Add Bedrock permissions policy
    bedrock_policy_name = "BedrockAgentPermissions"
    bedrock_policy_doc = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "bedrock:*"
                ],
                "Resource": "*"  # You can scope this down to specific agents/models if needed
            }
        ]
    }
    try:
        logger.info(f"Putting basic inline policy '{basic_inline_policy_name}' for role '{role_name}'...")
        iam_client.put_role_policy(
            RoleName=role_name,
            PolicyName=basic_inline_policy_name,
            PolicyDocument=json.dumps(basic_inline_policy_doc)
        )
        logger.info(f"Successfully put inline policy '{basic_inline_policy_name}'.")
        
        # Add Bedrock permissions policy
        logger.info(f"Putting Bedrock permissions policy '{bedrock_policy_name}' for role '{role_name}'...")
        iam_client.put_role_policy(
            RoleName=role_name,
            PolicyName=bedrock_policy_name,
            PolicyDocument=json.dumps(bedrock_policy_doc)
        )
        logger.info(f"Successfully put inline policy '{bedrock_policy_name}'.")
        
        logger.info("Waiting 10s for policy changes to propagate...")
        time.sleep(10)
    except ClientError as e:
        logger.error(f"Error putting inline policy: {e}")
        # Decide if this is fatal
        
    if not role_arn:
         raise Exception(f"Failed to create or retrieve ARN for role {role_name}")
         
    return role_arn

3.8 Lambda Deployment Functions

This subsection groups together several helper functions dedicated to managing the deployment lifecycle of the AWS Lambda function that will serve as the tool executor for the Bedrock Agent. These functions handle packaging the Lambda code, managing its dependencies, deploying it to AWS, and cleaning up resources.

3.8.1 delete_lambda_function

This function is designed to safely delete an AWS Lambda function. Before attempting to delete the function itself, it tries to remove any permissions associated with it (specifically, the permission allowing Bedrock to invoke it, using a predictable statement ID). It then checks if the function exists and, if so, proceeds with the deletion. The function includes a brief pause after initiating deletion, as the process is asynchronous. It returns True if deletion was attempted/occurred and False if the function didn't exist or if an error occurred during the process.

def delete_lambda_function(lambda_client, function_name):
    """Delete Lambda function if it exists, attempting to remove permissions first."""
    logger.info(f"Attempting to delete Lambda function: {function_name}...")
    try:
        # Use a predictable statement ID added by create_lambda_function
        statement_id = f"AllowBedrockInvokeBasic-{function_name}"
        try:
            logger.info(f"Attempting to remove permission {statement_id} from {function_name}...")
            lambda_client.remove_permission(
                FunctionName=function_name,
                StatementId=statement_id
            )
            logger.info(f"Successfully removed permission {statement_id} from {function_name}.")
            time.sleep(2) # Allow time for permission removal
        except lambda_client.exceptions.ResourceNotFoundException:
            logger.info(f"Permission {statement_id} not found on {function_name}. Skipping removal.")
        except ClientError as perm_e:
            # Log error but continue with deletion attempt
            logger.warning(f"Error removing permission {statement_id} from {function_name}: {str(perm_e)}")

        # Check if function exists before attempting deletion
        lambda_client.get_function(FunctionName=function_name)
        logger.info(f"Function {function_name} exists. Deleting...")
        lambda_client.delete_function(FunctionName=function_name)

        # Wait for deletion to complete using a waiter
        logger.info(f"Waiting for {function_name} to be deleted...")
        time.sleep(10) # Simple delay after delete call
        logger.info(f"Function {function_name} deletion initiated.")

        return True # Indicates deletion was attempted/occurred

    except lambda_client.exceptions.ResourceNotFoundException:
        logger.info(f"Lambda function '{function_name}' does not exist. No need to delete.")
        return False # Indicates function didn't exist
    except Exception as e:
        logger.error(f"Error during deletion process for Lambda function '{function_name}': {str(e)}")
        # Depending on severity, might want to raise or just return False
        return False # Indicates an error occurred beyond not found

3.8.2 upload_to_s3

This function handles uploading a Lambda deployment package (a .zip file) to Amazon S3. This is necessary when the package size exceeds the direct upload limit for Lambda. Key features include:

  • Bucket Management: It generates a unique S3 bucket name (prefixed with lambda-deployment-) using the AWS account ID and a timestamp, or a fallback UUID if the account ID isn't available. It checks if this bucket exists and creates it if not, ensuring the correct region is specified for bucket creation. It also uses a waiter to ensure the bucket is available before proceeding.
  • S3 Key Generation: Creates a unique S3 key (object path) for the uploaded file, incorporating the original filename and a UUID to prevent collisions.
  • Multipart Upload: For large files (currently > 100MB in the code, though the Lambda direct upload limit is typically around 50MB for the zip, and 250MB unzipped including layers, so S3 is used for packages over ~45-50MB in this notebook), it uses boto3.s3.transfer.S3Transfer for robust multipart uploads. For smaller files, it uses a standard put_object call.
  • Retry Configuration: Initializes the S3 and STS clients with a configuration that includes increased timeouts and retries for better resilience.

It returns a dictionary containing the S3Bucket and S3Key of the uploaded package, which is then used by the create_lambda_function.

def upload_to_s3(zip_file, region, bucket_name=None):
    """Upload zip file to S3 with retry logic and return S3 location."""
    logger.info(f"Preparing to upload {zip_file} to S3 in region {region}...")
    # Configure the client with increased timeouts
    config = Config(
        connect_timeout=60,
        read_timeout=300,
        retries={'max_attempts': 3, 'mode': 'adaptive'}
    )

    s3_client = boto3.client('s3', region_name=region, config=config)
    sts_client = boto3.client('sts', region_name=region, config=config)

    # Determine bucket name
    if bucket_name is None:
        try:
            account_id = sts_client.get_caller_identity().get('Account')
            timestamp = int(time.time())
            bucket_name = f"lambda-deployment-{account_id}-{timestamp}"
            logger.info(f"Generated unique S3 bucket name: {bucket_name}")
        except Exception as e:
            fallback_id = uuid.uuid4().hex[:12]
            bucket_name = f"lambda-deployment-{fallback_id}"
            logger.warning(f"Error getting account ID ({e}). Using fallback bucket name: {bucket_name}")

    # Create bucket if needed
    try:
        s3_client.head_bucket(Bucket=bucket_name)
        logger.info(f"Using existing S3 bucket: {bucket_name}")
    except ClientError as e:
        error_code = int(e.response['Error']['Code'])
        if error_code == 404:
            logger.info(f"Creating S3 bucket: {bucket_name}...")
            try:
                if region == 'us-east-1':
                    s3_client.create_bucket(Bucket=bucket_name)
                else:
                    s3_client.create_bucket(
                        Bucket=bucket_name,
                        CreateBucketConfiguration={'LocationConstraint': region}
                    )
                logger.info(f"Created S3 bucket: {bucket_name}. Waiting for availability...")
                waiter = s3_client.get_waiter('bucket_exists')
                waiter.wait(Bucket=bucket_name, WaiterConfig={'Delay': 5, 'MaxAttempts': 12})
                logger.info(f"Bucket {bucket_name} is available.")
            except Exception as create_e:
                logger.error(f"Error creating bucket '{bucket_name}': {create_e}")
                raise
        else:
            logger.error(f"Error checking bucket '{bucket_name}': {e}")
            raise

    # Upload file
    s3_key = f"lambda/{os.path.basename(zip_file)}-{uuid.uuid4().hex[:8]}"
    try:
        logger.info(f"Uploading {zip_file} to s3://{bucket_name}/{s3_key}...")
        file_size = os.path.getsize(zip_file)
        if file_size > 100 * 1024 * 1024:  # Use multipart for files > 100MB
            logger.info("Using multipart upload for large file...")
            transfer_config = boto3.s3.transfer.TransferConfig(
                multipart_threshold=10 * 1024 * 1024, max_concurrency=10,
                multipart_chunksize=10 * 1024 * 1024, use_threads=True
            )
            s3_transfer = boto3.s3.transfer.S3Transfer(client=s3_client, config=transfer_config)
            s3_transfer.upload_file(zip_file, bucket_name, s3_key)
        else:
            with open(zip_file, 'rb') as f:
                s3_client.put_object(Bucket=bucket_name, Key=s3_key, Body=f)

        logger.info(f"Successfully uploaded to s3://{bucket_name}/{s3_key}")
        return {'S3Bucket': bucket_name, 'S3Key': s3_key}

    except Exception as upload_e:
        logger.error(f"S3 upload failed: {upload_e}")
        raise

3.8.3 package_function

This function automates the process of packaging the Lambda function code and its dependencies into a .zip file, ready for deployment. It relies on a Makefile located in the source_dir (which is lambda_functions in this notebook). The steps are:

  1. Path Setup: Defines various paths for source files, the temporary packaging directory, the Makefile, and the final output .zip file.
  2. File Preparation: It copies the specific Lambda handler script (e.g., bedrock_agent_search_and_format.py) to lambda_function.py within the source_dir because the Makefile is likely configured to look for a generic lambda_function.py.
  3. Execute Makefile: It runs a make clean package command using subprocess.check_call. The make command is executed with the source_dir as its current working directory. The Makefile is responsible for creating a virtual environment, installing dependencies from requirements.txt into a temporary package_dir, and then zipping the contents of this directory along with lambda_function.py into lambda_package.zip within the source_dir.
  4. Output Handling: After the make command successfully completes, it moves and renames the generated lambda_package.zip from the source_dir to the specified build_dir (the notebook's current directory in this case) with a name like function_name.zip.
  5. Cleanup: In a finally block, it cleans up the temporary lambda_function.py copied earlier and any intermediate lambda_package.zip left in the source_dir (e.g., if the rename/move failed).

The function returns the path to the final .zip file.

def package_function(function_name, source_dir, build_dir):
    """Package Lambda function using Makefile found in source_dir."""
    # source_dir is where the .py, requirements.txt, Makefile live (e.g., lambda_functions)
    # build_dir is where packaging happens and final zip ends up (e.g., lambda-experiments)
    makefile_path = os.path.join(source_dir, 'Makefile')
    # Temp build dir inside source_dir, as Makefile expects relative paths
    temp_package_dir = os.path.join(source_dir, 'package_dir') 
    # Requirements file is in source_dir
    source_req_path = os.path.join(source_dir, 'requirements.txt') 
    # Target requirements path inside source_dir (needed for Makefile)
    # target_req_path = os.path.join(source_dir, 'requirements.txt') # No copy needed if running make in source_dir
    source_func_script_path = os.path.join(source_dir, f'{function_name}.py')
    # Target function script path inside source_dir, renamed for Makefile install_deps copy
    target_func_script_path = os.path.join(source_dir, 'lambda_function.py') 
    # Make output zip is created inside source_dir
    make_output_zip = os.path.join(source_dir, 'lambda_package.zip') 
    # Final zip path is in the build_dir (one level up from source_dir)
    final_zip_path = os.path.join(build_dir, f'{function_name}.zip')

    logger.info(f"--- Packaging function {function_name} --- ")
    logger.info(f"Source Dir (Makefile location & make cwd): {source_dir}")
    logger.info(f"Build Dir (Final zip location): {build_dir}")

    if not os.path.exists(source_func_script_path):
        raise FileNotFoundError(f"Source function script not found: {source_func_script_path}")
    if not os.path.exists(source_req_path):
        raise FileNotFoundError(f"Source requirements file not found: {source_req_path}")
    if not os.path.exists(makefile_path):
        raise FileNotFoundError(f"Makefile not found at: {makefile_path}")

    # Ensure no leftover target script from previous failed run
    if os.path.exists(target_func_script_path):
        logger.warning(f"Removing existing target script: {target_func_script_path}")
        os.remove(target_func_script_path)

    try:
        # 1. No need to create lambda subdir in build_dir

        # 2. Copy source function script to source_dir as lambda_function.py
        logger.info(f"Copying {source_func_script_path} to {target_func_script_path}")
        shutil.copy(source_func_script_path, target_func_script_path)
        # Requirements file is already in source_dir, no copy needed.

        # 3. Run make command (execute from source_dir where Makefile is)
        make_command = [
            'make',
            '-f', makefile_path, # Still specify Makefile path explicitly
            'clean', # Clean first
            'package',
            # 'PYTHON_VERSION=python3.9' # Let Makefile use its default or system default
        ]
        logger.info(f"Running make command: {' '.join(make_command)} (in {source_dir})")
        # Run make from source_dir; relative paths in Makefile should now work
        subprocess.check_call(make_command, cwd=source_dir, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
        logger.info("Make command completed successfully.")

        # 4. Check for output zip in source_dir and rename/move to build_dir
        if not os.path.exists(make_output_zip):
            raise FileNotFoundError(f"Makefile did not produce expected output: {make_output_zip}")

        logger.info(f"Moving and renaming {make_output_zip} to {final_zip_path}")
        if os.path.exists(final_zip_path):
             logger.warning(f"Removing existing final zip: {final_zip_path}")
             os.remove(final_zip_path)
        # Use shutil.move for cross-filesystem safety if needed, os.rename is fine here
        os.rename(make_output_zip, final_zip_path) 
        logger.info(f"Zip file ready: {final_zip_path}")

        return final_zip_path

    except subprocess.CalledProcessError as e:
        logger.error(f"Error running Makefile for {function_name}: {e}")
        stderr_output = "(No stderr captured)"
        if e.stderr:
             try:
                  stderr_output = e.stderr.decode()
             except Exception:
                  stderr_output = "(Could not decode stderr)"
        logger.error(f"Make stderr: {stderr_output}")
        raise
    except Exception as e:
        logger.error(f"Error packaging function {function_name} using Makefile: {str(e)}")
        logger.error(traceback.format_exc())
        raise
    finally:
        # 5. Clean up intermediate files in source_dir
        if os.path.exists(target_func_script_path):
            logger.info(f"Cleaning up temporary script: {target_func_script_path}")
            os.remove(target_func_script_path)
        if os.path.exists(make_output_zip): # If rename failed
            logger.warning(f"Cleaning up intermediate zip in source dir: {make_output_zip}")
            os.remove(make_output_zip)

3.8.4 create_lambda_function

This is a key function that handles the creation or update of the AWS Lambda function. It incorporates several important aspects for robustness and proper configuration:

  • Package Handling: It checks the size of the deployment .zip file. If it's over a threshold (45MB in this code, as Lambda has limits for direct uploads), it calls upload_to_s3 to upload the package to S3 and uses the S3 location for deployment. Otherwise, it reads the .zip file content directly for deployment.
  • Configuration: Defines common arguments for Lambda creation/update, including the function name, runtime (python3.9), IAM role ARN, handler name, timeout, memory size, and crucial environment variables (Couchbase details, Bedrock model IDs) that the Lambda will need at runtime.
  • Idempotency & Retry Logic: It first attempts to create the Lambda function.
    • If it encounters a ResourceConflictException (meaning the function already exists), it then attempts to update the function's code and configuration.
    • It includes a retry loop for both creation and update operations to handle potential throttling or other transient AWS issues, with an exponential backoff strategy.
  • Permissions: After successfully creating or updating the Lambda, it adds a resource-based policy (permission) to the Lambda function. This permission specifically allows the Bedrock service (bedrock.amazonaws.com) to invoke this Lambda function. It uses a predictable StatementId and handles potential conflicts if the permission already exists.
  • Waiters: It uses boto3 waiters (function_active_v2 after creation, function_updated_v2 after update) to pause execution until the Lambda function becomes fully active and ready, preventing issues where subsequent operations might target a Lambda that isn't fully initialized.

The function returns the ARN of the successfully created or updated Lambda function.

def create_lambda_function(lambda_client, function_name, handler, role_arn, zip_file, region):
    """Create or update Lambda function with retry logic."""
    logger.info(f"Deploying Lambda function {function_name} from {zip_file}...")

    # Configure the client with increased timeouts for potentially long creation
    config = Config(
        connect_timeout=120,
        read_timeout=300,
        retries={'max_attempts': 5, 'mode': 'adaptive'}
    )
    lambda_client_local = boto3.client('lambda', region_name=region, config=config)

    # Check zip file size
    zip_size_mb = 0
    try:
        zip_size_bytes = os.path.getsize(zip_file)
        zip_size_mb = zip_size_bytes / (1024 * 1024)
        logger.info(f"Zip file size: {zip_size_mb:.2f} MB")
    except OSError as e:
         logger.error(f"Could not get size of zip file {zip_file}: {e}")
         raise # Cannot proceed without zip file

    use_s3 = zip_size_mb > 45 # Use S3 for packages over ~45MB
    s3_location = None
    zip_content = None

    if use_s3:
        logger.info(f"Package size ({zip_size_mb:.2f} MB) requires S3 deployment.")
        s3_location = upload_to_s3(zip_file, region)
        if not s3_location:
             raise Exception("Failed to upload Lambda package to S3.")
    else:
         logger.info("Deploying package directly.")
         try:
             with open(zip_file, 'rb') as f:
                 zip_content = f.read()
         except OSError as e:
              logger.error(f"Could not read zip file {zip_file}: {e}")
              raise

    # Define common create/update args
    common_args = {
        'FunctionName': function_name,
        'Runtime': 'python3.9',
        'Role': role_arn,
        'Handler': handler,
        'Timeout': 180,
        'MemorySize': 1536, # Adjust as needed
        # Env vars loaded from main script env or .env
        'Environment': {
            'Variables': {
                'CB_HOST': os.getenv('CB_HOST', 'couchbase://localhost'),
                'CB_USERNAME': os.getenv('CB_USERNAME', 'Administrator'),
                'CB_PASSWORD': os.getenv('CB_PASSWORD', 'password'),
                'CB_BUCKET_NAME': os.getenv('CB_BUCKET_NAME', 'vector-search-exp'),
                'SCOPE_NAME': os.getenv('SCOPE_NAME', 'bedrock_exp'),
                'COLLECTION_NAME': os.getenv('COLLECTION_NAME', 'docs_exp'),
                'INDEX_NAME': os.getenv('INDEX_NAME', 'vector_search_bedrock_exp'),
                'EMBEDDING_MODEL_ID': os.getenv('EMBEDDING_MODEL_ID', EMBEDDING_MODEL_ID),
                'AGENT_MODEL_ID': os.getenv('AGENT_MODEL_ID', AGENT_MODEL_ID)
            }
        }
    }

    if use_s3:
        code_arg = {'S3Bucket': s3_location['S3Bucket'], 'S3Key': s3_location['S3Key']}
    else:
        code_arg = {'ZipFile': zip_content}

    max_retries = 3
    base_delay = 10
    for attempt in range(1, max_retries + 1):
        try:
            logger.info(f"Creating function '{function_name}' (attempt {attempt}/{max_retries})...")
            create_args = common_args.copy()
            create_args['Code'] = code_arg
            create_args['Publish'] = True # Publish a version

            create_response = lambda_client_local.create_function(**create_args)
            function_arn = create_response['FunctionArn']
            logger.info(f"Successfully created function '{function_name}' with ARN: {function_arn}")

            # Add basic invoke permission after creation
            time.sleep(5) # Give function time to be fully created before adding policy
            statement_id = f"AllowBedrockInvokeBasic-{function_name}"
            try:
                logger.info(f"Adding basic invoke permission ({statement_id}) to {function_name}...")
                lambda_client_local.add_permission(
                    FunctionName=function_name,
                    StatementId=statement_id,
                    Action='lambda:InvokeFunction',
                    Principal='bedrock.amazonaws.com'
                )
                logger.info(f"Successfully added basic invoke permission {statement_id}.")
            except lambda_client_local.exceptions.ResourceConflictException:
                 logger.info(f"Permission {statement_id} already exists for {function_name}. Skipping add.")
            except ClientError as perm_e:
                logger.warning(f"Failed to add basic invoke permission {statement_id} to {function_name}: {perm_e}")

            # Wait for function to be Active
            logger.info(f"Waiting for function '{function_name}' to become active...")
            waiter = lambda_client_local.get_waiter('function_active_v2')
            waiter.wait(FunctionName=function_name, WaiterConfig={'Delay': 5, 'MaxAttempts': 24})
            logger.info(f"Function '{function_name}' is active.")

            return function_arn # Return ARN upon successful creation

        except lambda_client_local.exceptions.ResourceConflictException:
             logger.warning(f"Function '{function_name}' already exists. Attempting to update code...")
             try:
                 if use_s3:
                     update_response = lambda_client_local.update_function_code(
                         FunctionName=function_name,
                         S3Bucket=s3_location['S3Bucket'],
                         S3Key=s3_location['S3Key'],
                         Publish=True
                     )
                 else:
                     update_response = lambda_client_local.update_function_code(
                         FunctionName=function_name,
                         ZipFile=zip_content,
                         Publish=True
                     )
                 function_arn = update_response['FunctionArn']
                 logger.info(f"Successfully updated function code for '{function_name}'. New version ARN: {function_arn}")
                 
                 # Also update configuration just in case
                 try:
                      logger.info(f"Updating configuration for '{function_name}'...")
                      lambda_client_local.update_function_configuration(**common_args)
                      logger.info(f"Configuration updated for '{function_name}'.")
                 except ClientError as conf_e:
                      logger.warning(f"Could not update configuration for '{function_name}': {conf_e}")
                 
                 # Re-verify invoke permission after update
                 time.sleep(5)
                 statement_id = f"AllowBedrockInvokeBasic-{function_name}"
                 try:
                     logger.info(f"Verifying/Adding basic invoke permission ({statement_id}) after update...")
                     lambda_client_local.add_permission(
                         FunctionName=function_name,
                         StatementId=statement_id,
                         Action='lambda:InvokeFunction',
                         Principal='bedrock.amazonaws.com'
                     )
                     logger.info(f"Successfully added/verified basic invoke permission {statement_id}.")
                 except lambda_client_local.exceptions.ResourceConflictException:
                     logger.info(f"Permission {statement_id} already exists for {function_name}. Skipping add.")
                 except ClientError as perm_e:
                     logger.warning(f"Failed to add/verify basic invoke permission {statement_id} after update: {perm_e}")

                 # Wait for function to be Active after update
                 logger.info(f"Waiting for function '{function_name}' update to complete...")
                 waiter = lambda_client_local.get_waiter('function_updated_v2')
                 waiter.wait(FunctionName=function_name, WaiterConfig={'Delay': 5, 'MaxAttempts': 24})
                 logger.info(f"Function '{function_name}' update complete.")
                 
                 return function_arn # Return ARN after successful update

             except ClientError as update_e:
                 logger.error(f"Failed to update function '{function_name}': {update_e}")
                 if attempt < max_retries:
                      delay = base_delay * (2 ** (attempt - 1))
                      logger.info(f"Retrying update in {delay} seconds...")
                      time.sleep(delay)
                 else:
                      logger.error("Maximum update retries reached. Deployment failed.")
                      raise update_e
                      
        except ClientError as e:
            # Handle throttling or other retryable errors
            error_code = e.response.get('Error', {}).get('Code')
            if error_code in ['ThrottlingException', 'ProvisionedConcurrencyConfigNotFoundException', 'EC2ThrottledException'] or 'Rate exceeded' in str(e):
                logger.warning(f"Retryable error on attempt {attempt}: {e}")
                if attempt < max_retries:
                    delay = base_delay * (2 ** (attempt - 1)) + (uuid.uuid4().int % 5)
                    logger.info(f"Retrying in {delay} seconds...")
                    time.sleep(delay)
                else:
                    logger.error("Maximum retries reached after retryable error. Deployment failed.")
                    raise e
            else:
                logger.error(f"Error creating/updating Lambda '{function_name}': {e}")
                logger.error(traceback.format_exc()) # Log full traceback for unexpected errors
                raise e # Re-raise non-retryable or unexpected errors
        except Exception as e:
             logger.error(f"Unexpected error during Lambda deployment: {e}")
             logger.error(traceback.format_exc())
             raise e
             
    # If loop completes without returning, something went wrong
    raise Exception(f"Failed to deploy Lambda function {function_name} after {max_retries} attempts.")

3.9 Agent Resource Deletion Functions

This subsection provides helper functions to manage the cleanup of AWS Bedrock Agent resources. Creating agents, action groups, and aliases results in persistent configurations in AWS. These functions are essential for maintaining a clean environment, especially during experimentation and development, by allowing for the removal of these resources when they are no longer needed or before recreating them in a subsequent run.

3.9.1 get_agent_by_name

This utility function searches for an existing Bedrock Agent by its name. Since the AWS SDK's get_agent requires an agentId, and you often work with human-readable names, this function bridges that gap. It uses the list_agents operation (with a paginator to handle potentially many agents in an account) and iterates through the summaries, comparing the agentName field. If a match is found, it returns the corresponding agentId. If no agent with the given name is found or an error occurs during listing, it returns None.

def get_agent_by_name(agent_client, agent_name):
    """Find an agent ID by its name using list_agents."""
    logger.info(f"Attempting to find agent by name: {agent_name}")
    try:
        paginator = agent_client.get_paginator('list_agents')
        for page in paginator.paginate():
            for agent_summary in page.get('agentSummaries', []):
                if agent_summary.get('agentName') == agent_name:
                    agent_id = agent_summary.get('agentId')
                    logger.info(f"Found agent '{agent_name}' with ID: {agent_id}")
                    return agent_id
        logger.info(f"Agent '{agent_name}' not found.")
        return None
    except ClientError as e:
        logger.error(f"Error listing agents to find '{agent_name}': {e}")
        return None # Treat as not found if error occurs

3.9.2 delete_action_group

This function handles the deletion of a specific action group associated with a Bedrock Agent. Action groups are always tied to the DRAFT version of an agent. It calls delete_agent_action_group, providing the agentId, agentVersion='DRAFT', and the actionGroupId. It uses skipResourceInUseCheck=True to force deletion, which can be useful if the agent is in a state (like PREPARING) that might otherwise prevent immediate deletion. The function includes error handling for cases where the action group is not found or if a conflict occurs (e.g., agent is busy), attempting a retry after a delay in case of a conflict. It returns True if deletion was successful or the group was not found, and False if an unrecoverable error occurred.

def delete_action_group(agent_client, agent_id, action_group_id):
    """Deletes a specific action group for an agent."""
    logger.info(f"Attempting to delete action group {action_group_id} for agent {agent_id}...")
    try:
        agent_client.delete_agent_action_group(
            agentId=agent_id,
            agentVersion='DRAFT', # Action groups are tied to the DRAFT version
            actionGroupId=action_group_id,
            skipResourceInUseCheck=True # Force deletion even if in use (e.g., during prepare)
        )
        logger.info(f"Successfully deleted action group {action_group_id} for agent {agent_id}.")
        time.sleep(5) # Short pause after deletion
        return True
    except agent_client.exceptions.ResourceNotFoundException:
        logger.info(f"Action group {action_group_id} not found for agent {agent_id}. Skipping deletion.")
        return False
    except ClientError as e:
        # Handle potential throttling or conflict if prepare is happening
        error_code = e.response.get('Error', {}).get('Code')
        if error_code == 'ConflictException':
            logger.warning(f"Conflict deleting action group {action_group_id} (agent might be preparing/busy). Retrying once after delay...")
            time.sleep(15)
            try:
                agent_client.delete_agent_action_group(
                    agentId=agent_id, agentVersion='DRAFT', actionGroupId=action_group_id, skipResourceInUseCheck=True
                )
                logger.info(f"Successfully deleted action group {action_group_id} after retry.")
                return True
            except Exception as retry_e:
                 logger.error(f"Error deleting action group {action_group_id} on retry: {retry_e}")
                 return False
        else:
            logger.error(f"Error deleting action group {action_group_id} for agent {agent_id}: {e}")
            return False

3.9.3 delete_agent_and_resources

This function orchestrates the complete cleanup of a Bedrock Agent and its associated components. Its process is:

  1. Find Agent: It first calls get_agent_by_name to retrieve the agentId for the specified agent_name. If the agent isn't found, it exits gracefully.
  2. Delete Action Groups: It lists all action groups associated with the DRAFT version of the agent. For each action group found, it calls delete_action_group to remove it.
  3. Delete Agent: After attempting to delete all action groups, it proceeds to delete the agent itself using delete_agent with skipResourceInUseCheck=True to force the deletion.
  4. Wait for Deletion: It includes a custom polling loop to wait for the agent to be fully deleted by repeatedly calling get_agent and checking for a ResourceNotFoundException. This ensures that subsequent operations (like recreating an agent with the same name) are less likely to encounter conflicts.
def delete_agent_and_resources(agent_client, agent_name):
    """Deletes the agent and its associated action groups."""
    agent_id = get_agent_by_name(agent_client, agent_name)
    if not agent_id:
        logger.info(f"Agent '{agent_name}' not found, no deletion needed.")
        return

    logger.warning(f"--- Deleting Agent Resources for '{agent_name}' (ID: {agent_id}) ---")

    # 1. Delete Action Groups
    try:
        logger.info(f"Listing action groups for agent {agent_id}...")
        action_groups = agent_client.list_agent_action_groups(
            agentId=agent_id,
            agentVersion='DRAFT' # List groups for the DRAFT version
        ).get('actionGroupSummaries', [])

        if action_groups:
            logger.info(f"Found {len(action_groups)} action groups to delete.")
            for ag in action_groups:
                delete_action_group(agent_client, agent_id, ag['actionGroupId'])
        else:
            logger.info("No action groups found to delete.")

    except ClientError as e:
        logger.error(f"Error listing action groups for agent {agent_id}: {e}")
        # Continue to agent deletion attempt even if listing fails

    # 2. Delete the Agent
    try:
        logger.info(f"Attempting to delete agent {agent_id} ('{agent_name}')...")
        agent_client.delete_agent(agentId=agent_id, skipResourceInUseCheck=True) # Force delete

        # Wait for agent deletion (custom waiter logic might be needed if no standard waiter)
        logger.info(f"Waiting up to 2 minutes for agent {agent_id} deletion...")
        deleted = False
        for _ in range(24): # Check every 5 seconds for 2 minutes
            try:
                agent_client.get_agent(agentId=agent_id)
                time.sleep(5)
            except agent_client.exceptions.ResourceNotFoundException:
                logger.info(f"Agent {agent_id} successfully deleted.")
                deleted = True
                break
            except ClientError as e:
                 # Handle potential throttling during check
                 error_code = e.response.get('Error', {}).get('Code')
                 if error_code == 'ThrottlingException':
                     logger.warning("Throttled while checking agent deletion status, continuing wait...")
                     time.sleep(10)
                 else:
                     logger.error(f"Error checking agent deletion status: {e}")
                     # Break checking loop on unexpected error
                     break
        if not deleted:
             logger.warning(f"Agent {agent_id} deletion confirmation timed out.")

    except agent_client.exceptions.ResourceNotFoundException:
        logger.info(f"Agent {agent_id} ('{agent_name}') already deleted or not found.")
    except ClientError as e:
        logger.error(f"Error deleting agent {agent_id}: {e}")

    logger.info(f"--- Agent Resource Deletion Complete for '{agent_name}' ---")

3.10 Agent Creation Functions

This subsection contains functions dedicated to the setup and configuration of the Bedrock Agent itself, including its core definition, action groups that link it to tools (Lambda functions), and the preparation process that makes it ready for invocation.

3.10.1 create_agent

This function creates a new Bedrock Agent. It takes the desired agent_name, the agent_role_arn (obtained from create_agent_role), and the foundation_model_id (e.g., for Claude Sonnet) as input. Key configurations include:

  • Instruction: A detailed prompt that defines the agent's persona, capabilities, and how it should use its tools. The instruction in this notebook guides the agent to use a single "SearchAndFormat" tool and present results directly.
  • idleSessionTTLInSeconds: Sets a timeout for how long an agent session can remain idle.
  • Description: A brief description for the agent.

After calling create_agent, the function logs the initial response details (ID, ARN, status). It then enters a polling loop to wait until the agent's status moves out of the CREATING state, typically to NOT_PREPARED. If the agent creation fails and enters a FAILED state, it raises an exception. It returns the agent_id and agent_arn upon successful initiation of creation.

def create_agent(agent_client, agent_name, agent_role_arn, foundation_model_id):
    """Creates a new Bedrock Agent."""
    logger.info(f"--- Creating Agent: {agent_name} ---")
    try:
        # Updated Instruction for single tool
        instruction = (
            "You are a helpful research assistant. Your primary function is to use the SearchAndFormat tool "
            "to find relevant documents based on user queries and format them. " 
            "Use the user's query for the search, and specify a formatting style if requested, otherwise use the default. "
            "Present the formatted results returned by the tool directly to the user."
            "Only use the tool provided. Do not add your own knowledge."
        )

        response = agent_client.create_agent(
            agentName=agent_name,
            agentResourceRoleArn=agent_role_arn,
            foundationModel=foundation_model_id,
            instruction=instruction,
            idleSessionTTLInSeconds=1800, # 30 minutes
            description=f"Experimental agent for Couchbase search and content formatting ({foundation_model_id})"
            # promptOverrideConfiguration={} # Optional: Add later if needed
        )
        agent_info = response.get('agent')
        agent_id = agent_info.get('agentId')
        agent_arn = agent_info.get('agentArn')
        agent_status = agent_info.get('agentStatus')
        logger.info(f"Agent creation initiated. Name: {agent_name}, ID: {agent_id}, ARN: {agent_arn}, Status: {agent_status}")

        # Wait for agent to become NOT_PREPARED (initial state after creation)
        # Using custom waiter logic as there might not be a standard one for this transition
        logger.info(f"Waiting for agent {agent_id} to reach initial state...")
        for _ in range(12): # Check for up to 1 minute
             current_status = agent_client.get_agent(agentId=agent_id)['agent']['agentStatus']
             logger.info(f"Agent {agent_id} status: {current_status}")
             if current_status != 'CREATING': # Expect NOT_PREPARED or FAILED
                  break
             time.sleep(5)

        final_status = agent_client.get_agent(agentId=agent_id)['agent']['agentStatus']
        if final_status == 'FAILED':
            logger.error(f"Agent {agent_id} creation failed.")
            # Optionally retrieve failure reasons if API provides them
            raise Exception(f"Agent creation failed for {agent_name}")
        else:
             logger.info(f"Agent {agent_id} successfully created (Status: {final_status}).")
             
        return agent_id, agent_arn

    except ClientError as e:
        logger.error(f"Error creating agent '{agent_name}': {e}")
        raise

3.10.2 create_action_group

This function creates or updates an action group for the specified agent. Action groups define the tools an agent can use. In this Lambda-based approach, the action group links the agent to the Lambda function that implements the tool. Key steps include:

  • Function Schema Definition: It programmatically defines a function_schema_details dictionary. This schema describes the tool (searchAndFormatDocuments) that the Lambda function provides, including its name, description, and expected input parameters (query, k, style) with their types and whether they are required. This schema is what the agent uses to understand how to invoke the tool.
  • Idempotency: It first checks if an action group with the given action_group_name already exists for the DRAFT version of the agent.
    • If it exists, it attempts to update the existing action group using update_agent_action_group, ensuring the actionGroupExecutor points to the correct Lambda ARN and that it uses the functionSchema (for defining the tool via its signature) rather than an OpenAPI schema.
    • If it doesn't exist, it creates a new action group using create_agent_action_group.
  • actionGroupExecutor: This is set to {'lambda': function_arn}, where function_arn is the ARN of the deployed Lambda function. This tells Bedrock to invoke this Lambda when the agent decides to use a tool from this action group.
  • functionSchema Parameter: The functionSchema (containing the function_schema_details) is provided to the create_agent_action_group or update_agent_action_group call. This method of defining tools is simpler for single functions compared to providing a full OpenAPI schema, which is also an option for more complex APIs.
  • State: The action group is explicitly set to ENABLED.

A brief pause is added after creation/update to allow changes to propagate. The function returns the actionGroupId.

def create_action_group(agent_client, agent_id, action_group_name, function_arn, schema_path=None):
    """Creates an action group for the agent using Define with function details."""
    logger.info(f"--- Creating/Updating Action Group (Function Details): {action_group_name} for Agent: {agent_id} ---")
    logger.info(f"Lambda ARN: {function_arn}")

    # Define function schema details (for functionSchema parameter)
    function_schema_details = {
        'functions': [
            {
                'name': 'searchAndFormatDocuments', # Function name agent will call
                'description': 'Performs vector search based on query, retrieves documents, and formats results using specified style.',
                'parameters': {
                    'query': {
                        'description': 'The search query text.',
                        'type': 'string',
                        'required': True
                    },
                    'k': {
                        'description': 'The maximum number of documents to retrieve.',
                        'type': 'integer',
                        'required': False # Making optional as Lambda has default
                    },
                    'style': {
                        'description': 'The desired formatting style for the results (e.g., \'bullet points\', \'paragraph\', \'summary\').',
                        'type': 'string',
                        'required': False # Making optional as Lambda has default
                    }
                }
            }
        ]
    }

    try:
        # Check if Action Group already exists for the DRAFT version
        try:
             logger.info(f"Checking if action group '{action_group_name}' already exists for agent {agent_id} DRAFT version...")
             paginator = agent_client.get_paginator('list_agent_action_groups')
             existing_group = None
             for page in paginator.paginate(agentId=agent_id, agentVersion='DRAFT'):
                 for ag_summary in page.get('actionGroupSummaries', []):
                      if ag_summary.get('actionGroupName') == action_group_name:
                           existing_group = ag_summary
                           break
                 if existing_group:
                      break
             
             if existing_group:
                 ag_id = existing_group['actionGroupId']
                 logger.warning(f"Action Group '{action_group_name}' (ID: {ag_id}) already exists for agent {agent_id} DRAFT. Attempting update to Function Details.")
                 # Update existing action group - REMOVE apiSchema, ADD functionSchema
                 response = agent_client.update_agent_action_group(
                     agentId=agent_id,
                     agentVersion='DRAFT',
                     actionGroupId=ag_id,
                     actionGroupName=action_group_name,
                     actionGroupExecutor={'lambda': function_arn},
                     functionSchema={ # Use functionSchema
                         'functions': function_schema_details['functions'] # Pass the list with the correct key
                     },
                     actionGroupState='ENABLED'
                 )
                 ag_info = response.get('agentActionGroup')
                 logger.info(f"Successfully updated Action Group '{action_group_name}' (ID: {ag_info.get('actionGroupId')}) to use Function Details.")
                 return ag_info.get('actionGroupId')
             else:
                  logger.info(f"Action group '{action_group_name}' does not exist. Creating new with Function Details.")

        except ClientError as e:
             logger.error(f"Error checking for existing action group '{action_group_name}': {e}. Proceeding with creation attempt.")


        # Create new action group if not found or update failed implicitly
        response = agent_client.create_agent_action_group(
            agentId=agent_id,
            agentVersion='DRAFT', 
            actionGroupName=action_group_name,
            actionGroupExecutor={
                'lambda': function_arn 
            },
            functionSchema={ # Use functionSchema
                 'functions': function_schema_details['functions'] # Pass the list with the correct key
            },
            actionGroupState='ENABLED' 
        )
        ag_info = response.get('agentActionGroup')
        ag_id = ag_info.get('actionGroupId')
        logger.info(f"Successfully created Action Group '{action_group_name}' with ID: {ag_id} using Function Details.")
        time.sleep(5) # Pause after creation/update
        return ag_id

    except ClientError as e:
        logger.error(f"Error creating/updating action group '{action_group_name}' using Function Details: {e}")
        raise

3.10.3 prepare_agent

This function initiates the preparation of the DRAFT version of the Bedrock Agent and waits for this process to complete. Preparation involves Bedrock compiling the agent's configuration (instructions, action groups, model settings) and making it ready for invocation.

  • It calls bedrock_agent_client.prepare_agent(agentId=agent_id).
  • Custom Waiter: It then uses a custom-defined boto3 waiter (AgentPrepared) to poll the agent's status. The waiter configuration specifies:
    • delay: How often to check (e.g., every 30 seconds).
    • operation: The SDK call to make for checking (GetAgent).
    • maxAttempts: How many times to check before timing out (e.g., 20 attempts, for a total of up to 10 minutes).
    • acceptors: Conditions that determine success, failure, or retry. It succeeds if agent.agentStatus becomes PREPARED, fails if it becomes FAILED, and retries if it's UPDATING (though PREPARING is the more typical intermediate state here).

If the waiter times out or the agent preparation results in a FAILED status, an exception is raised. This step is crucial because an agent cannot be invoked (or an alias reliably pointed to its version) until it is successfully prepared.

def prepare_agent(agent_client, agent_id):
    """Prepares the DRAFT version of the agent."""
    logger.info(f"--- Preparing Agent: {agent_id} ---")
    try:
        response = agent_client.prepare_agent(agentId=agent_id)
        agent_version = response.get('agentVersion') # Should be DRAFT
        prepared_at = response.get('preparedAt')
        status = response.get('agentStatus') # Should be PREPARING
        logger.info(f"Agent preparation initiated for version '{agent_version}'. Status: {status}. Prepared At: {prepared_at}")

        # Wait for preparation to complete (PREPARED or FAILED)
        logger.info(f"Waiting for agent {agent_id} preparation to complete (up to 10 minutes)...")
        # Define a simple waiter config
        waiter_config = {
            'version': 2,
            'waiters': {
                'AgentPrepared': {
                    'delay': 30, # Check every 30 seconds
                    'operation': 'GetAgent',
                    'maxAttempts': 20, # Max 10 minutes
                    'acceptors': [
                        {
                            'matcher': 'path',
                            'expected': 'PREPARED',
                            'argument': 'agent.agentStatus',
                            'state': 'success'
                        },
                        {
                            'matcher': 'path',
                            'expected': 'FAILED',
                            'argument': 'agent.agentStatus',
                            'state': 'failure'
                        },
                        {
                            'matcher': 'path',
                            'expected': 'UPDATING', # Can happen during prep? Treat as retryable
                            'argument': 'agent.agentStatus',
                            'state': 'retry'
                        }
                    ]
                }
            }
        }
        waiter_model = WaiterModel(waiter_config)
        custom_waiter = create_waiter_with_client('AgentPrepared', waiter_model, agent_client)

        try: # Outer try for both preparation and alias handling
             custom_waiter.wait(agentId=agent_id)
             logger.info(f"Agent {agent_id} successfully prepared.")

        except Exception as e: # Outer except catches prepare_agent wait errors OR unhandled alias errors
            logger.error(f"Agent {agent_id} preparation failed or timed out (or alias error): {e}")
            # Check final status if possible
            try:
                final_status = agent_client.get_agent(agentId=agent_id)['agent']['agentStatus']
                logger.error(f"Final agent status: {final_status}")
            except Exception as get_e:
                logger.error(f"Could not retrieve final agent status after wait failure: {get_e}")
            raise Exception(f"Agent preparation or alias setup failed for {agent_id}")

    except Exception as e:
        logger.error(f"Error preparing agent {agent_id}: {e}")
        # Handle error, maybe exit
        raise e # Re-raise the exception

3.11 Agent Invocation Function

This subsection provides the function used to interact with the prepared and aliased Bedrock Agent, sending it a prompt and processing its response.

3.11.1 test_agent_invocation

This function is responsible for invoking the configured Bedrock Agent and handling its response. Key operations include:

  • Invocation: Calls bedrock_agent_runtime_client.invoke_agent with the agentId, agentAliasId, a unique sessionId (generated for each invocation in this script), the user's prompt (inputText), and enableTrace=True to get detailed trace information for debugging.
  • Stream Processing: The agent's response is a stream. The function iterates through the events in this stream (response.get('completion', [])).
    • chunk events: These contain parts of the agent's textual response. The function decodes these byte chunks (UTF-8) and concatenates them to form the completion_text.
    • trace events: If enableTrace was true, these events provide detailed insight into the agent's internal operations, such as which foundation model was called, the input to the model, any tool invocations (though in the Lambda approach, the tool invocation itself is handled by Bedrock calling Lambda, the trace might show the agent deciding to call it and the result from it), and rationale. The function collects these trace parts.
  • Logging: It logs the final combined completion_text and a summary of the trace events, which can be very helpful for understanding the agent's decision-making process and debugging any issues with tool invocation or response generation.

It returns the final textual response from the agent.

def test_agent_invocation(agent_runtime_client, agent_id, agent_alias_id, session_id, prompt):
    """Invokes the agent and prints the response."""
    logger.info(f"--- Testing Agent Invocation (Agent ID: {agent_id}, Alias: {agent_alias_id}) ---")
    logger.info(f"Session ID: {session_id}")
    logger.info(f"Prompt: \"{prompt}\"")

    try:
        response = agent_runtime_client.invoke_agent(
            agentId=agent_id,
            agentAliasId=agent_alias_id,
            sessionId=session_id,
            inputText=prompt,
            enableTrace=True # Enable trace for debugging
        )

        logger.info("Agent invocation successful. Processing response...")
        completion_text = ""
        trace_events = []

        # The response is a stream. Iterate through the chunks.
        for event in response.get('completion', []):
            if 'chunk' in event:
                data = event['chunk'].get('bytes', b'')
                decoded_chunk = data.decode('utf-8')
                completion_text += decoded_chunk
            elif 'trace' in event:
                trace_part = event['trace'].get('trace')
                if trace_part:
                     trace_events.append(trace_part)
            else:
                 logger.warning(f"Unhandled event type in stream: {event}")

        # Log final combined response
        logger.info(f"--- Agent Final Response ---{completion_text}")
        
        # Keep trace summary log (optional, can be removed if too verbose)
        if trace_events:
             logger.info("--- Invocation Trace Summary ---")
             for i, trace in enumerate(trace_events):
                  trace_type = trace.get('type')
                  step_type = trace.get('orchestration', {}).get('stepType')
                  model_invocation_input = trace.get('modelInvocationInput')
                  if model_invocation_input:
                      fm_input = model_invocation_input.get('text',
                          json.dumps(model_invocation_input.get('invocationInput',{}).get('toolConfiguration',{})) # Handle tool input
                      )
                  log_line = f"Trace {i+1}: Type={trace_type}, Step={step_type}"
                  rationale = trace.get('rationale', {}).get('text')
                  if rationale: log_line += f", Rationale=\"{rationale[:100]}...\""
                  logger.info(log_line) # Log summary line

        return completion_text

    except ClientError as e:
        logger.error(f"Error invoking agent: {e}")
        logger.error(traceback.format_exc())
        return None
    except Exception as e:
         logger.error(f"Unexpected error during agent invocation: {e}")
         logger.error(traceback.format_exc())
         return None

4. Main Execution Flow

This is the primary section of the notebook where all the previously defined helper functions are called in sequence to set up the complete Bedrock Agent environment with a Lambda-backed tool, and then test its invocation. The flow is designed to be largely idempotent where possible, meaning it can often be re-run, and it will attempt to clean up or reuse existing resources before creating new ones (e.g., IAM roles, Lambda functions, agents). The major steps are outlined below:

4.1 Initial Setup

This first step in the main execution flow performs essential preliminary tasks:

  1. Logs a starting message for the script execution.
  2. Calls check_environment_variables() to ensure all required environment variables (AWS credentials, Couchbase password, etc.) are set. If not, it raises an EnvironmentError to halt execution, as the subsequent steps depend on these variables.
  3. Calls initialize_aws_clients() to get the necessary boto3 client objects for Bedrock, IAM, Lambda, etc.
  4. Calls connect_couchbase() to establish a connection to the Couchbase cluster.

If any of these critical initialization steps fail, an exception is raised to stop the notebook's execution, preventing errors in later stages.

logger.info("--- Starting Bedrock Agent Experiment Script ---")

if not check_environment_variables():
    # In a notebook, raising an exception might be better than exit(1)
    raise EnvironmentError("Missing required environment variables. Check logs.")

# Initialize all clients, including the agent client
try:
    bedrock_runtime_client, iam_client, lambda_client, bedrock_agent_client, bedrock_agent_runtime_client = initialize_aws_clients()
    cb_cluster = connect_couchbase()
    logger.info("AWS clients and Couchbase connection initialized.")
except Exception as e:
    logger.error(f"Initialization failed: {e}")
    raise # Re-raise the exception to stop execution
2025-06-09 13:39:41,643 - INFO - --- Starting Bedrock Agent Experiment Script ---
2025-06-09 13:39:41,644 - INFO - All required environment variables are set.
2025-06-09 13:39:41,644 - INFO - Initializing AWS clients in region: us-east-1
2025-06-09 13:39:42,002 - INFO - AWS clients initialized successfully.
2025-06-09 13:39:42,002 - INFO - Connecting to Couchbase cluster at couchbases://cb.hlcup4o4jmjr55yf.cloud.couchbase.com...
2025-06-09 13:39:44,131 - INFO - Successfully connected to Couchbase.
2025-06-09 13:39:44,132 - INFO - AWS clients and Couchbase connection initialized.

4.2 Couchbase Setup

This block focuses on preparing the Couchbase environment to serve as the vector store for the agent. It involves:

  1. Calling setup_collection(): This helper function ensures that the target Couchbase bucket, scope, and collection (defined by CB_BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME) are created if they don't already exist. It also ensures a primary index is present on the collection.
  2. Calling setup_search_index(): This creates or updates the Couchbase Full-Text Search (FTS) index (named by INDEX_NAME) using the definition from INDEX_JSON_PATH. This search index is crucial for performing vector similarity searches.
  3. Calling clear_collection(): This function deletes all existing documents from the target collection. This step ensures that each run of the notebook starts with a clean slate, preventing data from previous experiments from interfering with the current one.

If any part of this Couchbase setup fails, an exception is logged and re-raised to stop further execution.

try:
    # Use the setup functions with the script's config variables
    cb_collection = setup_collection(cb_cluster, CB_BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME)
    logger.info(f"Couchbase collection '{CB_BUCKET_NAME}.{SCOPE_NAME}.{COLLECTION_NAME}' setup complete.")

    # Pass required args to setup_search_index
    setup_search_index(cb_cluster, INDEX_NAME, CB_BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME, INDEX_JSON_PATH)
    logger.info(f"Couchbase search index '{INDEX_NAME}' setup complete.")

    # Clear any existing documents from previous runs
    clear_collection(cb_cluster, CB_BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME)
    logger.info("Cleared any existing documents from the collection.")
except Exception as e:
    logger.error(f"Couchbase setup failed: {e}")
    raise
2025-06-09 13:39:44,140 - INFO - Setting up collection: vector-search-testing/shared/bedrock
2025-06-09 13:39:45,245 - INFO - Bucket 'vector-search-testing' exists.
2025-06-09 13:39:46,219 - INFO - Scope 'shared' already exists.
2025-06-09 13:39:47,149 - INFO - Collection 'bedrock' already exists.
2025-06-09 13:39:47,152 - INFO - Ensuring primary index exists on `vector-search-testing`.`shared`.`bedrock`...
2025-06-09 13:39:48,185 - INFO - Primary index present or created successfully.
2025-06-09 13:39:48,186 - INFO - Collection setup complete.
2025-06-09 13:39:48,187 - INFO - Couchbase collection 'vector-search-testing.shared.bedrock' setup complete.
2025-06-09 13:39:48,187 - INFO - Looking for index definition at: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/aws_index.json
2025-06-09 13:39:48,192 - INFO - Loaded index definition from /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/aws_index.json, ensuring name is 'vector_search_bedrock' and source is 'vector-search-testing'.
2025-06-09 13:39:48,193 - INFO - Upserting search index 'vector_search_bedrock'...
2025-06-09 13:39:48,880 - WARNING - Search index 'vector_search_bedrock' likely already existed (caught QueryIndexAlreadyExistsException, check if applicable). Upsert attempted.
2025-06-09 13:39:48,881 - INFO - Couchbase search index 'vector_search_bedrock' setup complete.
2025-06-09 13:39:48,881 - WARNING - Attempting to clear all documents from `vector-search-testing`.`shared`.`bedrock`...
2025-06-09 13:39:49,141 - WARNING - Could not retrieve mutation count after delete: 'list' object has no attribute 'meta_data'
2025-06-09 13:39:49,142 - INFO - Successfully cleared documents from the collection (approx. 0 mutations).
2025-06-09 13:39:49,143 - INFO - Cleared any existing documents from the collection.

4.3 Vector Store Initialization and Data Loading

With the Couchbase infrastructure in place, this section prepares the LangChain vector store and populates it with data:

  1. Initialize BedrockEmbeddings: Creates an instance of the BedrockEmbeddings client, specifying the EMBEDDING_MODEL_ID (e.g., Amazon Titan Text Embeddings V2). This client will be used by the vector store to convert text documents into numerical embeddings for similarity searching.
  2. Initialize CouchbaseSearchVectorStore: Creates an instance of CouchbaseSearchVectorStore. This LangChain component acts as an abstraction layer over the Couchbase collection and search index, providing methods for adding documents and performing similarity searches. It's configured with the Couchbase cluster connection, bucket/scope/collection names, the embeddings client, and the search index name.
  3. Load Documents from JSON: Reads document data from the DOCS_JSON_PATH file. This file is expected to contain a list of documents, each with text and metadata fields.
  4. Add Documents to Vector Store: If documents are loaded, their texts and metadatas are extracted. The vector_store.add_texts() method is then called to process these documents: each document's text is converted into an embedding (using the BedrockEmbeddings client), and both the text and its embedding (along with metadata) are stored in the Couchbase collection. The search index (INDEX_NAME) is then updated to include these new vectors, making them searchable.

Error handling is included to catch issues like file not found or problems during embedding generation or data insertion.

Note: documents.json contains the documents that we want to load into our vector store. As an example, we have added a few documents to the file from https://cline.bot/

Let's load the documents from the documents.json file and add them to our vector store:

try:
    logger.info(f"Initializing Bedrock Embeddings client with model: {EMBEDDING_MODEL_ID}")
    embeddings = BedrockEmbeddings(
        client=bedrock_runtime_client,
        model_id=EMBEDDING_MODEL_ID
    )
    logger.info("Successfully created Bedrock embeddings client.")

    logger.info(f"Initializing CouchbaseSearchVectorStore with index: {INDEX_NAME}")
    vector_store = CouchbaseSearchVectorStore(
        cluster=cb_cluster,
        bucket_name=CB_BUCKET_NAME,
        scope_name=SCOPE_NAME,
        collection_name=COLLECTION_NAME,
        embedding=embeddings,
        index_name=INDEX_NAME
    )
    logger.info("Successfully created Couchbase vector store.")

    # Load documents from JSON file
    logger.info(f"Looking for documents at: {DOCS_JSON_PATH}")
    if not os.path.exists(DOCS_JSON_PATH):
         logger.error(f"Documents file not found: {DOCS_JSON_PATH}")
         raise FileNotFoundError(f"Documents file not found: {DOCS_JSON_PATH}")

    with open(DOCS_JSON_PATH, 'r') as f:
        data = json.load(f)
        documents_to_load = data.get('documents', [])
    logger.info(f"Loaded {len(documents_to_load)} documents from {DOCS_JSON_PATH}")

    # Add documents to vector store
    if documents_to_load:
        logger.info(f"Adding {len(documents_to_load)} documents to vector store...")
        texts = [doc.get('text', '') for doc in documents_to_load]
        metadatas = []
        for i, doc in enumerate(documents_to_load):
            metadata_raw = doc.get('metadata', {})
            if isinstance(metadata_raw, str):
                try:
                    metadata = json.loads(metadata_raw)
                    if not isinstance(metadata, dict):
                         logger.warning(f"Metadata for doc {i} parsed from string is not a dict: {metadata}. Using empty dict.")
                         metadata = {}
                except json.JSONDecodeError:
                    logger.warning(f"Could not parse metadata string for doc {i}: {metadata_raw}. Using empty dict.")
                    metadata = {}
            elif isinstance(metadata_raw, dict):
                metadata = metadata_raw
            else:
                logger.warning(f"Metadata for doc {i} is not a string or dict: {metadata_raw}. Using empty dict.")
                metadata = {}
            metadatas.append(metadata)

        inserted_ids = vector_store.add_texts(texts=texts, metadatas=metadatas)
        logger.info(f"Successfully added {len(inserted_ids)} documents to the vector store.")
    else:
         logger.warning("No documents found in the JSON file to add.")

except FileNotFoundError as e:
     logger.error(f"Setup failed: {e}")
     raise
except Exception as e:
    logger.error(f"Error during vector store setup or data loading: {e}")
    logger.error(traceback.format_exc())
    raise 

logger.info("--- Couchbase Setup and Data Loading Complete ---")
2025-06-09 13:39:49,152 - INFO - Initializing Bedrock Embeddings client with model: amazon.titan-embed-text-v2:0
2025-06-09 13:39:49,153 - INFO - Successfully created Bedrock embeddings client.
2025-06-09 13:39:49,153 - INFO - Initializing CouchbaseSearchVectorStore with index: vector_search_bedrock
2025-06-09 13:39:52,549 - INFO - Successfully created Couchbase vector store.
2025-06-09 13:39:52,549 - INFO - Looking for documents at: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/documents.json
2025-06-09 13:39:52,551 - INFO - Loaded 7 documents from /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/documents.json
2025-06-09 13:39:52,551 - INFO - Adding 7 documents to vector store...
2025-06-09 13:39:56,544 - INFO - Successfully added 7 documents to the vector store.
2025-06-09 13:39:56,545 - INFO - --- Couchbase Setup and Data Loading Complete ---

4.4 Create IAM Role

This step ensures that the necessary IAM (Identity and Access Management) role for the Bedrock Agent and its Lambda function is in place.

  • It defines a agent_role_name (e.g., bedrock_agent_lambda_exp_role).
  • It calls the create_agent_role() helper function. This function (described in section 3.7) either creates a new IAM role with this name or updates an existing one.
  • The role is configured with a trust policy allowing both the Bedrock service and the Lambda service to assume it.
  • It attaches necessary permissions policies, including AWSLambdaBasicExecutionRole for Lambda logging and custom inline policies for Bedrock access and any other required permissions.
  • The AWS Account ID, needed for defining precise resource ARNs in policies, is fetched dynamically using the STS client if not already available as an environment variable.

The ARN of this role (agent_role_arn) is stored, as it's a required parameter for creating both the Bedrock Agent and the AWS Lambda function that the agent will invoke.

agent_role_name = "bedrock_agent_lambda_exp_role"
try:
    # Ensure AWS_ACCOUNT_ID is loaded correctly
    if not AWS_ACCOUNT_ID:
        logger.info("Attempting to fetch AWS Account ID...")
        sts_client = boto3.client('sts', region_name=AWS_REGION)
        AWS_ACCOUNT_ID = sts_client.get_caller_identity().get('Account')
        if not AWS_ACCOUNT_ID:
            raise ValueError("AWS Account ID could not be determined. Please set the AWS_ACCOUNT_ID environment variable.")
        logger.info(f"Fetched AWS Account ID: {AWS_ACCOUNT_ID}")
        
    agent_role_arn = create_agent_role(iam_client, agent_role_name, AWS_ACCOUNT_ID)
    logger.info(f"Agent IAM Role ARN: {agent_role_arn}")
except Exception as e:
    logger.error(f"Failed to create/verify IAM role: {e}")
    logger.error(traceback.format_exc())
    raise
2025-06-09 13:39:56,553 - INFO - Checking/Creating IAM role: bedrock_agent_lambda_exp_role
2025-06-09 13:39:57,454 - INFO - IAM role 'bedrock_agent_lambda_exp_role' already exists with ARN: arn:aws:iam::598307997273:role/bedrock_agent_lambda_exp_role
2025-06-09 13:39:57,454 - INFO - Updating trust policy for existing role 'bedrock_agent_lambda_exp_role'...
2025-06-09 13:39:57,710 - INFO - Trust policy updated for role 'bedrock_agent_lambda_exp_role'.
2025-06-09 13:39:57,710 - INFO - Attaching basic Lambda execution policy to role 'bedrock_agent_lambda_exp_role'...
2025-06-09 13:39:57,973 - INFO - Attached basic Lambda execution policy.
2025-06-09 13:39:57,974 - INFO - Putting basic inline policy 'LambdaBasicLoggingPermissions' for role 'bedrock_agent_lambda_exp_role'...
2025-06-09 13:39:58,240 - INFO - Successfully put inline policy 'LambdaBasicLoggingPermissions'.
2025-06-09 13:39:58,240 - INFO - Putting Bedrock permissions policy 'BedrockAgentPermissions' for role 'bedrock_agent_lambda_exp_role'...
2025-06-09 13:39:58,607 - INFO - Successfully put inline policy 'BedrockAgentPermissions'.
2025-06-09 13:39:58,608 - INFO - Waiting 10s for policy changes to propagate...
2025-06-09 13:40:08,612 - INFO - Agent IAM Role ARN: arn:aws:iam::598307997273:role/bedrock_agent_lambda_exp_role

4.5 Deploy Lambda Function

This section orchestrates the deployment of the AWS Lambda function that will execute the agent's searchAndFormatDocuments tool. The process involves several steps managed by the helper functions:

  1. Define Lambda Details: Specifies the search_format_lambda_name (e.g., bedrock_agent_search_format_exp), the lambda_source_dir (where the Lambda's Python script and Makefile are located), and lambda_build_dir (where the final .zip package will be placed).
  2. Cleanup Old Lambdas (Optional but Recommended): Calls delete_lambda_function for potentially conflicting older Lambda functions (e.g., separate researcher/writer Lambdas from previous experiments or an old version of the current combined Lambda). This ensures a cleaner environment, especially during iterative development.
  3. Package Lambda: Calls package_function(). This helper (described in 3.8.3) uses the Makefile in lambda_source_dir to install dependencies, prepare the handler script (bedrock_agent_search_and_format.py), and create a .zip deployment package (search_format_zip_path).
  4. Create/Update Lambda in AWS: Calls create_lambda_function(). This helper (described in 3.8.4) takes the .zip package and either creates a new Lambda function in AWS or updates an existing one. It handles S3 upload for large packages, sets environment variables (like Couchbase connection info and Bedrock model IDs), configures the IAM role, runtime, handler, timeout, and memory. It also adds permissions for Bedrock to invoke the Lambda and waits for the Lambda to become active.
  5. Cleanup Deployment Package: After successful deployment, the local .zip file is removed to save space.

The ARN of the deployed Lambda (search_format_lambda_arn) is stored, as it's needed to link this Lambda to the Bedrock Agent's action group.

search_format_lambda_name = "bedrock_agent_search_format_exp"
# Adjust source/build dirs for notebook context if necessary
lambda_source_dir = os.path.join(SCRIPT_DIR, 'lambda_functions') 
lambda_build_dir = SCRIPT_DIR # Final zip ends up in the notebook's directory

logger.info("--- Starting Lambda Deployment (Single Function) --- ")
search_format_lambda_arn = None
search_format_zip_path = None

try:
    # Delete old lambdas if they exist (optional, but good cleanup)
    logger.info("Deleting potentially conflicting old Lambda functions...")
    delete_lambda_function(lambda_client, "bedrock_agent_researcher_exp")
    delete_lambda_function(lambda_client, "bedrock_agent_writer_exp")
    # Delete the new lambda if it exists from a previous run
    delete_lambda_function(lambda_client, search_format_lambda_name)
    logger.info("Old Lambda deletion checks complete.")

    logger.info(f"Packaging Lambda function '{search_format_lambda_name}'...")
    search_format_zip_path = package_function("bedrock_agent_search_and_format", lambda_source_dir, lambda_build_dir)
    logger.info(f"Lambda function packaged at: {search_format_zip_path}")

    logger.info(f"Creating/Updating Lambda function '{search_format_lambda_name}'...")
    search_format_lambda_arn = create_lambda_function(
        lambda_client=lambda_client, function_name=search_format_lambda_name,
        handler='lambda_function.lambda_handler', role_arn=agent_role_arn,
        zip_file=search_format_zip_path, region=AWS_REGION
    )
    logger.info(f"Search/Format Lambda Deployed: {search_format_lambda_arn}")

except FileNotFoundError as e:
     logger.error(f"Lambda packaging failed: Required file not found. {e}")
     raise
except Exception as e:
    logger.error(f"Lambda deployment failed: {e}")
    logger.error(traceback.format_exc())
    raise
finally:
    logger.info("Cleaning up deployment zip file...")
    if search_format_zip_path and os.path.exists(search_format_zip_path):
        try:
            os.remove(search_format_zip_path)
            logger.info(f"Removed zip file: {search_format_zip_path}")
        except OSError as e:
            logger.warning(f"Could not remove zip file {search_format_zip_path}: {e}")

logger.info("--- Lambda Deployment Complete --- ")
2025-06-09 13:40:08,624 - INFO - --- Starting Lambda Deployment (Single Function) --- 
2025-06-09 13:40:08,626 - INFO - Deleting potentially conflicting old Lambda functions...
2025-06-09 13:40:08,626 - INFO - Attempting to delete Lambda function: bedrock_agent_researcher_exp...
2025-06-09 13:40:08,626 - INFO - Attempting to remove permission AllowBedrockInvokeBasic-bedrock_agent_researcher_exp from bedrock_agent_researcher_exp...
2025-06-09 13:40:09,490 - INFO - Permission AllowBedrockInvokeBasic-bedrock_agent_researcher_exp not found on bedrock_agent_researcher_exp. Skipping removal.
2025-06-09 13:40:09,794 - INFO - Lambda function 'bedrock_agent_researcher_exp' does not exist. No need to delete.
2025-06-09 13:40:09,795 - INFO - Attempting to delete Lambda function: bedrock_agent_writer_exp...
2025-06-09 13:40:09,796 - INFO - Attempting to remove permission AllowBedrockInvokeBasic-bedrock_agent_writer_exp from bedrock_agent_writer_exp...
2025-06-09 13:40:10,082 - INFO - Permission AllowBedrockInvokeBasic-bedrock_agent_writer_exp not found on bedrock_agent_writer_exp. Skipping removal.
2025-06-09 13:40:10,387 - INFO - Lambda function 'bedrock_agent_writer_exp' does not exist. No need to delete.
2025-06-09 13:40:10,387 - INFO - Attempting to delete Lambda function: bedrock_agent_search_format_exp...
2025-06-09 13:40:10,387 - INFO - Attempting to remove permission AllowBedrockInvokeBasic-bedrock_agent_search_format_exp from bedrock_agent_search_format_exp...
2025-06-09 13:40:10,686 - INFO - Successfully removed permission AllowBedrockInvokeBasic-bedrock_agent_search_format_exp from bedrock_agent_search_format_exp.
2025-06-09 13:40:13,060 - INFO - Function bedrock_agent_search_format_exp exists. Deleting...
2025-06-09 13:40:13,594 - INFO - Waiting for bedrock_agent_search_format_exp to be deleted...
2025-06-09 13:40:23,596 - INFO - Function bedrock_agent_search_format_exp deletion initiated.
2025-06-09 13:40:23,597 - INFO - Old Lambda deletion checks complete.
2025-06-09 13:40:23,598 - INFO - Packaging Lambda function 'bedrock_agent_search_format_exp'...
2025-06-09 13:40:23,599 - INFO - --- Packaging function bedrock_agent_search_and_format --- 
2025-06-09 13:40:23,602 - INFO - Source Dir (Makefile location & make cwd): /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions
2025-06-09 13:40:23,602 - INFO - Build Dir (Final zip location): /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach
2025-06-09 13:40:23,603 - INFO - Copying /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions/bedrock_agent_search_and_format.py to /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions/lambda_function.py
2025-06-09 13:40:23,605 - INFO - Running make command: make -f /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions/Makefile clean package (in /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions)
2025-06-09 13:40:50,341 - INFO - Make command completed successfully.
2025-06-09 13:40:50,343 - INFO - Moving and renaming /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions/lambda_package.zip to /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip
2025-06-09 13:40:50,351 - INFO - Zip file ready: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip
2025-06-09 13:40:50,362 - INFO - Cleaning up temporary script: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions/lambda_function.py
2025-06-09 13:40:50,362 - INFO - Lambda function packaged at: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip
2025-06-09 13:40:50,363 - INFO - Creating/Updating Lambda function 'bedrock_agent_search_format_exp'...
2025-06-09 13:40:50,363 - INFO - Deploying Lambda function bedrock_agent_search_format_exp from /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip...
2025-06-09 13:40:50,371 - INFO - Found credentials in environment variables.
2025-06-09 13:40:50,516 - INFO - Zip file size: 50.91 MB
2025-06-09 13:40:50,516 - INFO - Package size (50.91 MB) requires S3 deployment.
2025-06-09 13:40:50,517 - INFO - Preparing to upload /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip to S3 in region us-east-1...
2025-06-09 13:40:51,552 - INFO - Generated unique S3 bucket name: lambda-deployment-598307997273-1749456651
2025-06-09 13:40:52,277 - INFO - Creating S3 bucket: lambda-deployment-598307997273-1749456651...
2025-06-09 13:40:52,666 - INFO - Created S3 bucket: lambda-deployment-598307997273-1749456651. Waiting for availability...
2025-06-09 13:40:52,929 - INFO - Bucket lambda-deployment-598307997273-1749456651 is available.
2025-06-09 13:40:52,929 - INFO - Uploading /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip to s3://lambda-deployment-598307997273-1749456651/lambda/bedrock_agent_search_and_format.zip-0b1fc73a...
2025-06-09 13:41:03,991 - INFO - Successfully uploaded to s3://lambda-deployment-598307997273-1749456651/lambda/bedrock_agent_search_and_format.zip-0b1fc73a
2025-06-09 13:41:03,993 - INFO - Creating function 'bedrock_agent_search_format_exp' (attempt 1/3)...
2025-06-09 13:41:06,690 - INFO - Successfully created function 'bedrock_agent_search_format_exp' with ARN: arn:aws:lambda:us-east-1:598307997273:function:bedrock_agent_search_format_exp
2025-06-09 13:41:11,693 - INFO - Adding basic invoke permission (AllowBedrockInvokeBasic-bedrock_agent_search_format_exp) to bedrock_agent_search_format_exp...
2025-06-09 13:41:12,007 - INFO - Successfully added basic invoke permission AllowBedrockInvokeBasic-bedrock_agent_search_format_exp.
2025-06-09 13:41:12,007 - INFO - Waiting for function 'bedrock_agent_search_format_exp' to become active...
2025-06-09 13:41:12,306 - INFO - Function 'bedrock_agent_search_format_exp' is active.
2025-06-09 13:41:12,308 - INFO - Search/Format Lambda Deployed: arn:aws:lambda:us-east-1:598307997273:function:bedrock_agent_search_format_exp
2025-06-09 13:41:12,308 - INFO - Cleaning up deployment zip file...
2025-06-09 13:41:12,310 - INFO - Removed zip file: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip
2025-06-09 13:41:12,311 - INFO - --- Lambda Deployment Complete --- 

4.6 Agent Setup

This part of the script focuses on creating the Bedrock Agent itself.

  1. Define Agent Name: An agent_name is defined (e.g., couchbase_search_format_agent_exp).
  2. Cleanup Existing Agent (Idempotency): It calls delete_agent_and_resources() first. This helper function (described in 3.9.3) attempts to find an agent with the same name and, if found, deletes it along with its action groups and aliases. This ensures that each run starts with a clean slate for the agent, preventing conflicts or issues from previous configurations.
  3. Create New Agent: After the cleanup attempt, it calls create_agent(). This helper function (described in 3.10.1) creates a new Bedrock Agent with the specified name, the IAM role ARN (agent_role_arn), the foundation model ID (AGENT_MODEL_ID), and a set of instructions guiding the agent on how to behave and use its tools.

The agent_id and agent_arn returned by create_agent() are stored for subsequent steps like creating action groups and preparing the agent.

agent_name = f"couchbase_search_format_agent_exp"
agent_id = None
agent_arn = None
alias_name = "prod" # Define alias name here
# agent_alias_id_to_use will be set later after preparation

# 1. Attempt to find and delete existing agent to ensure a clean state
logger.info(f"Checking for and deleting existing agent: {agent_name}")
try:
    delete_agent_and_resources(bedrock_agent_client, agent_name) # Handles finding and deleting
    logger.info(f"Deletion process completed for any existing agent named {agent_name}.")
except Exception as e:
    # Log error during find/delete but proceed to creation attempt
    logger.error(f"Error during agent finding/deletion phase: {e}. Proceeding to creation attempt.")

# 2. Always attempt to create the agent after the delete phase
logger.info(f"--- Creating Agent: {agent_name} ---")
try:
    agent_id, agent_arn = create_agent(
        agent_client=bedrock_agent_client,
        agent_name=agent_name,
        agent_role_arn=agent_role_arn,
        foundation_model_id=AGENT_MODEL_ID
    )
    if not agent_id:
        raise Exception("create_agent function did not return a valid agent ID.")
    logger.info(f"Agent created successfully. ID: {agent_id}, ARN: {agent_arn}")
except Exception as e:
    logger.error(f"Failed to create agent '{agent_name}': {e}")
    logger.error(traceback.format_exc())
    raise
2025-06-09 13:41:12,317 - INFO - Checking for and deleting existing agent: couchbase_search_format_agent_exp
2025-06-09 13:41:12,318 - INFO - Attempting to find agent by name: couchbase_search_format_agent_exp
2025-06-09 13:41:13,172 - INFO - Found agent 'couchbase_search_format_agent_exp' with ID: 8CZXA8LJJH
2025-06-09 13:41:13,172 - WARNING - --- Deleting Agent Resources for 'couchbase_search_format_agent_exp' (ID: 8CZXA8LJJH) ---
2025-06-09 13:41:13,172 - INFO - Listing action groups for agent 8CZXA8LJJH...
2025-06-09 13:41:13,472 - INFO - Found 1 action groups to delete.
2025-06-09 13:41:13,473 - INFO - Attempting to delete action group GKWWTGZVHJ for agent 8CZXA8LJJH...
2025-06-09 13:41:13,794 - INFO - Successfully deleted action group GKWWTGZVHJ for agent 8CZXA8LJJH.
2025-06-09 13:41:18,797 - INFO - Attempting to delete agent 8CZXA8LJJH ('couchbase_search_format_agent_exp')...
2025-06-09 13:41:19,108 - INFO - Waiting up to 2 minutes for agent 8CZXA8LJJH deletion...
2025-06-09 13:41:25,109 - INFO - Agent 8CZXA8LJJH successfully deleted.
2025-06-09 13:41:25,110 - INFO - --- Agent Resource Deletion Complete for 'couchbase_search_format_agent_exp' ---
2025-06-09 13:41:25,111 - INFO - Deletion process completed for any existing agent named couchbase_search_format_agent_exp.
2025-06-09 13:41:25,111 - INFO - --- Creating Agent: couchbase_search_format_agent_exp ---
2025-06-09 13:41:25,112 - INFO - --- Creating Agent: couchbase_search_format_agent_exp ---
2025-06-09 13:41:25,623 - INFO - Agent creation initiated. Name: couchbase_search_format_agent_exp, ID: 7BTR61MXVF, ARN: arn:aws:bedrock:us-east-1:598307997273:agent/7BTR61MXVF, Status: CREATING
2025-06-09 13:41:25,625 - INFO - Waiting for agent 7BTR61MXVF to reach initial state...
2025-06-09 13:41:26,201 - INFO - Agent 7BTR61MXVF status: CREATING
2025-06-09 13:41:31,658 - INFO - Agent 7BTR61MXVF status: NOT_PREPARED
2025-06-09 13:41:32,110 - INFO - Agent 7BTR61MXVF successfully created (Status: NOT_PREPARED).
2025-06-09 13:41:32,111 - INFO - Agent created successfully. ID: 7BTR61MXVF, ARN: arn:aws:bedrock:us-east-1:598307997273:agent/7BTR61MXVF

4.7 Action Group Setup

Once the agent is created and the Lambda function is deployed, this step links them together by creating an Action Group.

  • It defines an action_group_name (e.g., SearchAndFormatActionGroup).
  • It calls the create_action_group() helper function (described in 3.10.2). This function is responsible for:
    • Taking the agent_id and the search_format_lambda_arn (the ARN of the deployed Lambda function) as input.
    • Defining the functionSchema which tells the agent how to use the Lambda function (i.e., the tool name searchAndFormatDocuments and its parameters like query, k, style).
    • Setting the actionGroupExecutor to point to the Lambda ARN, so Bedrock knows which Lambda to invoke.
    • Creating a new action group or updating an existing one with the same name for the DRAFT version of the agent.
  • A 30-second pause (time.sleep(30)) is added after the action group setup. This is a crucial step to give AWS services enough time to propagate the changes and ensure that the agent is aware of the newly configured or updated action group before proceeding to the preparation phase. Without such a delay, the preparation step might fail or not correctly incorporate the action group.
# --- Action Group Creation/Update (Now assumes agent_id is valid) ---
action_group_name = "SearchAndFormatActionGroup"
action_group_id = None
try:
    if not agent_id:
        raise ValueError("Agent ID is not set. Cannot create action group.")
    if not search_format_lambda_arn:
        raise ValueError("Lambda ARN is not set. Cannot create action group.")
        
    logger.info(f"Creating/Updating Action Group '{action_group_name}' for agent {agent_id}...")
    action_group_id = create_action_group(
        agent_client=bedrock_agent_client,
        agent_id=agent_id,
        action_group_name=action_group_name,
        function_arn=search_format_lambda_arn,
        # schema_path=None # No longer needed explicitly if default is None
    )
    if not action_group_id:
        raise Exception("create_action_group did not return a valid ID.")
    logger.info(f"Action Group '{action_group_name}' created/updated with ID: {action_group_id}")

    # Add a slightly longer wait after action group modification/creation
    logger.info("Waiting 30s after action group setup before preparing agent...")
    time.sleep(30)
except Exception as e:
    logger.error(f"Failed to set up action group: {e}")
    logger.error(traceback.format_exc())
    raise
2025-06-09 13:41:32,119 - INFO - Creating/Updating Action Group 'SearchAndFormatActionGroup' for agent 7BTR61MXVF...
2025-06-09 13:41:32,120 - INFO - --- Creating/Updating Action Group (Function Details): SearchAndFormatActionGroup for Agent: 7BTR61MXVF ---
2025-06-09 13:41:32,121 - INFO - Lambda ARN: arn:aws:lambda:us-east-1:598307997273:function:bedrock_agent_search_format_exp
2025-06-09 13:41:32,122 - INFO - Checking if action group 'SearchAndFormatActionGroup' already exists for agent 7BTR61MXVF DRAFT version...
2025-06-09 13:41:32,412 - INFO - Action group 'SearchAndFormatActionGroup' does not exist. Creating new with Function Details.
2025-06-09 13:41:32,806 - INFO - Successfully created Action Group 'SearchAndFormatActionGroup' with ID: 7XTTI9XFOX using Function Details.
2025-06-09 13:41:37,812 - INFO - Action Group 'SearchAndFormatActionGroup' created/updated with ID: 7XTTI9XFOX
2025-06-09 13:41:37,812 - INFO - Waiting 30s after action group setup before preparing agent...

4.8 Prepare Agent and Handle Alias

After the agent and its action group (linking to the Lambda tool) are defined, this section makes the agent ready for use and assigns an alias to it:

  1. Prepare Agent: It calls the prepare_agent() helper function (described in 3.10.3). This function initiates the preparation process for the DRAFT version of the agent and uses a custom waiter to wait until the agent's status becomes PREPARED. This step is vital as it compiles all agent configurations.
  2. Alias Handling (Create or Update): Once the agent is successfully prepared:
    • An alias_name (e.g., prod) is defined.
    • The code checks if an alias with this name already exists for the agent using list_agent_aliases.
    • If the alias exists, its ID (agent_alias_id_to_use) is retrieved. The notebook assumes the existing alias will correctly point to the latest prepared version (DRAFT) or could be updated if necessary (though direct update logic for the alias to point to a specific version isn't explicitly shown here beyond creation).
    • If the alias does not exist, create_agent_alias() is called. This creates a new alias that, by default, points to the latest prepared version of the agent (which is the DRAFT version that was just prepared).
    • A brief pause (time.sleep(10)) is added to allow the alias changes to propagate.

The agent_alias_id_to_use is now ready for invoking the agent.

agent_alias_id_to_use = None # Initialize alias ID
alias_name = "prod" # Make sure alias_name is defined
if agent_id:
    logger.info(f"--- Preparing Agent: {agent_id} ---")
    preparation_successful = False
    try:
        # prepare_agent now ONLY prepares, doesn't handle alias or return its ID
        prepare_agent(bedrock_agent_client, agent_id) 
        logger.info(f"Agent {agent_id} preparation seems complete (waiter succeeded).")
        preparation_successful = True # Flag success

    except Exception as e: # Catch errors from preparation
        logger.error(f"Error during agent preparation for {agent_id}: {e}")
        logger.error(traceback.format_exc())
        raise

    # --- Alias Handling (runs only if preparation succeeded) ---
    if preparation_successful:
        logger.info(f"--- Setting up Alias '{alias_name}' for Agent {agent_id} ---") # Add log
        try:
            # --- Alias Creation/Update Logic (Copied/adapted from main.py's __main__) ---
            logger.info(f"Checking for alias '{alias_name}' for agent {agent_id}...")
            existing_alias = None
            paginator = bedrock_agent_client.get_paginator('list_agent_aliases')
            for page in paginator.paginate(agentId=agent_id):
                for alias_summary in page.get('agentAliasSummaries', []):
                    if alias_summary.get('agentAliasName') == alias_name:
                        existing_alias = alias_summary
                        break
                if existing_alias:
                    break
            
            if existing_alias:
                agent_alias_id_to_use = existing_alias['agentAliasId']
                logger.info(f"Using existing alias '{alias_name}' with ID: {agent_alias_id_to_use}.")
                # Optional: Update alias to point to DRAFT if needed, 
                # but create_agent_alias defaults to latest prepared (DRAFT) so just checking existence is often enough.
            else:
                logger.info(f"Alias '{alias_name}' not found. Creating new alias...")
                create_alias_response = bedrock_agent_client.create_agent_alias(
                    agentId=agent_id,
                    agentAliasName=alias_name
                    # routingConfiguration removed - defaults to latest prepared (DRAFT)
                )
                agent_alias_id_to_use = create_alias_response.get('agentAlias', {}).get('agentAliasId')
                logger.info(f"Successfully created alias '{alias_name}' with ID: {agent_alias_id_to_use}. (Defaults to latest prepared version - DRAFT)")

            if not agent_alias_id_to_use:
                    raise ValueError(f"Failed to get a valid alias ID for '{alias_name}'")

            logger.info(f"Waiting 10s for alias '{alias_name}' changes to propagate...")
            time.sleep(10)
            logger.info(f"Agent {agent_id} preparation and alias '{alias_name}' ({agent_alias_id_to_use}) setup complete.")


        except Exception as alias_e: # Catch errors from alias logic
            logger.error(f"Failed to create/update alias '{alias_name}' for agent {agent_id}: {alias_e}")
            logger.error(traceback.format_exc())
            raise
else:
    logger.error("Agent ID not available, skipping preparation and alias setup.")
2025-06-09 13:42:07,835 - INFO - --- Preparing Agent: 7BTR61MXVF ---
2025-06-09 13:42:07,836 - INFO - --- Preparing Agent: 7BTR61MXVF ---
2025-06-09 13:42:08,338 - INFO - Agent preparation initiated for version 'DRAFT'. Status: PREPARING. Prepared At: 2025-06-09 08:12:08.237735+00:00
2025-06-09 13:42:08,338 - INFO - Waiting for agent 7BTR61MXVF preparation to complete (up to 10 minutes)...
2025-06-09 13:42:39,237 - INFO - Agent 7BTR61MXVF successfully prepared.
2025-06-09 13:42:39,238 - INFO - Agent 7BTR61MXVF preparation seems complete (waiter succeeded).
2025-06-09 13:42:39,238 - INFO - --- Setting up Alias 'prod' for Agent 7BTR61MXVF ---
2025-06-09 13:42:39,239 - INFO - Checking for alias 'prod' for agent 7BTR61MXVF...
2025-06-09 13:42:39,526 - INFO - Alias 'prod' not found. Creating new alias...
2025-06-09 13:42:39,902 - INFO - Successfully created alias 'prod' with ID: Y8YNYUDFFZ. (Defaults to latest prepared version - DRAFT)
2025-06-09 13:42:39,903 - INFO - Waiting 10s for alias 'prod' changes to propagate...
2025-06-09 13:42:49,907 - INFO - Agent 7BTR61MXVF preparation and alias 'prod' (Y8YNYUDFFZ) setup complete.

4.9 Test Agent Invocation

This is the final operational step where the fully configured Bedrock Agent is tested.

  • It first checks if both agent_id and agent_alias_id_to_use are available (i.e., the previous setup steps were successful).
  • A unique session_id is generated for this specific interaction.
  • A test_prompt is defined (e.g., "Search for information about Project Chimera and format the results using bullet points."). This prompt is designed to trigger the agent's tool (searchAndFormatDocuments).
  • It then calls the test_agent_invocation() helper function (described in 3.11.1). This function sends the prompt to the Bedrock Agent Runtime using the specified agent ID and alias ID.
  • The test_agent_invocation function handles the streaming response from the agent, concatenates the text chunks, logs trace information for debugging, and prints the agent's final completion.

This step demonstrates an end-to-end test of the agent: receiving a prompt, deciding to use its Lambda-backed tool, Bedrock invoking the Lambda, the Lambda executing (performing search and formatting), returning results to the agent, and the agent formulating a final response to the user.

# --- Test Invocation ---
# Agent ID and custom alias ID should be valid here
if agent_id and agent_alias_id_to_use: # Check both are set
     session_id = str(uuid.uuid4()) 
     test_prompt = "Search for information about Project Chimera and format the results using bullet points."
     logger.info(f"--- Invoking Agent {agent_id} using Alias '{alias_name}' ({agent_alias_id_to_use}) ---") # Updated log
     try:
         completion = test_agent_invocation(
             agent_runtime_client=bedrock_agent_runtime_client,
             agent_id=agent_id,
             agent_alias_id=agent_alias_id_to_use, 
             session_id=session_id,
             prompt=test_prompt
         )
         if completion is None:
              logger.error("Agent invocation failed.")
     except Exception as e:
          logger.error(f"Error during test invocation: {e}")
          logger.error(traceback.format_exc())
else:
     logger.error("Agent ID or Alias ID not available, skipping invocation test.")
2025-06-09 13:42:49,923 - INFO - --- Invoking Agent 7BTR61MXVF using Alias 'prod' (Y8YNYUDFFZ) ---
2025-06-09 13:42:49,924 - INFO - --- Testing Agent Invocation (Agent ID: 7BTR61MXVF, Alias: Y8YNYUDFFZ) ---
2025-06-09 13:42:49,925 - INFO - Session ID: 6529a5a7-0b58-4c7d-8682-20353a8f09c3
2025-06-09 13:42:49,925 - INFO - Prompt: "Search for information about Project Chimera and format the results using bullet points."
2025-06-09 13:42:50,894 - INFO - Agent invocation successful. Processing response...
2025-06-09 13:43:05,971 - INFO - --- Agent Final Response ---• Project Chimera combines quantum entanglement communication with neural networks for secure, real-time data analysis across distributed nodes. Lead developer: Dr. Aris Thorne.

• Chimera operates in two modes:
    - 'Quantum Sync' for high-fidelity data transfer
    - 'Neural Inference' for localized edge processing based on the synced data.

• A key aspect of Chimera is its "Ephemeral Key Protocol" (EKP), which generates one-time quantum keys for each transmission, ensuring absolute forward secrecy.
2025-06-09 13:43:05,973 - INFO - --- Invocation Trace Summary ---
2025-06-09 13:43:05,975 - INFO - Trace 1: Type=None, Step=None
2025-06-09 13:43:05,975 - INFO - Trace 2: Type=None, Step=None
2025-06-09 13:43:05,976 - INFO - Trace 3: Type=None, Step=None
2025-06-09 13:43:05,976 - INFO - Trace 4: Type=None, Step=None
2025-06-09 13:43:05,977 - INFO - Trace 5: Type=None, Step=None
2025-06-09 13:43:05,977 - INFO - Trace 6: Type=None, Step=None
2025-06-09 13:43:05,978 - INFO - Trace 7: Type=None, Step=None
2025-06-09 13:43:05,978 - INFO - Trace 8: Type=None, Step=None

Conclusion

In this notebook, we've demonstrated the Lambda approach for implementing AWS Bedrock agents with Couchbase Vector Search. This approach allows the agent to invoke AWS Lambda functions to execute operations, providing better scalability and separation of concerns.

Key components of this implementation include:

  1. Vector Store Setup: We set up a Couchbase vector store to store and search documents using semantic similarity.
  2. Lambda Function Deployment: We deployed Lambda functions that handle the agent's function calls.
  3. Agent Creation: We created two specialized agents - a researcher agent for searching documents and a writer agent for formatting results.
  4. Lambda Integration: We integrated the agents with Lambda functions, allowing them to execute operations in a serverless environment.

This approach is particularly useful for production environments where scalability and separation of concerns are important. The Lambda functions can be deployed independently and can access other AWS services, providing more flexibility and power.


This tutorial is part of a Couchbase Learning Path:
Contents
Couchbase home page link

3250 Olcott Street
Santa Clara, CA 95054
United States

  • company
  • about
  • leadership
  • news & press
  • investor relations
  • careers
  • events
  • legal
  • contact us
  • support
  • Developer portal
  • Documentation
  • Forums
  • PROFESSIONAL SERVICES
  • support login
  • support policy
  • training
  • quicklinks
  • blog
  • downloads
  • get started
  • resources
  • why nosql
  • pricing
  • follow us
  • Social Media Link for FacebookFacebook
  • Social Media Link for TwitterTwitter
  • Social Media Link for LinkedInLinkedIn
  • Social Media Link for Youtubeyoutube
  • Social Media Link for GitHubGithub
  • Social Media Link for Stack OverflowStack Overflow
  • Social Media Link for Discorddiscord

© 2025 Couchbase, Inc. Couchbase and the Couchbase logo are registered trademarks of Couchbase, Inc. All third party trademarks (including logos and icons) referenced by Couchbase, Inc. remain the property of their respective owners.

Terms of UsePrivacy PolicyCookie PolicySupport PolicyDo Not Sell My Personal InformationMarketing Preference Center