Bulk operations

Bulk operations

Bulk operations allow you to operate on more than one document at the same time.

Introduction

To get better resource utilization, you need to perform all types of operations in batches. Because of the asynchronous nature of the underlying core package, you can utilize RxJava's operations to provide implicit batching facilities combined with the asynchronous operations of the SDK.

If you understand the general approach to batching, you can apply it to any operation against the SDK, not just with get() calls like in the 1.x series SDK.

Batching with RxJava

Implicit batching is performed by utilizing a few operators:
  • Observable.just() or Observable.from() to generate an Observable that contains the data you want to batch on.
  • flatMap() to send those events against the Couchbase Java SDK and merge the results asynchronously.
  • last() if you want to wait until the last element of the batch is received.
  • toList() if you care about the responses and want to aggregate them easily.
  • If you have more than one subscriber, using cache() to prevent accessing the network over and over again with every subscribe.

The following example creates an observable stream of 5 keys to load in a batch, asynchronously fires off get() requests against the SDK, waits until the last result has arrived, and then converts the result into a list and blocks at the very end:

Cluster cluster = CouchbaseCluster.create();
Bucket bucket = cluster.openBucket();

List<JsonDocument> foundDocs = Observable
    .just("key1", "key2", "key3", "key4", "key5")
    .flatMap(new Func1<String, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(String id) {
            return bucket.async().get(id);
        }
    })
    .toList()
    .toBlocking()
    .single();

Note that this always returns a list, but it may contain 0 to 5 documents, depending on how many are actually found. Also, at the very end the observable is converted into a blocking one, but everything before that, including the network calls and the aggregation, is happening completely asynchronously.

Inside the SDK, this provides much more efficient resource utilization because the requests are very quickly stored in the internal Request RingBuffer and the I/O threads are able to pick batches as large as they can. Afterward, whatever server returns a result first it is stored in the list, so there is no serialization of responses going on.

If you wrap the code in a helper method, you can provide very nice encapsulated batching semantics:

public List<JsonDocument> bulkGet(final Collection<String> ids) {
    return Observable
        .from(ids)
        .flatMap(new Func1<String, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(String id) {
                return bucket.async().get(id);
            }
        })
        .toList()
        .toBlocking()
        .single();
}

Batching mutations

The previous Java SDK only provided bulk operations for get() . With the techniques shown above, you can perform any kind of operation as a batch operation.

The following code generates a number of fake documents and inserts them in one batch. Note that you can decide to either collect the results with toList() as shown above or just use last() as shown here to wait until the last document is properly inserted.

// Generate a number of dummy JSON documents
int docsToCreate = 100;
List<JsonDocument> documents = new ArrayList<JsonDocument>();
for (int i = 0; i < docsToCreate; i++) {
    JsonObject content = JsonObject.create()
        .put("counter", i)
        .put("name", "Foo Bar");
    documents.add(JsonDocument.create("doc-"+i, content));
}

// Insert them in one batch, waiting until the last one is done.
Observable
    .from(documents)
    .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(final JsonDocument docToInsert) {
            return bucket.async().insert(docToInsert);
        }
    })
    .last()
    .toBlocking()
    .single();

Performance

Here are two code samples, both synchronous, that showcase serialized and batched loading of documents. Note that more important than the absolute operations per second is the relative improvement with the same workload.

// Serialized workload of loading documents
while(true) {
    List<JsonDocument> loaded = new ArrayList<JsonDocument>();
    int docsToLoad = 10;
    for (int i = 0; i < docsToLoad; i++) {
        JsonDocument doc = bucket.get("doc-" + i);
        if (doc != null) {
            loaded.add(doc);
        }
    }
}

// Same workload, utilizing batching effects
while(true) {
    int docsToLoad = 10;
    Observable
        .range(0, docsToLoad)
        .flatMap(new Func1<Integer, Observable<JsonDocument>>() {
            @Override
            public Observable<JsonDocument> call(Integer i) {
                return bucket.async().get("doc-"+i);
            }
        })
        .toList()
        .toBlocking()
        .single();

}

