Quickstart

Quickstart

This section shows how to add the connector to your application.

Installing the Kafka Connector

At a minimum, Java 6 is required, but any newer version is also supported (including Java 8).

The connector is shipped as a zip archive and available here for download: kafka-connect-couchbase-3.1.3.zip

The location of the archive contents depends on how the Confluent Platform was installed. In general it should be placed so that the share/java/kafka-connect-couchbase directory of the archive is at the same level as kafka-connect-jdbc.

If you are only using Apache Kafka, the connector should be in the path (e.g. kafka_2.11-0.10.1.0/libs. The Apache Kafka version of this sample will be almost the same except it cannot use Confluent-specific features provided by the Control Center or Schema Registry products.

Publishing Couchbase Documents to Kafka Using the Source Connector

To see basic functionality of the connector, we'll copy a single bucket from a local Couchbase instance. Start by loading one of the sample buckets, or pick your own. In this simple example we will use travel-sample from the server distribution.

We create a configuration file that will load data from this database. This file is included with the connector in config/quickstart-couchbase.properties and contains the following settings:

name=test-couchbase
connector.class=com.couchbase.connect.kafka.CouchbaseSourceConnector
tasks.max=2
connection.cluster_address=127.0.0.1
connection.bucket=default
connection.username=default
connection.password=secret
connection.timeout.ms=2000
# connection.ssl.enabled=true
# connection.ssl.keystore.location=/tmp/keystore
# connection.ssl.keystore.password=secret
topic.name=test-default
use_snapshots=false
For this exercise, change the value of connection.bucket to travel-sample (or whichever bucket you want to stream from). For connection.username and connection.password, supply the credentials of a Couchbase user with read access to the bucket. If you have not yet created such a user, see Creating and Managing Users with the UI.
Note: For Couchbase Server versions prior to 5.0, leave the username blank. Set the password property to the bucket password, or leave it blank if the bucket does not have a password. The sample buckets do not have passwords.

The first three settings are common for all connectors. The connection.cluster_address specifies the comma-separated list of cluster nodes, which should be used to bootstrap the connector. The use_snapshots setting indicates how we want to handle the stream of changes from the server. In this case, we have told the connector to send each event to the Kafka topic immediately, while setting this property to true will tell the connector task to accumulate events in local memory until the whole snapshot is received. The topic.name contains the name of the Kafka topic, where all the events will be stored. For simplicity, we don't configure secure connections in this sample; it will be shown later.

Run the connector in a standalone Kafka Connect worker in another terminal (this uses config/connect-standalone.properties from the connector distribution and assumes that Kafka and the Schema Registry are running locally on the default ports):

$ ./bin/connect-standalone config/connect-standalone.properties \
        config/quickstart-couchbase.properties

You should see the process start up and log some messages. Then it will begin receiving events from Couchbase Server and sending the results to Kafka. In order to check that it has copied the data that was present when we started Kafka Connect, start a console consumer, reading from the beginning of the topic:

$ ./bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 \
        --topic test-default --from-beginning
{"event":"mutation","partition":104,"key":"route_60137","cas":1479128706711552,"bySeqno":1,"revSeqno":1,"expiration":{...
{"event":"mutation","partition":104,"key":"airport_5736","cas":1479128706973696,"bySeqno":2,"revSeqno":1,"expiration":{...
{"event":"mutation","partition":104,"key":"landmark_5574","cas":1479128707104768,"bySeqno":3,"revSeqno":1,"expiration":{...
...

The output will display about 31,000 lines, each containing a mutation (assuming sample bucket hasn't been modified after creation). The shape of the payload is controlled by the dcp.message.converter.class property of the connector config. By default it is set to com.couchbase.connect.kafka.converter.SchemaConverter, which formats events into structure with meta information on the top level and optional value as bytestring in content property. The Avro schema for this converter is shown below:

{
  "type": "record",
  "name": "DcpMessage",
  "namespace": "com.couchbase",
  "fields": [
    {
      "name": "event",
      "type": "string"
    },
    {
      "name": "partition",
      "type": {
        "type": "int",
        "connect.type": "int16"
      }
    },
    {
      "name": "key",
      "type": "string"
    },
    {
      "name": "cas",
      "type": "long"
    },
    {
      "name": "bySeqno",
      "type": "long"
    },
    {
      "name": "revSeqno",
      "type": "long"
    },
    {
      "name": "expiration",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "name": "flags",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "name": "lockTime",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "name": "content",
      "type": [
        "null",
        "bytes"
      ]
    }
  ],
  "connect.name": "com.couchbase.DcpMessage"
}

Where the event property describes the kind of event, and can take one of the following values:

  • mutation: any change of the document, including creation and subdocument commands
  • deletion: removal or expiration of the document.
  • expiration: reserved for document expiration, but Server up to 4.5 does not use it.

When all bucket contents have been transferred, open the Couchbase Admin Console and create a new document with ID test and body using the Documents UI. If you switch back to the console consumer you should see the new record added (and, importantly, the old entries are not repeated):

{"event":"mutation","partition":127,"key":"test","cas":1480340133183488,"bySeqno":29,...
{"event":"mutation","partition":127,"key":"test","cas":1480340157956096,"bySeqno":30,"revSeqno":2,...

Removing the document test will generate a similar event:

{"event":"deletion","partition":127,"key":"test","cas":1480340391526400,"bySeqno":32,"revSeqno":3,...

Of course, all the features of Kafka Connect, including offset management and fault tolerance, work with the source connector. You can restart and kill the processes and they will pick up where they left off, copying only new data (taking into account the use_snapshots setting).

Importing Kafka Messages into Couchbase Using the Sink Connector

Since release 3.1, the library implements Sink Connector, which allows to import data from Kafka topics into Couchbase Server.

To show how it works, let's write data from the test-default topic we have just created into new empty bucket. We assume that a bucket named receiver exists on the server. Our configuration will look like this:

name=test-sink-couchbase
connector.class=com.couchbase.connect.kafka.CouchbaseSinkConnector
tasks.max=1
topics=test-default
connection.cluster_address=127.0.0.1
connection.bucket=receiver
connection.username=receiver
connection.password=secret
For the username and password properties, supply the credentials of a Couchbase user with write access to the bucket. If you have not yet created such a user, see Creating and Managing Users with the UI.
Note: For Couchbase Server versions prior to 5.0, leave the username blank. Set the password property to the bucket password, or leave it blank if the bucket does not have a password.
Now if you run this job, it will report some diagnostic information about processing data to standard output, and soon the number of documents in the receiver bucket will match the number of messages in the topic:
$ ./bin/connect-standalone config/connect-standalone.properties \
        config/quickstart-couchbase-sink.properties