[go: up one dir, main page]

Skip to content
Eric Springer edited this page May 24, 2013 · 2 revisions

A new Release Candidate for Scoobi: scoobi-0.7.0-RC1-cdh4.jar (or cdh3 if that's your thing) has just been released.

This version contains important changes from 0.6.1:

  • a refactoring of the implementation
  • a new API for persisting DLists so that it is possible to do incremental computations on the same list
  • a "checkpoint" API
  • an API to load and store DObjects
  • a Counters API
  • First-class Reduction data type for composing parallel operations
  • some new DList methods (isEqual, shuffle, zipWithIndex)
  • the mapFlatten operation
  • a REPL
  • Scala 2.10

Let's see all these points in more details.

Refactoring

A huge refactoring has been undertaken to make Scoobi's internals simpler and easier to reason about by harnessing the power of the Kiama library. If you are interested in the gory details, you can reserve a seat for a presentation during the next ScalaDays in NYC.

One consequence of this refactoring is the evolution of the "persist" API, which follows.

Persist API

Requirements

The requirements for persisting DLists and DObjects are a bit tricky. Typically, when you build 2 DLists sharing the same computations:

val list0 = DList(1, 2, 3).map(_ + 1)
val list1 = list0.map(_.toString)
val list2 = list0.map(_ * 2)

you don't want to execute those shared computations twice (the map operation for list0). This is why the old persist API has a persist method accepting several arguments so that we can process both lists at the same time and determine which computations are shared:

persist(toTextFile(list1, "path"), toTextFile(list2, "path")) // returns Unit

In terms of API, this complicates persistence of DObjects. Indeed, DObjects can be constructed from DLists but when we persist them, we generally want to bring the resulting value back in memory:

val list0 = DList(1, 2, 3).map(_ + 1)
val list1 = list0.map(_.toString)
val list2 = list0.map(_ * 2)
val o1 = list1.reduce(string)
val o2 = list2.reduce(Sum.int)

// can we call persist(toTextFile(list1, "path1"), toTextFile(list2, "path2"), o1, o2)?
// what do we get back?

What should we call? persist? The problem is that we want:

  • to persist DLists as side-effects (write them to files) and return Unit
  • to persist DObjects and read the value in memory (return an A)
  • all of this while executing shared computations between DLists and DObjects only once

Sinks

The first step in solving this problem is to change the way Sinks are specified for DLists.

In the previous API, you would write toTextFile(list, "path") to specify that the list should be persisted to a Text file with a given path and this would return a Persister object which you could persist. Now you can simply write list.toTextFile("path"), which merely returns another DList.

This is not only a syntactic change but also an improvement because you can now chain those calls to specify that you want to persist a DList to several sinks:

val list = DList(1, 2, 3).toTextFile("path").toAvroFile("other path")

This also means that we've separated the place where sinks are specified from the place where a list is persisted. One consequence is that you can persist some intermediary results while making just one call to persist for the list representing all the computations to be done:

val list1 = DList(1, 2, 3).toAvroFile("path1")
val list2 = list1.map(_ + 1)
persist(list2.toTextFile("path2"))

(see below "checkpoints" for a variation on this feature)

persist and run

Now that we know about Sinks, let's see how to persist lists and objects. With the new API, you have 2 methods; persist and run:

  • persist zero or more DLists or DObjects and returns Unit. This method makes sure that no shared computation is done twice and stores everything to disk (even DObject values)
  • run does the same as persist but brings back values in memory, including DLists values as Iterables

Here are a few examples, in growing order of complexity.

  • persist a list to a text file
val list = DList(1, 2, 3).map(_ * 10)
list.toTextFile("path").persist
  • persist a DList to a text file and get its value (aka "run")
val list = DList(1, 2, 3).map(_ * 10)
list.toTextFile("path").run // Vector(10, 20, 30)
  • create 2 DLists sharing some computations but only run one of them
val l0 = DList(1, 2, 3).map(_ * 10)
val l1 = l0.map(_ + 1)
val l2 = l0.map(i => { sys.error("l2 must not be computed!"); i + 2 }) // ok
l2.run // Vector(11, 21, 31)
  • persist 2 DLists sharing some computation and bring the results into memory
val l0 = DList(1, 2, 3).map(_ * 10)
val l1 = l0.map(_ + 1)
val l2 = l0.map(_ - 1)

persist(l1, l2)  // l0 is only computed once
// nothing is recomputed, the results are just read from files
(l1.run, l2.run) // (Vector(11, 21, 31), Vector(9, 19, 29))
  • iterate computations until a result is correct
val list: DList[Int] = DList(1, 2, 3)

def incrementUntilMaxIs(target: Int, list: DList[Int]): DList[Int] = {
  persist(list)
  val listMax = list.max.run
  if (listMax >= target) list
  else                   incrementUntilMaxIs(target, list.map(_ + 1))
}
incrementUntilMaxIs(5, list).run // Vector(3, 4, 5)
  • persist 2 objects and a list
val list = DList(1, 2, 3)
val plusOne = list.map(_ + 1)

val sum = list.sum
val max = list.max

persist(sum, max, plusOne)
(sum.run, max.run, plusOne.run) // === (6, 3, "Vector(2, 3, 4)")
  • run 2 objects and a list
val list = DList(1, 2, 3)
val plusOne = list.map(_ + 1)

// run return tuples for arities <= 10
val (sum, max, plus1) = run(list.sum, list.max, plusOne) // === (6, 3, "Vector(2, 3, 4)")

Checkpoints

One great addition on this new Persist API is the possibility to declare some sinks as "checkpoints"

val list = DList(1, 2, 3)

// This is an expensive computation. I want to keep the intermediate results.
val l1 = list.map(_ * 100).toAvroFile("path", checkpoint = true)

val l2 = l1.map(_ + 1)

When a sink has been marked as a checkpoint, on a subsequent execution of the program we will check if there is already a file containing data for the sink path. If this is the case, we will reuse this data instead of doing the computations all over again.

Important note: this functionality is only available on DataSinks which can also be used as DataSources. This is the case for Sequence or Avro files but not for Text files because once a data of type A is written as a string in a file, there is no way to read it as an A again.

DObject API

Another, less important but pretty natural, extension of the Persist API is the ability to save and load DObjects. This is particularly interesting when the the object is a list or map of some sort which sums up the characteristics of a big data set.

  • persisting a DObject[Iterable[A]]
val object1 = DList(1, 2).materialise.toTextFile("path")
  persist(object1) // the file contains 2 lines: 1, 2
  • persisting a DObject[A]
val object1 = DList(1, 2).sum.toTextFile("path")
persist(object1) // the file contains one line: 3
  • loading a DObject[Iterable[A]]
val object1 = DList(1, 2).materialise.toAvroFile("path")
persist(object1)

// the file can be read as a DList and re-materialized as an object
val object2: DObject[Iterable[Int]] = fromAvroFile[Int]("path").materialise
  • loading a DObject[A]
val object1 = DList(1, 2).sum.toAvroFile("path")
persist(object1)

// the objectFromFile method must be used in that case
val object2: DObject[Int] = objectFromAvroFile[Int]("path")

Counters API

Incrementing counters were not part of the previous Scoobi interface but has been added to Scoobi at a minimal cost to abstraction. Users are encouraged to reuse the low-level parallelDo method which gives an access to the Emitter interface: DList(1, 2, 3).parallelDo[String]((i: Int, emitter: Emitter[String]) => emitter.write(i.toString))

A similar operation can be used with the core.Counters interface DList(1, 2, 3).parallelDo((i: Int, counters: Counters) => { counters.incrementCounter("group1", "counter1", 1); i })

At the end of the run, the counter values will be accessible from the ScoobiConfiguration object sc.counters.getGroup("group1").findCounter("counter1").getValue

A very similar functionality also exists if you want to notify your current Hadoop context that your process is still alive DList(1, 2, 3).parallelDo((i: Int, beat: Heartbeat) => { beat.tick; i })

Reduction API

A Reduction value represents a binary operation on a closed set. In previous versions of Scoobi, the type (A, A) => A was used for this representation. Using a first-class data type (Reduction) permits an API for combining reduction operations to produce higher-level reductions.

The DList.combine method has been modified to accept a Reduction object instead of a simple function. The purpose of this change is to provide a rich set of combinators to create "combine" functions from other "combine" functions. For example, if you have a DList[(String, (String, Int, Int, Double))] and you want to combine the values pairwise with some specific logic

type Data = (String, Int, Int, Double)
val f = (t1: Data, t2: Data) => {
  val concat = t1._1 + t2._1
  val min = math.min(t1._2, t2._2)
  val max = math.max(t1._3, t2._3)
  val sum = t1._4 + t2._4
  (concat, min, max, sum)
}
DList(("key", Seq(("a", 1, 2, 3.0)))).combine(Reduction(f))

In the previous example, the manipulations of the indices are error-prone and hinder readability. However, using Reductions, these problems are mitigated:

type Data = (String, Int, Int, Double)
// Reduction.string is the normal concat operation on strings
// it is zipped with 3 other reductions to create a new Reduction on Tuple4
DList(("key", Seq(("a", 1, 2, 3.0)))).combine(Reduction.string.zip4(Reduction.minimum[Int], Reduction.maximum[Int], Sum.double))

New DList methods

The following 2 methods might be useful especially when experimenting with data and algorithms.

// will run a map-reduce job to shuffle the elements randomly
DList(1, 2, 3).shuffle

// this returns a DObject[Boolean] which is true if the 2 lists contain the same elements
DList(1, 2, 3).isEqual(DList(2, 1, 3))

The mapFlatten method

The DList.flatMap method has been deprecated because it is not really a flatMap method. A well-functioning flatMap method would have a signature like:

class DList[A] {
  def flatMap[B](f: A => DList[B]): DList[B] = ???
}

Note also that DList is not Iterable, so using Iterable does not provide a candidate flatMap method. However, we don't know yet how to implement this method because this would mean that for each element of your DList you spin-off other MapReduce job. This is not guaranteed to be very efficient :-).

A REPL

There is now a Scoobi REPL to let you create DLists from the command line. You can reuse our scoobi script which invokes the com.nicta.scoobi.application.ScoobiRepl class and start executing some code:

 === Please wait while Scoobi is initialising... ===

                                                     |\
                                             /    /\/o\_
                              __    _       (.-.__.(   __o
       ______________  ____  / /_  (_)   /\_(      .----'
      / ___/ ___/ __ \/ __ \/ __ \/ /     .' \____/
     (__  ) /__/ /_/ / /_/ / /_/ / /     /   /  / \
    /____/\___/\____/\____/_.___/_/ ____:____\__\__\____________


 === Ready, press Enter to start ===


scoobi> DList(1, 2, 3).run

By default, you will be executing your code on the cluster. If you simply want to execute locally or in memory, enter the REPL command local or inmemory. There is also a very succinct help command providing those options.

Scala 2.10

Scala is moving and so is its ecosystem. We are not yet leveraging any "big" feature of Scala 2.10 like macros but we are using small ones like string interpolation or implicit classes. On the other hand we are using other libraries, like specs2 or Kiama which are now more heavily using 2.10 fully.