Back to the Couchbase homepageCouchbase logo
Couchbase Developer

  • Docs

    • Integrations
    • SDKs
    • Mobile SDKs

    • AI Developer
    • Backend
    • Full-stack
    • Mobile
    • Ops / DBA

    • Data Modeling
    • Scalability

  • Tutorials

    • Developer Community
    • Ambassador Program
  • Sign In
  • Try Free

Building Intelligent Agents with AWS Bedrock (Custom Control)

  • Learn how to build intelligent agents using Amazon Bedrock Agents with a custom control approach and Couchbase as the vector store.
  • This tutorial demonstrates how to create specialized agents that can process documents and interact with external APIs using custom control flows.
  • You'll understand how to implement secure multi-agent architectures using Amazon Bedrock's agent capabilities with fine-grained control over agent behavior.

View Source

AWS Bedrock Agents with Couchbase Vector Search - Custom Control Approach

This notebook demonstrates the Custom Control approach for implementing AWS Bedrock agents with Couchbase Vector Search. In this approach, the agent returns control to the application for function execution.

We'll implement a multi-agent architecture with specialized agents for different tasks:

  • Researcher Agent: Searches for relevant documents in the vector store
  • Writer Agent: Formats and presents the research findings

Alternative Approaches

This notebook demonstrates the Custom Control approach for AWS Bedrock Agents. For comparison, you might also want to check out the Lambda Approach, which uses AWS Lambda functions to execute agent tools instead of handling them directly in your application code.

The Lambda approach offers better separation of concerns and scalability, but requires more setup. You can find that implementation here: Lambda Approach Notebook Note: If the link above doesn't work in your Jupyter environment, you can navigate to the file manually in the awsbedrock-agents/lambda-approach/ directory.

Overview

The Custom Control approach gives the application invoking the agent the responsibility of executing the agent's defined functions (tools). When the agent decides to use a tool, it sends a returnControl event back to the calling application, which then executes the function locally and (optionally) returns the result to the agent to continue processing.

Key Steps & Concepts

  1. Define Agent:

    • Define instructions (prompt) for the agent.
    • Define the function schema (tools the agent can use, e.g., researcher_functions, writer_functions in the example).
  2. Create Agent in Bedrock:

    • Use bedrock_agent_client.create_agent to create the agent, providing the instructions and foundation model.
    • The example's create_agent function includes logic to check for existing agents and potentially delete/recreate them if they are in a non-functional state.
  3. Create Action Group (Custom Control):

    • Use bedrock_agent_client.create_agent_action_group.
    • Crucially, set the actionGroupExecutor to {"customControl": "RETURN_CONTROL"}. This tells Bedrock to pause execution and return control to the caller when a function in this group needs to be run.
    • Provide the functionSchema defined earlier.
  4. Prepare Agent:

    • Use bedrock_agent_client.prepare_agent to make the agent ready for invocation.
    • The wait_for_agent_status utility function polls until the agent reaches a PREPARED or Available state.
  5. Create Agent Alias:

    • An alias (e.g., "v1") is created using bedrock_agent_client.create_agent_alias for invoking the agent.
  6. Invoke Agent & Handle Return Control (Custom Control Flow)

    When the application invokes a Bedrock agent and the agent decides to use a tool, the "Custom Control" mechanism takes effect. Instead of Bedrock running the tool, it sends a returnControl event back to the application. The code then parses this event to identify the requested function and its parameters, executes that function locally using the application's resources (like a vector store), and the result of this local execution becomes the final output for that specific agent interaction. If further steps are needed with another agent, a new, separate agent invocation is made using this output.

    • Application calls invoke_agent to interact with Bedrock.
    • Agent signals tool use via a returnControl event in the response stream.
    • Application parses the event, extracting function name and parameters.
    • Application executes the specified function locally, accessing its own resources.
    • The output from this local function execution is the final result for that agent's turn.

Pros

  • Full Control: The application has complete control over the execution environment and logic of the tools.
  • Direct State Access: Tools can directly access application memory, state, and resources (like the vector_store object in the example) without needing separate deployment or complex configuration passing.
  • Simpler Local Development: Can be easier to test and debug locally as the tool execution happens within the same process.
  • Flexibility: Allows integration with any library or service available to the application.

Cons

  • Application Burden: The application code is responsible for implementing and executing the tool logic.
  • Scalability: The scalability of tool execution is tied to the scalability of the application itself.
  • Tighter Coupling: The agent's functionality is more tightly coupled with the application code.
  • Interaction Model: The specific implementation shown requires chaining separate agent invocations rather than letting the agent continue processing within a single turn after a tool is used. Implementing the latter (returning results via ReturnControl) adds complexity to the application's handling of the invoke_agent response/request cycle.

