-
Notifications
You must be signed in to change notification settings - Fork 97
0.7.0 RC1
A new Release Candidate for Scoobi: scoobi-0.7.0-RC1-cdh4.jar (or cdh3 if that's your thing) has just been released.
This version contains important changes from 0.6.1:
- a refactoring of the implementation
- a new API for persisting DLists so that it is possible to do incremental computations on the same list
- a "checkpoint" API
- an API to load and store DObjects
- a Counters API
- First-class
Reduction
data type for composing parallel operations - some new DList methods (
isEqual
,shuffle
,zipWithIndex
) - the
mapFlatten
operation - a REPL
- Scala 2.10
Let's see all these points in more details.
A huge refactoring has been undertaken to make Scoobi's internals simpler and easier to reason about by harnessing the power of the Kiama library. If you are interested in the gory details, you can reserve a seat for a presentation during the next ScalaDays in NYC.
One consequence of this refactoring is the evolution of the "persist" API, which follows.
The requirements for persisting DLists and DObjects are a bit tricky. Typically, when you build 2 DLists sharing the same computations:
val list0 = DList(1, 2, 3).map(_ + 1)
val list1 = list0.map(_.toString)
val list2 = list0.map(_ * 2)
you don't want to execute those shared computations twice (the map
operation for list0
). This is why the old persist API has a persist
method accepting several arguments so that we can process both lists at the same time and determine which computations are shared:
persist(toTextFile(list1, "path"), toTextFile(list2, "path")) // returns Unit
In terms of API, this complicates persistence of DObject
s. Indeed, DObject
s can be constructed from DList
s but when we persist them, we generally want to bring the resulting value back in memory:
val list0 = DList(1, 2, 3).map(_ + 1)
val list1 = list0.map(_.toString)
val list2 = list0.map(_ * 2)
val o1 = list1.reduce(string)
val o2 = list2.reduce(Sum.int)
// can we call persist(toTextFile(list1, "path1"), toTextFile(list2, "path2"), o1, o2)?
// what do we get back?
What should we call? persist
? The problem is that we want:
- to persist
DList
s as side-effects (write them to files) and returnUnit
- to persist
DObject
s and read the value in memory (return anA
) - all of this while executing shared computations between
DList
s andDObject
s only once
The first step in solving this problem is to change the way Sink
s are specified for DLists
.
In the previous API, you would write toTextFile(list, "path")
to specify that the list should be persisted to a Text
file with a given path and this would return a Persister
object which you could persist
. Now you can simply write list.toTextFile("path")
, which merely returns another DList
.
This is not only a syntactic change but also an improvement because you can now chain those calls to specify that you want to persist a DList
to several sinks:
val list = DList(1, 2, 3).toTextFile("path").toAvroFile("other path")
This also means that we've separated the place where sinks are specified from the place where a list is persisted. One consequence is that you can persist some intermediary results while making just one call to persist
for the list representing all the computations to be done:
val list1 = DList(1, 2, 3).toAvroFile("path1")
val list2 = list1.map(_ + 1)
persist(list2.toTextFile("path2"))
(see below "checkpoints" for a variation on this feature)
Now that we know about Sink
s, let's see how to persist lists and objects. With the new API, you have 2 methods; persist
and run
:
-
persist
zero or moreDLists
orDObjects
and returnsUnit
. This method makes sure that no shared computation is done twice and stores everything to disk (evenDObject
values) -
run
does the same aspersist
but brings back values in memory, includingDList
s values asIterable
s
Here are a few examples, in growing order of complexity.
- persist a list to a text file
val list = DList(1, 2, 3).map(_ * 10)
list.toTextFile("path").persist
- persist a
DList
to a text file and get its value (aka "run")
val list = DList(1, 2, 3).map(_ * 10)
list.toTextFile("path").run // Vector(10, 20, 30)
- create 2
DList
s sharing some computations but only run one of them
val l0 = DList(1, 2, 3).map(_ * 10)
val l1 = l0.map(_ + 1)
val l2 = l0.map(i => { sys.error("l2 must not be computed!"); i + 2 }) // ok
l2.run // Vector(11, 21, 31)
- persist 2
DList
s sharing some computation and bring the results into memory
val l0 = DList(1, 2, 3).map(_ * 10)
val l1 = l0.map(_ + 1)
val l2 = l0.map(_ - 1)
persist(l1, l2) // l0 is only computed once
// nothing is recomputed, the results are just read from files
(l1.run, l2.run) // (Vector(11, 21, 31), Vector(9, 19, 29))
- iterate computations until a result is correct
val list: DList[Int] = DList(1, 2, 3)
def incrementUntilMaxIs(target: Int, list: DList[Int]): DList[Int] = {
persist(list)
val listMax = list.max.run
if (listMax >= target) list
else incrementUntilMaxIs(target, list.map(_ + 1))
}
incrementUntilMaxIs(5, list).run // Vector(3, 4, 5)
- persist 2 objects and a list
val list = DList(1, 2, 3)
val plusOne = list.map(_ + 1)
val sum = list.sum
val max = list.max
persist(sum, max, plusOne)
(sum.run, max.run, plusOne.run) // === (6, 3, "Vector(2, 3, 4)")
- run 2 objects and a list
val list = DList(1, 2, 3)
val plusOne = list.map(_ + 1)
// run return tuples for arities <= 10
val (sum, max, plus1) = run(list.sum, list.max, plusOne) // === (6, 3, "Vector(2, 3, 4)")
One great addition on this new Persist API is the possibility to declare some sinks as "checkpoints"
val list = DList(1, 2, 3)
// This is an expensive computation. I want to keep the intermediate results.
val l1 = list.map(_ * 100).toAvroFile("path", checkpoint = true)
val l2 = l1.map(_ + 1)
When a sink has been marked as a checkpoint, on a subsequent execution of the program we will check if there is already a file containing data for the sink path. If this is the case, we will reuse this data instead of doing the computations all over again.
Important note: this functionality is only available on DataSink
s which can also be used as DataSource
s. This is the case for Sequence
or Avro
files but not for Text
files because once a data of type A
is written as a string in a file, there is no way to read it as an A
again.
Another, less important but pretty natural, extension of the Persist API is the ability to save and load DObject
s. This is particularly interesting when the the object is a list or map of some sort which sums up the characteristics of a big data set.
- persisting a
DObject[Iterable[A]]
val object1 = DList(1, 2).materialise.toTextFile("path")
persist(object1) // the file contains 2 lines: 1, 2
- persisting a
DObject[A]
val object1 = DList(1, 2).sum.toTextFile("path")
persist(object1) // the file contains one line: 3
- loading a
DObject[Iterable[A]]
val object1 = DList(1, 2).materialise.toAvroFile("path")
persist(object1)
// the file can be read as a DList and re-materialized as an object
val object2: DObject[Iterable[Int]] = fromAvroFile[Int]("path").materialise
- loading a
DObject[A]
val object1 = DList(1, 2).sum.toAvroFile("path")
persist(object1)
// the objectFromFile method must be used in that case
val object2: DObject[Int] = objectFromAvroFile[Int]("path")
Incrementing counters were not part of the previous Scoobi interface but has been added to Scoobi at a minimal cost to abstraction. Users are encouraged to reuse the low-level parallelDo
method which gives an access to the Emitter
interface: DList(1, 2, 3).parallelDo[String]((i: Int, emitter: Emitter[String]) => emitter.write(i.toString))
A similar operation can be used with the core.Counters
interface DList(1, 2, 3).parallelDo((i: Int, counters: Counters) => { counters.incrementCounter("group1", "counter1", 1); i })
At the end of the run, the counter values will be accessible from the ScoobiConfiguration
object sc.counters.getGroup("group1").findCounter("counter1").getValue
A very similar functionality also exists if you want to notify your current Hadoop context that your process is still alive DList(1, 2, 3).parallelDo((i: Int, beat: Heartbeat) => { beat.tick; i })
A Reduction
value represents a binary operation on a closed set. In previous versions of Scoobi, the type (A, A) => A
was used for this representation. Using a first-class data type (Reduction
) permits an API for combining reduction operations to produce higher-level reductions.
The DList.combine
method has been modified to accept a Reduction
object instead of a simple function. The purpose of this change is to provide a rich set of combinators to create "combine" functions from other "combine" functions. For example, if you have a DList[(String, (String, Int, Int, Double))]
and you want to combine the values pairwise with some specific logic
type Data = (String, Int, Int, Double)
val f = (t1: Data, t2: Data) => {
val concat = t1._1 + t2._1
val min = math.min(t1._2, t2._2)
val max = math.max(t1._3, t2._3)
val sum = t1._4 + t2._4
(concat, min, max, sum)
}
DList(("key", Seq(("a", 1, 2, 3.0)))).combine(Reduction(f))
In the previous example, the manipulations of the indices are error-prone and hinder readability. However, using Reductions
, these problems are mitigated:
type Data = (String, Int, Int, Double)
// Reduction.string is the normal concat operation on strings
// it is zipped with 3 other reductions to create a new Reduction on Tuple4
DList(("key", Seq(("a", 1, 2, 3.0)))).combine(Reduction.string.zip4(Reduction.minimum[Int], Reduction.maximum[Int], Sum.double))
The following 2 methods might be useful especially when experimenting with data and algorithms.
// will run a map-reduce job to shuffle the elements randomly
DList(1, 2, 3).shuffle
// this returns a DObject[Boolean] which is true if the 2 lists contain the same elements
DList(1, 2, 3).isEqual(DList(2, 1, 3))
The DList.flatMap
method has been deprecated because it is not really a flatMap
method. A well-functioning flatMap
method would have a signature like:
class DList[A] {
def flatMap[B](f: A => DList[B]): DList[B] = ???
}
Note also that DList
is not Iterable
, so using Iterable
does not provide a candidate flatMap
method. However, we don't know yet how to implement this method because this would mean that for each element of your DList you spin-off other MapReduce job. This is not guaranteed to be very efficient :-).
There is now a Scoobi REPL to let you create DLists
from the command line. You can reuse our scoobi script which invokes the com.nicta.scoobi.application.ScoobiRepl
class and start executing some code:
=== Please wait while Scoobi is initialising... ===
|\
/ /\/o\_
__ _ (.-.__.( __o
______________ ____ / /_ (_) /\_( .----'
/ ___/ ___/ __ \/ __ \/ __ \/ / .' \____/
(__ ) /__/ /_/ / /_/ / /_/ / / / / / \
/____/\___/\____/\____/_.___/_/ ____:____\__\__\____________
=== Ready, press Enter to start ===
scoobi> DList(1, 2, 3).run
By default, you will be executing your code on the cluster. If you simply want to execute locally or in memory, enter the REPL command local
or inmemory
. There is also a very succinct help
command providing those options.
Scala is moving and so is its ecosystem. We are not yet leveraging any "big" feature of Scala 2.10 like macros but we are using small ones like string interpolation or implicit classes. On the other hand we are using other libraries, like specs2 or Kiama which are now more heavily using 2.10 fully.