Working With RDDs

Working With RDDs

Spark operates on resilient distributed datasets (RDDs). When you need to extract data out of Couchbase, the Couchbase Spark connector creates RDDs for you. You can create and persist RDDs by using key-value pairs, views, or N1QL.

Creating RDDs

To get access to the RDDs through the implicit methods that the Couchbase Spark connector provides, you need to add the following import to your code:

import com.couchbase.spark._

This import adds Couchbase-specific methods to the SparkContext. Each method starts with couchbase.

If you want to create an RDD for specific documents stored in a bucket, you can specify the IDs directly. The connector fetches them from the server and turns them into an RDD:

sc
	.couchbaseGet[JsonDocument](Seq("airline_10123", "airline_10748"))
	.collect()
	.foreach(println)

Because Spark outputs lots of information, you'll find the output somewhere in the logs::

JsonDocument{id='airline_10748', cas=314538279043072, expiry=0, content={"country":"United States","iata":"ZQ","name":"Locair","callsign":"LOCAIR","icao":"LOC","id":10748,"type":"airline"}}
JsonDocument{id='airline_10123', cas=314538278125568, expiry=0, content={"country":"United States","iata":"TQ","name":"Texas Wings","callsign":"TXW","icao":"TXW","id":10123,"type":"airline"}}
It is critical that you specify the target document type. Otherwise, the client does not know how to convert it. The main reason is that Couchbase has first-class JSON support, but is also able to store any data. You can even store serialized objects or protobuf-encoded documents, but then you'll loose the secondary indexing capabilities.

If you are unsure what to pick, stick with the JsonDocument. If you need raw access to the JSON data, you can also use the RawJsonDocument.

You can perform a (spatial) view query to extract rows and turn them into an RDD. Given the following view against the travel-sample bucket:

function(doc, meta) {
	if (doc.type == "airline") {
		emit(doc.name, null);
	}
}

Creating an RDD for the first 10 results works like this:

sc
  .couchbaseView(ViewQuery.from("airlines", "by_name").limit(10))
  .collect()
  .foreach(println)

And prints the following:

CouchbaseViewRow(airline_10,40-Mile Air,null)
CouchbaseViewRow(airline_792,Access Air,null)
CouchbaseViewRow(airline_665,AD Aviation,null)
CouchbaseViewRow(airline_21,Aigle Azur,null)
CouchbaseViewRow(airline_1191,Air Austral,null)
CouchbaseViewRow(airline_139,Air Caledonie International,null)
CouchbaseViewRow(airline_567,Air Carabes,null)
CouchbaseViewRow(airline_149,Air Cargo Carriers,null)
CouchbaseViewRow(airline_16881,Air Cudlua,null)
CouchbaseViewRow(airline_882,Air Florida,null)

If you want to load the full content for each document, you can mix both approaches:

sc
  .couchbaseView(ViewQuery.from("airlines", "by_name").limit(10))
  .map(_.id)
  .couchbaseGet[JsonDocument]()
  .collect()
  .foreach(println)

Here you can see that couchbaseGet is not just available on the context, but also on every RDD[String]. Neat, right? The exact same approach is also available for spatial views (just use the couchbaseSpatialView() method instead).

Finally, if you are using Couchbase Server 4.0 or greater, you can utilize N1QL to perform efficient queries against your JSON data in a Couchbase Bucket. The following query is very similar to the one performed before through Views, just to show how the different approaches work.

Note: You need to at least have a primary index created on the travel-sample bucket to make the following examples work. If you haven't done that already, perform a CREATE PRIMARY INDEX ON `travel-sample` query.
val query = "SELECT name FROM `travel-sample` WHERE type = 'airline' ORDER BY name ASC LIMIT 10"
sc
  .couchbaseQuery(Query.simple(query))
  .collect()
  .foreach(println)

And if you want to fetch the full document contents based on some criteria, here is a little more complex example:

val query = "SELECT META(`travel-sample`).id as id FROM `travel-sample` WHERE type = 'airline' ORDER BY name ASC LIMIT 10"
sc
  .couchbaseQuery(Query.simple(query))
  .map(_.value.getString("id"))
  .couchbaseGet[JsonDocument]()
  .collect()
  .foreach(println)

While this gives you the most flexibility with querying, we recommend using the higher level Spark SQL components through the DataFrame API. See the Spark SQL section for more information.

Persisting RDDs

Creating an RDD is only half the story. After you've done your aggregation, filtering, and machine learning, you normally want to persist the results somewhere. Couchbase provides the saveToCouchbase() method on every RDD[Document].

The following example extracts all airline names through N1QL, aggregates them by country, and stores each list as a separate document. Since RDDs are source agnostic, the same approach can be used for example to load data out of HDFS and then store the results back in Couchbase.

val query = "SELECT name, country FROM `travel-sample` WHERE type = 'airline' ORDER BY name"
sc
  .couchbaseQuery(Query.simple(query))
  .groupBy(_.value.getString("country"))
  .map(pair => {
    val airports = JsonArray.create()
    val content = JsonObject.create().put("airports", airports)
    pair._2.map(_.value.getString("name")).foreach(airports.add)
    JsonDocument.create("airports::" + pair._1, content)
  })
  .saveToCouchbase()

This will create a document in the bucket for each country containing a list of airports. For example the document with the ID airports::United States contains this:

{
  "airports": [
    "40-Mile Air",
    "ATA Airlines",
    "Access Air",
    "Air Cargo Carriers",
    "Air Florida",
    ...
    "United States Air Force",
    "Usa Sky Cargo",
    "Virgin America",
    "Vision Airlines (V2)",
    "Western Airlines",
    "World Airways",
    "XAIR USA",
    "XOJET",
    "Yellowtail"
  ]
}

By default, the saveToCouchbase() method will use the StoreMode.UPSERT. There are more options available, including inserting and replacing (ignoring or failing on error).

We are also working on better support for automatic conversions from different types so that you don't have to map to a specific document type manually.

Subdocument API

If you are running Couchbase Server 4.5 or later, you can use the "subdoc" API to fetch only a subset of the fields from a Document. This allows you to reduce network overhead, and you need to move less data between operations.

The couchbaseSubdocLookup method is available on both the SparContext and the RDD and accepts two/three arguments, depending on the context:

  • ids: The list of IDs to fetch the fragments from.
  • get: The paths of the JSON document that should be returned.
  • exists: The paths of the JSON document that should be checked for existence.

The following example fetches only two fields from an airline and checks if the foobar field exists in the document:

sc.parallelize(Seq("airline_10123"))
  .couchbaseSubdocLookup(get = Seq("name", "iata"), exists = Seq("foobar"))
  .collect()
  .foreach(println)

This prints:


SubdocLookupResult(airline_10123,0,Map(name -> Texas Wings, iata -> TQ),Map(foobar -> false))

You can then extract the information from the SubdocLookupResult and use them in your RDD flow. Note that the Couchbase SDK also supports subdocument mutations which will be added to the Spark Connector in a future release.