Getting Started

Getting Started

This section shows how to add the connector to your application and includes a basic sample application.

Installing the Kafka Connector

At a minimum, Java 7 is required, but any newer version is also supported (including Java 8).

To use the Kafka Connector, point your application project object model (POM) to the library, which is available on Maven Central. Here is a typical pom.xml that you can copy and paste into your Java project:

<dependencies>
   <dependency>
      <groupId>com.couchbase.client</groupId>
      <artifactId>kafka-connector</artifactId>
      <version>2.0.0</version>
   </dependency>
</dependencies>

You can also download an archive that has all JARs and dependencies included, however using a package manager is strongly recommended. For information about downloading an archive, see Download and API Reference .

If you import the dependency, the following transitive dependencies are also added:

  • core-io: our internal core library, which abstracts lots of Couchbase-specific behavior in a message-oriented way.
  • RxJava: a foundational library to build powerful reactive and asynchronous applications.

The SDK itself depends on additional packages (such as Netty, the Disruptor RingBuffer and Jackson), but they are repackaged into the distributed JAR to reduce the risk of dependency clashes in your environment.

Sample Application

Now that you have added the connector dependency to your project, you can try a simple example that demonstrates some important features of the library.

Using the library is quite straightforward. By default, the connector serializes the document body and metadata to JSON. For example, suppose you want to receive every modification from Couchbase Server and send only the document body to Kafka. To achieve that, you need to define a filter class that allows only instances of MutationMessage to pass through, and an encoder class that takes the document value and converts it to a byte array.

Here's an example of a filter class that allows only instances of MutationMessage to pass through:

package example;

import com.couchbase.client.core.message.dcp.MutationMessage;
import com.couchbase.kafka.DCPEvent;
import com.couchbase.kafka.filter.Filter;

public class SampleFilter implements Filter {
    @Override
    public boolean pass(final DCPEvent dcpEvent) {
        return dcpEvent.message() instanceof MutationMessage;
    }
}

Here's an example of an encoder class that takes the document value and converts it to byte array:

package example;

import com.couchbase.client.core.message.dcp.MutationMessage;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import com.couchbase.kafka.DCPEvent;
import com.couchbase.kafka.coder.AbstractEncoder;
import kafka.utils.VerifiableProperties;

public class SampleEncoder extends AbstractEncoder {
    public SampleEncoder(final VerifiableProperties properties) {
        super(properties);
    }

    @Override
    public byte[] toBytes(final DCPEvent dcpEvent) {
        MutationMessage message = (MutationMessage)dcpEvent.message();
        return message.content().toString(CharsetUtil.UTF_8).getBytes();
    }
}

With those classes in place, you can set up a Couchbase-Kafka bridge:

package example;

import com.couchbase.kafka.CouchbaseKafkaConnector;
import com.couchbase.kafka.CouchbaseKafkaEnvironment;
import com.couchbase.kafka.DefaultCouchbaseKafkaEnvironment;

public class Example {
    public static void main(String[] args) {
        DefaultCouchbaseKafkaEnvironment.Builder builder =
                (DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment
                        .builder()
                        .kafkaFilterClass("example.SampleFilter")
                        .kafkaValueSerializerClass("example.SampleEncoder")
                        .kafkaTopic("default")
                        .kafkaZookeeperAddress("kafka1.vagrant")
                        .couchbaseNodes("couchbase1.vagrant")
                        .couchbaseBucket("default")
                        .dcpEnabled(true);
        CouchbaseKafkaConnector connector = CouchbaseKafkaConnector.create(builder.build());
        connector.run();
    }
}

The couchbase1.vagrant and kafka1.vagrant addresses are locations of Couchbase Server and Kafka, respectively, which can be set up by using provisioning scripts from the env/ directory. Just navigate to that directory and run vagrant up.