Using the Spark Shell

Using the Spark Shell

The interactive shell can be used together with the couchbase connector for quick and easy data exploration.

Getting Started

The Spark shell provides an easy and convenient way to prototype certain operations quickly, without having to develop a full program, packaging it and then deploying it.

You need to download Apache Spark from the website, then navigate into the bin directory and run the spark-shell command:

michael@daschlbook ~/spark/spark-1.4.1-bin-hadoop2.6/bin $ ./spark-shell -h
Usage: ./bin/spark-shell [options]

Options:
  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --class CLASS_NAME          Your application's main class (for Java / Scala apps).
  --name NAME                 A name of your application.
  --jars JARS                 Comma-separated list of local jars
  ....

If you run the Spark shell as it is, you will only have the built-in Spark commands available. If you want to use it with the Couchbase Connector, the easiest way is to provide a specific argument that locates the dependency and pulls it in:

./spark-shell --packages com.couchbase.client:spark-connector_2.10:1.0.0

Now you're all set!

Usage

Once you've loaded the shell, both the SparkContext (sc) and the SQLContext (sqlContext) are already loaded and ready to go:

scala> sc
res1: org.apache.spark.SparkContext = org.apache.spark.SparkContext@7672960e

scala> sqlContext
res2: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@30cb489a

To load the Couchbase-specific implicit imports, run the following commands:

scala> import com.couchbase.spark._
import com.couchbase.spark._

scala> import com.couchbase.spark.sql._
import com.couchbase.spark.sql._

Now you can run all commands like in a regular program, just in an interactive fashion. The following example stores a document and then retrieves it through KeyValue:

scala> import com.couchbase.client.java.document.JsonDocument
import com.couchbase.client.java.document.JsonDocument

scala> import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.document.json.JsonObject

scala> sc.parallelize(Seq(JsonDocument.create("mydoc", JsonObject.create().put("hello", "spark")))).saveToCouchbase()
...

scala> sc.parallelize(Seq("mydoc")).couchbaseGet[JsonDocument]().foreach(println)
...
JsonDocument{id='mydoc', cas=39773408329728, expiry=0, content={"hello":"spark"}, mutationToken=null}
...

If you need to pass custom properties, right now you need to close the current spark context and establish a new one with the properties. We are working on easier integration through properties that you can pass in on startup (which are also respected by Spark out of the box):

scala> sc.stop()
...

scala> import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkContext, SparkConf}

scala> val sc = new SparkContext(new SparkConf().setAppName("shell").set("com.couchbase.bucket.travel-sample", ""))
...

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> val sqlContext = new SQLContext(sc)
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@567a2954

You can also make use of the first-class N1QL integration. The following example creates a data frame for airlines travel-sample bucket.

scala> val airlines = sqlContext.read.couchbase(schemaFilter = org.apache.spark.sql.sources.EqualTo("type", "airline"))
15/10/20 15:02:51 INFO N1QLRelation: Inferring schema from bucket travel-sample with query 'SELECT META(`travel-sample`).id as `META_ID`, `travel-sample`.* FROM `travel-sample` WHERE  `type` = 'airline' LIMIT 1000'
...
15/10/20 15:02:52 INFO N1QLRelation: Inferred schema is StructType(StructField(META_ID,StringType,true), StructField(callsign,StringType,true), StructField(country,StringType,true), StructField(iata,StringType,true), StructField(icao,StringType,true), StructField(id,LongType,true), StructField(name,StringType,true), StructField(type,StringType,true))
airlines: org.apache.spark.sql.DataFrame = [META_ID: string, callsign: string, country: string, iata: string, icao: string, id: bigint, name: string, type: string]

Now you can print the schema and run ad-hoc data exploration:

scala> airlines.printSchema
root
 |-- META_ID: string (nullable = true)
 |-- callsign: string (nullable = true)
 |-- country: string (nullable = true)
 |-- iata: string (nullable = true)
 |-- icao: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)
scala> airlines.select("name", "callsign").sort(airlines("callsign").desc).show(5)
...
+----------------+--------+
|            name|callsign|
+----------------+--------+
|     Aws express|     aws|
|          Atifly|  atifly|
|        XAIR USA|    XAIR|
|   World Airways|   WORLD|
|Western Airlines| WESTERN|
+----------------+--------+