clj-commons / gloss

speaks in bytes, so you don't have to
Eclipse Public License 1.0
483 stars 57 forks source link

data loss with io/decode-stream #60

Closed bo-tato closed 1 year ago

bo-tato commented 1 year ago

I was coding this simple problem to learn clojure, it's just a server that reads newline separated json requests over tcp and responds to them. My code using gloss is here. I found gloss as that's what the aleph examples used though later I realized for this task there's simpler to-line-seq from byte-streams. I ran into a bug with decode-stream that it seems is already known I'm totally new at clojure and manifold streams and everything so this is my understanding of the bug and my fix, that may be totally wrong: decode-stream reads from some stream and writes to out, as soon as the input stream is drained, it calls close on out. As calls to put on out are non-blocking, maybe the last call to put! on out hasn't written yet at the time we close out Edit: I think my understanding was wrong it seems if you call put! or put-all! on a stream and then immediately close! on a stream those puts still get written before it is closed. I think the race condition is that it reads bytes from src, parses it, and then calls put on dst, so src can be drained and trigger close call on dst, while it is still parsing and before it's called put! here is the code that closes it:

(s/connect-via src f dst {:downstream? false})
(s/on-drained src #(do (f []) (s/close! dst)))

As far as I understand manifold streams, the only effect of :downstream? option is whether manifold will automatically close the stream for us, when manifold does it automatically it does wait for input stream to be drained or closed and the pending writes to out to finish writing before closing out. I just deleted those two lines and replaced with:

(s/connect-via src f dst)

Like I said I don't know clojure or manifold so I'm not sure if there is undesired consequence of leaving downstream as default of true, but it seems for my program it fixes this bug. All the tests on protohackers pass after I made that change.

Also with this little program I ran into this bug that is already fixed in git, but not in the published version of the package. Seems weird that with a very simple usecase (so I assume it happens also in plenty of real programs) I run into two bugs that are already known, that in a real program could cause subtle hard to debug issues. It seems important to at least add a note in the documentation if don't have the time to make a proper fix.

Other than the painful experience debugging, I really like the library! it looks very nice for simply dealing with binary protocols

KingMob commented 1 year ago

@bo-tato Thanks for using gloss and looking into this. Sorry you ran into problems.

Unfortunately, your suggested change is something I already considered, and it breaks a bunch of stuff. I talk about it in more detail in https://github.com/clj-commons/gloss/pull/53#issuecomment-1177670781, but the gist is, a bunch of code relies on that final empty vector being sent when the source is drained, so it can't easily be deleted.

Unfortunately, there's two major reasons it's not fixed: gloss is not very popular, so it's lower priority, but worse, truly fixing the bug may require a massive overhaul of some of Manifold's internals.

Edit: I think my understanding was wrong it seems if you call put! or put-all! on a stream and then immediately close! on a stream those puts still get written before it is closed. I think the race condition is that it reads bytes from src, parses it, and then calls put on dst, so src can be drained and trigger close call on dst, while it is still parsing and before it's called put!

Your intuition is correct. The solution for gloss might be as simple as locking the dst so it can't be closed while the callback is running, but that's tricky to do without hurting performance. Like I said, this is not an easy fix.

KingMob commented 1 year ago

You're right that I should probably cut a new release that includes the buffer-underflow fix. I'll get on that.

KingMob commented 1 year ago

OK, 0.3.3 is available with the buffer underflow fix: https://clojars.org/org.clj-commons/gloss/versions/0.3.3

bo-tato commented 1 year ago

thanks! yea it looks tricky I realize now my original "fix" breaks it in other cases and doesn't even pass the tests. I have another idea for a fix, that might also be wrong as I'm quite new with clojure and this and manifold libraries, but did spend some time trying to understand what's going on and this one does pass all the tests, so it might work. I just replaced the line:

(s/on-drained src #(do (f []) (s/close! dst)))

with:

(s/on-drained src #(let [state @state-ref]
                     (binding [complete? (s/drained? src)]
                       (let [[leftover] (decode-byte-sequence
                                         (:codecs state)
                                         (:bytes state))]
                         (if leftover
                           (s/connect leftover dst)
                           (s/close! dst))))))

My maybe wrong understanding of the original code and proposed fix: Basically in the original code when the src is drained you still have some stuff pending in the state atom to decode and pass on to dst. So it calls f one last time with empty list and f will decode and send on to dst whatever was remaining in state, but then also it's calling close on dst maybe before f has written to it. So in this fix we have two cases, one where there isn't anything left in the state atom, in that case we just close dst. The other case I just copy pasted from f probably not the cleanest way to write it. But it's basically f except instead of calling put to write the decoded data to dst, it calls s/connect to stream the remaining decoded data to dst, and in this case we don't call close on dst. Manifold will close dst automatically after it's done writing.

KingMob commented 1 year ago

This looks plausible. Let me try it out.

KingMob commented 1 year ago

I extracted it to a shared fn like:

(defn- decode-cleanup
  "When a decoding stream is drained, this handles properly flushing out the remaining
   bytes from intermediate streams."
  [src state-ref dst]
  (let [state @state-ref]
    (binding [complete? (s/drained? src)]
      (let [[leftover _ _] (decode-byte-sequence
                             (:codecs state)
                             (:bytes state))]
        (if leftover
          (s/connect leftover dst)
          (s/close! dst))))))

and used it like (s/on-drained src #(decode-cleanup src state-ref dst))

however, the proposed test from #53 still fails. Your proposed change is not calling (f []) when there's no leftovers, but even if I add that to the fn like so:

(defn- decode-cleanup
  "When a decoding stream is drained, this handles properly flushing out the remaining
   bytes from intermediate streams."
  [src dst state-ref f]
  (let [state @state-ref]
    (binding [complete? (s/drained? src)]
      (let [[leftover _ _] (decode-byte-sequence
                             (:codecs state)
                             (:bytes state))]
        (if leftover
          (s/connect leftover dst)
          (do
            (f [])
            (s/close! dst)))))))

it's still failing, probably because dst is being closed before the intermediate streams have processed the []. I tried derefing the deferred returned by (f []) but that didn't work either.

I think this is a good first attempt, though!

bo-tato commented 1 year ago

hm this is tricky, I think my idea was off track again. The reason it's failing that test is that sometimes instead of doing: 1 2 3 4 5 6 7 8 9 [] it does: 1 2 3 4 5 6 7 8 [] 9 the on-drained event can fire before it's written the last 9 to the stream, my idea just addressed the problem of if it calls close! before the last call to put!, but still has the problem of the on-drained event fires and writing leftover/[] before it's written the last 9

I have a new idea that's kind of ugly but it is passing the test from https://github.com/clj-commons/gloss/pull/53 and all the existing tests: change: (let [src (s/->source src)... to (let [src (s/concat [(s/->source src) (s/->source [[]])])... rather than trying calling (f []) after src is drained, we make a new src that is the original with a [] on the end, then we can just do: (s/connect-via src f dst) without the {:downstream? false} and manifold takes care of closing dst

KingMob commented 1 year ago

Hmm, I hadn't considered that... have you proven that the 9 is coming after the []?

I think your idea is intriguing, can you make a PR so I can try it out? (Please include the test from #53)

bo-tato commented 1 year ago

have you proven that the 9 is coming after the []?

I added (println s) in f before (s/put-all! dst s) but I just realized writing to standard out can also have race conditions on it's ordering, so I changed it to (tap> s) and I view the tap in flowstorm debugger and it shows 8 then [] then 9 when the test is failing

can you make a PR so I can try it out?

ok just made PR