Setup and Configuration

First, let's import the necessary libraries and set up our environment:

import json
import logging
import os
import time
import uuid
from datetime import timedelta

import boto3
from botocore.exceptions import ClientError
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.exceptions import (InternalServerFailureException,
                                  QueryIndexAlreadyExistsException,
                                  ServiceUnavailableException)
from couchbase.management.buckets import CreateBucketSettings
from couchbase.management.search import SearchIndex
from couchbase.options import ClusterOptions
from dotenv import load_dotenv
from langchain_aws import BedrockEmbeddings
from langchain_couchbase.vectorstores import CouchbaseSearchVectorStore

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

Load Environment Variables

Load environment variables from the .env file. Make sure to create a .env file with the necessary credentials before running this notebook.

# Load environment variables
load_dotenv()

# Couchbase Configuration
CB_HOST = os.getenv("CB_HOST", "couchbase://localhost")
CB_USERNAME = os.getenv("CB_USERNAME", "Administrator")
CB_PASSWORD = os.getenv("CB_PASSWORD", "password")
CB_BUCKET_NAME = os.getenv("CB_BUCKET_NAME", "vector-search-testing")
SCOPE_NAME = os.getenv("SCOPE_NAME", "shared")
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "bedrock")
INDEX_NAME = os.getenv("INDEX_NAME", "vector_search_bedrock")

# AWS Configuration
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_ACCOUNT_ID = os.getenv("AWS_ACCOUNT_ID")

# Check if required environment variables are set
required_vars = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
    logging.warning(f"Missing required environment variables: {', '.join(missing_vars)}")
    logging.warning("Please set these variables in your .env file")
else:
    logging.info("All required environment variables are set")
2025-05-08 13:34:15,605 - INFO - All required environment variables are set

Initialize AWS Clients

Set up the AWS clients for Bedrock and other services:

# Initialize AWS session
session = boto3.Session(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name=AWS_REGION
)

# Initialize AWS clients from session
bedrock_client = session.client('bedrock')
bedrock_agent_client = session.client('bedrock-agent')
bedrock_runtime = session.client('bedrock-runtime')
bedrock_runtime_client = session.client('bedrock-agent-runtime')
iam_client = session.client('iam')

logging.info("AWS clients initialized successfully")
2025-05-08 13:34:15,836 - INFO - AWS clients initialized successfully

Set Up Couchbase and Vector Store

Now let's set up the Couchbase connection, collections, and vector store:

# Connect to Couchbase
auth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)
options = ClusterOptions(auth)
cluster = Cluster(CB_HOST, options)
cluster.wait_until_ready(timedelta(seconds=5))
logging.info("Successfully connected to Couchbase")
2025-05-08 13:34:17,966 - INFO - Successfully connected to Couchbase

Create Couchbase Bucket, Scope, and Collection

The following code block ensures that the necessary Couchbase bucket, scope, and collection are available. It will create them if they don't exist, and also clear any existing documents from the collection to start fresh.

Note: Bucket Creation will fail on Capella

# Set up collection
try:
    # Check if bucket exists, create if it doesn't
    try:
        bucket = cluster.bucket(CB_BUCKET_NAME)
        logging.info(f"Bucket '{CB_BUCKET_NAME}' exists.")
    except Exception as e:
        logging.info(f"Bucket '{CB_BUCKET_NAME}' does not exist. Creating it...")
        bucket_settings = CreateBucketSettings(
            name=CB_BUCKET_NAME,
            bucket_type='couchbase',
            ram_quota_mb=1024,
            flush_enabled=True,
            num_replicas=0
        )
        cluster.buckets().create_bucket(bucket_settings)
        bucket = cluster.bucket(CB_BUCKET_NAME)
        logging.info(f"Bucket '{CB_BUCKET_NAME}' created successfully.")

    bucket_manager = bucket.collections()

    # Check if scope exists, create if it doesn't
    scopes = bucket_manager.get_all_scopes()
    scope_exists = any(scope.name == SCOPE_NAME for scope in scopes)

    if not scope_exists and SCOPE_NAME != "_default":
        logging.info(f"Scope '{SCOPE_NAME}' does not exist. Creating it...")
        bucket_manager.create_scope(SCOPE_NAME)
        logging.info(f"Scope '{SCOPE_NAME}' created successfully.")

    # Check if collection exists, create if it doesn't
    collections = bucket_manager.get_all_scopes()
    collection_exists = any(
        scope.name == SCOPE_NAME and COLLECTION_NAME in [col.name for col in scope.collections]
        for scope in collections
    )

    if not collection_exists:
        logging.info(f"Collection '{COLLECTION_NAME}' does not exist. Creating it...")
        bucket_manager.create_collection(SCOPE_NAME, COLLECTION_NAME)
        logging.info(f"Collection '{COLLECTION_NAME}' created successfully.")
    else:
        logging.info(f"Collection '{COLLECTION_NAME}' already exists. Skipping creation.")

    # Wait for collection to be ready
    collection = bucket.scope(SCOPE_NAME).collection(COLLECTION_NAME)
    time.sleep(2)  # Give the collection time to be ready for queries

    # Ensure primary index exists
    try:
        cluster.query(f"CREATE PRIMARY INDEX IF NOT EXISTS ON `{CB_BUCKET_NAME}`.`{SCOPE_NAME}`.`{COLLECTION_NAME}`").execute()
        logging.info("Primary index present or created successfully.")
    except Exception as e:
        logging.error(f"Error creating primary index: {str(e)}")

    # Clear all documents in the collection
    try:
        query = f"DELETE FROM `{CB_BUCKET_NAME}`.`{SCOPE_NAME}`.`{COLLECTION_NAME}`"
        cluster.query(query).execute()
        logging.info("All documents cleared from the collection.")
    except Exception as e:
        logging.warning(f"Error while clearing documents: {str(e)}. The collection might be empty.")

