Getting Started

Getting Started

To get started with the Couchbase Spark connector quickly, learn how to add the connector to your Spark project and run simple queries.

Quickstart

Because most people using Spark are in the Scala ecosystem, the following examples use Scala and its sbt dependency manager.

Create a new sbt project and add the following content to the build.sbt file. This code includes all the Spark dependencies, as well as the Couchbase Spark connector. Just enough to get you started!

Here is a reference to the Scala docs.

name := "my-first-couchbase-spark-project"

organization := "my.organization"

version := "1.0.0-SNAPSHOT"

scalaVersion := "2.10.5"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.1",
  "org.apache.spark" %% "spark-streaming" % "1.4.1",
  "org.apache.spark" %% "spark-sql" % "1.4.1",
  "com.couchbase.client" %% "spark-connector" % "1.0.1"
)

Now, under src/main/scala/, create a Quickstart.scala class with the following skeleton:

object Quickstart {

	def main(args: Array[String]): Unit = {

	}

}

If you are not familiar with Scala, this is more or less the equivalent to Java's public static void main(String[] args) method. Because empty methods make the compiler sad, fill it with proper code now.

When it comes to Spark, you always need to set up a configuration and initialize the SparkContext object. The following snippet does that and also instructs the Couchbase Spark connector to open a bucket in the background.

// Configure Spark
val cfg = new SparkConf()
  .setAppName("couchbaseQuickstart") // give your app a name
  .setMaster("local[*]") // set the master to local for easy experimenting
  .set("com.couchbase.bucket.travel-sample", "") // open the travel-sample bucket

// Generate The Context
val sc = new SparkContext(cfg)

By default, the Spark Connector connects to the default bucket. However, this example uses actual data from the travel-sample bucket that ships with Couchbase Server, so the code connects to that bucket. The Couchbase Spark connector also supports opening more buckets in parallel.

Creating and saving RDDs

Now that the SparkContext is created, you can perform operations against Couchbase. A common scenario is to create resilient distributed datasets (RDDs) out of documents stored in Couchbase. The easiest one is probably by passing in Document IDs as strings.

Before getting further into the actual code, make sure to have the following items imported, because otherwise the implicit methods won't be available.

import com.couchbase.spark._

Use the couchbaseGet method on the SparkContext to fetch documents from Couchbase and create an RDD.

sc
	.couchbaseGet[JsonDocument](Seq("airline_10123", "airline_10748"))
	.collect()
	.foreach(println)

Make sure to specify which document type you want (in this case, a JsonDocument). If you forget to specify the document type, an exception is thrown because the connector does not know what format to use for the results. Then call collect() to aggregate the results and print them out to the command line.

Spark outputs lots of information, so you'll find the output somewhere in the logs:

JsonDocument{id='airline_10748', cas=314538279043072, expiry=0, content={"country":"United States","iata":"ZQ","name":"Locair","callsign":"LOCAIR","icao":"LOC","id":10748,"type":"airline"}}
JsonDocument{id='airline_10123', cas=314538278125568, expiry=0, content={"country":"United States","iata":"TQ","name":"Texas Wings","callsign":"TXW","icao":"TXW","id":10123,"type":"airline"}}

But since just loading data is half the fun, the connector also provides a convenient way to save documents. The following code loads documents like before, but then modifies its contents and ID before saving it back. You can imagine taking any kind of data source, mapping them to documents and just storing them back in Couchbase.

sc
  .couchbaseGet[JsonDocument](Seq("airline_10123", "airline_10748"))
  .map(oldDoc => {
    val id = "my_" + oldDoc.id()
    val content = JsonObject.create().put("name", oldDoc.content().getString("name"))
    JsonDocument.create(id, content)
  })
  .saveToCouchbase()

We utilize the saveToCouchbase() method available on our RDD to store a modified version of our original JsonDocument. Go find your modified document in the Couchbase Server UI! Look for "my_airline_10123" which will just have the name of the airline as its content.

Congratulations! You've successfully performed your first ETL job (extract-transform-load) using Couchbase and Spark. Next up is a whirlwind tour of N1QL and Spark DataFrames.

Working with DataFrames

DataFrames were introduced in Spark 1.3 and have matured even further in Spark 1.4. The nature of the queries fits very well with what Couchbase N1QL provides.
Note: To try this, you need Couchbase Server version 4.0 or later.
Note: You need to at least have a primary index created on the travel-sample bucket to make the following examples work. If you haven't done already, perform a CREATE PRIMARY INDEX ON `travel-sample` query.

In addition to creating a SparkContext, you'll need an SQLContext:

val sql = new SQLContext(sc)

Also, don't forget the Couchbase imports again for all the automatic method goodness:

import com.couchbase.spark.sql._

Because a DataFrame is like an RDD but with a schema and Couchbase is a schemaless database at its heart, you need a way to either define or infer a schema. The connector has built-in schema inference, but if you have a large or diverse data set, you need to give it some clues on filtering.

Suppose you want a DataFrame for all airlines, and you know that the JSON content has a type field with the value airline. You can pass this information to the connector for automatic schema inference:

// Create a DataFrame with Schema Inference
val airlines = sql.read.couchbase(schemaFilter = EqualTo("type", "airline"))

// Print The Schema
airlines.printSchema()

The code automatically infers the schema and prints it in this format:

root
 |-- META_ID: string (nullable = true)
 |-- callsign: string (nullable = true)
 |-- country: string (nullable = true)
 |-- iata: string (nullable = true)
 |-- icao: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)

Next you can perform an actual query where you are interested only in the name and callsign. This example sorts it by the callsign and loads only the first 10 rows.

airlines
  .select("name", "callsign")
  .sort(df("callsign").desc)
  .show(10)

The code prints the results on the console like this:

+-------+--------------------+
|   name|            callsign|
+-------+--------------------+
|   EASY|             easyJet|
|   BABY|             bmibaby|
|MIDLAND|                 bmi|
|   null|          Yellowtail|
|   null|               XOJET|
|STARWAY|   XL Airways France|
|   XAIR|            XAIR USA|
|  WORLD|       World Airways|
|WESTERN|    Western Airlines|
|   RUBY|Vision Airlines (V2)|
+-------+--------------------+