Error Handling & Recovery

Technically speaking, error handling in bulk operations is similar to generic Observable error handling, but because the topic is strongly related the most important concepts are covered here as well.

In general, the following questions come up:

  • How can I implement best effort loading and just return the values that were successful?
  • What are BackpressureExceptions and how can I handle them?
  • How can I retry individual operations in the batch when they fail?

When handling these situations, an important fact to remember is that as soon as an error happens inside an Observable , the whole thing is terminated. If you want the whole stream to complete, error handling needs to be as close as possible to the original source. Let's take the bulk loading of documents as an example which we are going to modify to be more resilient:

Observable
    .from(docIds)
    .flatMap(id -> {
        return bucket
            .async()
            .get(id);
    })
    .subscribe();

To implement the best effort use case, you can ignore all errors on each get() result before it gets merged and flattened into the original stream. It is strongly recommended to log the error, because otherwise you'll never know what went wrong in the first place for each failing operation.

Observable
    .from(docIds)
    .flatMap(id -> {
        return bucket
            .async()
            .get(id)
            .doOnError(System.err::println) // print the error, log,...
            .onErrorResumeNext(Observable.empty()); // on error resume with an empty sequence
    })
    .subscribe();

There is a slight variation to that which you can use instead. RxJava provides a mergeDelayError operator that merges individual observables, emits all items and then at the very end fails the observable with a CompositeException . This composite exception contains all errors that have happened so you can do something with them at a later point.

Very often you want a complete result and therefore you need to retry individual operations if an error happened. It is recommended to retry based on a defined strategy for specific exception types and propagate the error for unknown exceptions or those types which are known to be permanent. For a full list of errors that can happen and their implications, see the Javadoc API reference for the Bucket methods you are using.

Since the BackpressureException has been frequently referenced in the past, we are going to use that one as an example. The same logic of course applies to all other types as well.

The BackpressureException is used to shed load on the request side and fails your observable quickly if the underlying system is in an overload condition. The reason for this is that somehow requests are produced more quickly than responses can be generated (because that includes the actual network round trip). This is common in bulk scenarios since it could be that you are requesting a very large set of documents at the same time which puts temporary pressure on the client.

To solve that, we can apply a delayed retry algorithm onto the observable so it is retried at a later point. We are making use of the Delay construct shipped with the 2.1 SDK which provides a very convenient way to generated increasing delays. You also want to stop retrying at some point so the operation is not retried forever.

Since 2.1.2, the RetryBuilder API has been introduced to help you build retry scenarios. The following code retries with an exponential backoff (with a 100 millisecond ceiling), but stops after 10 attempts and propagates the error.


Observable
    .from(docIds)
    .flatMap(id -> {
        return bucket
            .async()
            .get(id)
            .retryWhen(RetryBuilder
                .anyOf(BackpressureException.class)
                .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                .max(10)
                .build()
            );
    })
    .subscribe();

For reference, this is how you would have written the retry feature prior to 2.1.2:

final Delay delay = Delay.exponential(TimeUnit.MILLISECONDS, 100);

Observable
    .from(docIds)
    .flatMap(id -> {
        return bucket
            .async()
            .get(id)
            .retryWhen(notification ->
                notification
                    .zipWith(Observable.range(1, 11), Tuple::create)
                    .flatMap(att ->
                        att.value2() == 9 || !(att.value1() instanceof BackpressureException)
                            ? Observable.error(att.value1())
                            : Observable.timer(delay.calculate(att.value2()), delay.unit())
                    )
            );
    })
    .subscribe();

This code zips the error with a range that indicates the number of attempts we want to retry. If this is over 10 attempts or the error is not a backpressure exception, the error will be propagated.

Finally, you always want to chain in timeout() calls so you have a last resort error and you can be sure that the code you're relying on isn't stuck forever. You can also use methods like onErrorReturn() to return a stubbed object or a fixed entity that you know will never fail (so in the worst case you can provide a reduced user experience instead of failing completely).