NICTA / scoobi

A Scala productivity framework for Hadoop.
http://nicta.github.com/scoobi/
482 stars 97 forks source link

Extend join APIs with "map-side" joins #90

Open blever opened 12 years ago

blever commented 12 years ago

Now that Scoobi has DObjects, it should be possible to implement "map-side" joins. That is, joins where one data set is small enough to fit into memory.

blever commented 11 years ago

@raronson - please provide some requirements on the functionality of the types of map-side joins we'd like to see. Appropriate names and pointers to implementation approaches would be great.

blever commented 11 years ago

This comment is related to #175.

Often the smaller table in the map-side join is some data set that is present on disk. In order to perform the map-side join, it needs to be ingested and placed inside a DObject. For example:

val records =
  Source.fromInputStream(configuration.fs.open(new Path("my-small-table.txt"))).getLines()
             .map { _.split("\t").toList }
             .collect { case id :: name :: _ => (id, name) }
val small_table: DObject [Map[String, String]] = DObject(records.toMap)

val large_table: DList[(String, ...)] = ...
small_table join large_table map { ...

The way this works is that the reading of the file and placing it into a Map is all done client side. Then, a serialised version of the Map is pushed to the distributed cache from which each mapper task reads and deserialises as part of the join with the DList.

The problem with this approach is that it may be more efficient to move all of this work (reading the file and creating the Map) into the setup phase of the mapper (or reducer) task. And I don't believe there is a way to make this happen with the APIs Scoobi currently exposes ...