clj-commons / durable-queue

a disk-backed queue for clojure
Eclipse Public License 1.0
384 stars 31 forks source link

Thread safety of take!, put! #16

Open mpenet opened 7 years ago

mpenet commented 7 years ago

Hi,

Should we consider put! and take! threadsafe?

I have one thread doing puts in bursts at regular interval, and N workers on their own respective threads consuming the queue. Sometimes I get into a situation where I have 1 in progress task and all other workers waiting on tasks while the queue keeps growing (essentially deadlocked at this point). In the code I take!, deref and mark completed tasks immediately (did that to exclude a bug on complete!). Should I serialize the takes or is this a potential bug? I ll try to come up with a repro at work tomorrow.

I also get the occasional negative in-progress while testing:

09:22:14.938 [clojure-agent-send-off-pool-0] INFO sink.testdq - {foo {:num-slabs 1, :num-active-slabs 1, :enqueued 10947, :retried 0, :completed 10947, :in-progress -1}}

Thanks for the lib by the way, it s super useful.

mpenet commented 7 years ago

I can reproduce the issue with the following code: https://gist.github.com/mpenet/a8266ac2d6e081701e74112bd20f3a5a

You start both consumers producers, then after a while kill the producers and you will sometimes end up with a constant in-progress 1 even tho all consumers are still up.

logs look this this 09:58:02.139 [clojure-agent-send-off-pool-6] INFO sink.testdq - {foo {:num-slabs 2, :num-active-slabs 2, :enqueued 858923, :retried 0, :completed 858922, :in-progress 1}}

I am not able to deadlock it just yet, but I suspect once I hit the same issue enough times per run all consumer threads are blocked. Might just be a matter of playing with parameters.

edit:

if you start with the following:

(def c (start-consumers! queues 30))
(def p (start-producers! queues 50))

in a couple of seconds you'd get these kind of logs:

10:48:47.892 [clojure-agent-send-off-pool-1] INFO sink.testdq - {foo {:num-slabs 22, :num-active-slabs 22, :enqueued 576941, :retried 0, :completed 576951, :in-progress -10}}
mpenet commented 7 years ago

It seems if I try catch (both Exception and AssertionError) around dq/complete! and (deref task) I can get consistent results without locking, avoiding the issue from the previous comment and weird counters. But there's definitely something fishy going on with the handling of the internal slab buffers.

what's caught :

#error {
 :cause "Assert failed: (.exists (io/file filename))"
 :via
 [{:type java.util.concurrent.ExecutionException
   :message "java.lang.AssertionError: Assert failed: (.exists (io/file filename))"
   :at [java.util.concurrent.FutureTask report "FutureTask.java" 122]}
  {:type java.lang.AssertionError
   :message "Assert failed: (.exists (io/file filename))"
   :at [durable_queue$load_buffer invokeStatic "durable_queue.clj" 110]}]
 :trace
 [[durable_queue$load_buffer invokeStatic "durable_queue.clj" 110]
  [durable_queue$load_buffer invoke "durable_queue.clj" 106]
  [durable_queue$load_buffer invokeStatic "durable_queue.clj" 108]
  [durable_queue$load_buffer invoke "durable_queue.clj" 106]
  [durable_queue.TaskSlab$fn__6029 invoke "durable_queue.clj" 274]
  [clojure.lang.Atom swap "Atom.java" 37]
  [clojure.core$swap_BANG_ invokeStatic "core.clj" 2342]
  [clojure.core$swap_BANG_ invoke "core.clj" 2335]
  [durable_queue.TaskSlab buffer "durable_queue.clj" 272]
  [durable_queue.TaskSlab sync_BANG_ "durable_queue.clj" 295]
  [durable_queue$queues$reify__6342 fsync "durable_queue.clj" 629]
  [durable_queue$queues$fn__6267 invoke "durable_queue.clj" 554]
  [durable_queue$queues$reify__6342 mark_complete_BANG_ "durable_queue.clj" 638]
  [durable_queue$complete_BANG_ invokeStatic "durable_queue.clj" 781]
  [durable_queue$complete_BANG_ invoke "durable_queue.clj" 772]
  [sink.testdq$start_consumers_BANG_$fn__18600$fn__18601 invoke "testdq.clj" 41]
  [clojure.core$binding_conveyor_fn$fn__6772 invoke "core.clj" 2020]
  [clojure.lang.AFn call "AFn.java" 18]
  [java.util.concurrent.FutureTask run "FutureTask.java" 266]
  [java.util.concurrent.ThreadPoolExecutor runWorker "ThreadPoolExecutor.java" 1142]
  [java.util.concurrent.ThreadPoolExecutor$Worker run "ThreadPoolExecutor.java" 617]
  [java.lang.Thread run "Thread.java" 745]]}}
 0x3bc9a2cf] 

or/and

 #error {
 :cause nil
 :via
 [{:type java.lang.IllegalArgumentException
   :message nil
   :at [java.nio.Buffer position Buffer.java 244]}]
 :trace
 [[java.nio.Buffer position Buffer.java 244]
  [durable_queue$task$fn__6008 invoke durable_queue.clj 193]
  [durable_queue.Task deref durable_queue.clj 162]
  [clojure.core$deref invokeStatic core.clj 2310]
  [clojure.core$deref invoke core.clj 2296]
  [sink.testdq$start_consumers_BANG_$fn__18600$fn__18601$fn__18602 invoke testdq.clj 43]
  [sink.testdq$start_consumers_BANG_$fn__18600$fn__18601 invoke testdq.clj 42]
  [clojure.core$binding_conveyor_fn$fn__6772 invoke core.clj 2020]
  [clojure.lang.AFn call AFn.java 18]
  [java.util.concurrent.FutureTask run FutureTask.java 266]
  [java.util.concurrent.ThreadPoolExecutor runWorker ThreadPoolExecutor.java 1142]
  [java.util.concurrent.ThreadPoolExecutor$Worker run ThreadPoolExecutor.java 617]
  [java.lang.Thread run Thread.java 745]]}
mpenet commented 7 years ago

Edited for clarity

mpenet commented 7 years ago

I can confirm this fixed our issue. It's been 2 days an hundreds of millions task later we didn't hit the issue again.

Trying to summon the @ztellman in case he has some insight on this :satellite: