Hello Couchbase Example

Hello Couchbase Example

Now that you added the SDK dependency to your project, you can proceed and implement the classic "Hello World" example. Some important aspects are also covered along the way that will help you later.

The first thing you need to do is connect to the cluster:

Cluster cluster = CouchbaseCluster.create();

With no other arguments provided, this will logically bind it to a cluster where the cluster is reachable on localhost . This is a reasonable default to get started during development, but in production you should pass in more seed nodes like this:

Cluster cluster = CouchbaseCluster.create("192.168.56.101", "192.168.56.102");

You do not need to pass in all nodes of the cluster, just a few seed nodes so that the client is able to establish initial contact. The actual process of connecting to a bucket (that is, opening sockets and everything related) happens when you call the openBucket method:

Bucket bucket = cluster.openBucket();

This will connect to the default bucket and return a Bucket reference.

If you want to connect to a different bucket (also with a password), you can do it like this:

Bucket bucket = cluster.openBucket("bucket", "password");

When your application shuts down, you need to make sure to properly disconnect from the cluster to free all resources (sockets, threads, and so on). The following code shuts down the client:

cluster.disconnect();

This disconnects all buckets and frees associated resources.

You now have a reference to the Bucket , so you can begin working with it. The SDK comes with built-in handling for JSON documents, which you can use right away. First, create a JsonObject , which contains information for a user.

JsonObject user = JsonObject.empty()
    .put("firstname", "Walter")
    .put("lastname", "White")
    .put("job", "chemistry teacher")
    .put("age", 50);

A JsonObject works very much like a Map , but it is designed to only let you insert values that can be stored as valid JSON (including nested objects and arrays). The resulting document looks like this:

{
    "firstname":"Walter",
    "job":"chemistry teacher",
    "age":50,
    "lastname":"White"
}

To store the document, you can use the upsert method on the bucket . Because a document on the server has more properties than just the content, you need to give it at least a unique document ID (for example, walter ). The container for all this information is called a Document and because you are dealing with JSON you need to create a JsonDocument :

JsonDocument doc = JsonDocument.create("walter", user);
JsonDocument response = bucket.upsert(doc);

The Document is automatically converted into JSON and stored on the cluster. If the document (identified by its unique ID) already exists, it is replaced.

If you replace upsert with insert and try to insert the same document twice (with the same ID), you see the following:

Exception in thread "main" com.couchbase.client.java.error.DocumentAlreadyExistsException
    at com.couchbase.client.java.CouchbaseAsyncBucket$12.call(CouchbaseAsyncBucket.java:425)
    at com.couchbase.client.java.CouchbaseAsyncBucket$12.call(CouchbaseAsyncBucket.java:409)
    ...

This provides an important clue:

  • insert , unlike upsert fails if the document already exists on the server (very much like the SQL INSERT statement).

Not surprisingly, you can also retrieve a Document from the database. You do this by providing its ID:

JsonDocument walter = bucket.get("walter");
System.out.println("Found: " + walter);

This prints:

Found: JsonDocument{id='walter', cas=16976883046781, expiry=0, content={"firstname":"Walter","job":"chemistry teacher","age":50,"lastname":"White"}}

If you want to print only the age, you can reach into the content (much like you would access a `Map`):

System.out.println("Age: " + walter.content().getInt("age"));

You can combine both commands to implement something that is needed very often: loading a document, modifying its content and then storing the modified document.

First, here is one of the possible way to do it synchronously:

JsonDocument loaded = bucket.get("walter");
if (loaded == null) {
    System.err.println("Document not found!");
} else {
    loaded.content().put("age", 52);
    JsonDocument updated = bucket.replace(loaded);
    System.out.println("Updated: " + updated.id());
}

The code uses the get method to load the Document and then waits until it arrives from the server (returns null as the default value if it does not exist). If it is not null, the content is modified and the document is stored again through the synchronous replace method. You can think of replace as the opposite of insert —if the document does not already exist, the call will fail with a DocumentDoesNotExistException .