except Exception as e:
    logging.error(f"Error setting up collection: {str(e)}")
    raise
2025-05-08 13:34:19,133 - INFO - Bucket 'vector-search-testing' exists.
2025-05-08 13:34:21,149 - INFO - Collection 'bedrock' already exists. Skipping creation.
2025-05-08 13:34:24,304 - INFO - Primary index present or created successfully.
2025-05-08 13:34:24,529 - INFO - All documents cleared from the collection.

Configure Couchbase Search Index

This section focuses on setting up the Couchbase Search Index, which is essential for enabling vector search capabilities.

  • The code will load an index definition from a local JSON file named aws_index.json.
  • Important Note: The provided aws_index.json file has hardcoded references for the bucket, scope, and collection names. If you have used different names for your bucket, scope, or collection than the defaults specified in this notebook or your .env file, you must modify the aws_index.json file to reflect your custom names before running the next cell.
# Set up search indexes
try:
    # Construct path relative to the script file
    # In a notebook, __file__ is not defined, so use os.getcwd() instead
    script_dir = os.getcwd()
    index_file_path = os.path.join(script_dir, 'aws_index.json')
    # Load index definition from file
    with open(index_file_path, 'r') as file:
        index_definition = json.load(file)
        logging.info(f"Loaded index definition from aws_index.json")
except Exception as e:
    logging.error(f"Error loading index definition: {str(e)}")
    raise

try:
    scope_index_manager = cluster.bucket(CB_BUCKET_NAME).scope(SCOPE_NAME).search_indexes()

    # Check if index already exists
    existing_indexes = scope_index_manager.get_all_indexes()
    index_name = index_definition["name"]

    if index_name in [index.name for index in existing_indexes]:
        logging.info(f"Index '{index_name}' found")
    else:
        logging.info(f"Creating new index '{index_name}'...")

    # Create SearchIndex object from JSON definition
    search_index = SearchIndex.from_json(index_definition)

    # Upsert the index (create if not exists, update if exists)
    scope_index_manager.upsert_index(search_index)
    logging.info(f"Index '{index_name}' successfully created/updated.")

except QueryIndexAlreadyExistsException:
    logging.info(f"Index '{index_name}' already exists. Skipping creation/update.")
except ServiceUnavailableException:
    logging.error("Search service is not available. Please ensure the Search service is enabled in your Couchbase cluster.")
except InternalServerFailureException as e:
    logging.error(f"Internal server error: {str(e)}")
    raise
2025-05-08 13:34:24,537 - INFO - Loaded index definition from aws_index.json
2025-05-08 13:34:25,659 - INFO - Index 'vector_search_bedrock' found
2025-05-08 13:34:26,348 - INFO - Index 'vector_search_bedrock' already exists. Skipping creation/update.
# Initialize Bedrock runtime client for embeddings
embeddings = BedrockEmbeddings(
    client=bedrock_runtime,
    model_id="amazon.titan-embed-text-v2:0"
)
logging.info("Successfully created Bedrock embeddings client")

# Initialize vector store
vector_store = CouchbaseSearchVectorStore(
    cluster=cluster,
    bucket_name=CB_BUCKET_NAME,
    scope_name=SCOPE_NAME,
    collection_name=COLLECTION_NAME,
    embedding=embeddings,
    index_name=INDEX_NAME
)
logging.info("Successfully created vector store")
2025-05-08 13:34:26,353 - INFO - Successfully created Bedrock embeddings client
2025-05-08 13:34:29,660 - INFO - Successfully created vector store

Load Documents into Vector Store

