bloom-lang / bud

Prototype Bud runtime (Bloom Under Development)
http://bloom-lang.net
Other
854 stars 59 forks source link

Incorrect agg output downstream of join #278

Closed neilconway closed 12 years ago

neilconway commented 12 years ago

Test case:

require "rubygems"
require "bud"

class TestAggIdempotence
  include Bud

  state do
    table :t1, [:cnt]
    table :t2
    table :t3
  end

  bloom do
    t1 <= (t2 * t3).lefts.group([], sum(:val))
  end
end

t = TestAggIdempotence.new
t.t2 <+ [[5, 10]]
t.t3 <+ [[10, 20]]
t.tick
puts t.t1.to_a.inspect

Expected output:

[[10]]

Observed output:

[[20]]

From a quick look, it appears that the problem arises because the join code can emit duplicates in the case of rescan/invalidate (see replay_join), but aggregate processing is not necessarily idempotent (e.g., aggs don't eliminate duplicates from their inputs).

For this particular example, you might be able to improve the rescan logic to avoid the duplicate output (since there is really no need to rescan in the first place), but that wouldn't fix the problem in general.

Allowing non-idempotent operators places a heavy burden on the rescan code to avoid dups. An easy and robust fix would be to just do dup elimination for dup-sensitive aggs (e.g., count and sum, min/max are fine as is) -- although that comes at a performance cost. So I'm inclined to just change the dup-sensitive aggs to do dup elimination. Objections welcome.

sriram-srinivasan commented 12 years ago

For this particular example, you might be able to improve the rescan logic to avoid the duplicate output (since there is really no need to rescan in the first place), but that wouldn't fix the problem in general.

Allowing non-idempotent operators places a heavy burden on the rescan code to avoid dups. An easy and robust fix would be to just do dup elimination for dup-sensitive aggs (e.g., count and sum, min/max are fine as is) -- although that comes at a performance cost. So I'm inclined to just change the dup-sensitive aggs to do dup elimination. Objections welcome.

It clearly is a performance hit if you de-dup an already unique stream. Perhaps we can do a quick check of the path from source to the group element, and flag the group for checking only if the path contains a projection (how else can one produce dups?). Within a projection, it is possible to do a simpler conservative analysis, I suppose.

SQL doesn't bother about this issue, does it?

--s

jhellerstein commented 12 years ago

SQL implementations don't do this kind of delta processing, so it doesn't come up.

Without digging into the code, I'm confused on why we're re-emitting previously-computed tuples. That doesn't sound right unless we're recomputing everything downstream.

Seems to me that the right approach here is (a) don't rescan/rejoin unless needed of course, (b) in general, upstream rescan should force downstream recomputation of aggs (and everything else) from scratch, but (c) avoiding recomputation of some of the agg state should be possible in certain cases -- e.g. when deltas are scoped to specific groups for recomputation via some kind of FK analysis.

neilconway commented 12 years ago

@sriram-srinivasan : You could also get dups from a join with a user-provided code block, I believe. You could imagine conservative static analyses that would prove certain operators to be uniqueness-preserving (e.g., a projection to the input collection's key), but that seems hard in general.

@jhellerstein : The join should be improved to avoid emitting duplicate output in this case, which is a win anyway -- fixed per bb272d48c064f82cdc2589ef611689f2c4db7fff

There are two separate issues here:

  1. Ensuring that duplicates introduced by user-defined code blocks do not produce incorrect query outputs. For operators that are not naturally idempotent, this requires doing duplicate elimination, either at the source of the dups (proj/join) or at the dup-sensitive operator (group). My recent checkin implements the latter; adding additional overhead to all projection/join ops is probably a bit more overhead than making non-exemplary aggs more expensive.
  2. Avoiding duplicates from rescan/invalidation. This is an efficiency issue in any case, as well as a correctness issue if we revert the change to make aggs idempotent.

Related to #2: I was digging into this code because I need to trigger additional rescanning for the lattice branch. If you have a projection over t1 that embeds a lattice x, you need to rescan t1 when x sees a delta. Now you could invalidate all the state of downstream operators to t1, but if all operators are idempotent then you can avoid the invalidation work.

Bottom line: I am inclined to leave things as they are (and do dup elimination in the grouping operator), but I'm open to alternative proposals.