The code shown above is completely synchronous, which means that your main thread will wait all the time until a response comes back from the server. It waits for network IO to happen while instead it could perform valuable work. This is where the whole idea of asynchronous and parallel computation gets interesting. Instead of waiting and "pulling" the data out of the SDK, you can just keep going forward in your application flow and let the SDK notify you once it's done with work.

Observable s provide a large range of methods and functionality to create, combine and transform asynchronous workflows and make them look synchronous (while they aren't). This is important, because other approaches quickly get you into "callback hell" and are complicated dealing with futures, especially if you want to chain more than one asynchronous operation together, let alone proper error handling.

Here is the very same example, but completely non-blocking (note that you want to keep your main thread alive with a Thread.sleep() for now or use CountDownLatch ):

bucket
    .async()
    .get("walter")
    .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(final JsonDocument loaded) {
            loaded.content().put("age", 52);
            return bucket.async().replace(loaded);
        }
    })
    .subscribe(new Action1<JsonDocument>() {
        @Override
        public void call(final JsonDocument updated) {
            System.out.println("Updated: " + updated.id());
        }
    });

Note how you can get access to the underlying asynchronous interfaces by using the async() method on the bucket.

Without going too much in-depth about how Observables work just yet, it is not too hard to guess what is happening. The document is loaded through the get method and after it is done, the flatMap method is executed. This method changes the content and calls the replace method. After the replace is done, the Subscriber is called and the line is printed. You do not need to check for null here because the whole chain is just not executed if the document is not found in the first place.

The style shown above is Java6/7 with anonymous classes. If you are already using Java 8, you can replace the same code with lambdas to make it much nicer and succinct:

bucket
    .async()
    .get("walter")
    .flatMap(loaded -> {
        loaded.content().put("age", 52);
        return bucket.async().replace(loaded);
    })
    .subscribe(updated -> System.out.println("Updated: " + updated.id()));

Because this flow is asynchronous, every "callback" is run in a specific thread, different from the one you were executing the call (at this point most likely main ). Since this one-off example is different from long-running server applications (which are not shut down after one operation), it is a good idea to synchronize and wait after the last operation is finished.

A naive way is to add a Thread.sleep(1000) after the last call to keep the current thread alive for a second and hope that the operation is done:

bucket
    .async()
    .get("walter")
    .flatMap(loaded -> {
        loaded.content().put("age", 52);
        return bucket.async().replace(loaded);
    })
    .subscribe(updated -> System.out.println("Updated: " + updated.id()));

Thread.sleep(1000);

Usually it takes a shorter amount of time, so you are just wasting time. And in the unlikely case that it takes longer, it doesn't work properly. So a better way is the use of a CountDownLatch , which ships with the JDK. One thread counts it down and the other waits until its counted down:

final CountDownLatch latch = new CountDownLatch(1);
bucket
    .async()
    .get("walter")
    .flatMap(loaded -> {
        loaded.content().put("age", 52);
        return bucket.async().replace(loaded);
    })
    .subscribe(
        System.out::println,
        err -> {
            err.printStackTrace();
            latch.countDown();
        },
        latch::countDown
    );

latch.await();

Much better, but not perfect, and here is why: because your subscriber (the callbacks) are called from a different thread, without more changes on your side, you do not have access to either the computed values or the error that is thrown. You can work around that by using atomic variables from the concurrent package, but thankfully RxJava has you covered as well. You can use the toBlocking methods which will return you the result(s) in a blocking fashion, as well as propagate any errors of the flow as exceptions to your calling thread:

JsonDocument result = bucket
    .async()
    .get("walter")
    .flatMap(loaded -> {
        loaded.content().put("age", 52);
        return bucket.async().replace(loaded);
    })
    .toBlocking()
    .single();

So remember that instead of using latches, always use toBlocking and its different ways to extract the data. You can learn more about that in Mastering observables .

Congratulations, you've completed your first steps towards Couchbase mastery! At this point, you can either proceed with the complete tutorial application or browse through the rest of the documentation as you see fit.