nathanmarz / cascalog

Data processing on Hadoop without the hassle.
Other
1.38k stars 179 forks source link

Leak in tuple sequence in custom buffer operations #251

Closed harshadss closed 9 years ago

harshadss commented 10 years ago

For a buffer operation, the first tuple in the input tuple sequence gets leaked. The example code for this and the output is given below.

Example Code

(def test-data [["u1", 1, "p1"] , ["u1", 3, "p2"], ["u1", 5, "p1"], ["u1", 8, "p2"] ] )

(defbufferfn some-op [tuples] (partition 2 1 tuples) )

(?<- (stdout) [?user ?first ?second] (test-data :> ?user ?timeindex ?p) (:sort ?timeindex) (some-op :< ?p ?timeindex :> ?first ?second) )

The output has a leak and the first tuple ("p1" 1) doesn't show up in the first pair. The output is given below. u1 ("p2" 3) ("p2" 3) u1 ("p2" 3) ("p1" 5) u1 ("p1" 5) ("p2" 8)

bfabry commented 9 years ago

suuuper weird, if you put a (println tuples) before the partition call in the bufferfn then it behaves correctly. Something odd with serialization

sritchie commented 9 years ago

What happens if you call vec on the tuples before you pass them out?

bfabry commented 9 years ago

Same problem with

(defbufferfn some-op [tuples]
   (map vec (partition 2 1 tuples) ))

however this fixes it

(defbufferfn some-op [tuples]
  (partition 2 1 (map vec tuples)))
sritchie commented 9 years ago

Yeah, this is not good. Might be able to find some time to look into this one. Thanks for the test case.

sritchie commented 9 years ago

I think the issue we're running into is similar to this:

https://groups.google.com/forum/#!topic/clojure/Dey9CYZGNTw

The buffer gets passed a clojure.lang.IteratorSeq, which is a straight-up wrapper over the java iterator that Cascading hands us. partition isn't designed to work with a stateful iterator, and its internal buffering and moving along the sequence screws with the iterator's stored stateful pointer.

That's why (map vec tuples) works; because map goes through a single time and creates a sequence.

This is something we should document for sure, but I'm not sure if there's really a fix here, since fully realizing every iterator isn't the right thing to do.

Let me know what you think about keeping or closing this one.

bfabry commented 9 years ago

Hmm. A colleague of mine actually ran into this today and having not seen this bug found it very difficult to track down and fix. I kinda feel like seeing as bufferiter exists, then it's fine for buffer to fully realise the iterator, but the documentation of bufferiter would need to improve.

I'm actually really struggling to see how the implementation of IteratorSeq could lose data this way though, it tries pretty hard to store every time it calls next(). If I could figure out how it happens then I could at least modify it so it goes bang.

sritchie commented 9 years ago

Ugh, I'm trying to recreate this in a local test case and I can't. Working in the REPL, it looks like the WORST thing to do is call (vec tuples) - that seems to force the entire sequence to be realized, so you get n copies of the last elements.

@cwensel, do you have any ideas here? It looks like the argumentsIterator returned by BufferCall here:

https://github.com/nathanmarz/cascalog/blob/develop/cascalog-core/src/java/cascalog/ClojureBuffer.java#L42

isn't implementing .hasNext and .next properly. Or something like that. Any clue as to where the concrete implementation of that iterator is?

cwensel commented 9 years ago

I'm not sure what you mean by N copies of last elements. What you do get is the same TupleEntry backed by a Tuple on every call to #next(). To reduce gc, we re-use the TupleEntry instance. and depending a few things, the Tuple will be re-used where it can be. This re-use is inherited from the core Hadoop apis where they return a newly populated key/value vs creating new ones.

Cascading has always been clear that you should never cache a TupleEntry or Tuple you did not create as they may (will be) reused. But looking at the javadoc for 2.6 we should re-state this in the method javadoc to avoid this confusion with Buffers. https://github.com/cascading/cascading/blob/2.6/cascading-core/src/main/java/cascading/operation/Buffer.java#L53-53

Use TupleEntry#getTupleCopy() of you want to store a Tuple in a List.

sritchie commented 9 years ago

Ah, thanks, @cwensel. Looks like getTupleCopy fixes the issue. Thanks!

sritchie commented 9 years ago

@bfabry can you try this patch out? https://github.com/nathanmarz/cascalog/pull/280 I added a test as well, and all looks good to go. I can get this out after tests pass.

bfabry commented 9 years ago

Aye that patch fixes it for me @sritchie.

sritchie commented 9 years ago

Okay, excellent. I should probably publish 3.0.0. It's been a while.