Let's load the documents from the documents.json file and add them to our vector store:

Note: documents.json contains the documents that we want to load into our vector store. As an example, we have added a few documents to the file from https://cline.bot/

# Load documents from JSON file
try:
    # In a notebook, __file__ is not defined, so use os.getcwd() instead
    script_dir = os.getcwd()
    documents_file_path = os.path.join(script_dir, 'documents.json')
    with open(documents_file_path, 'r') as f:
        data = json.load(f)
        documents = data.get('documents', [])
    logging.info(f"Loaded {len(documents)} documents from documents.json")
except Exception as e:
    logging.error(f"Error loading documents: {str(e)}")
    raise

# Add documents to vector store
logging.info(f"Adding {len(documents)} documents to vector store...")
for i, doc in enumerate(documents, 1):
    text = doc.get('text', '')
    metadata = doc.get('metadata', {})

    # Ensure metadata is a dictionary before adding
    if isinstance(metadata, str):
        try:
            metadata = json.loads(metadata)
        except json.JSONDecodeError:
            logging.warning(f"Warning: Could not parse metadata for document {i}. Using empty metadata.")
            metadata = {}
    elif not isinstance(metadata, dict):
        logging.warning(f"Warning: Metadata for document {i} is not a dict or valid JSON string. Using empty metadata.")
        metadata = {}

    doc_id = vector_store.add_texts([text], [metadata])[0]
    logging.info(f"Added document {i}/{len(documents)} with ID: {doc_id}")

    # Add small delay between requests
    time.sleep(1)

logging.info(f"\nProcessing complete: {len(documents)}/{len(documents)} documents added successfully")
2025-05-08 13:34:29,670 - INFO - Loaded 7 documents from documents.json
2025-05-08 13:34:29,670 - INFO - Adding 7 documents to vector store...
2025-05-08 13:34:31,637 - INFO - Added document 1/7 with ID: 884e8caae84545aa9e4735538b38f373
2025-05-08 13:34:33,211 - INFO - Added document 2/7 with ID: 61b9d4c9c5ee42a8a51e44ef0b55942a
2025-05-08 13:34:34,784 - INFO - Added document 3/7 with ID: c7cb7541a9004ead83b9b393bc44a9b5
2025-05-08 13:34:36,886 - INFO - Added document 4/7 with ID: c8b07eae2e3a42c1a8114397bc8bfa67
2025-05-08 13:34:38,534 - INFO - Added document 5/7 with ID: a4356e0801564ad1b2f3ccdf05284375
2025-05-08 13:34:40,129 - INFO - Added document 6/7 with ID: 647d0fddba8f4bb38fd66d291a669bb2
2025-05-08 13:34:42,140 - INFO - Added document 7/7 with ID: 3b57038a7a234992927756cb3307738f
2025-05-08 13:34:43,142 - INFO - 
Processing complete: 7/7 documents added successfully

Custom Control Approach Implementation

Now let's implement the Custom Control approach for Bedrock agents. In this approach, the agent returns control to the application for function execution.

# Function to wait for agent status
def wait_for_agent_status(bedrock_agent_client, agent_id, target_statuses=['Available', 'PREPARED', 'NOT_PREPARED'], max_attempts=30, delay=2):
    """Wait for agent to reach any of the target statuses"""
    for attempt in range(max_attempts):
        try:
            response = bedrock_agent_client.get_agent(agentId=agent_id)
            current_status = response['agent']['agentStatus']

            if current_status in target_statuses:
                logging.info(f"Agent {agent_id} reached status: {current_status}")
                return current_status
            elif current_status == 'FAILED':
                logging.error(f"Agent {agent_id} failed")
                return 'FAILED'

            logging.info(f"Agent status: {current_status}, waiting... (attempt {attempt + 1}/{max_attempts})")
            time.sleep(delay)

        except Exception as e:
            logging.error(f"Error checking agent status: {str(e)}")
            time.sleep(delay)

    return current_status
