This notebook demonstrates the Custom Control approach for implementing AWS Bedrock agents with Couchbase Vector Search. In this approach, the agent returns control to the application for function execution.
We'll implement a multi-agent architecture with specialized agents for different tasks:
This notebook demonstrates the Custom Control approach for AWS Bedrock Agents. For comparison, you might also want to check out the Lambda Approach, which uses AWS Lambda functions to execute agent tools instead of handling them directly in your application code.
The Lambda approach offers better separation of concerns and scalability, but requires more setup. You can find that implementation here: Lambda Approach Notebook
Note: If the link above doesn't work in your Jupyter environment, you can navigate to the file manually in the awsbedrock-agents/lambda-approach/
directory.
The Custom Control approach gives the application invoking the agent the responsibility of executing the agent's defined functions (tools). When the agent decides to use a tool, it sends a returnControl
event back to the calling application, which then executes the function locally and (optionally) returns the result to the agent to continue processing.
Define Agent:
researcher_functions
, writer_functions
in the example).Create Agent in Bedrock:
bedrock_agent_client.create_agent
to create the agent, providing the instructions and foundation model.create_agent
function includes logic to check for existing agents and potentially delete/recreate them if they are in a non-functional state.Create Action Group (Custom Control):
bedrock_agent_client.create_agent_action_group
.actionGroupExecutor
to {"customControl": "RETURN_CONTROL"}
. This tells Bedrock to pause execution and return control to the caller when a function in this group needs to be run.functionSchema
defined earlier.Prepare Agent:
bedrock_agent_client.prepare_agent
to make the agent ready for invocation.wait_for_agent_status
utility function polls until the agent reaches a PREPARED
or Available
state.Create Agent Alias:
bedrock_agent_client.create_agent_alias
for invoking the agent.Invoke Agent & Handle Return Control (Custom Control Flow)
When the application invokes a Bedrock agent and the agent decides to use a tool, the "Custom Control" mechanism takes effect. Instead of Bedrock running the tool, it sends a returnControl
event back to the application. The code then parses this event to identify the requested function and its parameters, executes that function locally using the application's resources (like a vector store), and the result of this local execution becomes the final output for that specific agent interaction. If further steps are needed with another agent, a new, separate agent invocation is made using this output.
invoke_agent
to interact with Bedrock.returnControl
event in the response stream.vector_store
object in the example) without needing separate deployment or complex configuration passing.ReturnControl
) adds complexity to the application's handling of the invoke_agent
response/request cycle.First, let's import the necessary libraries and set up our environment:
import json
import logging
import os
import time
import uuid
from datetime import timedelta
import boto3
from botocore.exceptions import ClientError
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.exceptions import (InternalServerFailureException,
QueryIndexAlreadyExistsException,
ServiceUnavailableException)
from couchbase.management.buckets import CreateBucketSettings
from couchbase.management.search import SearchIndex
from couchbase.options import ClusterOptions
from dotenv import load_dotenv
from langchain_aws import BedrockEmbeddings
from langchain_couchbase.vectorstores import CouchbaseSearchVectorStore
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
Load environment variables from the .env file. Make sure to create a .env file with the necessary credentials before running this notebook.
# Load environment variables
load_dotenv()
# Couchbase Configuration
CB_HOST = os.getenv("CB_HOST", "couchbase://localhost")
CB_USERNAME = os.getenv("CB_USERNAME", "Administrator")
CB_PASSWORD = os.getenv("CB_PASSWORD", "password")
CB_BUCKET_NAME = os.getenv("CB_BUCKET_NAME", "vector-search-testing")
SCOPE_NAME = os.getenv("SCOPE_NAME", "shared")
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "bedrock")
INDEX_NAME = os.getenv("INDEX_NAME", "vector_search_bedrock")
# AWS Configuration
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_ACCOUNT_ID = os.getenv("AWS_ACCOUNT_ID")
# Check if required environment variables are set
required_vars = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
logging.warning(f"Missing required environment variables: {', '.join(missing_vars)}")
logging.warning("Please set these variables in your .env file")
else:
logging.info("All required environment variables are set")
2025-05-08 13:34:15,605 - INFO - All required environment variables are set
Set up the AWS clients for Bedrock and other services:
# Initialize AWS session
session = boto3.Session(
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)
# Initialize AWS clients from session
bedrock_client = session.client('bedrock')
bedrock_agent_client = session.client('bedrock-agent')
bedrock_runtime = session.client('bedrock-runtime')
bedrock_runtime_client = session.client('bedrock-agent-runtime')
iam_client = session.client('iam')
logging.info("AWS clients initialized successfully")
2025-05-08 13:34:15,836 - INFO - AWS clients initialized successfully
Now let's set up the Couchbase connection, collections, and vector store:
# Connect to Couchbase
auth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)
options = ClusterOptions(auth)
cluster = Cluster(CB_HOST, options)
cluster.wait_until_ready(timedelta(seconds=5))
logging.info("Successfully connected to Couchbase")
2025-05-08 13:34:17,966 - INFO - Successfully connected to Couchbase
The following code block ensures that the necessary Couchbase bucket, scope, and collection are available. It will create them if they don't exist, and also clear any existing documents from the collection to start fresh.
Note: Bucket Creation will fail on Capella
# Set up collection
try:
# Check if bucket exists, create if it doesn't
try:
bucket = cluster.bucket(CB_BUCKET_NAME)
logging.info(f"Bucket '{CB_BUCKET_NAME}' exists.")
except Exception as e:
logging.info(f"Bucket '{CB_BUCKET_NAME}' does not exist. Creating it...")
bucket_settings = CreateBucketSettings(
name=CB_BUCKET_NAME,
bucket_type='couchbase',
ram_quota_mb=1024,
flush_enabled=True,
num_replicas=0
)
cluster.buckets().create_bucket(bucket_settings)
bucket = cluster.bucket(CB_BUCKET_NAME)
logging.info(f"Bucket '{CB_BUCKET_NAME}' created successfully.")
bucket_manager = bucket.collections()
# Check if scope exists, create if it doesn't
scopes = bucket_manager.get_all_scopes()
scope_exists = any(scope.name == SCOPE_NAME for scope in scopes)
if not scope_exists and SCOPE_NAME != "_default":
logging.info(f"Scope '{SCOPE_NAME}' does not exist. Creating it...")
bucket_manager.create_scope(SCOPE_NAME)
logging.info(f"Scope '{SCOPE_NAME}' created successfully.")
# Check if collection exists, create if it doesn't
collections = bucket_manager.get_all_scopes()
collection_exists = any(
scope.name == SCOPE_NAME and COLLECTION_NAME in [col.name for col in scope.collections]
for scope in collections
)
if not collection_exists:
logging.info(f"Collection '{COLLECTION_NAME}' does not exist. Creating it...")
bucket_manager.create_collection(SCOPE_NAME, COLLECTION_NAME)
logging.info(f"Collection '{COLLECTION_NAME}' created successfully.")
else:
logging.info(f"Collection '{COLLECTION_NAME}' already exists. Skipping creation.")
# Wait for collection to be ready
collection = bucket.scope(SCOPE_NAME).collection(COLLECTION_NAME)
time.sleep(2) # Give the collection time to be ready for queries
# Ensure primary index exists
try:
cluster.query(f"CREATE PRIMARY INDEX IF NOT EXISTS ON `{CB_BUCKET_NAME}`.`{SCOPE_NAME}`.`{COLLECTION_NAME}`").execute()
logging.info("Primary index present or created successfully.")
except Exception as e:
logging.error(f"Error creating primary index: {str(e)}")
# Clear all documents in the collection
try:
query = f"DELETE FROM `{CB_BUCKET_NAME}`.`{SCOPE_NAME}`.`{COLLECTION_NAME}`"
cluster.query(query).execute()
logging.info("All documents cleared from the collection.")
except Exception as e:
logging.warning(f"Error while clearing documents: {str(e)}. The collection might be empty.")
except Exception as e:
logging.error(f"Error setting up collection: {str(e)}")
raise
2025-05-08 13:34:19,133 - INFO - Bucket 'vector-search-testing' exists.
2025-05-08 13:34:21,149 - INFO - Collection 'bedrock' already exists. Skipping creation.
2025-05-08 13:34:24,304 - INFO - Primary index present or created successfully.
2025-05-08 13:34:24,529 - INFO - All documents cleared from the collection.
This section focuses on setting up the Couchbase Search Index, which is essential for enabling vector search capabilities.
aws_index.json
.aws_index.json
file has hardcoded references for the bucket, scope, and collection names. If you have used different names for your bucket, scope, or collection than the defaults specified in this notebook or your .env
file, you must modify the aws_index.json
file to reflect your custom names before running the next cell.# Set up search indexes
try:
# Construct path relative to the script file
# In a notebook, __file__ is not defined, so use os.getcwd() instead
script_dir = os.getcwd()
index_file_path = os.path.join(script_dir, 'aws_index.json')
# Load index definition from file
with open(index_file_path, 'r') as file:
index_definition = json.load(file)
logging.info(f"Loaded index definition from aws_index.json")
except Exception as e:
logging.error(f"Error loading index definition: {str(e)}")
raise
try:
scope_index_manager = cluster.bucket(CB_BUCKET_NAME).scope(SCOPE_NAME).search_indexes()
# Check if index already exists
existing_indexes = scope_index_manager.get_all_indexes()
index_name = index_definition["name"]
if index_name in [index.name for index in existing_indexes]:
logging.info(f"Index '{index_name}' found")
else:
logging.info(f"Creating new index '{index_name}'...")
# Create SearchIndex object from JSON definition
search_index = SearchIndex.from_json(index_definition)
# Upsert the index (create if not exists, update if exists)
scope_index_manager.upsert_index(search_index)
logging.info(f"Index '{index_name}' successfully created/updated.")
except QueryIndexAlreadyExistsException:
logging.info(f"Index '{index_name}' already exists. Skipping creation/update.")
except ServiceUnavailableException:
logging.error("Search service is not available. Please ensure the Search service is enabled in your Couchbase cluster.")
except InternalServerFailureException as e:
logging.error(f"Internal server error: {str(e)}")
raise
2025-05-08 13:34:24,537 - INFO - Loaded index definition from aws_index.json
2025-05-08 13:34:25,659 - INFO - Index 'vector_search_bedrock' found
2025-05-08 13:34:26,348 - INFO - Index 'vector_search_bedrock' already exists. Skipping creation/update.
# Initialize Bedrock runtime client for embeddings
embeddings = BedrockEmbeddings(
client=bedrock_runtime,
model_id="amazon.titan-embed-text-v2:0"
)
logging.info("Successfully created Bedrock embeddings client")
# Initialize vector store
vector_store = CouchbaseSearchVectorStore(
cluster=cluster,
bucket_name=CB_BUCKET_NAME,
scope_name=SCOPE_NAME,
collection_name=COLLECTION_NAME,
embedding=embeddings,
index_name=INDEX_NAME
)
logging.info("Successfully created vector store")
2025-05-08 13:34:26,353 - INFO - Successfully created Bedrock embeddings client
2025-05-08 13:34:29,660 - INFO - Successfully created vector store
Let's load the documents from the documents.json file and add them to our vector store:
Note:
documents.json
contains the documents that we want to load into our vector store. As an example, we have added a few documents to the file from https://cline.bot/
# Load documents from JSON file
try:
# In a notebook, __file__ is not defined, so use os.getcwd() instead
script_dir = os.getcwd()
documents_file_path = os.path.join(script_dir, 'documents.json')
with open(documents_file_path, 'r') as f:
data = json.load(f)
documents = data.get('documents', [])
logging.info(f"Loaded {len(documents)} documents from documents.json")
except Exception as e:
logging.error(f"Error loading documents: {str(e)}")
raise
# Add documents to vector store
logging.info(f"Adding {len(documents)} documents to vector store...")
for i, doc in enumerate(documents, 1):
text = doc.get('text', '')
metadata = doc.get('metadata', {})
# Ensure metadata is a dictionary before adding
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
logging.warning(f"Warning: Could not parse metadata for document {i}. Using empty metadata.")
metadata = {}
elif not isinstance(metadata, dict):
logging.warning(f"Warning: Metadata for document {i} is not a dict or valid JSON string. Using empty metadata.")
metadata = {}
doc_id = vector_store.add_texts([text], [metadata])[0]
logging.info(f"Added document {i}/{len(documents)} with ID: {doc_id}")
# Add small delay between requests
time.sleep(1)
logging.info(f"\nProcessing complete: {len(documents)}/{len(documents)} documents added successfully")
2025-05-08 13:34:29,670 - INFO - Loaded 7 documents from documents.json
2025-05-08 13:34:29,670 - INFO - Adding 7 documents to vector store...
2025-05-08 13:34:31,637 - INFO - Added document 1/7 with ID: 884e8caae84545aa9e4735538b38f373
2025-05-08 13:34:33,211 - INFO - Added document 2/7 with ID: 61b9d4c9c5ee42a8a51e44ef0b55942a
2025-05-08 13:34:34,784 - INFO - Added document 3/7 with ID: c7cb7541a9004ead83b9b393bc44a9b5
2025-05-08 13:34:36,886 - INFO - Added document 4/7 with ID: c8b07eae2e3a42c1a8114397bc8bfa67
2025-05-08 13:34:38,534 - INFO - Added document 5/7 with ID: a4356e0801564ad1b2f3ccdf05284375
2025-05-08 13:34:40,129 - INFO - Added document 6/7 with ID: 647d0fddba8f4bb38fd66d291a669bb2
2025-05-08 13:34:42,140 - INFO - Added document 7/7 with ID: 3b57038a7a234992927756cb3307738f
2025-05-08 13:34:43,142 - INFO -
Processing complete: 7/7 documents added successfully
Now let's implement the Custom Control approach for Bedrock agents. In this approach, the agent returns control to the application for function execution.
# Function to wait for agent status
def wait_for_agent_status(bedrock_agent_client, agent_id, target_statuses=['Available', 'PREPARED', 'NOT_PREPARED'], max_attempts=30, delay=2):
"""Wait for agent to reach any of the target statuses"""
for attempt in range(max_attempts):
try:
response = bedrock_agent_client.get_agent(agentId=agent_id)
current_status = response['agent']['agentStatus']
if current_status in target_statuses:
logging.info(f"Agent {agent_id} reached status: {current_status}")
return current_status
elif current_status == 'FAILED':
logging.error(f"Agent {agent_id} failed")
return 'FAILED'
logging.info(f"Agent status: {current_status}, waiting... (attempt {attempt + 1}/{max_attempts})")
time.sleep(delay)
except Exception as e:
logging.error(f"Error checking agent status: {str(e)}")
time.sleep(delay)
return current_status
# Function to create a Bedrock agent with Custom Control action groups
def create_agent(bedrock_agent_client, name, instructions, functions, model_id="amazon.nova-pro-v1:0", agent_role_arn=None):
"""Create a Bedrock agent with Custom Control action groups"""
try:
# List existing agents
existing_agents = bedrock_agent_client.list_agents()
existing_agent = next(
(agent for agent in existing_agents['agentSummaries']
if agent['agentName'] == name),
None
)
# Handle existing agent
if existing_agent:
agent_id = existing_agent['agentId']
logging.info(f"Found existing agent '{name}' with ID: {agent_id}")
# Check agent status
response = bedrock_agent_client.get_agent(agentId=agent_id)
status = response['agent']['agentStatus']
if status in ['NOT_PREPARED', 'FAILED']:
logging.info(f"Deleting agent '{name}' with status {status}")
bedrock_agent_client.delete_agent(agentId=agent_id)
time.sleep(10) # Wait after deletion
existing_agent = None
# Create new agent if needed
if not existing_agent:
logging.info(f"Creating new agent '{name}'")
agent_params = {
"agentName": name,
"description": f"{name.title()} agent for document operations",
"instruction": instructions,
"idleSessionTTLInSeconds": 1800,
"foundationModel": model_id
}
if agent_role_arn:
agent_params["agentResourceRoleArn"] = agent_role_arn
agent = bedrock_agent_client.create_agent(**agent_params)
agent_id = agent['agent']['agentId']
logging.info(f"Created new agent '{name}' with ID: {agent_id}")
else:
agent_id = existing_agent['agentId']
# Wait for initial creation if needed
status = wait_for_agent_status(bedrock_agent_client, agent_id, target_statuses=['NOT_PREPARED', 'PREPARED', 'Available'])
if status not in ['NOT_PREPARED', 'PREPARED', 'Available']:
raise Exception(f"Agent failed to reach valid state: {status}")
# Create action group if needed
try:
bedrock_agent_client.create_agent_action_group(
agentId=agent_id,
agentVersion="DRAFT",
actionGroupExecutor={"customControl": "RETURN_CONTROL"}, # This is the key for Custom Control
actionGroupName=f"{name}_actions",
functionSchema={"functions": functions},
description=f"Action group for {name} operations"
)
logging.info(f"Created action group for agent '{name}'")
time.sleep(5)
except bedrock_agent_client.exceptions.ConflictException:
logging.info(f"Action group already exists for agent '{name}'")
# Prepare agent if needed
if status == 'NOT_PREPARED':
try:
logging.info(f"Starting preparation for agent '{name}'")
bedrock_agent_client.prepare_agent(agentId=agent_id)
status = wait_for_agent_status(
bedrock_agent_client,
agent_id,
target_statuses=['PREPARED', 'Available']
)
logging.info(f"Agent '{name}' preparation completed with status: {status}")
except Exception as e:
logging.error(f"Error during preparation: {str(e)}")
# Handle alias creation/retrieval
try:
aliases = bedrock_agent_client.list_agent_aliases(agentId=agent_id)
alias = next((a for a in aliases['agentAliasSummaries'] if a['agentAliasName'] == 'v1'), None)
if not alias:
logging.info(f"Creating new alias for agent '{name}'")
alias = bedrock_agent_client.create_agent_alias(
agentId=agent_id,
agentAliasName="v1"
)
alias_id = alias['agentAlias']['agentAliasId']
else:
alias_id = alias['agentAliasId']
logging.info(f"Using existing alias for agent '{name}'")
logging.info(f"Successfully configured agent '{name}' with ID: {agent_id} and alias: {alias_id}")
return agent_id, alias_id
except Exception as e:
logging.error(f"Error managing alias: {str(e)}")
raise
except Exception as e:
logging.error(f"Error creating/updating agent: {str(e)}")
raise
# Function to invoke a Bedrock agent
def invoke_agent(bedrock_runtime_client, agent_id, alias_id, input_text, session_id=None, vector_store=None):
"""Invoke a Bedrock agent"""
if session_id is None:
session_id = str(uuid.uuid4())
try:
logging.info(f"Invoking agent with input: {input_text}")
response = bedrock_runtime_client.invoke_agent(
agentId=agent_id,
agentAliasId=alias_id,
sessionId=session_id,
inputText=input_text,
enableTrace=True
)
result = ""
for event in response['completion']:
# Process text chunks
if 'chunk' in event:
chunk = event['chunk']['bytes'].decode('utf-8')
result += chunk
# Handle custom control return
if 'returnControl' in event:
return_control = event['returnControl']
invocation_inputs = return_control.get('invocationInputs', [])
if invocation_inputs:
function_input = invocation_inputs[0].get('functionInvocationInput', {})
action_group = function_input.get('actionGroup')
function_name = function_input.get('function')
parameters = function_input.get('parameters', [])
# Convert parameters to a dictionary
param_dict = {}
for param in parameters:
param_dict[param.get('name')] = param.get('value')
logging.info(f"Function call: {action_group}::{function_name}")
# Handle search_documents function
if function_name == 'search_documents':
query = param_dict.get('query')
k = int(param_dict.get('k', 3))
logging.info(f"Searching for: {query}, k={k}")
if vector_store:
# Perform the search
docs = vector_store.similarity_search(query, k=k)
# Format results
search_results = [doc.page_content for doc in docs]
logging.info(f"Found {len(search_results)} results")
# Format the response
result = f"Search results for '{query}':\n\n"
for i, content in enumerate(search_results):
result += f"Result {i+1}: {content}\n\n"
else:
logging.error("Vector store not available")
result = "Error: Vector store not available"
# Handle format_content function
elif function_name == 'format_content':
content = param_dict.get('content')
style = param_dict.get('style', 'user-friendly')
logging.info(f"Formatting content in {style} style")
# Check if content is valid
if content and content != '?':
result = f"Formatted in {style} style: {content}"
else:
result = "No content provided to format."
else:
logging.error(f"Unknown function: {function_name}")
result = f"Error: Unknown function {function_name}"
if not result.strip():
logging.warning("Received empty response from agent")
return result
except Exception as e:
logging.error(f"Error invoking agent: {str(e)}")
raise RuntimeError(f"Failed to invoke agent: {str(e)}")
Now let's define the instructions and functions for our agents:
# Researcher agent instructions and functions
researcher_instructions = """
You are a Research Assistant that helps users find relevant information in documents.
Your capabilities include:
1. Searching through documents using semantic similarity
2. Providing relevant document excerpts
3. Answering questions based on document content
"""
researcher_functions = [
{
"name": "search_documents",
"description": "Search for relevant documents using semantic similarity",
"parameters": {
"query": {
"type": "string",
"description": "The search query",
"required": True
},
"k": {
"type": "integer",
"description": "Number of results to return",
"required": False
}
},
"requireConfirmation": "DISABLED"
}
]
# Writer agent instructions and functions
writer_instructions = """
You are a Content Writer Assistant that helps format and present research findings.
Your capabilities include:
1. Formatting research findings in a user-friendly way
2. Creating clear and engaging summaries
3. Organizing information logically
4. Highlighting key insights
"""
writer_functions = [
{
"name": "format_content",
"description": "Format and present research findings",
"parameters": {
"content": {
"type": "string",
"description": "The research findings to format",
"required": True
},
"style": {
"type": "string",
"description": "The desired presentation style (e.g., summary, detailed, bullet points)",
"required": False
}
},
"requireConfirmation": "DISABLED"
}
]
Now let's run the Custom Control approach with our agents:
# Get or Create IAM Role
agent_role_name = "BedrockExecutionRoleForAgents_CustomControl"
trust_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "bedrock.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
policy_arn_to_attach = "arn:aws:iam::aws:policy/AmazonBedrockFullAccess"
try:
role_response = iam_client.get_role(RoleName=agent_role_name)
agent_role_arn = role_response['Role']['Arn']
logging.info(f"Found existing IAM role '{agent_role_name}' with ARN: {agent_role_arn}")
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchEntity':
logging.info(f"IAM role '{agent_role_name}' not found. Creating...")
try:
role_response = iam_client.create_role(
RoleName=agent_role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy),
Description="IAM role for Bedrock Agents execution"
)
agent_role_arn = role_response['Role']['Arn']
logging.info(f"Created IAM role '{agent_role_name}' with ARN: {agent_role_arn}")
# Wait a bit for the role to be fully available before attaching policy
time.sleep(10)
except ClientError as create_error:
logging.error(f"Error creating IAM role '{agent_role_name}': {create_error}")
agent_role_arn = None
else:
logging.error(f"Error getting IAM role '{agent_role_name}': {e}")
agent_role_arn = None
# Attach the policy if not already attached
if agent_role_arn:
try:
attached_policies = iam_client.list_attached_role_policies(RoleName=agent_role_name)
if not any(p['PolicyArn'] == policy_arn_to_attach for p in attached_policies.get('AttachedPolicies', [])):
logging.info(f"Attaching policy '{policy_arn_to_attach}' to role '{agent_role_name}'...")
iam_client.attach_role_policy(
RoleName=agent_role_name,
PolicyArn=policy_arn_to_attach
)
logging.info(f"Policy '{policy_arn_to_attach}' attached successfully.")
# Wait a bit for the policy attachment to propagate
time.sleep(5)
else:
logging.info(f"Policy '{policy_arn_to_attach}' already attached to role '{agent_role_name}'.")
except ClientError as attach_error:
logging.warning(f"Error attaching policy to role '{agent_role_name}': {attach_error}")
2025-05-08 13:34:44,254 - INFO - Found existing IAM role 'BedrockExecutionRoleForAgents_CustomControl' with ARN: arn:aws:iam::598307997273:role/BedrockExecutionRoleForAgents_CustomControl
2025-05-08 13:34:44,547 - INFO - Policy 'arn:aws:iam::aws:policy/AmazonBedrockFullAccess' already attached to role 'BedrockExecutionRoleForAgents_CustomControl'.
# Create researcher agent
researcher_id = None
researcher_alias = None
if agent_role_arn:
try:
researcher_id, researcher_alias = create_agent(
bedrock_agent_client,
"researcher",
researcher_instructions,
researcher_functions,
agent_role_arn=agent_role_arn
)
logging.info(f"Researcher agent created with ID: {researcher_id} and alias: {researcher_alias}")
except Exception as e:
logging.error(f"Failed to create researcher agent: {str(e)}")
else:
logging.error("No agent role ARN available for researcher agent creation")
2025-05-08 13:34:45,303 - INFO - Found existing agent 'researcher' with ID: FF1OSFJIJF
2025-05-08 13:34:46,399 - INFO - Agent FF1OSFJIJF reached status: PREPARED
2025-05-08 13:34:46,712 - INFO - Action group already exists for agent 'researcher'
2025-05-08 13:34:46,996 - INFO - Using existing alias for agent 'researcher'
2025-05-08 13:34:46,997 - INFO - Successfully configured agent 'researcher' with ID: FF1OSFJIJF and alias: RQVFGLBCZP
2025-05-08 13:34:46,997 - INFO - Researcher agent created with ID: FF1OSFJIJF and alias: RQVFGLBCZP
# Create writer agent
writer_id = None
writer_alias = None
if agent_role_arn:
try:
writer_id, writer_alias = create_agent(
bedrock_agent_client,
"writer",
writer_instructions,
writer_functions,
agent_role_arn=agent_role_arn
)
logging.info(f"Writer agent created with ID: {writer_id} and alias: {writer_alias}")
except Exception as e:
logging.error(f"Failed to create writer agent: {str(e)}")
else:
logging.error("No agent role ARN available for writer agent creation")
if not any([researcher_id, writer_id]):
# Adjust error message based on whether role setup failed
if not agent_role_arn:
raise RuntimeError("Failed to create agents because IAM role setup failed.")
else:
raise RuntimeError("Failed to create any agents despite successful IAM role setup.")
2025-05-08 13:34:47,279 - INFO - Found existing agent 'writer' with ID: JDA8S8SRS1
2025-05-08 13:34:48,178 - INFO - Agent JDA8S8SRS1 reached status: PREPARED
2025-05-08 13:34:48,498 - INFO - Action group already exists for agent 'writer'
2025-05-08 13:34:48,797 - INFO - Using existing alias for agent 'writer'
2025-05-08 13:34:48,797 - INFO - Successfully configured agent 'writer' with ID: JDA8S8SRS1 and alias: 3SFKJGSGNQ
2025-05-08 13:34:48,798 - INFO - Writer agent created with ID: JDA8S8SRS1 and alias: 3SFKJGSGNQ
Let's test our agents by asking the researcher agent to search for information and the writer agent to format the results:
# Test researcher agent
if researcher_id and researcher_alias:
researcher_response = invoke_agent(
bedrock_runtime_client,
researcher_id,
researcher_alias,
"What is unique about the Cline AI assistant? Use the search_documents function to find relevant information.",
vector_store=vector_store
)
print("\nResearcher Response:\n", researcher_response)
else:
logging.error("Researcher agent not available for testing")
2025-05-08 13:34:48,808 - INFO - Invoking agent with input: What is unique about the Cline AI assistant? Use the search_documents function to find relevant information.
2025-05-08 13:34:51,478 - INFO - Function call: researcher_actions::search_documents
2025-05-08 13:34:51,478 - INFO - Searching for: What is unique about the Cline AI assistant?, k=3
2025-05-08 13:34:52,791 - INFO - Found 3 results
Researcher Response:
Search results for 'What is unique about the Cline AI assistant?':
Result 1: The Cline AI assistant, developed by Saoud Rizwan, is a unique system that combines vector search capabilities with Amazon Bedrock agents. Unlike traditional chatbots, it uses a sophisticated multi-agent architecture where specialized agents handle different aspects of document processing and interaction.
Result 2: One of Cline's key features is its ability to create MCP (Model Context Protocol) servers on the fly. This allows users to extend the system's capabilities by adding new tools and resources that connect to external APIs, all while maintaining a secure and non-interactive environment.
Result 3: The browser automation capabilities in Cline are implemented through Puppeteer, allowing the system to interact with web interfaces in a controlled 900x600 pixel window. This enables testing of web applications, verification of changes, and even general web browsing tasks.
# Test writer agent
if writer_id and writer_alias and "researcher_response" in locals():
writer_response = invoke_agent(
bedrock_runtime_client,
writer_id,
writer_alias,
f"Format this research finding using the format_content function: {researcher_response}",
vector_store=vector_store
)
print("\nWriter Response:\n", writer_response)
else:
logging.error("Writer agent not available for testing or no researcher response to format")
2025-05-08 13:34:52,798 - INFO - Invoking agent with input: Format this research finding using the format_content function: Search results for 'What is unique about the Cline AI assistant?':
Result 1: The Cline AI assistant, developed by Saoud Rizwan, is a unique system that combines vector search capabilities with Amazon Bedrock agents. Unlike traditional chatbots, it uses a sophisticated multi-agent architecture where specialized agents handle different aspects of document processing and interaction.
Result 2: One of Cline's key features is its ability to create MCP (Model Context Protocol) servers on the fly. This allows users to extend the system's capabilities by adding new tools and resources that connect to external APIs, all while maintaining a secure and non-interactive environment.
Result 3: The browser automation capabilities in Cline are implemented through Puppeteer, allowing the system to interact with web interfaces in a controlled 900x600 pixel window. This enables testing of web applications, verification of changes, and even general web browsing tasks.
2025-05-08 13:34:55,730 - INFO - Function call: writer_actions::format_content
2025-05-08 13:34:55,730 - INFO - Formatting content in summary style
Writer Response:
Formatted in summary style: The Cline AI assistant, developed by Saoud Rizwan, is a unique system that combines vector search capabilities with Amazon Bedrock agents. Unlike traditional chatbots, it uses a sophisticated multi-agent architecture where specialized agents handle different aspects of document processing and interaction. One of Cline's key features is its ability to create MCP (Model Context Protocol) servers on the fly. This allows users to extend the system's capabilities by adding new tools and resources that connect to external APIs, all while maintaining a secure and non-interactive environment. The browser automation capabilities in Cline are implemented through Puppeteer, allowing the system to interact with web interfaces in a controlled 900x600 pixel window. This enables testing of web applications, verification of changes, and even general web browsing tasks.
In this notebook, we've demonstrated the Custom Control approach for implementing AWS Bedrock agents with Couchbase Vector Search. This approach allows the agent to return control to the application for function execution, providing more flexibility and control over the agent's behavior.
Key components of this implementation include:
This approach is particularly useful when you need more control over the agent's behavior or when you want to integrate the agent with existing systems and data sources.