Blog

The How and Why of Spark and Couchbase

I can spend a lot of time gushing about Couchbase and the details about its architecture and implementation. I've grown to really love Couchbase as a NoSQL store but my love for it isn't really a good reason to write a blog post. I think a great deal of people using Couchbase for analytical purposes can benefit from combining it with Spark. This blog is just a quick rundown of some of the features I'll often use when working with the two. More so my notes than really any wider statement.

House Keeping

I won't go into too much detail about Couchbase, but it's a JSON document store that is easy to distribute and has some other great features. I would recommend reading the docs for more details.

Type Safe Serialization

image

A big annoyance with JSON is serialization. If you have data that looks like this:

[
{
color: "red",
value: "#f00"
},
{
color: "green",
value: "#0f0"
},
{
color: "blue",
value: "#00f"
},
{
color: "cyan",
value: "#0ff"
},
{
color: "magenta",
value: "#f0f"
},
{
color: "yellow",
value: "#ff0"
},
{
color: "black",
value: "#000"
}
]

Using it for analytical purposes requires some kind of query language or hand written code to loop through the objects but there are not a lot of guarantees about types when doing this in doing so. It's safer to at least know what you're dealing with and that requires guaranteeing an implicit conversion. In Scala we can use some help from case classes and Spray JSON to accomplish this:

import spray.json._
val myJson: String = "{
color: "red",
value: "#f00"
}" // JSON defined above
case class Settings(color: String, value: String) // Case class for the JSON
object SettingsProtocol extends DefaultJsonProtocol {
implicit val settingsFormat = jsonFormat2(Settings)
}
import SettingsProtocol._
Json.parse(myJson).convertTo[Settings]

From here, converting JSON to a Spark dataset is fairly trivial:

import org.apache.spark.sql._
import spray.json._
import com.couchbase.client.java.query.N1qlQuery
import com.couchbase.spark.connection.CouchbaseConnection
import com.couchbase.spark._
val sparkSession = "..."
import sparkSession.sqlContext.implicits._
case class Settings(color: String, value: String) // Case class for the JSON
object SettingsProtocol extends DefaultJsonProtocol {
implicit val settingsFormat = jsonFormat2(Settings)
}
import SettingsProtocol._
// Set up a NQ1L Query
val query1 = "SELECT * from `settings`"
// Query Couchbase
val data = sparkSession.sparkContext.couchbaseQuery(N1qlQuery.simple(query1))
.collect()
.map(x => x.value.toString)
.map(x => x.parseJson.convertTo[Settings])
sparkSession.sparkContext.parallelize(data).toDS().show()
///DataSet
CouchbaseConnection().stop() // Stop Couchbase

We can go safely from a JSON string to a dataset of rows and columns that have properly defined types with minimal effort, allowing for a natural pipeline from Couchbase to Spark.

You might say “Well you will have to hand write code to do these implicit conversions,” which is true. You can do a loose conversion to a list of JSON documents and then convert the schema afterward, or use some of Sparks built in facilities:

// borrowed from: https://developer.couchbase.com/documentation/server/4.5/connectors/spark-2.0/working-with-rdds.html
import com.couchbase.spark._
sc
.couchbaseGet[JsonDocument](Seq("airline_10123", "airline_10748"))
.collect()
.foreach(println)

N1QL

image

N1QL is the query language behind Couchbase allowing you to write SQL-like queries over the JSON data structure. There have been other attempts at this but none implemented as well as N1QL, in my opinion. If I have data that looks like this:

[
{"id":"1","display":"melbourne","tag":"victoria", "payments";[5,4,3,2,1]},
{"id":"2","display":"geelong","tag":"victoria", "payments";[1,2,3,4,5]},
{"id":"3","display":"swan hill","tag":"victoria", "payments";[10,12,13,14,15]},
{"id":"4","display":"brisbane","tag":"queensland", "payments";[8,7,6,5,4,3]},
{"id":"5","display":"gold coast","tag":"queensland", "payments";[111,222,333,444,555]},
]

I could query it with N1QL in the following way:

"SELECT * from records where tag = 'victoria'"
view raw query.txt hosted with ❤ by GitHub

That may not be that impressive because there aren't any nested structures to get through. You can do array searches in N1QL as well making for some interesting query opportunities like the following:

"SELECT * from `records` where ANY p in payments SATISFIES p in [1,2,3,4,5] END"
view raw N1qlarray.txt hosted with ❤ by GitHub

I find N1QL intuitive, especially in the Spark SQL context where you're already writing SQL-Like syntax. I used N1QL in the example in the previous section without explaining it. You can see how it is a bit more intuitive than the traditional Couchbaseget if you come from a SQL background.

Streaming

image

You can stream data from Couchbase as well instead of querying it. This only makes sense if you have some analytical needs based on updates to the database.

Setting up this code is straight forward:

// Set up A Streaming Context
val ssc = new StreamingContext(conf, Seconds(2))
ssc
.couchbaseStream(from = FromBeginning, to = ToInfinity)
.someMethod()
ssc.start()
ssc.awaitTermination()

You can use this as an analytics layer to watch for abnormalities in the data or to trigger other events or pipelines. You can also use Spark streaming to write data to Couchbase from a Stream as outlined in the docs.

You can use full-text search in Couchbase, similar to Elastic Search. Full-text search is much less precise than a SQL query but it's appropriate for many use cases. You can use full-text search without much effort:

import com.couchbase.client.java.search._
import com.couchbase.spark.connection.CouchbaseConnection
import com.couchbase.spark._
val fullTextSearch = SearchQuery.match("term"); // Enable the search query to look to match specifically on a term
val result = sparkSession.sparkContext.couchbaseQuery(SearchQuery("my_search_term", fullTextSearch))
.collect()
.someMethod() ...

From here you can use pattern matching to ensure correct serialization and so on. In addition to a simple search like this, there are many complex searches you can do in Couchbase as well.

That's All

Couchbase isn't a good fit for all applications but it's being adopted pretty rapidly and it's great to know it's nice to work with for Spark. I've been using it for a few months now and have grown to like it quite a bit.

Notes

Examples are done with Spark 2.0.1 and Spark Couchbase Connector 2.0

Poll