twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.48k stars 703 forks source link

Batched groups for TypedPipe #1318

Open rubanm opened 9 years ago

rubanm commented 9 years ago

With a TypedPipe, suppose we want to generate groups such that there are at most k values in each group, it can currently be done using something like:

pipe.groupAll.mapValueStream(_.zipWithIndex).values
  .map { case (v, i) => (i / k, v) }.group

However, this can be too slow for large datasets. It should be possible to use the total size of the dataset using groupAll + size instead. The extra mr step should still be faster than the earlier groupAll. Will something like the following work?

def batchedGroup(batchSize: Long): Grouped[Long, T] = {
  val random = Random(123)
  val intermediate = forceToDisk
  val size = intermediate.groupAll.size.values
  intermediate.cross(size)
    .map { case (data, size) =>
       val k = (random.nextLong % size) / batchSize
       (k, data)
    }
   .group
}
johnynek commented 9 years ago

I think that's not quite right. You want % batchSize to make sure that (0 until batchSize).toSet.contains(k) right? Another approach: (random.nextDouble() * batchSize).toLong

In fact in that approach wouldn't this work:

this.map { v =>
  val k = math.floor(random.nextDouble * batchSize)
  (k, v)
}
.group
ianoc commented 9 years ago

These won't be able to give you an at most bounds right? they are approximate bucketing?

johnynek commented 9 years ago

Yeah, I totally lost track of what was supposed to be happening here. disregard. :)

Still what is this doing:

(random.nextLong % size) / batchSize

so, nextLong % size should be a random bucketing into size buckets, but if there is a collision, then you could have more than batchSize is each bucket. If you knew the taskId and the number of tasks there were (something we have thought about adding in the past and I even added a patch to do this once, but we tossed it), we could do this without the randomness and then we could make it exact. Rather than counting all of the items, you count how many items each task has (create a hashTable of taskId => count). Then we can use that to produce an exact contiguous ordering of the items into the space 1 to sum(hashTable.values), then it should be exact, right?

ianoc commented 9 years ago

Yep, that could be exact since we can revisit the same persisted data on disk from the first pass. (So i guess you'd need to ensure you've done a forceToDisk before the operation to ensure its deterministic). But that'd work i think

aalmahmud commented 9 years ago

I am currently trying to solve this problem. I need to partition a pipe to multiple groups but each group can contain at most 50000(say batchSize) entries. So in this case my number of buckets will be size/batchSize (say buckets) using groupRandomly(buckets) can solve this problem. But the problem is that may be some group can contain more than batchSize entries. To avoid this problem am planning to use groupRandomly(buckets X 2) or groupRandomly(bucketsX 3)? This will reduce the probability of having a bucket more than batchSize entries. [Edit: using * for multiplication was not displaying correctly"]

rubanm commented 9 years ago

@aalmahmud that can be a workaround if your total number of buckets is not too many reducers to spawn in one step. I will work on a PR based on this thread so we have a proper solution.