# Function to create a Bedrock agent with Custom Control action groups
def create_agent(bedrock_agent_client, name, instructions, functions, model_id="amazon.nova-pro-v1:0", agent_role_arn=None):
    """Create a Bedrock agent with Custom Control action groups"""
    try:
        # List existing agents
        existing_agents = bedrock_agent_client.list_agents()
        existing_agent = next(
            (agent for agent in existing_agents['agentSummaries']
             if agent['agentName'] == name),
            None
        )

        # Handle existing agent
        if existing_agent:
            agent_id = existing_agent['agentId']
            logging.info(f"Found existing agent '{name}' with ID: {agent_id}")

            # Check agent status
            response = bedrock_agent_client.get_agent(agentId=agent_id)
            status = response['agent']['agentStatus']

            if status in ['NOT_PREPARED', 'FAILED']:
                logging.info(f"Deleting agent '{name}' with status {status}")
                bedrock_agent_client.delete_agent(agentId=agent_id)
                time.sleep(10)  # Wait after deletion
                existing_agent = None

        # Create new agent if needed
        if not existing_agent:
            logging.info(f"Creating new agent '{name}'")
            agent_params = {
                "agentName": name,
                "description": f"{name.title()} agent for document operations",
                "instruction": instructions,
                "idleSessionTTLInSeconds": 1800,
                "foundationModel": model_id
            }
            
            if agent_role_arn:
                agent_params["agentResourceRoleArn"] = agent_role_arn
                
            agent = bedrock_agent_client.create_agent(**agent_params)
            agent_id = agent['agent']['agentId']
            logging.info(f"Created new agent '{name}' with ID: {agent_id}")
        else:
            agent_id = existing_agent['agentId']

        # Wait for initial creation if needed
        status = wait_for_agent_status(bedrock_agent_client, agent_id, target_statuses=['NOT_PREPARED', 'PREPARED', 'Available'])
        if status not in ['NOT_PREPARED', 'PREPARED', 'Available']:
            raise Exception(f"Agent failed to reach valid state: {status}")

        # Create action group if needed
        try:
            bedrock_agent_client.create_agent_action_group(
                agentId=agent_id,
                agentVersion="DRAFT",
                actionGroupExecutor={"customControl": "RETURN_CONTROL"},  # This is the key for Custom Control
                actionGroupName=f"{name}_actions",
                functionSchema={"functions": functions},
                description=f"Action group for {name} operations"
            )
            logging.info(f"Created action group for agent '{name}'")
            time.sleep(5)
        except bedrock_agent_client.exceptions.ConflictException:
            logging.info(f"Action group already exists for agent '{name}'")

        # Prepare agent if needed
        if status == 'NOT_PREPARED':
            try:
                logging.info(f"Starting preparation for agent '{name}'")
                bedrock_agent_client.prepare_agent(agentId=agent_id)
                status = wait_for_agent_status(
                    bedrock_agent_client,
                    agent_id,
                    target_statuses=['PREPARED', 'Available']
                )
                logging.info(f"Agent '{name}' preparation completed with status: {status}")
            except Exception as e:
                logging.error(f"Error during preparation: {str(e)}")

        # Handle alias creation/retrieval
        try:
            aliases = bedrock_agent_client.list_agent_aliases(agentId=agent_id)
            alias = next((a for a in aliases['agentAliasSummaries'] if a['agentAliasName'] == 'v1'), None)

            if not alias:
                logging.info(f"Creating new alias for agent '{name}'")
                alias = bedrock_agent_client.create_agent_alias(
                    agentId=agent_id,
                    agentAliasName="v1"
                )
                alias_id = alias['agentAlias']['agentAliasId']
            else:
                alias_id = alias['agentAliasId']
                logging.info(f"Using existing alias for agent '{name}'")

            logging.info(f"Successfully configured agent '{name}' with ID: {agent_id} and alias: {alias_id}")
            return agent_id, alias_id

        except Exception as e:
            logging.error(f"Error managing alias: {str(e)}")
            raise

    except Exception as e:
        logging.error(f"Error creating/updating agent: {str(e)}")
        raise
# Function to invoke a Bedrock agent
def invoke_agent(bedrock_runtime_client, agent_id, alias_id, input_text, session_id=None, vector_store=None):
    """Invoke a Bedrock agent"""
    if session_id is None:
        session_id = str(uuid.uuid4())

    try:
        logging.info(f"Invoking agent with input: {input_text}")

        response = bedrock_runtime_client.invoke_agent(
            agentId=agent_id,
            agentAliasId=alias_id,
            sessionId=session_id,
            inputText=input_text,
            enableTrace=True
        )

        result = ""

        for event in response['completion']:
            # Process text chunks
            if 'chunk' in event:
                chunk = event['chunk']['bytes'].decode('utf-8')
                result += chunk

            # Handle custom control return
            if 'returnControl' in event:
                return_control = event['returnControl']
                invocation_inputs = return_control.get('invocationInputs', [])

                if invocation_inputs:
                    function_input = invocation_inputs[0].get('functionInvocationInput', {})
                    action_group = function_input.get('actionGroup')
                    function_name = function_input.get('function')
                    parameters = function_input.get('parameters', [])

                    # Convert parameters to a dictionary
                    param_dict = {}
                    for param in parameters:
                        param_dict[param.get('name')] = param.get('value')

                    logging.info(f"Function call: {action_group}::{function_name}")

                    # Handle search_documents function
                    if function_name == 'search_documents':
                        query = param_dict.get('query')
                        k = int(param_dict.get('k', 3))

                        logging.info(f"Searching for: {query}, k={k}")

                        if vector_store:
                            # Perform the search
                            docs = vector_store.similarity_search(query, k=k)

                            # Format results
                            search_results = [doc.page_content for doc in docs]
                            logging.info(f"Found {len(search_results)} results")

                            # Format the response
                            result = f"Search results for '{query}':\n\n"
                            for i, content in enumerate(search_results):
                                result += f"Result {i+1}: {content}\n\n"
                        else:
                            logging.error("Vector store not available")
                            result = "Error: Vector store not available"

                    # Handle format_content function
                    elif function_name == 'format_content':
                        content = param_dict.get('content')
                        style = param_dict.get('style', 'user-friendly')

                        logging.info(f"Formatting content in {style} style")

                        # Check if content is valid
                        if content and content != '?':
                            result = f"Formatted in {style} style: {content}"
                        else:
                            result = "No content provided to format."
                    else:
                        logging.error(f"Unknown function: {function_name}")
                        result = f"Error: Unknown function {function_name}"

        if not result.strip():
            logging.warning("Received empty response from agent")

        return result

    except Exception as e:
        logging.error(f"Error invoking agent: {str(e)}")
        raise RuntimeError(f"Failed to invoke agent: {str(e)}")

