Hadoop MapReduce is awesome, but it seems a little bit crazy when you have to write this to count words. Wouldn't it be nicer if you could simply write what you want to do:
val lines = fromTextFile("hdfs://in/...")
val counts = lines.flatMap(_.split(" "))
.map(word => (word, 1))
.groupByKey
.combine(_+_)
persist(toTextFile(counts, "hdfs://out/..."))This is what Scoobi is all about. Scoobi is a Scala library that focuses on making you more productive at building Hadoop applications. It stands on the functional programming shoulders of Scala and allows you to just write what you want rather than how to do it.
Scoobi is a library that leverages the Scala programming language to provide a programmer friendly abstraction around Hadoop's MapReduce to facilitate rapid development of analytics and machine-learning algorithms.
Scoobi has the following requirements:
- Cloudera's Hadoop 0.20.2
- Scala 2.9.1: Note that this is typically set in build.sbt
- Sbt 0.11.0
Scala and Hadoop are obvious prerequisites. In addition, the Scoobi library and Scoobi applications use sbt (version 0.11 or later) for dependency management and building.
NOTE: You will probably have to edit the sbt launcher script (located in ~/bin/sbt
or wherever sbt has been installed) to increase the maximum heap size, or you will
get out-of-memory errors. Try changing the existing -Xmx option to -Xmx2048M
(or adding this option if it's not already present). If this still leads to errors,
-Xmx4096M should be enough.
To build Scoobi:
$ cd scoobi
$ sbt publish-local
Then build and package one of the examples:
$ cd examples/wordCount
$ sbt package-hadoop
Finally, run on Hadoop:
$ hadoop jar ./target/Scoobi_Word_Count-hadoop-0.1.jar <input> <output>
Scoobi is centered around the idea of a distributed collection, which is implemented by the
DList (distributed list)
class. In a lot of ways, DList objects are similar to normal
Scala List objects:
they are parameterized by a type and they provide methods that can be used to produce
new DList objects, often parameterized by higher-order functions. For example:
// Converting a List[Int] to a List[String] keeping only evens
val stringList = intList filter { _ % 2 == 0 } map { _.toString }
// Converting a DList[Int] to a DList[String] keeping only evens
val stringDList = intDList filter { _ % 2 == 0 } map { _.toString }However, unlike a Scala List object, the contents of DList objects are not stored on
the JVM heap but stored in HDFS. Secondly, calling DList methods will not immediately result in
data being generated in HDFS. This is because, behind the scenes, Scoobi implements a
staging compiler. The purpose of DList methods are to construct a graph of data
transformations. Then, the act of persisting a DList triggers the compilation of the graph
into one or more MapReduce jobs and their execution.
So, DList objects essentially provide two abstractions:
- The contents of a
DListobject abstracts the storage of data and files in HDFS; - Calling methods on
DListobjects to transform and manipulate them abstracts the mapper, combiner, reducer and sort-and-shuffle phases of MapReduce.
So, what are some of the advantages of using Scoobi?
- The collections abstraction implemented by
DListis a familiar one: The methods for theDListclass have been designed to be the same or as similar to those implemented in the standard Scala collections. There aren't as many methods, but if you grok the semantics of Scala collections, you shouldn't have too much trouble getting up to speed with Scoobi; - The
DListclass is strongly typed: Like the Scala collections, theDListinterface is strongly typed so that more errors are caught at compile time. This is a major improvement over standard Hadoop MapReduce where type-based run-time errors often occur; - The
DListclass can be easily parameterized on rich data types: Unlike Hadoop MapReduce, which requires that you go off implementing a myriad of classes that implement theWritableinterface, Scoobi allowsDListobjects to be parameterized by normal Scala types. This includes the primitive types (e.g.Int,String,Double), tuple types (with arbitrary nesting, e.g.(String, (Int, Char), Double)) as well as case classes. This is all implemented without sacrificing performance in serialization and deserialization; - Scoobi applications are optimized across library boundaries: Over time it makes sense to partition Scoobi code into separate logical entities - into separate classes and libraries. The advantage of Scoobi is that its staging compiler works across library boundaries. Therefore you'll get the same Hadoop performance as if you had everything in the one file but with the productivity gains of having modular software;
- It's Scala: Of course, with Scala you don't lose access to those precious Java libraries, but you also get functional programming and concise syntax which makes writing Hadoop applications with Scoobi very productive ... and fun!
Let's take a step-by-step look at the simple word count example from above. The complete application for word count looks like this:
import com.nicta.scoobi.Scoobi._
object WordCount extends ScoobiApp {
val lines: DList[String] = fromTextFile(args(0))
val counts: DList[(String, Int)] = lines.flatMap(_.split(" "))
.map(word => (word, 1))
.groupByKey
.combine(_+_)
persist(toTextFile(counts, args(1)))
}Our word count example is implemented by the object WordCount, wich extends a ScoobiApp. This
is a convience in Scoobi to avoid having to write a main function, as well as automatically
handling arguments intended for hadoop. The remaining arguments are available as args
Within the implementation guts, the first task is to construct a DList representing the data
located at the input directory. In this situation, because the input data are simple text files,
we can use the fromTextFile method that takes our input directory as an argument and returns
a DList[String] object. Here our DList object is a distributed collection where each
collection element is a line from the input data and is assigned to lines.
The second task is to compute a DList of word counts given all the lines of text from our
input data. This is implemented in four steps:
-
A
flatMapis performed onlines. LikeList'sflatMap, a parameterizing function is supplied which will take as its input a given line (aString) and will return 0 or moreStrings as its result. In this case, that function is the methodsplitwhich will split the input string (a line) into a collection of words based on the occurrence of whitespace. The result of theflatMapthen is anotherDList[String]representing a distributed collection of words. -
A
mapis performed on the distributed collection of words. LikeList'smap, a parameterizing function is supplied which takes as its input a given word (aString) and will return another value. In this case the supplied function takes the input word and returns a pair: the word and the value 1. The resulting object is a new distributed collection of typeDList[(String, Int)]. -
A
groupByKeyis performed on the(String, Int)distributed collection.groupByKeyhas no direct counterpart inList(although there is agroupBydefined onDLists).groupByKeymust be called on a key-valueDListobject else the program will not type check. The effect ofgroupByKeyis to collect all distributed collection values with the same key. In this case theDListobject is of type(String, Int)so a newDListobject will be returned of type(String, Iterable[Int]). That is, the counts for the same words will be grouped together. -
To get the total count for each word, a
combineis performed.combinealso has no counterpart inListbut its semantics are to take aDList[(K, Iterable[V])]and return aDList[(K, V)]by reducing all the values. It is parameterized by a function of type(V, V) => Vthat must be associative. In our case we are simply performing addition to sum all the counts.
The final task is to take the counts object, which represents counts for each word, and persist it.
In this case we will simply persist it as a text file, whose path is specified by the second command line
argument, using toTextFile. Note that toTextFile is used within persist. Although not demonstrated
in this example, persist takes a variable number of arguments, each of which specifies what DList is
being persisted and how.
Until persist is called, our application will only be running on the local client. The act of calling
persist, along with the DList(s) to be persisted, will trigger Scoobi's staging compiler to take the
sequence of DList transformations and turn them into one or more Hadoop MapReduce jobs. In this
example Scoobi will generate a single MapReduce job that would be executed:
- The functionality associated with the
flatMapandmapwill become part of a mapper tasks; - The transformation associated with
groupByKeywill be occur as a consequence of the sort-and-shuffle phase; - The functionality of the
combinewill become part of both a combiner and reducer task.
The word count example is one of a number of examples included with Scoobi. The top level directory examples contains a number of self-contained tutorial-like examples, as well as a guide to building and deploying them. This is an additional starting point for learning and using scoobi.
DList objects are merely nodes in a graph describing a series of data computation we want to
perform. However, at some point we need to specify what the inputs and outputs to that computation
are. We have already seen this in the previous example with fromTextFile(...) and
persist(toTextFile(...)). The former is an example of loading data and the latter is an example
of persisting data.
Most of the time when we create DList objects, it is the result of calling a method on another
DList object (e.g. map). Loading, on the other hand, is the only way to create a DList
object that is not based on any others. It is the means by which we associate a DList object with
some data files on HDFS. Scoobi provides functions to create DList objects associated with
text files on HDFS, which are implemented in the object
com.nicta.scoobi.io.text.TextInput.
The simplest, which we have seen already, is fromTextFile. It takes a path (globs are supported) to
text files on HDFS (or whichever file system Hadoop has been configured for) and returns a
DList[String] object, where each element of the distributed collection refers to one of the lines of
text from the files.
Often we are interested in loading delimited text files, for example, comma separated value (CSV) files.
In this case, we can use fromTextFile followed by a map to pull out fields of interest:
// load CSV with schema "id,first_name,second_name,age"
val lines: DList[String] = fromTextFile("hdfs://path/to/CVS/files/*")
// pull out id and second_name
val names: DList[(Int, String)] = lines map { line =>
val fields = line.split(",")
(fields(0).toInt, fields(2))
}This works fine, but because it's such a common task, TextInput also provides the function
fromDelimitedTextFile specifically for these types of field extractions:
// load CSV and pull out id and second_name
val names: DList[(Int, String)] = fromDelimitedTextFile("hdfs://path/to/CVS/files/*", ",") {
case id :: first_name :: second_name :: age :: _ => (id.toInt, second_name)
}When using fromDelimitedTextFile, the first
argument specifies the delimiter and the second is the path. However, there is also
a second parameter list which is used to specify what to do with fields once they are separated out.
This is specified by supplying a partial function that takes a list of separated String fields as its input
and returns a value whose type will set the type of the resulting DList - i.e. a PartialFunction[List[String], A]
will create a DList[A] (where A is (Int, String) above). In this example, we use Scala's
pattern matching feature to pull out the four fields
and return the first and third.
One of the advantages of this approach is that we have at our disposal all of the Scala
pattern matching features, and because we are providing a partial function, any fields
that don't match against the supplied pattern will not be present in the returned DList.
This allows us implement simple filtering inline with the extraction:
// load CSV and pull out id and second_name if first_name is "Harry"
val names: DList[(Int, String)] = fromDelimitedTextFile("hdfs://path/to/CSV/files/*", ",") {
case id :: "Harry" :: second_name :: age :: _ => (id.toInt, second_name)
}We can of course supply multiple patterns:
// load CSV and pull out id and second_name if first_name is "Harry" or "Lucy"
val names: DList[(Int, String)] = fromDelimitedTextFile("hdfs://path/to/CSV/files/*", ",") {
case id :: "Harry" :: second_name :: age :: _ => (id.toInt, second_name)
case id :: "Lucy" :: second_name :: age :: _ => (id.toInt, second_name)
}And, a more interesting example is when the value of one field influences the semantics of another. For example:
val thisYear: Int = ...
// load CSV with schema "event,year,year_designation" and pull out event and how many years ago it occurred
val yearsAgo: DList[(String, Int)] = fromDelimitedTextFile("hdfs://path/to/CSV/files/*", ",") {
case event :: year :: "BC" :: _ => (event, thisYear + year.toInt - 1) // No 0 AD
case event :: year :: "AD" :: _ => (event, thisYear - year.toInt)
}These are nice features. However, one of the problems with these examples is their conversion of
a String fields into an Int. If the field is not supplied (e.g. empty string) or the files
are simply erroneous, a run-time exception will occur when toInt is called. This exception will be
caught by Hadoop and likely cause the MapReduce job to fail. As a solution to this problem, TextInput
provides Scala extractors for Ints, Longs and Doubles.
Using the Int extractor we can rewrite one of the above examples:
// load CSV and pull out id and second_name
val names: DList[(Int, String)] = fromDelimitedTextFile("hdfs://path/to/CSV/files/*", ",") {
case Int(id) :: first_name :: second_name :: Int(age) :: _ => (id, second_name)
}Here, the pattern will only match if the id (and age) field(s) can be converted successfully from a
String to an Int. If not, the pattern will not match and that line will not be extracted into the
resulting DList.
Persisting is the mechanism Scoobi uses for specifying that the result of executing the computational graph
associated with a DList object is to be associated with a particular data file on HDFS. There are
two parts to persisting:
- Calling
persist, which bundles allDListobjects being persisted; - Specifying how each
DListobject is to be persisted.
Scoobi currently only provides one mechanism for specifying how a DList is to be persisted. It is
toTextFile and is implemented in the object
com.nicta.scoobi.io.text.TextOutput.
As we have seen previously, toTextFile takes two arguments: the DList object being persisted and
the directory path to write the resulting data:
val rankings: DList[(String, Int)] = ...
persist(toTextFile(rankings, "hdfs://path/to/output"))persist can of course bundle together more than one DList. For example:
val rankings: DList[(String, Int)] = ...
val rankings_reverse: DList[(Int, String)] = rankings map { swap }
val rankings_example: DList[(Int, String)] = rankings_reverse.groupByKey.map{ case (ranking, items) => (ranking, items.head) }
persist(toTextFile(rankings, "hdfs://path/to/output"),
toTextFile(rankings_reverse, "hdfs://path/to/output-reverse"),
toTextFile(rankings_example, "hdfs://path/to/output-example"))As mentioned previously, persist is the trigger for executing the computational graph associated
with its DList objects. By bundling DList objects together, persist is able to determine computations
that are shared by those outputs and ensure that they are only performed once.
We've seen in many of the examples that it's possible for DList objects to be parameterized
by normal Scala primitive (value) types. Not surprisingly, Scoobi supports DList objects
that are parameterized by any of the Scala primitive types:
val x: DList[Byte] = ...
val x: DList[Char] = ...
val x: DList[Int] = ...
val x: DList[Long] = ...
val x: DList[Double] = ...And as we've also see, although not a primitive, Scoobi supports DLists of Strings:
val x: DList[String] = ...Some of the examples also use DList objects that are parameterized by a pair (Scala
Tuple2 type).
In fact, Scoobi supports DList objects that are parameterized by Scala tuples up to arity 8, and in addition,
supports arbitrary nesting:
val x: DList[(Int, String)] = ...
val x: DList[(String, Long)] = ...
val x: DList[(Int, String, Long)] = ...
val x: DList[(Int, (String, String), Int, (Long, Long, Long))] = ...
val x: DList[(Int, (String, (Long, Long)), Char)] = ...Finally, Scoobi also supports DList objects that are parameterized by the Scala
Option
and Either
types, which can also be combined with any of the Tuple and primitive types:
val x: Option[Int] = ...
val x: Option[String] = ...
val x: Option[(Long, String)] = ...
val x: Either[Int, String] = ...
val x: Either[String, (Long, Long)] = ...
val x: Either[Long, Either[String, Int]] = ...
val x: Either[Int, Option[Long]] = ...Notice that in all these cases, the DList object is parameterized by a standard Scala type
and not some wrapper type. This is really convenient. It means, for example, that the use of
a higher-order function like map can directly call any of the methods associated with those types.
In contrast, programming MapReduce jobs directly using Hadoop's API requires that all types implement the
Writable
interface, resulting in the use of wrapper types such as IntWritable rather than just int.
Of course the reason for this is that Writable specifies methods for serialization and
deserialization of data within the Hadoop framework. However, given that DList objects eventually
result in code that is executed by the Hadoop framework, how is serialization and deserialization
specified?
Scoobi requires that the type parameterizing a DList object has an implementation of the
WireFormat type class
(Scala context bound).
Thus, the DList class is actually specified as:
class DList[A : WireFormat] { ... }If the compiler cannot find a WireFormat implementation for the type parameterizing a specific
DList object, that code will not compile. Implementations of WireFormat specify serialization
and deserialization in their toWire and fromWire methods, which end up finding their way into
Writable's write and readFields methods.
To make life easy, the
WireFormat object
includes WireFormat implementations for the types listed above (that is why they work out of the
box). However, the real advantage of using
type classes is they allow you to extend the set of types that can be used with DList objects and
that set can include types that already exist, maybe even in some other compilation unit. So long as
a type has a WireFormat implementation, it can parameterize a DList. This is extremely
useful because while, say, you can represent a lot with nested tuples, much can be gained in terms
of type safety, readability and maintenance by using custom types. For example,
say we were building an application to analyze stock ticker-data. In that situation it would be nice
to work with DList[Tick] objects. We can do that if we write a WireFormat implementation for Tick:
case class Tick(val date: Int, val symbol: String, val price: Double)
implicit def TickFmt = new WireFormat[Tick] {
def toWire(tick: Tick, out: DataOutput) = {
out.writeInt(tick.date)
out.writeUTF(tick.symbol)
out.writeDouble(tick.price)
}
def fromWire(in: DataInput): Tick = {
val date = in.readInt
val symbol = in.readUTF
val price = in.readDouble
Tick(date, symbol, price)
}
def show(tick: Tick): String = tick.toString
}
val ticks: DList[Tick] = ... /* OK */Then we can actually make use of the Tick type:
/* Function to compute Hi and Low for a stock for a given day */
def hilo(ts: Iterable[Tick]): (Double, Double) = {
val start = ts.head.price
ts.tail.foldLeft((start, start)) { case ((high, low), tick) => (max(high, tick.price), min(low, tick.price)) }
}
/* Group tick data by date and symbol */
val ticks: DList[Tick] = ...
val ticksGrouped = ticks.groupBy(t => (t.symbol, t.date))
/* Compute highs and lows for each stock for each day */
val highLow = ticksGrouped map { case ((symbol, date), ticks) => (symbol, date, hilo(ticks)) }Notice that by using the custom type Tick it's obvious what fields we are using. If instead
the type of ticks was DList[(Int, String, Double)], the code would be far less readable,
and maintenance would be more difficult if, for example, we added new fields to Tick or modified
the order of existing fields.
Being able to have DList objects of custom types is a huge productivity boost. However, there
is still the boiler-plate, mechanical work associated with the WireFormat implementation. To overcome this,
the WireFormat object also provides a utility function called mkCaseWireFormat that automatically
constructs a WireFormat for case classes:
case class Tick(val date: Int, val symbol: String, val price: Double)
implicit val tickFmt = mkCaseWireFormat(Tick, Tick.unapply _)
val ticks: DList[Tick] = ... /* Still OK */mkCaseWireFormat takes as arguments the case class's automatically generated apply and unapply
methods. The only requirement on case classes when using mkCaseWireFormat is that all its fields have
WireFormat implementations. If not, your DList objects won't type check. The upside to
this is that all of the types above that do have WireFormat implementations can be
fields in a case class when used in conjunction with mkCaseWireFormat:
case class Tick(val date: Int, val symbol: String, val price: Double, val high_low: (Double, Double))
implicit val tickFmt = mkCaseWireFormat(Tick, Tick.unapply _)
val ticks: DList[Tick] = ... /* Amazingly, still OK */Of course, this will also extend to other case classes as long as they have WireFormat implementations.
Thus, it's possible to have nested case classes that can parameterize DList objects:
case class PriceAttr(val: price: Double, val high_low: (Double, Double))
implicit val priceAttrFmt = mkCaseWireFormat(PriceAttr, PriceAttr.unapply _)
case class Tick(val date: Int, val symbol: String, val attr: PriceAttr)
implicit val tickFmt = mkCaseWireFormat(Tick, Tick.unapply _)
val ticks: DList[Tick] = ... /* That's right, amazingly, still OK */In summary, the way data types work in Scoobi is definitely one of its killer features, basically because they don't get in the way!
Scoobi projects are generally developed with sbt, and to simplify the task of building and packaging a project for running on Hadoop, it's really handy to use the sbt plugin sbt-scoobi. Here are a few steps for creating a new project:
Create a new Scoobi application and add some code:
$ mkdir my-app
$ cd my-app
$ mkdir -p src/main/scala
$ vi src/main/scala/MyApp.scala
To use the sbt-scoobi plugin we need to include a project/project/scoobi.scala file with the following contents:
import sbt._
object Plugins extends Build {
lazy val root = Project("root", file(".")) dependsOn(
uri("git://github.com/NICTA/sbt-scoobi.git#master")
)
}And, we can add a pretty standard build.sbt that has a dependency on Scoobi:
name := "MyApp"
version := "0.1"
scalaVersion := "2.9.1"
libraryDependencies += "com.nicta" %% "scoobi" % "0.4.0-SNAPSHOT" % "provided"
scalacOptions += "-deprecation"The provided is added to the scoobi dependency to let sbt know that Scoobi
is provided by the sbt-plugin when it packages everything in a jar. If you
don't included this provided nothing bad will happen, but the jar will contain
some Scoobi dependencies that are not strictly required.
We can now use sbt to easily build and package our application into a self-contained executable jar to feed directly into Hadoop:
$ sbt package-hadoop
$ hadoop jar ./target/MyApp-app-hadoop-0.1.jar <args>
Note that there appears to be a OSX-specific issue
associated with calling hadoop in this manner requiring the jar to be added to HADOOP_CLASSPATH
and then hadoop being given the correct object to run. e.g.:
$ export HADOOP_CLASSPATH=$PWD/target/Scoobi_Word_Count-hadoop-0.1.jar
$ hadoop WordCount inputFile/to/wordcount nonexistent/outputdir
Scoobi is released under the Apache license v2. We welcome contributions of bug fixes and/or new features via GitHib pull requests. In addition, it is important to us to build a friendly user and developer community around Scoobi, so:
- If you happen to encounter something that looks suspiciously like a bug, be sure to log it on the GitHub issue tracker so that it can be fixed for everyone - the more information the better;
- If, on the other hand, you simply have questions about how to use Scoobi, take a look at the posts on the scoobi-users mailing list or post a question of your own;
- And, if you're keen to get your hands dirty and contribute new features to Scoobi, or are hoping to get some insight into Scoobi's internal architecture, or simply want to know what's going on in developer-land, head over to the scoobi-dev mailing list.
We will try our best to respond to all issues and questions quickly.