This notebook demonstrates the Lambda approach for implementing AWS Bedrock agents with Couchbase Vector Search. In this approach, the agent invokes AWS Lambda functions to execute operations.
We'll implement a multi-agent architecture with specialized agents for different tasks:
This notebook demonstrates the Lambda Approach for AWS Bedrock Agents. For comparison, you might also want to check out the Custom Control Approach, which handles agent tools directly in your application code instead of using AWS Lambda functions.
The Custom Control approach offers simpler setup and more direct control, but may not scale as well. You can find that implementation here: Custom Control Approach Notebook
Note: If the link above doesn't work in your Jupyter environment, you can navigate to the file manually in the awsbedrock-agents/custom-control-approach/
directory.
The Lambda approach for AWS Bedrock Agents delegates the execution of an agent's defined functions (tools) to backend AWS Lambda functions. When the agent decides to use a tool, Bedrock directly invokes the corresponding Lambda function that you've specified in the agent's action group configuration. This Lambda function receives the parameters from the agent, executes the necessary logic (e.g., querying a Couchbase vector store, calling other APIs, performing computations), and then returns the result to the Bedrock Agent. The agent can then use this result to continue its reasoning process or formulate a final response to the user. This architecture promotes a clean separation of concerns, allows tool logic to be developed and scaled independently, and leverages the serverless capabilities of AWS Lambda.
Define Agent Instructions & Tool Schema:
searchAndFormatDocuments
) that the agent will call. This schema specifies the function name, description, and its input parameters (e.g., query
, k
, style
). This schema acts as the contract between the agent and the Lambda function.Implement Lambda Handler Function:
bedrock_agent_search_and_format.py
) that contains the actual Python code to execute the tool's logic.k
value.style
parameter (though in this specific example, the formatting is largely illustrative and the LLM does the heavy lifting of presentation).actionGroup
, apiPath
, httpMethod
, httpStatusCode
, and a responseBody
containing the result of the tool execution (e.g., the search results as a string).requirements.txt
) into a .zip file. This notebook includes helper functions to automate packaging (using a Makefile
) and deployment, including uploading to S3 if the package is large. The Lambda also needs an IAM role with permissions to run, write logs, and interact with Bedrock and any other required AWS services.Create Agent in AWS Bedrock:
bedrock_agent_client.create_agent
SDK call. Provide the agent name, the ARN of the IAM role it will assume, the foundation model ID (e.g., Claude Sonnet), and the instructions defined in step 1.Create Agent Action Group (Linking to Lambda):
bedrock_agent_client.create_agent_action_group
.actionGroupExecutor
: This is the crucial part for the Lambda approach. Set it to {'lambda': 'arn:aws:lambda:<region>:<account_id>:function:<lambda_function_name>'}
. This tells Bedrock to invoke your specific Lambda function when this action group is triggered.functionSchema
: Provide the function schema defined in step 1. This allows the agent to understand how to call the Lambda function (i.e., what parameters to send).SearchAndFormatActionGroup
).Prepare Agent:
bedrock_agent_client.prepare_agent
with the agentId
. This makes the DRAFT version of the agent (with its newly configured action group) ready for use. The notebook includes a custom waiter to poll until the agent status is PREPARED
.Create or Update Agent Alias:
prod
) is used to invoke a specific version of the agent. The notebook checks if an alias exists and creates one if not, pointing to the latest prepared (DRAFT) version. Use bedrock_agent_client.create_agent_alias
or update_agent_alias
.Invoke Agent:
bedrock_agent_runtime_client.invoke_agent
, providing the agentId
, agentAliasId
, a unique sessionId
, and the user's inputText
(prompt).invoke_agent
call. Unlike the Custom Control approach, there's no returnControl
event for the application to handle for tool execution; Bedrock manages the Lambda invocation directly.returnControl
events for function execution, as Bedrock orchestrates the call to Lambda directly.This section imports all necessary Python libraries. These include:
json
for data handling, logging
for progress and error messages, os
for interacting with the operating system (e.g., file paths), subprocess
for running external commands (like make
for Lambda packaging), time
for delays, traceback
for detailed error reporting, uuid
for generating unique identifiers, and shutil
for file operations.boto3
and botocore
: The AWS SDK for Python, used to interact with AWS services like Bedrock, IAM, Lambda, and S3. Specific configurations (Config
) and waiters are also imported for robust client interactions.couchbase
: The official Couchbase SDK for Python, used for connecting to and interacting with the Couchbase cluster, including managing buckets, collections, and search indexes. Specific exception classes are imported for error handling.dotenv
: For loading environment variables from a .env
file, which helps manage configuration settings like API keys and connection strings securely.langchain_aws
and langchain_couchbase
: Libraries from the LangChain ecosystem. BedrockEmbeddings
is used to generate text embeddings via Amazon Bedrock, and CouchbaseSearchVectorStore
provides an interface for using Couchbase as a vector store in LangChain applications.import json
import logging
import os
import shutil
import subprocess
import time
import traceback
import uuid
from datetime import timedelta
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from botocore.waiter import WaiterModel, create_waiter_with_client
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.exceptions import (BucketNotFoundException, CouchbaseException,
QueryIndexAlreadyExistsException)
from couchbase.management.buckets import BucketSettings, BucketType
from couchbase.management.collections import CollectionSpec
from couchbase.management.search import SearchIndex
from couchbase.options import ClusterOptions
from dotenv import load_dotenv
from langchain_aws import BedrockEmbeddings
from langchain_couchbase.vectorstores import CouchbaseSearchVectorStore
This section handles the initial setup of essential configurations for the notebook:
logging
module to output messages with a specific format (timestamp, level, message), which helps in tracking the script's execution and diagnosing issues..env
file located either in the current directory or the parent directory. This is a common practice to keep sensitive information like credentials and hostnames out of the codebase. If the .env
file is not found, the script will rely on variables already set in the execution environment.boto3
to interact with AWS services.amazon.titan-embed-text-v2:0
) and the foundation model to be used by the agent (e.g., anthropic.claude-3-sonnet-20240229-v1:0
).os.getcwd()
makes these paths relative to the notebook's current working directory.# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Load environment variables from project root .env
# In a notebook environment, '__file__' is not defined. Use a relative path or absolute path directly.
# Assuming the notebook is run from the 'lambda-experiments' directory
dotenv_path = os.path.join(os.getcwd(), '.env') # Or specify the full path if needed
logger.info(f"Attempting to load .env file from: {dotenv_path}")
if os.path.exists(dotenv_path):
load_dotenv(dotenv_path=dotenv_path)
logger.info(".env file loaded successfully.")
else:
# Try loading from parent directory if not found in current
parent_dotenv_path = os.path.join(os.path.dirname(os.getcwd()), '.env')
if os.path.exists(parent_dotenv_path):
load_dotenv(dotenv_path=parent_dotenv_path)
logger.info(f".env file loaded successfully from parent directory: {parent_dotenv_path}")
else:
logger.warning(f".env file not found at {dotenv_path} or {parent_dotenv_path}. Relying on environment variables.")
# Couchbase Configuration
CB_HOST = os.getenv("CB_HOST", "couchbase://localhost")
CB_USERNAME = os.getenv("CB_USERNAME", "Administrator")
CB_PASSWORD = os.getenv("CB_PASSWORD", "password")
# Using a new bucket/scope/collection for experiments to avoid conflicts
CB_BUCKET_NAME = os.getenv("CB_BUCKET_NAME", "vector-search-exp")
SCOPE_NAME = os.getenv("SCOPE_NAME", "bedrock_exp")
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "docs_exp")
INDEX_NAME = os.getenv("INDEX_NAME", "vector_search_bedrock_exp")
# AWS Configuration
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_ACCOUNT_ID = os.getenv("AWS_ACCOUNT_ID")
# Bedrock Model IDs
EMBEDDING_MODEL_ID = "amazon.titan-embed-text-v2:0"
AGENT_MODEL_ID = "anthropic.claude-3-sonnet-20240229-v1:0" # Using Sonnet for the agent
# Paths (relative to the notebook's execution directory)
SCRIPT_DIR = os.getcwd() # Use current working directory for notebook context
SCHEMAS_DIR = os.path.join(SCRIPT_DIR, 'schemas') # New Schemas Dir
SEARCH_FORMAT_SCHEMA_PATH = os.path.join(SCHEMAS_DIR, 'search_and_format_schema.json') # Added
INDEX_JSON_PATH = os.path.join(SCRIPT_DIR, 'aws_index.json') # Keep
DOCS_JSON_PATH = os.path.join(SCRIPT_DIR, 'documents.json') # Changed to load from script's directory
2025-06-09 13:39:41,393 - INFO - Attempting to load .env file from: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/.env
2025-06-09 13:39:41,395 - INFO - .env file loaded successfully.
This section defines a comprehensive suite of helper functions to modularize the various operations required throughout the notebook. These functions encapsulate specific tasks, making the main execution flow cleaner and easier to understand. The categories of helper functions include:
boto3
) clients for services like IAM, Lambda, Bedrock, and S3.Makefile
), uploading the deployment package (to S3 if it's large), creating or updating the Lambda function in AWS, and deleting it for cleanup.This function verifies that all critical environment variables required for the script to run (e.g., AWS credentials, Couchbase password, AWS Account ID) are set. It logs an error and returns False
if any are missing, otherwise logs success and returns True
.
def check_environment_variables():
"""Check if required environment variables are set."""
required_vars = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_ACCOUNT_ID", "CB_PASSWORD"]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
logger.error(f"Missing required environment variables: {', '.join(missing_vars)}")
logger.error("Please set these variables in your environment or .env file")
return False
logger.info("All required environment variables are set.")
return True
This function sets up and returns the necessary AWS SDK (boto3
) clients for interacting with various AWS services. It initializes clients for Bedrock Runtime (for embeddings and agent invocation), IAM (for managing roles and policies), Lambda (for deploying and managing Lambda functions), Bedrock Agent (for creating and managing agents), and Bedrock Agent Runtime (for invoking agents). It uses credentials and region from the environment configuration and includes a custom configuration (agent_config
) with longer timeouts and retries, which is particularly important for Bedrock Agent operations that can take more time, like agent preparation.
def initialize_aws_clients():
"""Initialize required AWS clients."""
try:
logger.info(f"Initializing AWS clients in region: {AWS_REGION}")
session = boto3.Session(
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)
# Use a config with longer timeouts for agent operations
agent_config = Config(
connect_timeout=120,
read_timeout=600, # Agent preparation can take time
retries={'max_attempts': 5, 'mode': 'adaptive'}
)
bedrock_runtime = session.client('bedrock-runtime', region_name=AWS_REGION)
iam_client = session.client('iam', region_name=AWS_REGION)
lambda_client = session.client('lambda', region_name=AWS_REGION)
bedrock_agent_client = session.client('bedrock-agent', region_name=AWS_REGION, config=agent_config) # Add agent client
bedrock_agent_runtime_client = session.client('bedrock-agent-runtime', region_name=AWS_REGION, config=agent_config) # Add agent runtime client
logger.info("AWS clients initialized successfully.")
return bedrock_runtime, iam_client, lambda_client, bedrock_agent_client, bedrock_agent_runtime_client # Return agent runtime client
except Exception as e:
logger.error(f"Error initializing AWS clients: {e}")
raise
This function establishes a connection to the Couchbase cluster using the connection string (CB_HOST
), username, and password from the environment configuration. It uses PasswordAuthenticator
for authentication and ClusterOptions
for potentially customizing connection parameters (though commented out in the example, it shows where timeouts could be set). It waits for the cluster to be ready before returning the Cluster
object, ensuring that subsequent operations can be performed reliably.
def connect_couchbase():
"""Connect to Couchbase cluster."""
try:
logger.info(f"Connecting to Couchbase cluster at {CB_HOST}...")
auth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)
# Use robust options
options = ClusterOptions(
auth,
)
cluster = Cluster(CB_HOST, options)
cluster.wait_until_ready(timedelta(seconds=10)) # Wait longer if needed
logger.info("Successfully connected to Couchbase.")
return cluster
except CouchbaseException as e:
logger.error(f"Couchbase connection error: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error connecting to Couchbase: {e}")
raise
This comprehensive function is responsible for ensuring that the required Couchbase bucket, scope, and collection are available for the agent's vector store. It performs the following steps idempotently:
bucket_name
) exists. If not, it creates the bucket with defined settings (e.g., RAM quota, flush enabled). It includes a pause to allow the bucket to become ready.scope_name
) exists within the bucket. If not, it creates the scope and includes a brief pause.collection_name
) exists within the scope. If not, it creates the collection using a CollectionSpec
and pauses.Finally, it returns a Collection
object representing the target collection for further operations.
Note: Bucket Creation will not work on Capella.
def setup_collection(cluster, bucket_name, scope_name, collection_name):
"""Set up Couchbase collection (Original Logic from lamda-approach)"""
logger.info(f"Setting up collection: {bucket_name}/{scope_name}/{collection_name}")
try:
# Check if bucket exists, create if it doesn't
try:
bucket = cluster.bucket(bucket_name)
logger.info(f"Bucket '{bucket_name}' exists.")
except BucketNotFoundException:
logger.info(f"Bucket '{bucket_name}' does not exist. Creating it...")
# Use BucketSettings with potentially lower RAM for experiment
bucket_settings = BucketSettings(
name=bucket_name,
bucket_type=BucketType.COUCHBASE,
ram_quota_mb=256, # Adjusted from 1024
flush_enabled=True,
num_replicas=0
)
try:
cluster.buckets().create_bucket(bucket_settings)
# Wait longer after bucket creation
logger.info(f"Bucket '{bucket_name}' created. Waiting for ready state (10s)...")
time.sleep(10)
bucket = cluster.bucket(bucket_name) # Re-assign bucket object
except Exception as create_e:
logger.error(f"Failed to create bucket '{bucket_name}': {create_e}")
raise
except Exception as e:
logger.error(f"Error getting bucket '{bucket_name}': {e}")
raise
bucket_manager = bucket.collections()
# Check if scope exists, create if it doesn't
scopes = bucket_manager.get_all_scopes()
scope_exists = any(s.name == scope_name for s in scopes)
if not scope_exists:
logger.info(f"Scope '{scope_name}' does not exist. Creating it...")
try:
bucket_manager.create_scope(scope_name)
logger.info(f"Scope '{scope_name}' created. Waiting (2s)...")
time.sleep(2)
except CouchbaseException as e:
# Handle potential race condition or already exists error more robustly
if "already exists" in str(e).lower() or "scope_exists" in str(e).lower():
logger.info(f"Scope '{scope_name}' likely already exists (caught during creation attempt).")
else:
logger.error(f"Failed to create scope '{scope_name}': {e}")
raise
else:
logger.info(f"Scope '{scope_name}' already exists.")
# Check if collection exists, create if it doesn't
# Re-fetch scopes in case it was just created
scopes = bucket_manager.get_all_scopes()
collection_exists = False
for s in scopes:
if s.name == scope_name:
if any(c.name == collection_name for c in s.collections):
collection_exists = True
break
if not collection_exists:
logger.info(f"Collection '{collection_name}' does not exist in scope '{scope_name}'. Creating it...")
try:
# Use CollectionSpec
collection_spec = CollectionSpec(collection_name, scope_name)
bucket_manager.create_collection(collection_spec)
logger.info(f"Collection '{collection_name}' created. Waiting (2s)...")
time.sleep(2)
except CouchbaseException as e:
if "already exists" in str(e).lower() or "collection_exists" in str(e).lower():
logger.info(f"Collection '{collection_name}' likely already exists (caught during creation attempt).")
else:
logger.error(f"Failed to create collection '{collection_name}': {e}")
raise
else:
logger.info(f"Collection '{collection_name}' already exists.")
# Ensure primary index exists
try:
logger.info(f"Ensuring primary index exists on `{bucket_name}`.`{scope_name}`.`{collection_name}`...")
cluster.query(f"CREATE PRIMARY INDEX IF NOT EXISTS ON `{bucket_name}`.`{scope_name}`.`{collection_name}`").execute()
logger.info("Primary index present or created successfully.")
except Exception as e:
logger.error(f"Error creating primary index: {str(e)}")
# Decide if this is fatal
logger.info("Collection setup complete.")
# Return the collection object for use
return cluster.bucket(bucket_name).scope(scope_name).collection(collection_name)
except Exception as e:
logger.error(f"Error setting up collection: {str(e)}")
logger.error(traceback.format_exc())
raise
This function is responsible for creating or updating the Couchbase Search (FTS) index required for vector similarity search. Key operations include:
index_definition_path
).index_name
and sourceName
(bucket name) provided as arguments. This allows for a template index definition file to be reused.SearchIndexManager
(obtained from the cluster object) to upsert_index
. Upserting means the index will be created if it doesn't exist, or updated if an index with the same name already exists. This makes the operation idempotent.time.sleep
) to allow Couchbase some time to start the indexing process in the background.Important Note: The provided
aws_index.json
file has hardcoded references for the bucket, scope, and collection names. If you have used different names for your bucket, scope, or collection than the defaults specified in this notebook or your.env
file, you must modify theaws_index.json
file to reflect your custom names before running the next cell.
def setup_search_index(cluster, index_name, bucket_name, scope_name, collection_name, index_definition_path):
"""Set up search indexes (Original Logic, adapted) """
try:
logger.info(f"Looking for index definition at: {index_definition_path}")
if not os.path.exists(index_definition_path):
logger.error(f"Index definition file not found: {index_definition_path}")
raise FileNotFoundError(f"Index definition file not found: {index_definition_path}")
with open(index_definition_path, 'r') as file:
index_definition = json.load(file)
index_definition['name'] = index_name
index_definition['sourceName'] = bucket_name
logger.info(f"Loaded index definition from {index_definition_path}, ensuring name is '{index_name}' and source is '{bucket_name}'.")
except Exception as e:
logger.error(f"Error loading index definition: {str(e)}")
raise
try:
# Use the SearchIndexManager from the Cluster object for cluster-level indexes
# Or use scope-level if the index JSON is structured for that
# Assuming cluster level based on original script structure for upsert
search_index_manager = cluster.search_indexes()
# Create SearchIndex object from potentially modified JSON definition
search_index = SearchIndex.from_json(index_definition)
# Upsert the index (create if not exists, update if exists)
logger.info(f"Upserting search index '{index_name}'...")
search_index_manager.upsert_index(search_index)
# Wait for indexing
logger.info(f"Index '{index_name}' upsert operation submitted. Waiting for indexing (10s)...")
time.sleep(10)
logger.info(f"Search index '{index_name}' setup complete.")
except QueryIndexAlreadyExistsException:
# This exception might not be correct for SearchIndexManager
# Upsert should handle exists cases, but log potential specific errors
logger.warning(f"Search index '{index_name}' likely already existed (caught QueryIndexAlreadyExistsException, check if applicable). Upsert attempted.")
except CouchbaseException as e:
logger.error(f"Couchbase error during search index setup for '{index_name}': {e}")
raise
except Exception as e:
logger.error(f"Unexpected error during search index setup for '{index_name}': {e}")
raise
This utility function is used to delete all documents from a specified Couchbase collection. It constructs and executes a N1QL DELETE
query targeting the given bucket, scope, and collection. This is useful for ensuring a clean state before loading new data for an experiment, preventing interference from previous runs. It also attempts to log the number of mutations (deleted documents) if the query metrics are available.
def clear_collection(cluster, bucket_name, scope_name, collection_name):
"""Delete all documents from the specified collection (Original Logic)."""
try:
logger.warning(f"Attempting to clear all documents from `{bucket_name}`.`{scope_name}`.`{collection_name}`...")
query = f"DELETE FROM `{bucket_name}`.`{scope_name}`.`{collection_name}`"
result = cluster.query(query).execute()
# Try to get mutation count, handle if not available
mutation_count = 0
try:
metrics_data = result.meta_data().metrics()
if metrics_data:
mutation_count = metrics_data.mutation_count()
except Exception as metrics_e:
logger.warning(f"Could not retrieve mutation count after delete: {metrics_e}")
logger.info(f"Successfully cleared documents from the collection (approx. {mutation_count} mutations).")
except Exception as e:
logger.error(f"Error clearing documents from collection: {e}. Collection might be empty or index not ready.")
This function creates or updates the necessary IAM (Identity and Access Management) role that the Bedrock Agent and its associated Lambda function will assume. The role needs permissions to interact with AWS services on your behalf. Key aspects of this function are:
lambda.amazonaws.com
(for the Lambda function execution) and bedrock.amazonaws.com
(for the Bedrock Agent service itself).role_name
already exists.
AWSLambdaBasicExecutionRole
, which grants the Lambda function permissions to write logs to CloudWatch.LambdaBasicLoggingPermissions
) for more specific logging permissions if needed, scoped to the Lambda log group.BedrockAgentPermissions
) granting broad bedrock:*
permissions. For production, these permissions should be scoped down to the minimum required.time.sleep
calls after creating the role and after attaching policies to allow time for the changes to propagate within AWS, which helps prevent subsequent operations from failing due to eventual consistency issues.It returns the ARN (Amazon Resource Name) of the created or updated IAM role, which is then used when creating the Bedrock Agent and the Lambda function.
def create_agent_role(iam_client, role_name, aws_account_id):
"""Creates or gets the IAM role for the Bedrock Agent Lambda functions."""
logger.info(f"Checking/Creating IAM role: {role_name}")
assume_role_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com",
"bedrock.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
}
role_arn = None
try:
# Check if role exists
get_role_response = iam_client.get_role(RoleName=role_name)
role_arn = get_role_response['Role']['Arn']
logger.info(f"IAM role '{role_name}' already exists with ARN: {role_arn}")
# Ensure trust policy is up-to-date
logger.info(f"Updating trust policy for existing role '{role_name}'...")
iam_client.update_assume_role_policy(
RoleName=role_name,
PolicyDocument=json.dumps(assume_role_policy_document)
)
logger.info(f"Trust policy updated for role '{role_name}'.")
except iam_client.exceptions.NoSuchEntityException:
logger.info(f"IAM role '{role_name}' not found. Creating...")
try:
create_role_response = iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(assume_role_policy_document),
Description='IAM role for Bedrock Agent Lambda functions (Experiment)',
MaxSessionDuration=3600
)
role_arn = create_role_response['Role']['Arn']
logger.info(f"Successfully created IAM role '{role_name}' with ARN: {role_arn}")
# Wait after role creation before attaching policies
logger.info("Waiting 15s for role creation propagation...")
time.sleep(15)
except ClientError as e:
logger.error(f"Error creating IAM role '{role_name}': {e}")
raise
except ClientError as e:
logger.error(f"Error getting/updating IAM role '{role_name}': {e}")
raise
# Attach basic execution policy (idempotent)
try:
logger.info(f"Attaching basic Lambda execution policy to role '{role_name}'...")
iam_client.attach_role_policy(
RoleName=role_name,
PolicyArn='arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
)
logger.info("Attached basic Lambda execution policy.")
except ClientError as e:
logger.error(f"Error attaching basic Lambda execution policy: {e}")
# Don't necessarily raise, might already be attached or other issue
# Add minimal inline policy for logging (can be expanded later if needed)
basic_inline_policy_name = "LambdaBasicLoggingPermissions"
basic_inline_policy_doc = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": f"arn:aws:logs:{AWS_REGION}:{aws_account_id}:log-group:/aws/lambda/*:*" # Scope down logs if possible
}
# Add S3 permissions here ONLY if Lambda code explicitly needs it
]
}
# Add Bedrock permissions policy
bedrock_policy_name = "BedrockAgentPermissions"
bedrock_policy_doc = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"bedrock:*"
],
"Resource": "*" # You can scope this down to specific agents/models if needed
}
]
}
try:
logger.info(f"Putting basic inline policy '{basic_inline_policy_name}' for role '{role_name}'...")
iam_client.put_role_policy(
RoleName=role_name,
PolicyName=basic_inline_policy_name,
PolicyDocument=json.dumps(basic_inline_policy_doc)
)
logger.info(f"Successfully put inline policy '{basic_inline_policy_name}'.")
# Add Bedrock permissions policy
logger.info(f"Putting Bedrock permissions policy '{bedrock_policy_name}' for role '{role_name}'...")
iam_client.put_role_policy(
RoleName=role_name,
PolicyName=bedrock_policy_name,
PolicyDocument=json.dumps(bedrock_policy_doc)
)
logger.info(f"Successfully put inline policy '{bedrock_policy_name}'.")
logger.info("Waiting 10s for policy changes to propagate...")
time.sleep(10)
except ClientError as e:
logger.error(f"Error putting inline policy: {e}")
# Decide if this is fatal
if not role_arn:
raise Exception(f"Failed to create or retrieve ARN for role {role_name}")
return role_arn
This subsection groups together several helper functions dedicated to managing the deployment lifecycle of the AWS Lambda function that will serve as the tool executor for the Bedrock Agent. These functions handle packaging the Lambda code, managing its dependencies, deploying it to AWS, and cleaning up resources.
This function is designed to safely delete an AWS Lambda function. Before attempting to delete the function itself, it tries to remove any permissions associated with it (specifically, the permission allowing Bedrock to invoke it, using a predictable statement ID). It then checks if the function exists and, if so, proceeds with the deletion. The function includes a brief pause after initiating deletion, as the process is asynchronous. It returns True
if deletion was attempted/occurred and False
if the function didn't exist or if an error occurred during the process.
def delete_lambda_function(lambda_client, function_name):
"""Delete Lambda function if it exists, attempting to remove permissions first."""
logger.info(f"Attempting to delete Lambda function: {function_name}...")
try:
# Use a predictable statement ID added by create_lambda_function
statement_id = f"AllowBedrockInvokeBasic-{function_name}"
try:
logger.info(f"Attempting to remove permission {statement_id} from {function_name}...")
lambda_client.remove_permission(
FunctionName=function_name,
StatementId=statement_id
)
logger.info(f"Successfully removed permission {statement_id} from {function_name}.")
time.sleep(2) # Allow time for permission removal
except lambda_client.exceptions.ResourceNotFoundException:
logger.info(f"Permission {statement_id} not found on {function_name}. Skipping removal.")
except ClientError as perm_e:
# Log error but continue with deletion attempt
logger.warning(f"Error removing permission {statement_id} from {function_name}: {str(perm_e)}")
# Check if function exists before attempting deletion
lambda_client.get_function(FunctionName=function_name)
logger.info(f"Function {function_name} exists. Deleting...")
lambda_client.delete_function(FunctionName=function_name)
# Wait for deletion to complete using a waiter
logger.info(f"Waiting for {function_name} to be deleted...")
time.sleep(10) # Simple delay after delete call
logger.info(f"Function {function_name} deletion initiated.")
return True # Indicates deletion was attempted/occurred
except lambda_client.exceptions.ResourceNotFoundException:
logger.info(f"Lambda function '{function_name}' does not exist. No need to delete.")
return False # Indicates function didn't exist
except Exception as e:
logger.error(f"Error during deletion process for Lambda function '{function_name}': {str(e)}")
# Depending on severity, might want to raise or just return False
return False # Indicates an error occurred beyond not found
This function handles uploading a Lambda deployment package (a .zip file) to Amazon S3. This is necessary when the package size exceeds the direct upload limit for Lambda. Key features include:
lambda-deployment-
) using the AWS account ID and a timestamp, or a fallback UUID if the account ID isn't available. It checks if this bucket exists and creates it if not, ensuring the correct region is specified for bucket creation. It also uses a waiter to ensure the bucket is available before proceeding.boto3.s3.transfer.S3Transfer
for robust multipart uploads. For smaller files, it uses a standard put_object
call.It returns a dictionary containing the S3Bucket
and S3Key
of the uploaded package, which is then used by the create_lambda_function
.
def upload_to_s3(zip_file, region, bucket_name=None):
"""Upload zip file to S3 with retry logic and return S3 location."""
logger.info(f"Preparing to upload {zip_file} to S3 in region {region}...")
# Configure the client with increased timeouts
config = Config(
connect_timeout=60,
read_timeout=300,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
s3_client = boto3.client('s3', region_name=region, config=config)
sts_client = boto3.client('sts', region_name=region, config=config)
# Determine bucket name
if bucket_name is None:
try:
account_id = sts_client.get_caller_identity().get('Account')
timestamp = int(time.time())
bucket_name = f"lambda-deployment-{account_id}-{timestamp}"
logger.info(f"Generated unique S3 bucket name: {bucket_name}")
except Exception as e:
fallback_id = uuid.uuid4().hex[:12]
bucket_name = f"lambda-deployment-{fallback_id}"
logger.warning(f"Error getting account ID ({e}). Using fallback bucket name: {bucket_name}")
# Create bucket if needed
try:
s3_client.head_bucket(Bucket=bucket_name)
logger.info(f"Using existing S3 bucket: {bucket_name}")
except ClientError as e:
error_code = int(e.response['Error']['Code'])
if error_code == 404:
logger.info(f"Creating S3 bucket: {bucket_name}...")
try:
if region == 'us-east-1':
s3_client.create_bucket(Bucket=bucket_name)
else:
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': region}
)
logger.info(f"Created S3 bucket: {bucket_name}. Waiting for availability...")
waiter = s3_client.get_waiter('bucket_exists')
waiter.wait(Bucket=bucket_name, WaiterConfig={'Delay': 5, 'MaxAttempts': 12})
logger.info(f"Bucket {bucket_name} is available.")
except Exception as create_e:
logger.error(f"Error creating bucket '{bucket_name}': {create_e}")
raise
else:
logger.error(f"Error checking bucket '{bucket_name}': {e}")
raise
# Upload file
s3_key = f"lambda/{os.path.basename(zip_file)}-{uuid.uuid4().hex[:8]}"
try:
logger.info(f"Uploading {zip_file} to s3://{bucket_name}/{s3_key}...")
file_size = os.path.getsize(zip_file)
if file_size > 100 * 1024 * 1024: # Use multipart for files > 100MB
logger.info("Using multipart upload for large file...")
transfer_config = boto3.s3.transfer.TransferConfig(
multipart_threshold=10 * 1024 * 1024, max_concurrency=10,
multipart_chunksize=10 * 1024 * 1024, use_threads=True
)
s3_transfer = boto3.s3.transfer.S3Transfer(client=s3_client, config=transfer_config)
s3_transfer.upload_file(zip_file, bucket_name, s3_key)
else:
with open(zip_file, 'rb') as f:
s3_client.put_object(Bucket=bucket_name, Key=s3_key, Body=f)
logger.info(f"Successfully uploaded to s3://{bucket_name}/{s3_key}")
return {'S3Bucket': bucket_name, 'S3Key': s3_key}
except Exception as upload_e:
logger.error(f"S3 upload failed: {upload_e}")
raise
This function automates the process of packaging the Lambda function code and its dependencies into a .zip file, ready for deployment. It relies on a Makefile
located in the source_dir
(which is lambda_functions
in this notebook). The steps are:
Makefile
, and the final output .zip file.bedrock_agent_search_and_format.py
) to lambda_function.py
within the source_dir
because the Makefile
is likely configured to look for a generic lambda_function.py
.make clean package
command using subprocess.check_call
. The make
command is executed with the source_dir
as its current working directory. The Makefile is responsible for creating a virtual environment, installing dependencies from requirements.txt
into a temporary package_dir
, and then zipping the contents of this directory along with lambda_function.py
into lambda_package.zip
within the source_dir
.make
command successfully completes, it moves and renames the generated lambda_package.zip
from the source_dir
to the specified build_dir
(the notebook's current directory in this case) with a name like function_name.zip
.finally
block, it cleans up the temporary lambda_function.py
copied earlier and any intermediate lambda_package.zip
left in the source_dir
(e.g., if the rename/move failed).The function returns the path to the final .zip file.
def package_function(function_name, source_dir, build_dir):
"""Package Lambda function using Makefile found in source_dir."""
# source_dir is where the .py, requirements.txt, Makefile live (e.g., lambda_functions)
# build_dir is where packaging happens and final zip ends up (e.g., lambda-experiments)
makefile_path = os.path.join(source_dir, 'Makefile')
# Temp build dir inside source_dir, as Makefile expects relative paths
temp_package_dir = os.path.join(source_dir, 'package_dir')
# Requirements file is in source_dir
source_req_path = os.path.join(source_dir, 'requirements.txt')
# Target requirements path inside source_dir (needed for Makefile)
# target_req_path = os.path.join(source_dir, 'requirements.txt') # No copy needed if running make in source_dir
source_func_script_path = os.path.join(source_dir, f'{function_name}.py')
# Target function script path inside source_dir, renamed for Makefile install_deps copy
target_func_script_path = os.path.join(source_dir, 'lambda_function.py')
# Make output zip is created inside source_dir
make_output_zip = os.path.join(source_dir, 'lambda_package.zip')
# Final zip path is in the build_dir (one level up from source_dir)
final_zip_path = os.path.join(build_dir, f'{function_name}.zip')
logger.info(f"--- Packaging function {function_name} --- ")
logger.info(f"Source Dir (Makefile location & make cwd): {source_dir}")
logger.info(f"Build Dir (Final zip location): {build_dir}")
if not os.path.exists(source_func_script_path):
raise FileNotFoundError(f"Source function script not found: {source_func_script_path}")
if not os.path.exists(source_req_path):
raise FileNotFoundError(f"Source requirements file not found: {source_req_path}")
if not os.path.exists(makefile_path):
raise FileNotFoundError(f"Makefile not found at: {makefile_path}")
# Ensure no leftover target script from previous failed run
if os.path.exists(target_func_script_path):
logger.warning(f"Removing existing target script: {target_func_script_path}")
os.remove(target_func_script_path)
try:
# 1. No need to create lambda subdir in build_dir
# 2. Copy source function script to source_dir as lambda_function.py
logger.info(f"Copying {source_func_script_path} to {target_func_script_path}")
shutil.copy(source_func_script_path, target_func_script_path)
# Requirements file is already in source_dir, no copy needed.
# 3. Run make command (execute from source_dir where Makefile is)
make_command = [
'make',
'-f', makefile_path, # Still specify Makefile path explicitly
'clean', # Clean first
'package',
# 'PYTHON_VERSION=python3.9' # Let Makefile use its default or system default
]
logger.info(f"Running make command: {' '.join(make_command)} (in {source_dir})")
# Run make from source_dir; relative paths in Makefile should now work
subprocess.check_call(make_command, cwd=source_dir, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
logger.info("Make command completed successfully.")
# 4. Check for output zip in source_dir and rename/move to build_dir
if not os.path.exists(make_output_zip):
raise FileNotFoundError(f"Makefile did not produce expected output: {make_output_zip}")
logger.info(f"Moving and renaming {make_output_zip} to {final_zip_path}")
if os.path.exists(final_zip_path):
logger.warning(f"Removing existing final zip: {final_zip_path}")
os.remove(final_zip_path)
# Use shutil.move for cross-filesystem safety if needed, os.rename is fine here
os.rename(make_output_zip, final_zip_path)
logger.info(f"Zip file ready: {final_zip_path}")
return final_zip_path
except subprocess.CalledProcessError as e:
logger.error(f"Error running Makefile for {function_name}: {e}")
stderr_output = "(No stderr captured)"
if e.stderr:
try:
stderr_output = e.stderr.decode()
except Exception:
stderr_output = "(Could not decode stderr)"
logger.error(f"Make stderr: {stderr_output}")
raise
except Exception as e:
logger.error(f"Error packaging function {function_name} using Makefile: {str(e)}")
logger.error(traceback.format_exc())
raise
finally:
# 5. Clean up intermediate files in source_dir
if os.path.exists(target_func_script_path):
logger.info(f"Cleaning up temporary script: {target_func_script_path}")
os.remove(target_func_script_path)
if os.path.exists(make_output_zip): # If rename failed
logger.warning(f"Cleaning up intermediate zip in source dir: {make_output_zip}")
os.remove(make_output_zip)
This is a key function that handles the creation or update of the AWS Lambda function. It incorporates several important aspects for robustness and proper configuration:
upload_to_s3
to upload the package to S3 and uses the S3 location for deployment. Otherwise, it reads the .zip file content directly for deployment.python3.9
), IAM role ARN, handler name, timeout, memory size, and crucial environment variables (Couchbase details, Bedrock model IDs) that the Lambda will need at runtime.ResourceConflictException
(meaning the function already exists), it then attempts to update the function's code and configuration.bedrock.amazonaws.com
) to invoke this Lambda function. It uses a predictable StatementId
and handles potential conflicts if the permission already exists.boto3
waiters (function_active_v2
after creation, function_updated_v2
after update) to pause execution until the Lambda function becomes fully active and ready, preventing issues where subsequent operations might target a Lambda that isn't fully initialized.The function returns the ARN of the successfully created or updated Lambda function.
def create_lambda_function(lambda_client, function_name, handler, role_arn, zip_file, region):
"""Create or update Lambda function with retry logic."""
logger.info(f"Deploying Lambda function {function_name} from {zip_file}...")
# Configure the client with increased timeouts for potentially long creation
config = Config(
connect_timeout=120,
read_timeout=300,
retries={'max_attempts': 5, 'mode': 'adaptive'}
)
lambda_client_local = boto3.client('lambda', region_name=region, config=config)
# Check zip file size
zip_size_mb = 0
try:
zip_size_bytes = os.path.getsize(zip_file)
zip_size_mb = zip_size_bytes / (1024 * 1024)
logger.info(f"Zip file size: {zip_size_mb:.2f} MB")
except OSError as e:
logger.error(f"Could not get size of zip file {zip_file}: {e}")
raise # Cannot proceed without zip file
use_s3 = zip_size_mb > 45 # Use S3 for packages over ~45MB
s3_location = None
zip_content = None
if use_s3:
logger.info(f"Package size ({zip_size_mb:.2f} MB) requires S3 deployment.")
s3_location = upload_to_s3(zip_file, region)
if not s3_location:
raise Exception("Failed to upload Lambda package to S3.")
else:
logger.info("Deploying package directly.")
try:
with open(zip_file, 'rb') as f:
zip_content = f.read()
except OSError as e:
logger.error(f"Could not read zip file {zip_file}: {e}")
raise
# Define common create/update args
common_args = {
'FunctionName': function_name,
'Runtime': 'python3.9',
'Role': role_arn,
'Handler': handler,
'Timeout': 180,
'MemorySize': 1536, # Adjust as needed
# Env vars loaded from main script env or .env
'Environment': {
'Variables': {
'CB_HOST': os.getenv('CB_HOST', 'couchbase://localhost'),
'CB_USERNAME': os.getenv('CB_USERNAME', 'Administrator'),
'CB_PASSWORD': os.getenv('CB_PASSWORD', 'password'),
'CB_BUCKET_NAME': os.getenv('CB_BUCKET_NAME', 'vector-search-exp'),
'SCOPE_NAME': os.getenv('SCOPE_NAME', 'bedrock_exp'),
'COLLECTION_NAME': os.getenv('COLLECTION_NAME', 'docs_exp'),
'INDEX_NAME': os.getenv('INDEX_NAME', 'vector_search_bedrock_exp'),
'EMBEDDING_MODEL_ID': os.getenv('EMBEDDING_MODEL_ID', EMBEDDING_MODEL_ID),
'AGENT_MODEL_ID': os.getenv('AGENT_MODEL_ID', AGENT_MODEL_ID)
}
}
}
if use_s3:
code_arg = {'S3Bucket': s3_location['S3Bucket'], 'S3Key': s3_location['S3Key']}
else:
code_arg = {'ZipFile': zip_content}
max_retries = 3
base_delay = 10
for attempt in range(1, max_retries + 1):
try:
logger.info(f"Creating function '{function_name}' (attempt {attempt}/{max_retries})...")
create_args = common_args.copy()
create_args['Code'] = code_arg
create_args['Publish'] = True # Publish a version
create_response = lambda_client_local.create_function(**create_args)
function_arn = create_response['FunctionArn']
logger.info(f"Successfully created function '{function_name}' with ARN: {function_arn}")
# Add basic invoke permission after creation
time.sleep(5) # Give function time to be fully created before adding policy
statement_id = f"AllowBedrockInvokeBasic-{function_name}"
try:
logger.info(f"Adding basic invoke permission ({statement_id}) to {function_name}...")
lambda_client_local.add_permission(
FunctionName=function_name,
StatementId=statement_id,
Action='lambda:InvokeFunction',
Principal='bedrock.amazonaws.com'
)
logger.info(f"Successfully added basic invoke permission {statement_id}.")
except lambda_client_local.exceptions.ResourceConflictException:
logger.info(f"Permission {statement_id} already exists for {function_name}. Skipping add.")
except ClientError as perm_e:
logger.warning(f"Failed to add basic invoke permission {statement_id} to {function_name}: {perm_e}")
# Wait for function to be Active
logger.info(f"Waiting for function '{function_name}' to become active...")
waiter = lambda_client_local.get_waiter('function_active_v2')
waiter.wait(FunctionName=function_name, WaiterConfig={'Delay': 5, 'MaxAttempts': 24})
logger.info(f"Function '{function_name}' is active.")
return function_arn # Return ARN upon successful creation
except lambda_client_local.exceptions.ResourceConflictException:
logger.warning(f"Function '{function_name}' already exists. Attempting to update code...")
try:
if use_s3:
update_response = lambda_client_local.update_function_code(
FunctionName=function_name,
S3Bucket=s3_location['S3Bucket'],
S3Key=s3_location['S3Key'],
Publish=True
)
else:
update_response = lambda_client_local.update_function_code(
FunctionName=function_name,
ZipFile=zip_content,
Publish=True
)
function_arn = update_response['FunctionArn']
logger.info(f"Successfully updated function code for '{function_name}'. New version ARN: {function_arn}")
# Also update configuration just in case
try:
logger.info(f"Updating configuration for '{function_name}'...")
lambda_client_local.update_function_configuration(**common_args)
logger.info(f"Configuration updated for '{function_name}'.")
except ClientError as conf_e:
logger.warning(f"Could not update configuration for '{function_name}': {conf_e}")
# Re-verify invoke permission after update
time.sleep(5)
statement_id = f"AllowBedrockInvokeBasic-{function_name}"
try:
logger.info(f"Verifying/Adding basic invoke permission ({statement_id}) after update...")
lambda_client_local.add_permission(
FunctionName=function_name,
StatementId=statement_id,
Action='lambda:InvokeFunction',
Principal='bedrock.amazonaws.com'
)
logger.info(f"Successfully added/verified basic invoke permission {statement_id}.")
except lambda_client_local.exceptions.ResourceConflictException:
logger.info(f"Permission {statement_id} already exists for {function_name}. Skipping add.")
except ClientError as perm_e:
logger.warning(f"Failed to add/verify basic invoke permission {statement_id} after update: {perm_e}")
# Wait for function to be Active after update
logger.info(f"Waiting for function '{function_name}' update to complete...")
waiter = lambda_client_local.get_waiter('function_updated_v2')
waiter.wait(FunctionName=function_name, WaiterConfig={'Delay': 5, 'MaxAttempts': 24})
logger.info(f"Function '{function_name}' update complete.")
return function_arn # Return ARN after successful update
except ClientError as update_e:
logger.error(f"Failed to update function '{function_name}': {update_e}")
if attempt < max_retries:
delay = base_delay * (2 ** (attempt - 1))
logger.info(f"Retrying update in {delay} seconds...")
time.sleep(delay)
else:
logger.error("Maximum update retries reached. Deployment failed.")
raise update_e
except ClientError as e:
# Handle throttling or other retryable errors
error_code = e.response.get('Error', {}).get('Code')
if error_code in ['ThrottlingException', 'ProvisionedConcurrencyConfigNotFoundException', 'EC2ThrottledException'] or 'Rate exceeded' in str(e):
logger.warning(f"Retryable error on attempt {attempt}: {e}")
if attempt < max_retries:
delay = base_delay * (2 ** (attempt - 1)) + (uuid.uuid4().int % 5)
logger.info(f"Retrying in {delay} seconds...")
time.sleep(delay)
else:
logger.error("Maximum retries reached after retryable error. Deployment failed.")
raise e
else:
logger.error(f"Error creating/updating Lambda '{function_name}': {e}")
logger.error(traceback.format_exc()) # Log full traceback for unexpected errors
raise e # Re-raise non-retryable or unexpected errors
except Exception as e:
logger.error(f"Unexpected error during Lambda deployment: {e}")
logger.error(traceback.format_exc())
raise e
# If loop completes without returning, something went wrong
raise Exception(f"Failed to deploy Lambda function {function_name} after {max_retries} attempts.")
This subsection provides helper functions to manage the cleanup of AWS Bedrock Agent resources. Creating agents, action groups, and aliases results in persistent configurations in AWS. These functions are essential for maintaining a clean environment, especially during experimentation and development, by allowing for the removal of these resources when they are no longer needed or before recreating them in a subsequent run.
This utility function searches for an existing Bedrock Agent by its name. Since the AWS SDK's get_agent
requires an agentId
, and you often work with human-readable names, this function bridges that gap. It uses the list_agents
operation (with a paginator to handle potentially many agents in an account) and iterates through the summaries, comparing the agentName
field. If a match is found, it returns the corresponding agentId
. If no agent with the given name is found or an error occurs during listing, it returns None
.
def get_agent_by_name(agent_client, agent_name):
"""Find an agent ID by its name using list_agents."""
logger.info(f"Attempting to find agent by name: {agent_name}")
try:
paginator = agent_client.get_paginator('list_agents')
for page in paginator.paginate():
for agent_summary in page.get('agentSummaries', []):
if agent_summary.get('agentName') == agent_name:
agent_id = agent_summary.get('agentId')
logger.info(f"Found agent '{agent_name}' with ID: {agent_id}")
return agent_id
logger.info(f"Agent '{agent_name}' not found.")
return None
except ClientError as e:
logger.error(f"Error listing agents to find '{agent_name}': {e}")
return None # Treat as not found if error occurs
This function handles the deletion of a specific action group associated with a Bedrock Agent. Action groups are always tied to the DRAFT
version of an agent. It calls delete_agent_action_group
, providing the agentId
, agentVersion='DRAFT'
, and the actionGroupId
. It uses skipResourceInUseCheck=True
to force deletion, which can be useful if the agent is in a state (like PREPARING
) that might otherwise prevent immediate deletion. The function includes error handling for cases where the action group is not found or if a conflict occurs (e.g., agent is busy), attempting a retry after a delay in case of a conflict. It returns True
if deletion was successful or the group was not found, and False
if an unrecoverable error occurred.
def delete_action_group(agent_client, agent_id, action_group_id):
"""Deletes a specific action group for an agent."""
logger.info(f"Attempting to delete action group {action_group_id} for agent {agent_id}...")
try:
agent_client.delete_agent_action_group(
agentId=agent_id,
agentVersion='DRAFT', # Action groups are tied to the DRAFT version
actionGroupId=action_group_id,
skipResourceInUseCheck=True # Force deletion even if in use (e.g., during prepare)
)
logger.info(f"Successfully deleted action group {action_group_id} for agent {agent_id}.")
time.sleep(5) # Short pause after deletion
return True
except agent_client.exceptions.ResourceNotFoundException:
logger.info(f"Action group {action_group_id} not found for agent {agent_id}. Skipping deletion.")
return False
except ClientError as e:
# Handle potential throttling or conflict if prepare is happening
error_code = e.response.get('Error', {}).get('Code')
if error_code == 'ConflictException':
logger.warning(f"Conflict deleting action group {action_group_id} (agent might be preparing/busy). Retrying once after delay...")
time.sleep(15)
try:
agent_client.delete_agent_action_group(
agentId=agent_id, agentVersion='DRAFT', actionGroupId=action_group_id, skipResourceInUseCheck=True
)
logger.info(f"Successfully deleted action group {action_group_id} after retry.")
return True
except Exception as retry_e:
logger.error(f"Error deleting action group {action_group_id} on retry: {retry_e}")
return False
else:
logger.error(f"Error deleting action group {action_group_id} for agent {agent_id}: {e}")
return False
This function orchestrates the complete cleanup of a Bedrock Agent and its associated components. Its process is:
get_agent_by_name
to retrieve the agentId
for the specified agent_name
. If the agent isn't found, it exits gracefully.DRAFT
version of the agent. For each action group found, it calls delete_action_group
to remove it.delete_agent
with skipResourceInUseCheck=True
to force the deletion.get_agent
and checking for a ResourceNotFoundException
. This ensures that subsequent operations (like recreating an agent with the same name) are less likely to encounter conflicts.def delete_agent_and_resources(agent_client, agent_name):
"""Deletes the agent and its associated action groups."""
agent_id = get_agent_by_name(agent_client, agent_name)
if not agent_id:
logger.info(f"Agent '{agent_name}' not found, no deletion needed.")
return
logger.warning(f"--- Deleting Agent Resources for '{agent_name}' (ID: {agent_id}) ---")
# 1. Delete Action Groups
try:
logger.info(f"Listing action groups for agent {agent_id}...")
action_groups = agent_client.list_agent_action_groups(
agentId=agent_id,
agentVersion='DRAFT' # List groups for the DRAFT version
).get('actionGroupSummaries', [])
if action_groups:
logger.info(f"Found {len(action_groups)} action groups to delete.")
for ag in action_groups:
delete_action_group(agent_client, agent_id, ag['actionGroupId'])
else:
logger.info("No action groups found to delete.")
except ClientError as e:
logger.error(f"Error listing action groups for agent {agent_id}: {e}")
# Continue to agent deletion attempt even if listing fails
# 2. Delete the Agent
try:
logger.info(f"Attempting to delete agent {agent_id} ('{agent_name}')...")
agent_client.delete_agent(agentId=agent_id, skipResourceInUseCheck=True) # Force delete
# Wait for agent deletion (custom waiter logic might be needed if no standard waiter)
logger.info(f"Waiting up to 2 minutes for agent {agent_id} deletion...")
deleted = False
for _ in range(24): # Check every 5 seconds for 2 minutes
try:
agent_client.get_agent(agentId=agent_id)
time.sleep(5)
except agent_client.exceptions.ResourceNotFoundException:
logger.info(f"Agent {agent_id} successfully deleted.")
deleted = True
break
except ClientError as e:
# Handle potential throttling during check
error_code = e.response.get('Error', {}).get('Code')
if error_code == 'ThrottlingException':
logger.warning("Throttled while checking agent deletion status, continuing wait...")
time.sleep(10)
else:
logger.error(f"Error checking agent deletion status: {e}")
# Break checking loop on unexpected error
break
if not deleted:
logger.warning(f"Agent {agent_id} deletion confirmation timed out.")
except agent_client.exceptions.ResourceNotFoundException:
logger.info(f"Agent {agent_id} ('{agent_name}') already deleted or not found.")
except ClientError as e:
logger.error(f"Error deleting agent {agent_id}: {e}")
logger.info(f"--- Agent Resource Deletion Complete for '{agent_name}' ---")
This subsection contains functions dedicated to the setup and configuration of the Bedrock Agent itself, including its core definition, action groups that link it to tools (Lambda functions), and the preparation process that makes it ready for invocation.
This function creates a new Bedrock Agent. It takes the desired agent_name
, the agent_role_arn
(obtained from create_agent_role
), and the foundation_model_id
(e.g., for Claude Sonnet) as input. Key configurations include:
idleSessionTTLInSeconds
: Sets a timeout for how long an agent session can remain idle.After calling create_agent
, the function logs the initial response details (ID, ARN, status). It then enters a polling loop to wait until the agent's status moves out of the CREATING
state, typically to NOT_PREPARED
. If the agent creation fails and enters a FAILED
state, it raises an exception. It returns the agent_id
and agent_arn
upon successful initiation of creation.
def create_agent(agent_client, agent_name, agent_role_arn, foundation_model_id):
"""Creates a new Bedrock Agent."""
logger.info(f"--- Creating Agent: {agent_name} ---")
try:
# Updated Instruction for single tool
instruction = (
"You are a helpful research assistant. Your primary function is to use the SearchAndFormat tool "
"to find relevant documents based on user queries and format them. "
"Use the user's query for the search, and specify a formatting style if requested, otherwise use the default. "
"Present the formatted results returned by the tool directly to the user."
"Only use the tool provided. Do not add your own knowledge."
)
response = agent_client.create_agent(
agentName=agent_name,
agentResourceRoleArn=agent_role_arn,
foundationModel=foundation_model_id,
instruction=instruction,
idleSessionTTLInSeconds=1800, # 30 minutes
description=f"Experimental agent for Couchbase search and content formatting ({foundation_model_id})"
# promptOverrideConfiguration={} # Optional: Add later if needed
)
agent_info = response.get('agent')
agent_id = agent_info.get('agentId')
agent_arn = agent_info.get('agentArn')
agent_status = agent_info.get('agentStatus')
logger.info(f"Agent creation initiated. Name: {agent_name}, ID: {agent_id}, ARN: {agent_arn}, Status: {agent_status}")
# Wait for agent to become NOT_PREPARED (initial state after creation)
# Using custom waiter logic as there might not be a standard one for this transition
logger.info(f"Waiting for agent {agent_id} to reach initial state...")
for _ in range(12): # Check for up to 1 minute
current_status = agent_client.get_agent(agentId=agent_id)['agent']['agentStatus']
logger.info(f"Agent {agent_id} status: {current_status}")
if current_status != 'CREATING': # Expect NOT_PREPARED or FAILED
break
time.sleep(5)
final_status = agent_client.get_agent(agentId=agent_id)['agent']['agentStatus']
if final_status == 'FAILED':
logger.error(f"Agent {agent_id} creation failed.")
# Optionally retrieve failure reasons if API provides them
raise Exception(f"Agent creation failed for {agent_name}")
else:
logger.info(f"Agent {agent_id} successfully created (Status: {final_status}).")
return agent_id, agent_arn
except ClientError as e:
logger.error(f"Error creating agent '{agent_name}': {e}")
raise
This function creates or updates an action group for the specified agent. Action groups define the tools an agent can use. In this Lambda-based approach, the action group links the agent to the Lambda function that implements the tool. Key steps include:
function_schema_details
dictionary. This schema describes the tool (searchAndFormatDocuments
) that the Lambda function provides, including its name, description, and expected input parameters (query
, k
, style
) with their types and whether they are required. This schema is what the agent uses to understand how to invoke the tool.action_group_name
already exists for the DRAFT
version of the agent.
update_agent_action_group
, ensuring the actionGroupExecutor
points to the correct Lambda ARN and that it uses the functionSchema
(for defining the tool via its signature) rather than an OpenAPI schema.create_agent_action_group
.actionGroupExecutor
: This is set to {'lambda': function_arn}
, where function_arn
is the ARN of the deployed Lambda function. This tells Bedrock to invoke this Lambda when the agent decides to use a tool from this action group.functionSchema
Parameter: The functionSchema
(containing the function_schema_details
) is provided to the create_agent_action_group
or update_agent_action_group
call. This method of defining tools is simpler for single functions compared to providing a full OpenAPI schema, which is also an option for more complex APIs.ENABLED
.A brief pause is added after creation/update to allow changes to propagate. The function returns the actionGroupId
.
def create_action_group(agent_client, agent_id, action_group_name, function_arn, schema_path=None):
"""Creates an action group for the agent using Define with function details."""
logger.info(f"--- Creating/Updating Action Group (Function Details): {action_group_name} for Agent: {agent_id} ---")
logger.info(f"Lambda ARN: {function_arn}")
# Define function schema details (for functionSchema parameter)
function_schema_details = {
'functions': [
{
'name': 'searchAndFormatDocuments', # Function name agent will call
'description': 'Performs vector search based on query, retrieves documents, and formats results using specified style.',
'parameters': {
'query': {
'description': 'The search query text.',
'type': 'string',
'required': True
},
'k': {
'description': 'The maximum number of documents to retrieve.',
'type': 'integer',
'required': False # Making optional as Lambda has default
},
'style': {
'description': 'The desired formatting style for the results (e.g., \'bullet points\', \'paragraph\', \'summary\').',
'type': 'string',
'required': False # Making optional as Lambda has default
}
}
}
]
}
try:
# Check if Action Group already exists for the DRAFT version
try:
logger.info(f"Checking if action group '{action_group_name}' already exists for agent {agent_id} DRAFT version...")
paginator = agent_client.get_paginator('list_agent_action_groups')
existing_group = None
for page in paginator.paginate(agentId=agent_id, agentVersion='DRAFT'):
for ag_summary in page.get('actionGroupSummaries', []):
if ag_summary.get('actionGroupName') == action_group_name:
existing_group = ag_summary
break
if existing_group:
break
if existing_group:
ag_id = existing_group['actionGroupId']
logger.warning(f"Action Group '{action_group_name}' (ID: {ag_id}) already exists for agent {agent_id} DRAFT. Attempting update to Function Details.")
# Update existing action group - REMOVE apiSchema, ADD functionSchema
response = agent_client.update_agent_action_group(
agentId=agent_id,
agentVersion='DRAFT',
actionGroupId=ag_id,
actionGroupName=action_group_name,
actionGroupExecutor={'lambda': function_arn},
functionSchema={ # Use functionSchema
'functions': function_schema_details['functions'] # Pass the list with the correct key
},
actionGroupState='ENABLED'
)
ag_info = response.get('agentActionGroup')
logger.info(f"Successfully updated Action Group '{action_group_name}' (ID: {ag_info.get('actionGroupId')}) to use Function Details.")
return ag_info.get('actionGroupId')
else:
logger.info(f"Action group '{action_group_name}' does not exist. Creating new with Function Details.")
except ClientError as e:
logger.error(f"Error checking for existing action group '{action_group_name}': {e}. Proceeding with creation attempt.")
# Create new action group if not found or update failed implicitly
response = agent_client.create_agent_action_group(
agentId=agent_id,
agentVersion='DRAFT',
actionGroupName=action_group_name,
actionGroupExecutor={
'lambda': function_arn
},
functionSchema={ # Use functionSchema
'functions': function_schema_details['functions'] # Pass the list with the correct key
},
actionGroupState='ENABLED'
)
ag_info = response.get('agentActionGroup')
ag_id = ag_info.get('actionGroupId')
logger.info(f"Successfully created Action Group '{action_group_name}' with ID: {ag_id} using Function Details.")
time.sleep(5) # Pause after creation/update
return ag_id
except ClientError as e:
logger.error(f"Error creating/updating action group '{action_group_name}' using Function Details: {e}")
raise
This function initiates the preparation of the DRAFT
version of the Bedrock Agent and waits for this process to complete. Preparation involves Bedrock compiling the agent's configuration (instructions, action groups, model settings) and making it ready for invocation.
bedrock_agent_client.prepare_agent(agentId=agent_id)
.boto3
waiter (AgentPrepared
) to poll the agent's status. The waiter configuration specifies:
delay
: How often to check (e.g., every 30 seconds).operation
: The SDK call to make for checking (GetAgent
).maxAttempts
: How many times to check before timing out (e.g., 20 attempts, for a total of up to 10 minutes).acceptors
: Conditions that determine success, failure, or retry. It succeeds if agent.agentStatus
becomes PREPARED
, fails if it becomes FAILED
, and retries if it's UPDATING
(though PREPARING
is the more typical intermediate state here).If the waiter times out or the agent preparation results in a FAILED
status, an exception is raised. This step is crucial because an agent cannot be invoked (or an alias reliably pointed to its version) until it is successfully prepared.
def prepare_agent(agent_client, agent_id):
"""Prepares the DRAFT version of the agent."""
logger.info(f"--- Preparing Agent: {agent_id} ---")
try:
response = agent_client.prepare_agent(agentId=agent_id)
agent_version = response.get('agentVersion') # Should be DRAFT
prepared_at = response.get('preparedAt')
status = response.get('agentStatus') # Should be PREPARING
logger.info(f"Agent preparation initiated for version '{agent_version}'. Status: {status}. Prepared At: {prepared_at}")
# Wait for preparation to complete (PREPARED or FAILED)
logger.info(f"Waiting for agent {agent_id} preparation to complete (up to 10 minutes)...")
# Define a simple waiter config
waiter_config = {
'version': 2,
'waiters': {
'AgentPrepared': {
'delay': 30, # Check every 30 seconds
'operation': 'GetAgent',
'maxAttempts': 20, # Max 10 minutes
'acceptors': [
{
'matcher': 'path',
'expected': 'PREPARED',
'argument': 'agent.agentStatus',
'state': 'success'
},
{
'matcher': 'path',
'expected': 'FAILED',
'argument': 'agent.agentStatus',
'state': 'failure'
},
{
'matcher': 'path',
'expected': 'UPDATING', # Can happen during prep? Treat as retryable
'argument': 'agent.agentStatus',
'state': 'retry'
}
]
}
}
}
waiter_model = WaiterModel(waiter_config)
custom_waiter = create_waiter_with_client('AgentPrepared', waiter_model, agent_client)
try: # Outer try for both preparation and alias handling
custom_waiter.wait(agentId=agent_id)
logger.info(f"Agent {agent_id} successfully prepared.")
except Exception as e: # Outer except catches prepare_agent wait errors OR unhandled alias errors
logger.error(f"Agent {agent_id} preparation failed or timed out (or alias error): {e}")
# Check final status if possible
try:
final_status = agent_client.get_agent(agentId=agent_id)['agent']['agentStatus']
logger.error(f"Final agent status: {final_status}")
except Exception as get_e:
logger.error(f"Could not retrieve final agent status after wait failure: {get_e}")
raise Exception(f"Agent preparation or alias setup failed for {agent_id}")
except Exception as e:
logger.error(f"Error preparing agent {agent_id}: {e}")
# Handle error, maybe exit
raise e # Re-raise the exception
This subsection provides the function used to interact with the prepared and aliased Bedrock Agent, sending it a prompt and processing its response.
This function is responsible for invoking the configured Bedrock Agent and handling its response. Key operations include:
bedrock_agent_runtime_client.invoke_agent
with the agentId
, agentAliasId
, a unique sessionId
(generated for each invocation in this script), the user's prompt
(inputText), and enableTrace=True
to get detailed trace information for debugging.response.get('completion', [])
).
chunk
events: These contain parts of the agent's textual response. The function decodes these byte chunks (UTF-8) and concatenates them to form the completion_text
.trace
events: If enableTrace
was true, these events provide detailed insight into the agent's internal operations, such as which foundation model was called, the input to the model, any tool invocations (though in the Lambda approach, the tool invocation itself is handled by Bedrock calling Lambda, the trace might show the agent deciding to call it and the result from it), and rationale. The function collects these trace parts.completion_text
and a summary of the trace events, which can be very helpful for understanding the agent's decision-making process and debugging any issues with tool invocation or response generation.It returns the final textual response from the agent.
def test_agent_invocation(agent_runtime_client, agent_id, agent_alias_id, session_id, prompt):
"""Invokes the agent and prints the response."""
logger.info(f"--- Testing Agent Invocation (Agent ID: {agent_id}, Alias: {agent_alias_id}) ---")
logger.info(f"Session ID: {session_id}")
logger.info(f"Prompt: \"{prompt}\"")
try:
response = agent_runtime_client.invoke_agent(
agentId=agent_id,
agentAliasId=agent_alias_id,
sessionId=session_id,
inputText=prompt,
enableTrace=True # Enable trace for debugging
)
logger.info("Agent invocation successful. Processing response...")
completion_text = ""
trace_events = []
# The response is a stream. Iterate through the chunks.
for event in response.get('completion', []):
if 'chunk' in event:
data = event['chunk'].get('bytes', b'')
decoded_chunk = data.decode('utf-8')
completion_text += decoded_chunk
elif 'trace' in event:
trace_part = event['trace'].get('trace')
if trace_part:
trace_events.append(trace_part)
else:
logger.warning(f"Unhandled event type in stream: {event}")
# Log final combined response
logger.info(f"--- Agent Final Response ---{completion_text}")
# Keep trace summary log (optional, can be removed if too verbose)
if trace_events:
logger.info("--- Invocation Trace Summary ---")
for i, trace in enumerate(trace_events):
trace_type = trace.get('type')
step_type = trace.get('orchestration', {}).get('stepType')
model_invocation_input = trace.get('modelInvocationInput')
if model_invocation_input:
fm_input = model_invocation_input.get('text',
json.dumps(model_invocation_input.get('invocationInput',{}).get('toolConfiguration',{})) # Handle tool input
)
log_line = f"Trace {i+1}: Type={trace_type}, Step={step_type}"
rationale = trace.get('rationale', {}).get('text')
if rationale: log_line += f", Rationale=\"{rationale[:100]}...\""
logger.info(log_line) # Log summary line
return completion_text
except ClientError as e:
logger.error(f"Error invoking agent: {e}")
logger.error(traceback.format_exc())
return None
except Exception as e:
logger.error(f"Unexpected error during agent invocation: {e}")
logger.error(traceback.format_exc())
return None
This is the primary section of the notebook where all the previously defined helper functions are called in sequence to set up the complete Bedrock Agent environment with a Lambda-backed tool, and then test its invocation. The flow is designed to be largely idempotent where possible, meaning it can often be re-run, and it will attempt to clean up or reuse existing resources before creating new ones (e.g., IAM roles, Lambda functions, agents). The major steps are outlined below:
This first step in the main execution flow performs essential preliminary tasks:
check_environment_variables()
to ensure all required environment variables (AWS credentials, Couchbase password, etc.) are set. If not, it raises an EnvironmentError
to halt execution, as the subsequent steps depend on these variables.initialize_aws_clients()
to get the necessary boto3
client objects for Bedrock, IAM, Lambda, etc.connect_couchbase()
to establish a connection to the Couchbase cluster.If any of these critical initialization steps fail, an exception is raised to stop the notebook's execution, preventing errors in later stages.
logger.info("--- Starting Bedrock Agent Experiment Script ---")
if not check_environment_variables():
# In a notebook, raising an exception might be better than exit(1)
raise EnvironmentError("Missing required environment variables. Check logs.")
# Initialize all clients, including the agent client
try:
bedrock_runtime_client, iam_client, lambda_client, bedrock_agent_client, bedrock_agent_runtime_client = initialize_aws_clients()
cb_cluster = connect_couchbase()
logger.info("AWS clients and Couchbase connection initialized.")
except Exception as e:
logger.error(f"Initialization failed: {e}")
raise # Re-raise the exception to stop execution
2025-06-09 13:39:41,643 - INFO - --- Starting Bedrock Agent Experiment Script ---
2025-06-09 13:39:41,644 - INFO - All required environment variables are set.
2025-06-09 13:39:41,644 - INFO - Initializing AWS clients in region: us-east-1
2025-06-09 13:39:42,002 - INFO - AWS clients initialized successfully.
2025-06-09 13:39:42,002 - INFO - Connecting to Couchbase cluster at couchbases://cb.hlcup4o4jmjr55yf.cloud.couchbase.com...
2025-06-09 13:39:44,131 - INFO - Successfully connected to Couchbase.
2025-06-09 13:39:44,132 - INFO - AWS clients and Couchbase connection initialized.
This block focuses on preparing the Couchbase environment to serve as the vector store for the agent. It involves:
setup_collection()
: This helper function ensures that the target Couchbase bucket, scope, and collection (defined by CB_BUCKET_NAME
, SCOPE_NAME
, COLLECTION_NAME
) are created if they don't already exist. It also ensures a primary index is present on the collection.setup_search_index()
: This creates or updates the Couchbase Full-Text Search (FTS) index (named by INDEX_NAME
) using the definition from INDEX_JSON_PATH
. This search index is crucial for performing vector similarity searches.clear_collection()
: This function deletes all existing documents from the target collection. This step ensures that each run of the notebook starts with a clean slate, preventing data from previous experiments from interfering with the current one.If any part of this Couchbase setup fails, an exception is logged and re-raised to stop further execution.
try:
# Use the setup functions with the script's config variables
cb_collection = setup_collection(cb_cluster, CB_BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME)
logger.info(f"Couchbase collection '{CB_BUCKET_NAME}.{SCOPE_NAME}.{COLLECTION_NAME}' setup complete.")
# Pass required args to setup_search_index
setup_search_index(cb_cluster, INDEX_NAME, CB_BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME, INDEX_JSON_PATH)
logger.info(f"Couchbase search index '{INDEX_NAME}' setup complete.")
# Clear any existing documents from previous runs
clear_collection(cb_cluster, CB_BUCKET_NAME, SCOPE_NAME, COLLECTION_NAME)
logger.info("Cleared any existing documents from the collection.")
except Exception as e:
logger.error(f"Couchbase setup failed: {e}")
raise
2025-06-09 13:39:44,140 - INFO - Setting up collection: vector-search-testing/shared/bedrock
2025-06-09 13:39:45,245 - INFO - Bucket 'vector-search-testing' exists.
2025-06-09 13:39:46,219 - INFO - Scope 'shared' already exists.
2025-06-09 13:39:47,149 - INFO - Collection 'bedrock' already exists.
2025-06-09 13:39:47,152 - INFO - Ensuring primary index exists on `vector-search-testing`.`shared`.`bedrock`...
2025-06-09 13:39:48,185 - INFO - Primary index present or created successfully.
2025-06-09 13:39:48,186 - INFO - Collection setup complete.
2025-06-09 13:39:48,187 - INFO - Couchbase collection 'vector-search-testing.shared.bedrock' setup complete.
2025-06-09 13:39:48,187 - INFO - Looking for index definition at: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/aws_index.json
2025-06-09 13:39:48,192 - INFO - Loaded index definition from /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/aws_index.json, ensuring name is 'vector_search_bedrock' and source is 'vector-search-testing'.
2025-06-09 13:39:48,193 - INFO - Upserting search index 'vector_search_bedrock'...
2025-06-09 13:39:48,880 - WARNING - Search index 'vector_search_bedrock' likely already existed (caught QueryIndexAlreadyExistsException, check if applicable). Upsert attempted.
2025-06-09 13:39:48,881 - INFO - Couchbase search index 'vector_search_bedrock' setup complete.
2025-06-09 13:39:48,881 - WARNING - Attempting to clear all documents from `vector-search-testing`.`shared`.`bedrock`...
2025-06-09 13:39:49,141 - WARNING - Could not retrieve mutation count after delete: 'list' object has no attribute 'meta_data'
2025-06-09 13:39:49,142 - INFO - Successfully cleared documents from the collection (approx. 0 mutations).
2025-06-09 13:39:49,143 - INFO - Cleared any existing documents from the collection.
With the Couchbase infrastructure in place, this section prepares the LangChain vector store and populates it with data:
BedrockEmbeddings
: Creates an instance of the BedrockEmbeddings
client, specifying the EMBEDDING_MODEL_ID
(e.g., Amazon Titan Text Embeddings V2). This client will be used by the vector store to convert text documents into numerical embeddings for similarity searching.CouchbaseSearchVectorStore
: Creates an instance of CouchbaseSearchVectorStore
. This LangChain component acts as an abstraction layer over the Couchbase collection and search index, providing methods for adding documents and performing similarity searches. It's configured with the Couchbase cluster connection, bucket/scope/collection names, the embeddings client, and the search index name.DOCS_JSON_PATH
file. This file is expected to contain a list of documents, each with text
and metadata
fields.vector_store.add_texts()
method is then called to process these documents: each document's text is converted into an embedding (using the BedrockEmbeddings
client), and both the text and its embedding (along with metadata) are stored in the Couchbase collection. The search index (INDEX_NAME
) is then updated to include these new vectors, making them searchable.Error handling is included to catch issues like file not found or problems during embedding generation or data insertion.
Note:
documents.json
contains the documents that we want to load into our vector store. As an example, we have added a few documents to the file from https://cline.bot/
Let's load the documents from the documents.json file and add them to our vector store:
try:
logger.info(f"Initializing Bedrock Embeddings client with model: {EMBEDDING_MODEL_ID}")
embeddings = BedrockEmbeddings(
client=bedrock_runtime_client,
model_id=EMBEDDING_MODEL_ID
)
logger.info("Successfully created Bedrock embeddings client.")
logger.info(f"Initializing CouchbaseSearchVectorStore with index: {INDEX_NAME}")
vector_store = CouchbaseSearchVectorStore(
cluster=cb_cluster,
bucket_name=CB_BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
embedding=embeddings,
index_name=INDEX_NAME
)
logger.info("Successfully created Couchbase vector store.")
# Load documents from JSON file
logger.info(f"Looking for documents at: {DOCS_JSON_PATH}")
if not os.path.exists(DOCS_JSON_PATH):
logger.error(f"Documents file not found: {DOCS_JSON_PATH}")
raise FileNotFoundError(f"Documents file not found: {DOCS_JSON_PATH}")
with open(DOCS_JSON_PATH, 'r') as f:
data = json.load(f)
documents_to_load = data.get('documents', [])
logger.info(f"Loaded {len(documents_to_load)} documents from {DOCS_JSON_PATH}")
# Add documents to vector store
if documents_to_load:
logger.info(f"Adding {len(documents_to_load)} documents to vector store...")
texts = [doc.get('text', '') for doc in documents_to_load]
metadatas = []
for i, doc in enumerate(documents_to_load):
metadata_raw = doc.get('metadata', {})
if isinstance(metadata_raw, str):
try:
metadata = json.loads(metadata_raw)
if not isinstance(metadata, dict):
logger.warning(f"Metadata for doc {i} parsed from string is not a dict: {metadata}. Using empty dict.")
metadata = {}
except json.JSONDecodeError:
logger.warning(f"Could not parse metadata string for doc {i}: {metadata_raw}. Using empty dict.")
metadata = {}
elif isinstance(metadata_raw, dict):
metadata = metadata_raw
else:
logger.warning(f"Metadata for doc {i} is not a string or dict: {metadata_raw}. Using empty dict.")
metadata = {}
metadatas.append(metadata)
inserted_ids = vector_store.add_texts(texts=texts, metadatas=metadatas)
logger.info(f"Successfully added {len(inserted_ids)} documents to the vector store.")
else:
logger.warning("No documents found in the JSON file to add.")
except FileNotFoundError as e:
logger.error(f"Setup failed: {e}")
raise
except Exception as e:
logger.error(f"Error during vector store setup or data loading: {e}")
logger.error(traceback.format_exc())
raise
logger.info("--- Couchbase Setup and Data Loading Complete ---")
2025-06-09 13:39:49,152 - INFO - Initializing Bedrock Embeddings client with model: amazon.titan-embed-text-v2:0
2025-06-09 13:39:49,153 - INFO - Successfully created Bedrock embeddings client.
2025-06-09 13:39:49,153 - INFO - Initializing CouchbaseSearchVectorStore with index: vector_search_bedrock
2025-06-09 13:39:52,549 - INFO - Successfully created Couchbase vector store.
2025-06-09 13:39:52,549 - INFO - Looking for documents at: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/documents.json
2025-06-09 13:39:52,551 - INFO - Loaded 7 documents from /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/documents.json
2025-06-09 13:39:52,551 - INFO - Adding 7 documents to vector store...
2025-06-09 13:39:56,544 - INFO - Successfully added 7 documents to the vector store.
2025-06-09 13:39:56,545 - INFO - --- Couchbase Setup and Data Loading Complete ---
This step ensures that the necessary IAM (Identity and Access Management) role for the Bedrock Agent and its Lambda function is in place.
agent_role_name
(e.g., bedrock_agent_lambda_exp_role
).create_agent_role()
helper function. This function (described in section 3.7) either creates a new IAM role with this name or updates an existing one.AWSLambdaBasicExecutionRole
for Lambda logging and custom inline policies for Bedrock access and any other required permissions.The ARN of this role (agent_role_arn
) is stored, as it's a required parameter for creating both the Bedrock Agent and the AWS Lambda function that the agent will invoke.
agent_role_name = "bedrock_agent_lambda_exp_role"
try:
# Ensure AWS_ACCOUNT_ID is loaded correctly
if not AWS_ACCOUNT_ID:
logger.info("Attempting to fetch AWS Account ID...")
sts_client = boto3.client('sts', region_name=AWS_REGION)
AWS_ACCOUNT_ID = sts_client.get_caller_identity().get('Account')
if not AWS_ACCOUNT_ID:
raise ValueError("AWS Account ID could not be determined. Please set the AWS_ACCOUNT_ID environment variable.")
logger.info(f"Fetched AWS Account ID: {AWS_ACCOUNT_ID}")
agent_role_arn = create_agent_role(iam_client, agent_role_name, AWS_ACCOUNT_ID)
logger.info(f"Agent IAM Role ARN: {agent_role_arn}")
except Exception as e:
logger.error(f"Failed to create/verify IAM role: {e}")
logger.error(traceback.format_exc())
raise
2025-06-09 13:39:56,553 - INFO - Checking/Creating IAM role: bedrock_agent_lambda_exp_role
2025-06-09 13:39:57,454 - INFO - IAM role 'bedrock_agent_lambda_exp_role' already exists with ARN: arn:aws:iam::598307997273:role/bedrock_agent_lambda_exp_role
2025-06-09 13:39:57,454 - INFO - Updating trust policy for existing role 'bedrock_agent_lambda_exp_role'...
2025-06-09 13:39:57,710 - INFO - Trust policy updated for role 'bedrock_agent_lambda_exp_role'.
2025-06-09 13:39:57,710 - INFO - Attaching basic Lambda execution policy to role 'bedrock_agent_lambda_exp_role'...
2025-06-09 13:39:57,973 - INFO - Attached basic Lambda execution policy.
2025-06-09 13:39:57,974 - INFO - Putting basic inline policy 'LambdaBasicLoggingPermissions' for role 'bedrock_agent_lambda_exp_role'...
2025-06-09 13:39:58,240 - INFO - Successfully put inline policy 'LambdaBasicLoggingPermissions'.
2025-06-09 13:39:58,240 - INFO - Putting Bedrock permissions policy 'BedrockAgentPermissions' for role 'bedrock_agent_lambda_exp_role'...
2025-06-09 13:39:58,607 - INFO - Successfully put inline policy 'BedrockAgentPermissions'.
2025-06-09 13:39:58,608 - INFO - Waiting 10s for policy changes to propagate...
2025-06-09 13:40:08,612 - INFO - Agent IAM Role ARN: arn:aws:iam::598307997273:role/bedrock_agent_lambda_exp_role
This section orchestrates the deployment of the AWS Lambda function that will execute the agent's searchAndFormatDocuments
tool. The process involves several steps managed by the helper functions:
search_format_lambda_name
(e.g., bedrock_agent_search_format_exp
), the lambda_source_dir
(where the Lambda's Python script and Makefile
are located), and lambda_build_dir
(where the final .zip package will be placed).delete_lambda_function
for potentially conflicting older Lambda functions (e.g., separate researcher/writer Lambdas from previous experiments or an old version of the current combined Lambda). This ensures a cleaner environment, especially during iterative development.package_function()
. This helper (described in 3.8.3) uses the Makefile
in lambda_source_dir
to install dependencies, prepare the handler script (bedrock_agent_search_and_format.py
), and create a .zip deployment package (search_format_zip_path
).create_lambda_function()
. This helper (described in 3.8.4) takes the .zip package and either creates a new Lambda function in AWS or updates an existing one. It handles S3 upload for large packages, sets environment variables (like Couchbase connection info and Bedrock model IDs), configures the IAM role, runtime, handler, timeout, and memory. It also adds permissions for Bedrock to invoke the Lambda and waits for the Lambda to become active.The ARN of the deployed Lambda (search_format_lambda_arn
) is stored, as it's needed to link this Lambda to the Bedrock Agent's action group.
search_format_lambda_name = "bedrock_agent_search_format_exp"
# Adjust source/build dirs for notebook context if necessary
lambda_source_dir = os.path.join(SCRIPT_DIR, 'lambda_functions')
lambda_build_dir = SCRIPT_DIR # Final zip ends up in the notebook's directory
logger.info("--- Starting Lambda Deployment (Single Function) --- ")
search_format_lambda_arn = None
search_format_zip_path = None
try:
# Delete old lambdas if they exist (optional, but good cleanup)
logger.info("Deleting potentially conflicting old Lambda functions...")
delete_lambda_function(lambda_client, "bedrock_agent_researcher_exp")
delete_lambda_function(lambda_client, "bedrock_agent_writer_exp")
# Delete the new lambda if it exists from a previous run
delete_lambda_function(lambda_client, search_format_lambda_name)
logger.info("Old Lambda deletion checks complete.")
logger.info(f"Packaging Lambda function '{search_format_lambda_name}'...")
search_format_zip_path = package_function("bedrock_agent_search_and_format", lambda_source_dir, lambda_build_dir)
logger.info(f"Lambda function packaged at: {search_format_zip_path}")
logger.info(f"Creating/Updating Lambda function '{search_format_lambda_name}'...")
search_format_lambda_arn = create_lambda_function(
lambda_client=lambda_client, function_name=search_format_lambda_name,
handler='lambda_function.lambda_handler', role_arn=agent_role_arn,
zip_file=search_format_zip_path, region=AWS_REGION
)
logger.info(f"Search/Format Lambda Deployed: {search_format_lambda_arn}")
except FileNotFoundError as e:
logger.error(f"Lambda packaging failed: Required file not found. {e}")
raise
except Exception as e:
logger.error(f"Lambda deployment failed: {e}")
logger.error(traceback.format_exc())
raise
finally:
logger.info("Cleaning up deployment zip file...")
if search_format_zip_path and os.path.exists(search_format_zip_path):
try:
os.remove(search_format_zip_path)
logger.info(f"Removed zip file: {search_format_zip_path}")
except OSError as e:
logger.warning(f"Could not remove zip file {search_format_zip_path}: {e}")
logger.info("--- Lambda Deployment Complete --- ")
2025-06-09 13:40:08,624 - INFO - --- Starting Lambda Deployment (Single Function) ---
2025-06-09 13:40:08,626 - INFO - Deleting potentially conflicting old Lambda functions...
2025-06-09 13:40:08,626 - INFO - Attempting to delete Lambda function: bedrock_agent_researcher_exp...
2025-06-09 13:40:08,626 - INFO - Attempting to remove permission AllowBedrockInvokeBasic-bedrock_agent_researcher_exp from bedrock_agent_researcher_exp...
2025-06-09 13:40:09,490 - INFO - Permission AllowBedrockInvokeBasic-bedrock_agent_researcher_exp not found on bedrock_agent_researcher_exp. Skipping removal.
2025-06-09 13:40:09,794 - INFO - Lambda function 'bedrock_agent_researcher_exp' does not exist. No need to delete.
2025-06-09 13:40:09,795 - INFO - Attempting to delete Lambda function: bedrock_agent_writer_exp...
2025-06-09 13:40:09,796 - INFO - Attempting to remove permission AllowBedrockInvokeBasic-bedrock_agent_writer_exp from bedrock_agent_writer_exp...
2025-06-09 13:40:10,082 - INFO - Permission AllowBedrockInvokeBasic-bedrock_agent_writer_exp not found on bedrock_agent_writer_exp. Skipping removal.
2025-06-09 13:40:10,387 - INFO - Lambda function 'bedrock_agent_writer_exp' does not exist. No need to delete.
2025-06-09 13:40:10,387 - INFO - Attempting to delete Lambda function: bedrock_agent_search_format_exp...
2025-06-09 13:40:10,387 - INFO - Attempting to remove permission AllowBedrockInvokeBasic-bedrock_agent_search_format_exp from bedrock_agent_search_format_exp...
2025-06-09 13:40:10,686 - INFO - Successfully removed permission AllowBedrockInvokeBasic-bedrock_agent_search_format_exp from bedrock_agent_search_format_exp.
2025-06-09 13:40:13,060 - INFO - Function bedrock_agent_search_format_exp exists. Deleting...
2025-06-09 13:40:13,594 - INFO - Waiting for bedrock_agent_search_format_exp to be deleted...
2025-06-09 13:40:23,596 - INFO - Function bedrock_agent_search_format_exp deletion initiated.
2025-06-09 13:40:23,597 - INFO - Old Lambda deletion checks complete.
2025-06-09 13:40:23,598 - INFO - Packaging Lambda function 'bedrock_agent_search_format_exp'...
2025-06-09 13:40:23,599 - INFO - --- Packaging function bedrock_agent_search_and_format ---
2025-06-09 13:40:23,602 - INFO - Source Dir (Makefile location & make cwd): /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions
2025-06-09 13:40:23,602 - INFO - Build Dir (Final zip location): /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach
2025-06-09 13:40:23,603 - INFO - Copying /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions/bedrock_agent_search_and_format.py to /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions/lambda_function.py
2025-06-09 13:40:23,605 - INFO - Running make command: make -f /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions/Makefile clean package (in /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions)
2025-06-09 13:40:50,341 - INFO - Make command completed successfully.
2025-06-09 13:40:50,343 - INFO - Moving and renaming /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions/lambda_package.zip to /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip
2025-06-09 13:40:50,351 - INFO - Zip file ready: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip
2025-06-09 13:40:50,362 - INFO - Cleaning up temporary script: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/lambda_functions/lambda_function.py
2025-06-09 13:40:50,362 - INFO - Lambda function packaged at: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip
2025-06-09 13:40:50,363 - INFO - Creating/Updating Lambda function 'bedrock_agent_search_format_exp'...
2025-06-09 13:40:50,363 - INFO - Deploying Lambda function bedrock_agent_search_format_exp from /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip...
2025-06-09 13:40:50,371 - INFO - Found credentials in environment variables.
2025-06-09 13:40:50,516 - INFO - Zip file size: 50.91 MB
2025-06-09 13:40:50,516 - INFO - Package size (50.91 MB) requires S3 deployment.
2025-06-09 13:40:50,517 - INFO - Preparing to upload /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip to S3 in region us-east-1...
2025-06-09 13:40:51,552 - INFO - Generated unique S3 bucket name: lambda-deployment-598307997273-1749456651
2025-06-09 13:40:52,277 - INFO - Creating S3 bucket: lambda-deployment-598307997273-1749456651...
2025-06-09 13:40:52,666 - INFO - Created S3 bucket: lambda-deployment-598307997273-1749456651. Waiting for availability...
2025-06-09 13:40:52,929 - INFO - Bucket lambda-deployment-598307997273-1749456651 is available.
2025-06-09 13:40:52,929 - INFO - Uploading /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip to s3://lambda-deployment-598307997273-1749456651/lambda/bedrock_agent_search_and_format.zip-0b1fc73a...
2025-06-09 13:41:03,991 - INFO - Successfully uploaded to s3://lambda-deployment-598307997273-1749456651/lambda/bedrock_agent_search_and_format.zip-0b1fc73a
2025-06-09 13:41:03,993 - INFO - Creating function 'bedrock_agent_search_format_exp' (attempt 1/3)...
2025-06-09 13:41:06,690 - INFO - Successfully created function 'bedrock_agent_search_format_exp' with ARN: arn:aws:lambda:us-east-1:598307997273:function:bedrock_agent_search_format_exp
2025-06-09 13:41:11,693 - INFO - Adding basic invoke permission (AllowBedrockInvokeBasic-bedrock_agent_search_format_exp) to bedrock_agent_search_format_exp...
2025-06-09 13:41:12,007 - INFO - Successfully added basic invoke permission AllowBedrockInvokeBasic-bedrock_agent_search_format_exp.
2025-06-09 13:41:12,007 - INFO - Waiting for function 'bedrock_agent_search_format_exp' to become active...
2025-06-09 13:41:12,306 - INFO - Function 'bedrock_agent_search_format_exp' is active.
2025-06-09 13:41:12,308 - INFO - Search/Format Lambda Deployed: arn:aws:lambda:us-east-1:598307997273:function:bedrock_agent_search_format_exp
2025-06-09 13:41:12,308 - INFO - Cleaning up deployment zip file...
2025-06-09 13:41:12,310 - INFO - Removed zip file: /Users/kaustavghosh/Desktop/vector-search-cookbook/awsbedrock-agents/lambda-approach/bedrock_agent_search_and_format.zip
2025-06-09 13:41:12,311 - INFO - --- Lambda Deployment Complete ---
This part of the script focuses on creating the Bedrock Agent itself.
agent_name
is defined (e.g., couchbase_search_format_agent_exp
).delete_agent_and_resources()
first. This helper function (described in 3.9.3) attempts to find an agent with the same name and, if found, deletes it along with its action groups and aliases. This ensures that each run starts with a clean slate for the agent, preventing conflicts or issues from previous configurations.create_agent()
. This helper function (described in 3.10.1) creates a new Bedrock Agent with the specified name, the IAM role ARN (agent_role_arn
), the foundation model ID (AGENT_MODEL_ID
), and a set of instructions guiding the agent on how to behave and use its tools.The agent_id
and agent_arn
returned by create_agent()
are stored for subsequent steps like creating action groups and preparing the agent.
agent_name = f"couchbase_search_format_agent_exp"
agent_id = None
agent_arn = None
alias_name = "prod" # Define alias name here
# agent_alias_id_to_use will be set later after preparation
# 1. Attempt to find and delete existing agent to ensure a clean state
logger.info(f"Checking for and deleting existing agent: {agent_name}")
try:
delete_agent_and_resources(bedrock_agent_client, agent_name) # Handles finding and deleting
logger.info(f"Deletion process completed for any existing agent named {agent_name}.")
except Exception as e:
# Log error during find/delete but proceed to creation attempt
logger.error(f"Error during agent finding/deletion phase: {e}. Proceeding to creation attempt.")
# 2. Always attempt to create the agent after the delete phase
logger.info(f"--- Creating Agent: {agent_name} ---")
try:
agent_id, agent_arn = create_agent(
agent_client=bedrock_agent_client,
agent_name=agent_name,
agent_role_arn=agent_role_arn,
foundation_model_id=AGENT_MODEL_ID
)
if not agent_id:
raise Exception("create_agent function did not return a valid agent ID.")
logger.info(f"Agent created successfully. ID: {agent_id}, ARN: {agent_arn}")
except Exception as e:
logger.error(f"Failed to create agent '{agent_name}': {e}")
logger.error(traceback.format_exc())
raise
2025-06-09 13:41:12,317 - INFO - Checking for and deleting existing agent: couchbase_search_format_agent_exp
2025-06-09 13:41:12,318 - INFO - Attempting to find agent by name: couchbase_search_format_agent_exp
2025-06-09 13:41:13,172 - INFO - Found agent 'couchbase_search_format_agent_exp' with ID: 8CZXA8LJJH
2025-06-09 13:41:13,172 - WARNING - --- Deleting Agent Resources for 'couchbase_search_format_agent_exp' (ID: 8CZXA8LJJH) ---
2025-06-09 13:41:13,172 - INFO - Listing action groups for agent 8CZXA8LJJH...
2025-06-09 13:41:13,472 - INFO - Found 1 action groups to delete.
2025-06-09 13:41:13,473 - INFO - Attempting to delete action group GKWWTGZVHJ for agent 8CZXA8LJJH...
2025-06-09 13:41:13,794 - INFO - Successfully deleted action group GKWWTGZVHJ for agent 8CZXA8LJJH.
2025-06-09 13:41:18,797 - INFO - Attempting to delete agent 8CZXA8LJJH ('couchbase_search_format_agent_exp')...
2025-06-09 13:41:19,108 - INFO - Waiting up to 2 minutes for agent 8CZXA8LJJH deletion...
2025-06-09 13:41:25,109 - INFO - Agent 8CZXA8LJJH successfully deleted.
2025-06-09 13:41:25,110 - INFO - --- Agent Resource Deletion Complete for 'couchbase_search_format_agent_exp' ---
2025-06-09 13:41:25,111 - INFO - Deletion process completed for any existing agent named couchbase_search_format_agent_exp.
2025-06-09 13:41:25,111 - INFO - --- Creating Agent: couchbase_search_format_agent_exp ---
2025-06-09 13:41:25,112 - INFO - --- Creating Agent: couchbase_search_format_agent_exp ---
2025-06-09 13:41:25,623 - INFO - Agent creation initiated. Name: couchbase_search_format_agent_exp, ID: 7BTR61MXVF, ARN: arn:aws:bedrock:us-east-1:598307997273:agent/7BTR61MXVF, Status: CREATING
2025-06-09 13:41:25,625 - INFO - Waiting for agent 7BTR61MXVF to reach initial state...
2025-06-09 13:41:26,201 - INFO - Agent 7BTR61MXVF status: CREATING
2025-06-09 13:41:31,658 - INFO - Agent 7BTR61MXVF status: NOT_PREPARED
2025-06-09 13:41:32,110 - INFO - Agent 7BTR61MXVF successfully created (Status: NOT_PREPARED).
2025-06-09 13:41:32,111 - INFO - Agent created successfully. ID: 7BTR61MXVF, ARN: arn:aws:bedrock:us-east-1:598307997273:agent/7BTR61MXVF
Once the agent is created and the Lambda function is deployed, this step links them together by creating an Action Group.
action_group_name
(e.g., SearchAndFormatActionGroup
).create_action_group()
helper function (described in 3.10.2). This function is responsible for:
agent_id
and the search_format_lambda_arn
(the ARN of the deployed Lambda function) as input.functionSchema
which tells the agent how to use the Lambda function (i.e., the tool name searchAndFormatDocuments
and its parameters like query
, k
, style
).actionGroupExecutor
to point to the Lambda ARN, so Bedrock knows which Lambda to invoke.DRAFT
version of the agent.time.sleep(30)
) is added after the action group setup. This is a crucial step to give AWS services enough time to propagate the changes and ensure that the agent is aware of the newly configured or updated action group before proceeding to the preparation phase. Without such a delay, the preparation step might fail or not correctly incorporate the action group.# --- Action Group Creation/Update (Now assumes agent_id is valid) ---
action_group_name = "SearchAndFormatActionGroup"
action_group_id = None
try:
if not agent_id:
raise ValueError("Agent ID is not set. Cannot create action group.")
if not search_format_lambda_arn:
raise ValueError("Lambda ARN is not set. Cannot create action group.")
logger.info(f"Creating/Updating Action Group '{action_group_name}' for agent {agent_id}...")
action_group_id = create_action_group(
agent_client=bedrock_agent_client,
agent_id=agent_id,
action_group_name=action_group_name,
function_arn=search_format_lambda_arn,
# schema_path=None # No longer needed explicitly if default is None
)
if not action_group_id:
raise Exception("create_action_group did not return a valid ID.")
logger.info(f"Action Group '{action_group_name}' created/updated with ID: {action_group_id}")
# Add a slightly longer wait after action group modification/creation
logger.info("Waiting 30s after action group setup before preparing agent...")
time.sleep(30)
except Exception as e:
logger.error(f"Failed to set up action group: {e}")
logger.error(traceback.format_exc())
raise
2025-06-09 13:41:32,119 - INFO - Creating/Updating Action Group 'SearchAndFormatActionGroup' for agent 7BTR61MXVF...
2025-06-09 13:41:32,120 - INFO - --- Creating/Updating Action Group (Function Details): SearchAndFormatActionGroup for Agent: 7BTR61MXVF ---
2025-06-09 13:41:32,121 - INFO - Lambda ARN: arn:aws:lambda:us-east-1:598307997273:function:bedrock_agent_search_format_exp
2025-06-09 13:41:32,122 - INFO - Checking if action group 'SearchAndFormatActionGroup' already exists for agent 7BTR61MXVF DRAFT version...
2025-06-09 13:41:32,412 - INFO - Action group 'SearchAndFormatActionGroup' does not exist. Creating new with Function Details.
2025-06-09 13:41:32,806 - INFO - Successfully created Action Group 'SearchAndFormatActionGroup' with ID: 7XTTI9XFOX using Function Details.
2025-06-09 13:41:37,812 - INFO - Action Group 'SearchAndFormatActionGroup' created/updated with ID: 7XTTI9XFOX
2025-06-09 13:41:37,812 - INFO - Waiting 30s after action group setup before preparing agent...
After the agent and its action group (linking to the Lambda tool) are defined, this section makes the agent ready for use and assigns an alias to it:
prepare_agent()
helper function (described in 3.10.3). This function initiates the preparation process for the DRAFT
version of the agent and uses a custom waiter to wait until the agent's status becomes PREPARED
. This step is vital as it compiles all agent configurations.alias_name
(e.g., prod
) is defined.list_agent_aliases
.agent_alias_id_to_use
) is retrieved. The notebook assumes the existing alias will correctly point to the latest prepared version (DRAFT) or could be updated if necessary (though direct update logic for the alias to point to a specific version isn't explicitly shown here beyond creation).create_agent_alias()
is called. This creates a new alias that, by default, points to the latest prepared version of the agent (which is the DRAFT
version that was just prepared).time.sleep(10)
) is added to allow the alias changes to propagate.The agent_alias_id_to_use
is now ready for invoking the agent.
agent_alias_id_to_use = None # Initialize alias ID
alias_name = "prod" # Make sure alias_name is defined
if agent_id:
logger.info(f"--- Preparing Agent: {agent_id} ---")
preparation_successful = False
try:
# prepare_agent now ONLY prepares, doesn't handle alias or return its ID
prepare_agent(bedrock_agent_client, agent_id)
logger.info(f"Agent {agent_id} preparation seems complete (waiter succeeded).")
preparation_successful = True # Flag success
except Exception as e: # Catch errors from preparation
logger.error(f"Error during agent preparation for {agent_id}: {e}")
logger.error(traceback.format_exc())
raise
# --- Alias Handling (runs only if preparation succeeded) ---
if preparation_successful:
logger.info(f"--- Setting up Alias '{alias_name}' for Agent {agent_id} ---") # Add log
try:
# --- Alias Creation/Update Logic (Copied/adapted from main.py's __main__) ---
logger.info(f"Checking for alias '{alias_name}' for agent {agent_id}...")
existing_alias = None
paginator = bedrock_agent_client.get_paginator('list_agent_aliases')
for page in paginator.paginate(agentId=agent_id):
for alias_summary in page.get('agentAliasSummaries', []):
if alias_summary.get('agentAliasName') == alias_name:
existing_alias = alias_summary
break
if existing_alias:
break
if existing_alias:
agent_alias_id_to_use = existing_alias['agentAliasId']
logger.info(f"Using existing alias '{alias_name}' with ID: {agent_alias_id_to_use}.")
# Optional: Update alias to point to DRAFT if needed,
# but create_agent_alias defaults to latest prepared (DRAFT) so just checking existence is often enough.
else:
logger.info(f"Alias '{alias_name}' not found. Creating new alias...")
create_alias_response = bedrock_agent_client.create_agent_alias(
agentId=agent_id,
agentAliasName=alias_name
# routingConfiguration removed - defaults to latest prepared (DRAFT)
)
agent_alias_id_to_use = create_alias_response.get('agentAlias', {}).get('agentAliasId')
logger.info(f"Successfully created alias '{alias_name}' with ID: {agent_alias_id_to_use}. (Defaults to latest prepared version - DRAFT)")
if not agent_alias_id_to_use:
raise ValueError(f"Failed to get a valid alias ID for '{alias_name}'")
logger.info(f"Waiting 10s for alias '{alias_name}' changes to propagate...")
time.sleep(10)
logger.info(f"Agent {agent_id} preparation and alias '{alias_name}' ({agent_alias_id_to_use}) setup complete.")
except Exception as alias_e: # Catch errors from alias logic
logger.error(f"Failed to create/update alias '{alias_name}' for agent {agent_id}: {alias_e}")
logger.error(traceback.format_exc())
raise
else:
logger.error("Agent ID not available, skipping preparation and alias setup.")
2025-06-09 13:42:07,835 - INFO - --- Preparing Agent: 7BTR61MXVF ---
2025-06-09 13:42:07,836 - INFO - --- Preparing Agent: 7BTR61MXVF ---
2025-06-09 13:42:08,338 - INFO - Agent preparation initiated for version 'DRAFT'. Status: PREPARING. Prepared At: 2025-06-09 08:12:08.237735+00:00
2025-06-09 13:42:08,338 - INFO - Waiting for agent 7BTR61MXVF preparation to complete (up to 10 minutes)...
2025-06-09 13:42:39,237 - INFO - Agent 7BTR61MXVF successfully prepared.
2025-06-09 13:42:39,238 - INFO - Agent 7BTR61MXVF preparation seems complete (waiter succeeded).
2025-06-09 13:42:39,238 - INFO - --- Setting up Alias 'prod' for Agent 7BTR61MXVF ---
2025-06-09 13:42:39,239 - INFO - Checking for alias 'prod' for agent 7BTR61MXVF...
2025-06-09 13:42:39,526 - INFO - Alias 'prod' not found. Creating new alias...
2025-06-09 13:42:39,902 - INFO - Successfully created alias 'prod' with ID: Y8YNYUDFFZ. (Defaults to latest prepared version - DRAFT)
2025-06-09 13:42:39,903 - INFO - Waiting 10s for alias 'prod' changes to propagate...
2025-06-09 13:42:49,907 - INFO - Agent 7BTR61MXVF preparation and alias 'prod' (Y8YNYUDFFZ) setup complete.
This is the final operational step where the fully configured Bedrock Agent is tested.
agent_id
and agent_alias_id_to_use
are available (i.e., the previous setup steps were successful).session_id
is generated for this specific interaction.test_prompt
is defined (e.g., "Search for information about Project Chimera and format the results using bullet points."). This prompt is designed to trigger the agent's tool (searchAndFormatDocuments
).test_agent_invocation()
helper function (described in 3.11.1). This function sends the prompt to the Bedrock Agent Runtime using the specified agent ID and alias ID.test_agent_invocation
function handles the streaming response from the agent, concatenates the text chunks, logs trace information for debugging, and prints the agent's final completion.This step demonstrates an end-to-end test of the agent: receiving a prompt, deciding to use its Lambda-backed tool, Bedrock invoking the Lambda, the Lambda executing (performing search and formatting), returning results to the agent, and the agent formulating a final response to the user.
# --- Test Invocation ---
# Agent ID and custom alias ID should be valid here
if agent_id and agent_alias_id_to_use: # Check both are set
session_id = str(uuid.uuid4())
test_prompt = "Search for information about Project Chimera and format the results using bullet points."
logger.info(f"--- Invoking Agent {agent_id} using Alias '{alias_name}' ({agent_alias_id_to_use}) ---") # Updated log
try:
completion = test_agent_invocation(
agent_runtime_client=bedrock_agent_runtime_client,
agent_id=agent_id,
agent_alias_id=agent_alias_id_to_use,
session_id=session_id,
prompt=test_prompt
)
if completion is None:
logger.error("Agent invocation failed.")
except Exception as e:
logger.error(f"Error during test invocation: {e}")
logger.error(traceback.format_exc())
else:
logger.error("Agent ID or Alias ID not available, skipping invocation test.")
2025-06-09 13:42:49,923 - INFO - --- Invoking Agent 7BTR61MXVF using Alias 'prod' (Y8YNYUDFFZ) ---
2025-06-09 13:42:49,924 - INFO - --- Testing Agent Invocation (Agent ID: 7BTR61MXVF, Alias: Y8YNYUDFFZ) ---
2025-06-09 13:42:49,925 - INFO - Session ID: 6529a5a7-0b58-4c7d-8682-20353a8f09c3
2025-06-09 13:42:49,925 - INFO - Prompt: "Search for information about Project Chimera and format the results using bullet points."
2025-06-09 13:42:50,894 - INFO - Agent invocation successful. Processing response...
2025-06-09 13:43:05,971 - INFO - --- Agent Final Response ---• Project Chimera combines quantum entanglement communication with neural networks for secure, real-time data analysis across distributed nodes. Lead developer: Dr. Aris Thorne.
• Chimera operates in two modes:
- 'Quantum Sync' for high-fidelity data transfer
- 'Neural Inference' for localized edge processing based on the synced data.
• A key aspect of Chimera is its "Ephemeral Key Protocol" (EKP), which generates one-time quantum keys for each transmission, ensuring absolute forward secrecy.
2025-06-09 13:43:05,973 - INFO - --- Invocation Trace Summary ---
2025-06-09 13:43:05,975 - INFO - Trace 1: Type=None, Step=None
2025-06-09 13:43:05,975 - INFO - Trace 2: Type=None, Step=None
2025-06-09 13:43:05,976 - INFO - Trace 3: Type=None, Step=None
2025-06-09 13:43:05,976 - INFO - Trace 4: Type=None, Step=None
2025-06-09 13:43:05,977 - INFO - Trace 5: Type=None, Step=None
2025-06-09 13:43:05,977 - INFO - Trace 6: Type=None, Step=None
2025-06-09 13:43:05,978 - INFO - Trace 7: Type=None, Step=None
2025-06-09 13:43:05,978 - INFO - Trace 8: Type=None, Step=None
In this notebook, we've demonstrated the Lambda approach for implementing AWS Bedrock agents with Couchbase Vector Search. This approach allows the agent to invoke AWS Lambda functions to execute operations, providing better scalability and separation of concerns.
Key components of this implementation include:
This approach is particularly useful for production environments where scalability and separation of concerns are important. The Lambda functions can be deployed independently and can access other AWS services, providing more flexibility and power.