Define Agent Instructions and Functions

Now let's define the instructions and functions for our agents:

# Researcher agent instructions and functions
researcher_instructions = """
You are a Research Assistant that helps users find relevant information in documents.
Your capabilities include:
1. Searching through documents using semantic similarity
2. Providing relevant document excerpts
3. Answering questions based on document content
"""

researcher_functions = [
    {
        "name": "search_documents",
        "description": "Search for relevant documents using semantic similarity",
        "parameters": {
            "query": {
                "type": "string",
                "description": "The search query",
                "required": True
            },
            "k": {
                "type": "integer",
                "description": "Number of results to return",
                "required": False
            }
        },
        "requireConfirmation": "DISABLED"
    }
]

# Writer agent instructions and functions
writer_instructions = """
You are a Content Writer Assistant that helps format and present research findings.
Your capabilities include:
1. Formatting research findings in a user-friendly way
2. Creating clear and engaging summaries
3. Organizing information logically
4. Highlighting key insights
"""

writer_functions = [
    {
        "name": "format_content",
        "description": "Format and present research findings",
        "parameters": {
            "content": {
                "type": "string",
                "description": "The research findings to format",
                "required": True
            },
            "style": {
                "type": "string",
                "description": "The desired presentation style (e.g., summary, detailed, bullet points)",
                "required": False
            }
        },
        "requireConfirmation": "DISABLED"
    }
]

Run Custom Control Approach

Now let's run the Custom Control approach with our agents:

# Get or Create IAM Role
agent_role_name = "BedrockExecutionRoleForAgents_CustomControl"
trust_policy = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "bedrock.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
policy_arn_to_attach = "arn:aws:iam::aws:policy/AmazonBedrockFullAccess"

try:
    role_response = iam_client.get_role(RoleName=agent_role_name)
    agent_role_arn = role_response['Role']['Arn']
    logging.info(f"Found existing IAM role '{agent_role_name}' with ARN: {agent_role_arn}")
except ClientError as e:
    if e.response['Error']['Code'] == 'NoSuchEntity':
        logging.info(f"IAM role '{agent_role_name}' not found. Creating...")
        try:
            role_response = iam_client.create_role(
                RoleName=agent_role_name,
                AssumeRolePolicyDocument=json.dumps(trust_policy),
                Description="IAM role for Bedrock Agents execution"
            )
            agent_role_arn = role_response['Role']['Arn']
            logging.info(f"Created IAM role '{agent_role_name}' with ARN: {agent_role_arn}")
            # Wait a bit for the role to be fully available before attaching policy
            time.sleep(10)
        except ClientError as create_error:
            logging.error(f"Error creating IAM role '{agent_role_name}': {create_error}")
            agent_role_arn = None
    else:
        logging.error(f"Error getting IAM role '{agent_role_name}': {e}")
        agent_role_arn = None

# Attach the policy if not already attached
if agent_role_arn:
    try:
        attached_policies = iam_client.list_attached_role_policies(RoleName=agent_role_name)
        if not any(p['PolicyArn'] == policy_arn_to_attach for p in attached_policies.get('AttachedPolicies', [])):
            logging.info(f"Attaching policy '{policy_arn_to_attach}' to role '{agent_role_name}'...")
            iam_client.attach_role_policy(
                RoleName=agent_role_name,
                PolicyArn=policy_arn_to_attach
            )
            logging.info(f"Policy '{policy_arn_to_attach}' attached successfully.")
            # Wait a bit for the policy attachment to propagate
            time.sleep(5)
        else:
            logging.info(f"Policy '{policy_arn_to_attach}' already attached to role '{agent_role_name}'.")
    except ClientError as attach_error:
        logging.warning(f"Error attaching policy to role '{agent_role_name}': {attach_error}")
