Closed polytypic closed 1 year ago
The length
function is not part of the public interface of Queue
, but looking at it made me realize an interesting aspect of it. Consider the implementation with a note I added:
let iter ~f t =
let rec go p =
match Atomic.get p.next with
| None -> ()
| Some next -> f next.value; go next
in
go (Atomic.get t.head)
let length t =
let v = Atomic.make 0 in (* NOTE: This need not be an Atomic - a ref would be fine *)
iter ~f:(fun _ -> Atomic.incr v) t;
Atomic.get v
The interesting issue isn't related to the comment. It is that, if there are concurrent consumers and producers, then length
may actually return a value that is greater than what the length of the queue has been at any point in time. That is because nodes added by concurrent producers would be counted while nodes removed by concurrent consumers would not be counted (negatively).
Implementing an efficient and correct length operation for a lock-free queue is not impossible. One way to do it is to have a count field in the nodes:
-and 'a node = { mutable value: 'a; next: 'a node option Atomic.t }
+and 'a node = { mutable value: 'a; next: 'a node option Atomic.t; mutable count: int }
You then make it so that when you add a node to the queue, the count
of the new node is + 1
of the count of the predecessor (previous tail) node (read the count from predecessor node and update the count of the new node before CAS). This way you can then read the length as follows:
let rec length t =
let head = Atomic.get t.head in
let tail = Atomic.get t.tail in
match Atomic.get tail.next with
| Some node ->
(* Tail had fallen behind, so update it *)
Atomic.compare_and_set t.tail tail node |> ignore;
length t
| None ->
(* Check that we saw a consistent snapshot of the queue *)
if Atomic.get t.head != head then length t else tail.count - head.count
This way the returned length is at least a true length that the queue had at a point in time.
Another subtle thing is that the transfer
operation, which is part of the public interface, uses dequeue
internally. This of course means that concurrent updates of the queue interfere with the transfer operation and so the operation is not atomic. It is possible to implement an atomic version of the transfer operation. One way to do it is to introduce a snapshot concept that merely points to two nodes in the queue at a point in time:
type 'a snapshot = 'a node * 'a node
To take a snapshot you can use the same kind of loop as with the length
in a previous comment:
let rec snapshot t : 'a snapshot =
let head = Atomic.get t.head in
let tail = Atomic.get t.tail in
match Atomic.get tail.next with
| Some node ->
Atomic.compare_and_set t.tail tail node |> ignore;
snapshot t
| None -> if Atomic.get t.head != head then snapshot t else (head, tail)
In fact, you could reimplement length
using snapshot:
let length t =
let head, tail = snapshot t in
tail.count - head.count
But towards an atomic version of transfer one could then introduce a dequeue_all
operation that takes a snapshot of the queue and leaves the queue empty (which could of course change after taking the snapshot):
let rec dequeue_all t =
let ((head, tail) as snapshot) = snapshot t in
if Atomic.compare_and_set t.head head tail then snapshot else dequeue_all t
You could then replace the last node of the snapshot (in O(n)
time as you need to traverse through the snapshot) and create a new queue or you could have some operations, like iter
, for snapshots:
let iter ~f (head, tail) =
let rec go prev =
if prev != tail then
match Atomic.get prev.next with
| None -> ()
| Some next -> f next.value; go next
in
go head
One subtle issue here is that a snapshot also retains all the nodes added to the queue after the snapshot was taken. So, if you need to keep the snapshot for a long time, you should either replace the last node (as mentioned before) or convert the snapshot to some other form after taking it to avoid the potential space leak.
Thank you very much for this great explanation. I think that, for later, it is a good resource concerning the implementation of a lock-free queue (in order to understand all the subtleties). I've just done a pull-request on our official repository available here: https://git.robur.coop/robur/miou/pulls/2. If you have the time to do a quick review to validate the changes, it's in two parts:
dequeue
snapshot
in drop
, iter
and length
I don't see how I could comment directly on git.robur.io
so I'll comment here.
See here:
let p = Atomic.get t.tail in
if Atomic.compare_and_set p.next None (Some q) then (
q.count <- p.count + 1;
The q.count
update must be done before the compare_and_set
:
let p = Atomic.get t.tail in
q.count <- p.count + 1;
if Atomic.compare_and_set p.next None (Some q) then (
That is because the compare_and_set
publishes the node to other threads. So, if you update q.count
after the compare_and_set
, then it is possible for those other threads to read the unupdated q.count
.
Thanks, yes, GitHub is good enough to talk bout the PR. I will do the update and merge, Thanks again for this great review.
Thinking about the snapshot
approach I introduced a bit further yesterday, I realized it has a familiar (not unfixable) problem... 😅
Can you see it?
Well, the problem is the same one I pointed out as the first issue.
Namely that it can cause a space leak just like dequeue
, because the .value
field of the tail node is still reachable. So, while the snapshotting works, the .value
field of the tail node must be cleared after you are done with the snapshot.
Looking at the changes, the snapshot is not exposed, so it should be easy to fix the leak. Just clear the .value
field of the tail node of the snapshot, i.e. tail.value <- Obj.magic ()
, after done with the snapshot in the drop
function.
Also, one thing I noticed is that the to_list
function gives the list in LIFO order, which can be a bit surprising.
I finally made this PR which proposes your review: https://git.robur.coop/robur/miou/pulls/3 :+1:.
LGTM
First I must say that I love to see different takes on how to implement schedulers in OCaml!
Out of curiosity I took a look at the queue implementation and noticed a few issues.
But, take a look at the
dequeue
implementation with a note I added:This is a very typical issue/bug in an implementation of the Michael-Scott queue. I recently noticed the same issue in the lockfree implementation.
So, the problem is that
t.head
is updated to point to anode
whosenode.value
is returned and thenode
is logically removed from the queue. However, thatnode
is not physically removed from the queue and it still points to thevalue
. So, that last value dequeued from the queue will still be pointed to by the queue and it might be retained for a long time (indefinitely if nothing goes through the queue after) — causing a space leak.A simple fix is to overwrite the value: