Closed tsdeng closed 9 years ago
merge when green
Thanks for merging, will send another rb for UTs
Guys, I'm not convinced this is the right approach.
My concern here is that the method contract really does not say anything about having to read a full next batch. Why is that needed here? What I would like to see instead is a method that can optionally by called after this method that ensures that we can read at least one batch. However, for joining, you don't need this at all, you just read the previous batch and join as much as you can. I think the joining case is not being thought through here.
Also, given we think we are having issues, it would be really great to find some test to check some kind of relevant property. We have a few integration tests, but I don't think they are covering what is going on.
At the very least we to update the comments to this method to explain what the contract is and how it is to be used. Right now, I'm afraid it is a bit muddy. I think I know what it was supposed to do, but now, it is a bit complex. The old contract was this: knowing that a PipeFactory is a function that given a requested Interval of time responds with an Interval it can supply and a function to add that read to a FlowDef. readAfterLastBatch takes a PipeFactory and returns a new one that reads the same data over a changed Interval. The new Interval ends at the same time as the requested interval but starts immediately after the last written batch of the store on disk.
This code was factored out to share it between the joining code and the merge (summing) code, so the contract needs to make sense on both sides.
I don't know exactly what issue we are trying to fix here, but we are forcing all joins to be over at least one batch. Is that not going to increase latency of some complex jobs (with internal joins and stores on different batchers)? How can we check that?
Again, my proposed solution is just add a new method: atLeastOneBatch or something, that takes a PipeFactory, and if the requested range does not completely cover a single batch, then we fail the plan. We might want to do that for the merging side, but I don't think we need to force it on the joining side.
how about in the merge method, we call atLeastOneBatch after calling readAfterLastBatch, so it becomes like:
readBatchedResult <- readAfterLastBatch(delta)
(actualLast, snapshot, deltaFlow2Pipe) = readBatchedResult
_ <- atLeastOneBatch(actualLast)
@tsdeng sounds good to me.
the readDeltaTimestamps should include the whole firstDeltaBatch for the merge operation to start