2025-05-08 13:34:44,254 - INFO - Found existing IAM role 'BedrockExecutionRoleForAgents_CustomControl' with ARN: arn:aws:iam::598307997273:role/BedrockExecutionRoleForAgents_CustomControl
2025-05-08 13:34:44,547 - INFO - Policy 'arn:aws:iam::aws:policy/AmazonBedrockFullAccess' already attached to role 'BedrockExecutionRoleForAgents_CustomControl'.
# Create researcher agent
researcher_id = None
researcher_alias = None

if agent_role_arn:
    try:
        researcher_id, researcher_alias = create_agent(
            bedrock_agent_client,
            "researcher",
            researcher_instructions,
            researcher_functions,
            agent_role_arn=agent_role_arn
        )
        logging.info(f"Researcher agent created with ID: {researcher_id} and alias: {researcher_alias}")
    except Exception as e:
        logging.error(f"Failed to create researcher agent: {str(e)}")
else:
    logging.error("No agent role ARN available for researcher agent creation")
2025-05-08 13:34:45,303 - INFO - Found existing agent 'researcher' with ID: FF1OSFJIJF
2025-05-08 13:34:46,399 - INFO - Agent FF1OSFJIJF reached status: PREPARED
2025-05-08 13:34:46,712 - INFO - Action group already exists for agent 'researcher'
2025-05-08 13:34:46,996 - INFO - Using existing alias for agent 'researcher'
2025-05-08 13:34:46,997 - INFO - Successfully configured agent 'researcher' with ID: FF1OSFJIJF and alias: RQVFGLBCZP
2025-05-08 13:34:46,997 - INFO - Researcher agent created with ID: FF1OSFJIJF and alias: RQVFGLBCZP
# Create writer agent
writer_id = None
writer_alias = None

if agent_role_arn:
    try:
        writer_id, writer_alias = create_agent(
            bedrock_agent_client,
            "writer",
            writer_instructions,
            writer_functions,
            agent_role_arn=agent_role_arn
        )
        logging.info(f"Writer agent created with ID: {writer_id} and alias: {writer_alias}")
    except Exception as e:
        logging.error(f"Failed to create writer agent: {str(e)}")
else:
    logging.error("No agent role ARN available for writer agent creation")

if not any([researcher_id, writer_id]):
    # Adjust error message based on whether role setup failed
    if not agent_role_arn:
        raise RuntimeError("Failed to create agents because IAM role setup failed.")
    else:
        raise RuntimeError("Failed to create any agents despite successful IAM role setup.")
2025-05-08 13:34:47,279 - INFO - Found existing agent 'writer' with ID: JDA8S8SRS1
2025-05-08 13:34:48,178 - INFO - Agent JDA8S8SRS1 reached status: PREPARED
2025-05-08 13:34:48,498 - INFO - Action group already exists for agent 'writer'
2025-05-08 13:34:48,797 - INFO - Using existing alias for agent 'writer'
2025-05-08 13:34:48,797 - INFO - Successfully configured agent 'writer' with ID: JDA8S8SRS1 and alias: 3SFKJGSGNQ
2025-05-08 13:34:48,798 - INFO - Writer agent created with ID: JDA8S8SRS1 and alias: 3SFKJGSGNQ

Test the Agents

Let's test our agents by asking the researcher agent to search for information and the writer agent to format the results:

# Test researcher agent
if researcher_id and researcher_alias:
    researcher_response = invoke_agent(
        bedrock_runtime_client,
        researcher_id,
        researcher_alias,
        "What is unique about the Cline AI assistant? Use the search_documents function to find relevant information.",
        vector_store=vector_store
    )
    print("\nResearcher Response:\n", researcher_response)
else:
    logging.error("Researcher agent not available for testing")
2025-05-08 13:34:48,808 - INFO - Invoking agent with input: What is unique about the Cline AI assistant? Use the search_documents function to find relevant information.
2025-05-08 13:34:51,478 - INFO - Function call: researcher_actions::search_documents
2025-05-08 13:34:51,478 - INFO - Searching for: What is unique about the Cline AI assistant?, k=3
2025-05-08 13:34:52,791 - INFO - Found 3 results



Researcher Response:
 Search results for 'What is unique about the Cline AI assistant?':

