nervous-systems / fink-nottle

Asynchronous Clojure/Clojurescript client for Amazon's SNS & SQS services
The Unlicense
48 stars 4 forks source link

HTTPS connections seem to stay open even when channel is closed. #9

Closed mattdeboard closed 8 years ago

mattdeboard commented 9 years ago

I was curious about the connections being made to AWS when using your library. So, I entered this at the command line to watch what connections Java was making:

watch -n 2 lsof -nPi | grep java

I then invoked this function:

(defn read-from
  "Read from source queue, and write the message to both to the destination
  queue as well as to disk."
  [creds src dest]
  (let [ch (receive! creds src)
        {:keys [in-chan]} (batching-sends creds dest)
        wo (writer output-file :append true)]
    (async/go
      (with-open [o (writer output-file :append true)]
        (loop []
          ;; Using `alts!' in this way allows us to flush buffer output to disk
          ;; and close the channel after 10 seconds of no new messages.
          (let [[entry source-chan] (async/alts! [ch (async/timeout 500)]
                                                 :priority true)
                s (json/write-str entry)]
            ;; `entry' will be nil when a take operation has been performed with
            ;; a channel returned by `timeout'. When that happens, flush buffer
            ;; to disk and close the channel.
            (if (nil? entry)
              (do
                (.flush o)
                (async/close! ch))
              ;; If `entry' is not nil, instead write the message to `in-chan'
              ;; (which is our secondary queue for already-processed messages),
              ;; remove the message from the source queue (via `processed!'),
              ;; and write the data (with a newline appended) to the buffer
              ;; before recurring.
              (do
                (async/>! in-chan entry)
                (processed! creds src entry)
                (.write o (str s "\n"))
                (recur)))))))))

While everything does appear to work as expected -- the bytes in the BufferedWriter get written to disk, and, after 500ms, the channel is closed and the buffer is flushed to disk -- the HTTPS connections to AWS seem to either stay open or are refreshed continuously. According to the docs, "once the channel is closed, processing will cease."

Am I doing something wrong in the code? Is my expectation that the connections would be closed incorrect?

moea commented 9 years ago

Well spotted! It's not an incorrect expectation - it was incorrect of me to state/implement that as the behaviour.

As long as the queue is empty, the fetch loop within receive! won't write anything to the channel - and rejected writes are the only means it currently has of figuring out whether the consumer is done.

So, something like "on the first rejected write, processing will cease" would be more accurate. It's easy to imagine this being a real-life problem. I've addressed it in a branch (via dubious means - using a separate control channel to signal termination is probably the right thing to do).

Clojars is down, it looks like - otherwise I'd have pushed an updated 0.4.4-SNAPSHOT. I'll let you know when I do. I've tested the fix, though would appreciate if you could verify it against your example above.

moea commented 9 years ago

Deployed 0.4.4-SNAPSHOT

moea commented 8 years ago

Any input on this?