Quickstart

Quickstart

This section shows how to add the connector to your application.

Installing the Kafka Connector

At a minimum, Java 6 is required, but any newer version is also supported (including Java 8).

The connector is shipped as a zip archive and available here for download: kafka-connect-couchbase-3.0.0.zip

The location of the archive contents depends on how the Confluent Platform was installed. In general it should be placed so that the share/java/kafka-connect-couchbase directory of the archive is at the same level as kafka-connect-jdbc.

If you are only using Apache Kafka, the connector should be in the path (e.g. kafka_2.11-0.10.1.0/libs. The Apache Kafka version of this sample will be almost the same except it cannot use Confluent-specific features provided by the Control Center or Schema Registry products.

Sample

To see basic functionality of the connector, we'll copy a single bucket from a local Couchbase instance. Start by loading one of the sample buckets, or pick your own. In this simple example we will use travel-sample from the server distribution.

We create a configuration file that will load data from this database. This file is included with the connector in config/quickstart-couchbase.properties and contains the following settings:

name=test-couchbase
connector.class=com.couchbase.connect.kafka.CouchbaseSourceConnector
tasks.max=2
connection.cluster_address=127.0.0.1
connection.bucket=default
connection.timeout.ms=2000
# connection.ssl.enabled=true
# connection.ssl.keystore.location=/tmp/keystore
# connection.ssl.keystore.password=secret
topic.name=test-default
use_snapshots=false

For our case, we will only modify connection.bucket to travel-sample (or the bucket loaded on the first step).

The first three settings are common for all connectors. The connection.cluster_address specifies the comma-separated list of cluster nodes, which should be used to bootstrap the connector. The use_snapshots setting indicates how we want to handle the stream of changes from the server. In this case, we have told the connector to send each event to the Kafka topic immediately, while setting this property to true will tell the connector task to accumulate events in local memory until the whole snapshot is received. The topic.name contains the name of the Kafka topic, where all the events will be stored. For simplicity, we don't configure secure connections in this sample, it will be shown later.

Run the connector in a standalone Kafka Connect worker in another terminal (this uses config/connect-standalone.properties from the connector distribution and assumes that Kafka and the Schema Registry are running locally on the default ports):

$ ./bin/connect-standalone config/connect-standalone.properties \
        config/quickstart-couchbase.properties

You should see the process start up and log some messages. Then it will begin receiving events from Couchbase Server and sending the results to Kafka. In order to check that it has copied the data that was present when we started Kafka Connect, start a console consumer, reading from the beginning of the topic:

$ ./bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 \
        --topic test-default --from-beginning
{"event":"mutation","partition":104,"key":"route_60137","cas":1479128706711552,"bySeqno":1,"revSeqno":1,"expiration":{"int":0},"flags":{"int":33554432},"lockTime":{"int":0},"content":{"bytes":"{\"airline\":\"US\",\"airlineid\":\"airline_5265\",\"destinationairport\":\"ATL\",\"distance\":2549.0460387816483,\"equipment\":\"321 320\",\"id\":60137,\"schedule\":[{\"day\":0,\"flight\":\"US019\",\"utc\":\"13:56:00\"},{\"day\":0,\"flight\":\"US716\",\"utc\":\"19:19:00\"},{\"day\":0,\"flight\":\"US776\",\"utc\":\"16:14:00\"},{\"day\":1,\"flight\":\"US402\",\"utc\":\"17:19:00\"},{\"day\":2,\"flight\":\"US948\",\"utc\":\"16:53:00\"},{\"day\":2,\"flight\":\"US127\",\"utc\":\"02:02:00\"},{\"day\":2,\"flight\":\"US884\",\"utc\":\"19:38:00\"},{\"day\":3,\"flight\":\"US488\",\"utc\":\"16:42:00\"},{\"day\":3,\"flight\":\"US471\",\"utc\":\"11:43:00\"},{\"day\":3,\"flight\":\"US810\",\"utc\":\"11:02:00\"},{\"day\":4,\"flight\":\"US593\",\"utc\":\"00:41:00\"},{\"day\":4,\"flight\":\"US799\",\"utc\":\"12:59:00\"},{\"day\":4,\"flight\":\"US179\",\"utc\":\"10:37:00\"},{\"day\":4,\"flight\":\"US577\",\"utc\":\"15:05:00\"},{\"day\":4,\"flight\":\"US029\",\"utc\":\"08:03:00\"},{\"day\":5,\"flight\":\"US174\",\"utc\":\"17:02:00\"},{\"day\":5,\"flight\":\"US860\",\"utc\":\"17:55:00\"},{\"day\":5,\"flight\":\"US201\",\"utc\":\"00:52:00\"},{\"day\":6,\"flight\":\"US349\",\"utc\":\"00:52:00\"},{\"day\":6,\"flight\":\"US477\",\"utc\":\"22:00:00\"},{\"day\":6,\"flight\":\"US841\",\"utc\":\"13:08:00\"},{\"day\":6,\"flight\":\"US408\",\"utc\":\"15:48:00\"},{\"day\":6,\"flight\":\"US709\",\"utc\":\"10:13:00\"}],\"sourceairport\":\"PHX\",\"stops\":0,\"type\":\"route\"}"}}
{"event":"mutation","partition":104,"key":"airport_5736","cas":1479128706973696,"bySeqno":2,"revSeqno":1,"expiration":{"int":0},"flags":{"int":33554432},"lockTime":{"int":0},"content":{"bytes":"{\"airportname\":\"Elko Regional Airport\",\"city\":\"Elko\",\"country\":\"United States\",\"faa\":\"EKO\",\"geo\":{\"alt\":5140.0,\"lat\":40.8249,\"lon\":-115.792},\"icao\":\"KEKO\",\"id\":5736,\"type\":\"airport\",\"tz\":\"America/Los_Angeles\"}"}}
{"event":"mutation","partition":104,"key":"landmark_5574","cas":1479128707104768,"bySeqno":3,"revSeqno":1,"expiration":{"int":0},"flags":{"int":33554432},"lockTime":{"int":0},"content":{"bytes":"{\"activity\":\"buy\",\"address\":null,\"alt\":null,\"city\":\"Chesterfield\",\"content\":\"Shopping centre with typical brand names\",\"country\":\"United Kingdom\",\"directions\":null,\"email\":null,\"geo\":{\"accuracy\":\"ROOFTOP\",\"lat\":53.234931,\"lon\":-1.429559},\"hours\":null,\"id\":5574,\"image\":null,\"name\":\"The Pavements\",\"phone\":null,\"price\":null,\"state\":null,\"title\":\"Chesterfield\",\"tollfree\":null,\"type\":\"landmark\",\"url\":\"http://www.chesterfieldpavements.co.uk\"}"}}
...

The output will display about 31,000 lines, each containing mutation (assuming sample bucket hasn't been modified after creation). The shape of the payload is controlled by the dcp.message.converter.class property of the connector config. By default it set to com.couchbase.connect.kafka.converter.SchemaConverter, which formats events into structure with meta information on the top level and optional value as bytestring in content property. The Avro schema for this converter is shown below:

{
  "type": "record",
  "name": "DcpMessage",
  "namespace": "com.couchbase",
  "fields": [
    {
      "name": "event",
      "type": "string"
    },
    {
      "name": "partition",
      "type": {
        "type": "int",
        "connect.type": "int16"
      }
    },
    {
      "name": "key",
      "type": "string"
    },
    {
      "name": "cas",
      "type": "long"
    },
    {
      "name": "bySeqno",
      "type": "long"
    },
    {
      "name": "revSeqno",
      "type": "long"
    },
    {
      "name": "expiration",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "name": "flags",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "name": "lockTime",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "name": "content",
      "type": [
        "null",
        "bytes"
      ]
    }
  ],
  "connect.name": "com.couchbase.DcpMessage"
}

Where the event property describes the kind of event, and can take one of the following values:

  • mutation: any change of the document, including creation and subdocument commands
  • deletion: removal or expiration of the document.
  • expiration: reserved for document expiration, but Server up to 4.5 does not use it.

When all bucket contents have been transferred, open the Couchbase Admin Console and create a new document with ID test and body using the Documents UI. If you switch back to the console consumer you should see the new record added (and, importantly, the old entries are not repeated):

{"event":"mutation","partition":127,"key":"test","cas":1480340133183488,"bySeqno":29,"revSeqno":1,"expiration":{"int":0},"flags":{"int":0},"lockTime":{"int":0},"content":{"bytes":"{\"click\":\"to edit\",\"with JSON\":\"there are no reserved field names\"}"}}
{"event":"mutation","partition":127,"key":"test","cas":1480340157956096,"bySeqno":30,"revSeqno":2,"expiration":{"int":0},"flags":{"int":0},"lockTime":{"int":0},"content":{"bytes":"{\"foo\": \"bar\"}"}}

Removing the document test will generate a similar event:

{"event":"deletion","partition":127,"key":"test","cas":1480340391526400,"bySeqno":32,"revSeqno":3,"expiration":null,"flags":null,"lockTime":null,"content":null}

Of course, all the features of Kafka Connect, including offset management and fault tolerance, work with the source connector. You can restart and kill the processes and they will pick up where they left off, copying only new data (taking into account the use_snapshots setting).