Result 1: The Cline AI assistant, developed by Saoud Rizwan, is a unique system that combines vector search capabilities with Amazon Bedrock agents. Unlike traditional chatbots, it uses a sophisticated multi-agent architecture where specialized agents handle different aspects of document processing and interaction.

Result 2: One of Cline's key features is its ability to create MCP (Model Context Protocol) servers on the fly. This allows users to extend the system's capabilities by adding new tools and resources that connect to external APIs, all while maintaining a secure and non-interactive environment.

Result 3: The browser automation capabilities in Cline are implemented through Puppeteer, allowing the system to interact with web interfaces in a controlled 900x600 pixel window. This enables testing of web applications, verification of changes, and even general web browsing tasks.
# Test writer agent
if writer_id and writer_alias and "researcher_response" in locals():
    writer_response = invoke_agent(
        bedrock_runtime_client,
        writer_id,
        writer_alias,
        f"Format this research finding using the format_content function: {researcher_response}",
        vector_store=vector_store
    )
    print("\nWriter Response:\n", writer_response)
else:
    logging.error("Writer agent not available for testing or no researcher response to format")
2025-05-08 13:34:52,798 - INFO - Invoking agent with input: Format this research finding using the format_content function: Search results for 'What is unique about the Cline AI assistant?':

Result 1: The Cline AI assistant, developed by Saoud Rizwan, is a unique system that combines vector search capabilities with Amazon Bedrock agents. Unlike traditional chatbots, it uses a sophisticated multi-agent architecture where specialized agents handle different aspects of document processing and interaction.

Result 2: One of Cline's key features is its ability to create MCP (Model Context Protocol) servers on the fly. This allows users to extend the system's capabilities by adding new tools and resources that connect to external APIs, all while maintaining a secure and non-interactive environment.

Result 3: The browser automation capabilities in Cline are implemented through Puppeteer, allowing the system to interact with web interfaces in a controlled 900x600 pixel window. This enables testing of web applications, verification of changes, and even general web browsing tasks.


2025-05-08 13:34:55,730 - INFO - Function call: writer_actions::format_content
2025-05-08 13:34:55,730 - INFO - Formatting content in summary style



Writer Response:
 Formatted in summary style: The Cline AI assistant, developed by Saoud Rizwan, is a unique system that combines vector search capabilities with Amazon Bedrock agents. Unlike traditional chatbots, it uses a sophisticated multi-agent architecture where specialized agents handle different aspects of document processing and interaction. One of Cline's key features is its ability to create MCP (Model Context Protocol) servers on the fly. This allows users to extend the system's capabilities by adding new tools and resources that connect to external APIs, all while maintaining a secure and non-interactive environment. The browser automation capabilities in Cline are implemented through Puppeteer, allowing the system to interact with web interfaces in a controlled 900x600 pixel window. This enables testing of web applications, verification of changes, and even general web browsing tasks.

Conclusion

In this notebook, we've demonstrated the Custom Control approach for implementing AWS Bedrock agents with Couchbase Vector Search. This approach allows the agent to return control to the application for function execution, providing more flexibility and control over the agent's behavior.

Key components of this implementation include:

  1. Vector Store Setup: We set up a Couchbase vector store to store and search documents using semantic similarity.
  2. Agent Creation: We created two specialized agents - a researcher agent for searching documents and a writer agent for formatting results.
  3. Custom Control: We implemented the Custom Control approach, where the agent returns control to the application for function execution.
  4. Function Handling: We handled the agent's function calls in the application code, allowing for more control and flexibility.

This approach is particularly useful when you need more control over the agent's behavior or when you want to integrate the agent with existing systems and data sources.


This tutorial is part of a Couchbase Learning Path:
Contents
Couchbase home page link

3250 Olcott Street
Santa Clara, CA 95054
United States

  • company
  • about
  • leadership
  • news & press
  • investor relations
  • careers
  • events
  • legal
  • contact us
  • support
  • Developer portal
  • Documentation
  • Forums
  • PROFESSIONAL SERVICES
  • support login
  • support policy
  • training
  • quicklinks
  • blog
  • downloads
  • get started
  • resources
  • why nosql
  • pricing
  • follow us
  • Social Media Link for FacebookFacebook
  • Social Media Link for TwitterTwitter
  • Social Media Link for LinkedInLinkedIn
  • Social Media Link for Youtubeyoutube
  • Social Media Link for GitHubGithub
  • Social Media Link for Stack OverflowStack Overflow
  • Social Media Link for Discorddiscord

© 2025 Couchbase, Inc. Couchbase and the Couchbase logo are registered trademarks of Couchbase, Inc. All third party trademarks (including logos and icons) referenced by Couchbase, Inc. remain the property of their respective owners.

Terms of UsePrivacy PolicyCookie PolicySupport PolicyDo Not Sell My Personal InformationMarketing Preference Center