Spark Streaming Integration

Spark Streaming Integration

The Couchbase Spark connector works with Spark Streaming by using the Couchbase Server replication protocol (called DCP) to receive mutations on the server side as they happen and provide them to you in the form of a DStream. The DStream is the primary format used by Spark Streaming. You can create and persist DStreams.

DStream Creation

Note: Creating DStreams from DCP through Couchbase is highly experimental and not ready to be used in production yet. We appreciate any feedback until we stabilize this module.

The following example will print out all mutations that have happened and all mutations which are going to happen. You can then utilize them through all the common DStream methods.

System.setProperty("com.couchbase.dcpEnabled", "true")
 
val conf = new SparkConf().setMaster("local[*]").setAppName("StreamingSample")
val ssc = new StreamingContext(conf, Seconds(5))
 
ssc
.couchbaseStream()
.print()
 
ssc.start()
ssc.awaitTermination()

Don't forget to add the implicit import:

import com.couchbase.spark.streaming._
Note: Currently, the connector issues a backfill, which means that all data stored in the bucket will be streamed first, so you might want try it on an empty bucket.

DStream Persistence

Since DStreams are basically RDDs, the same persistence mechanisms apply (see Working with RDDs for more information). Here is an example that streams tweets from Twitter and persists them in a Couchbase bucket:

// Configure Spark
val cfg = new SparkConf()
  .setAppName("TwitterFeed")
  .setMaster("local[*]")
  .set("com.couchbase.bucket.twitter", "")

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

val ssc = new StreamingContext(cfg, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, Seq())

val hashTags = stream

hashTags.foreachRDD(rdd => {
  rdd
    .map(status => JsonDocument.create(status.getId.toString, JsonObject.create().put("text", status.getText)))
    .saveToCouchbase()

})

ssc.start()
ssc.awaitTermination()