Spark SQL Integration

Spark SQL Integration

Spark SQL integration depends on N1QL, which is available in Couchbase Server 4.0 and later. To use Spark SQL queries, you need to create and persist DataFrames via the Spark SQL DataFrame API.

All examples presented on this page at least require a primary index on the travel-sample data set. If you haven't done so already, you can create a primary index by executing this N1QL statement: CREATE PRIMARY INDEX ON `travel-sample`.

DataFrame creation

Before you can create a DataFrame with Couchbase, you need to create an SQLContext.

// Configure Spark
val cfg = new SparkConf()
  .setAppName("keyValueExample")
  .setMaster("local[*]")
  .set("com.couchbase.bucket.travel-sample", "")

// Generate The Context
val sc = new SparkContext(cfg)

// Spark SQL Setup
val sql = new SQLContext(sc)

Also, you need to make sure that the following import is present:

import com.couchbase.spark.sql._

The easiest way to create a DataFrame looks like this:

val dataFrame = sql.read.couchbase()

While this is the easiest, it has a few shortcomings. It will try to perform automatic schema inference based on the full data set, which is very likely not to hit the right schema (especially if you have a large or diverse data set).

There are two options to solve this shortcoming: you can either provide a manual schema or narrow down the automatic schema inference by providing explicit predicates. The benefit of the latter approach is also that the predicate provided will be used on every query to optimize performance.

If you want to get automatic schema inference on all airlines, you can specify it like this:

val airline = sql.read.couchbase(schemaFilter = EqualTo("type", "airline"))

If you call airline.printSchema(), it will print:

root
 |-- META_ID: string (nullable = true)
 |-- callsign: string (nullable = true)
 |-- country: string (nullable = true)
 |-- iata: string (nullable = true)
 |-- icao: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)

Not only did it automatically infer the schema, it also added a META_ID field which corresponds to the document ID if applicable.

A manual schema can also be provided if the automatic inference does not work properly:

sql.read.couchbase(StructType(
  StructField("name", StringType) ::
  StructField("type", StringType) :: Nil
))

Since now you have a DataFrame, you can apply all the operations that Spark SQL provides. A simple example would be to load specific fields from the DataFrame and print some of those records:

airline
  .select("name", "callsign")
  .sort(airline("callsign").desc)
  .show(10)
+-------+--------------------+
|   name|            callsign|
+-------+--------------------+
|   EASY|             easyJet|
|   BABY|             bmibaby|
|MIDLAND|                 bmi|
|   null|          Yellowtail|
|   null|               XOJET|
|STARWAY|   XL Airways France|
|   XAIR|            XAIR USA|
|  WORLD|       World Airways|
|WESTERN|    Western Airlines|
|   RUBY|Vision Airlines (V2)|
+-------+--------------------+

For a more complete example, imagine you have landmarks in Hadoop distributed file system (HDFS) and airports in Couchbase. For each airport code you want to find all landmarks in the same city:

// Load Landmarks from HDFS
val landmarks = sql.read.json("hdfs://127.0.0.1:8091/landmarks/*")
landmarks.registerTempTable("landmarks")

// Load Airports from Couchbase
val airports = sql.read.couchbase(schemaFilter = EqualTo("type", "airport"))

// find all landmarks in the same city as the given FAA code
val toFind = "SFO" // try SFO or LAX

airports
  .join(landmarks, airports("city") === landmarks("city"))
  .select(airports("faa"), landmarks("name"), landmarks("url"))
  .where(airports("faa") === toFind and landmarks("url").isNotNull)
  .orderBy(landmarks("name").asc)
  .show(20)

Direct DataFrame access

If you are using DataFrames from Scala, using the implicit imports is a great way to simplify your code. If you want to use DFs from other languages, direct access is the preferred way to use them.

In fact, the implicit imports are mainly just syntactic sugar over the direct access methods.

The following code describes how to create a DF directly, with setting all the options manually:

val df = sql.read
  // Define the source, required
  .format("com.couchbase.spark.sql.DefaultSource")
  
  // Either set the schema filter for inference. Like this
  .option("schemaFilter", N1QLRelation.filterToExpression(EqualTo("type", "airline")))
  // Or directly as a string
  .option("schemaFilter", "`type` = 'airline'")
  // Or provide the schema directly
  .schema(StructType(
    StructField("name", StringType):: Nil
  ))
  
  .load()

You can also provide all kinds of options directly, either to spark or for advanced functionality in the N1QL integration. Currently, the following options are allowed:

  • idField: The name of the document ID field, defaults to "META_ID".
  • bucket: The name of the bucket to use, which is required if more than one bucket is opened.

DataFrame persistence

It is also possible to persist DataFrames into Couchbase. The important part is that a META_ID (or different if configured) field exists which can be mapped to the unique Document ID. All the other fields in the DataFrame will be converted into JSON and stored as the document content.

import sql.implicits._

val people = sc.parallelize(Seq(
  Person("user::michael", "Michael", 27),
  Person("user::tom", "Tom", 33)
)).toDF()
people.registerTempTable("people")

people.write.couchbase(Map("idField" -> "uid"))

In this example, the DataFrame is persisted into Couchbase and the document ID field is mapped to uid.