eclipse / paho.golang

Go libraries
Other
330 stars 92 forks source link

initial lockfree attempt #191

Open ashtonian opened 10 months ago

ashtonian commented 10 months ago

I attempted to add a lock free queue for faster operations with some success. I believe the Wait() function inherently introduce a lock.

The intended use case here is in a high volume concurrent environment where there is high churn on/off the queue, the major assumption being the queue won't be starved or empty. If it is empty the assumption being it won't be for long. In this scenario it would be more desirable to spin poll on peak/dequeue as the goal is to move as fast as possible even at the cost of some cpu cycles in the case of empty queues.

However in order to be more compliant with the interface as it stands my initial pr has a bit of a lock mechanism which drastically reduces the benefits of this approach.

I think it can be potentially improved by isolating the wait channel a bit differently.

Few Ideas:

Here is the original simpler LockFreeQueue queue I used to replace an in memory channel based queue. In our use case (as described) the results were pretty noticeable, and we were not able to maintain low depth in our ingest service without it.

// LockFreeQueue is a lock-free queue implementation that allows for concurrent
// access by multiple goroutines without the use of locks.
type LockFreeQueue[T any] struct {
    head unsafe.Pointer // head points to the first node of the queue.
    tail unsafe.Pointer // tail points to the last node of the queue.
}

// node represents a single node in the queue with a generic type.
type node[T any] struct {
    value T             // value holds the data contained in the node.
    next  unsafe.Pointer // next points to the next node in the queue.
}

// NewLockFree initializes a new lock-free queue with a dummy node.
// The dummy node simplifies enqueue and dequeue operations by ensuring
// that the head and tail are never nil, even in an empty queue.
func NewLockFree[T any]() *LockFreeQueue[T] {
    node := unsafe.Pointer(new(node[T])) // Create a new dummy node.
    return &LockFreeQueue[T]{
        head: node, // Set both head and tail to point to the dummy node.
        tail: node,
    }
}

// Enqueue adds an element with the given value v to the end of the queue.
// It uses a loop with compare-and-swap (CAS) operations to ensure that the
// node is appended in a thread-safe manner without locks.
func (q *LockFreeQueue[T]) Enqueue(v T) {
    node := &node[T]{value: v} // Create a new node with the given value.
    for {
        tail := load[T](&q.tail) // Load the tail of the queue.
        next := load[T](&tail.next) // Load the next pointer of the tail node.
        // Check if tail and next are consistent; this is to ensure that the tail
        // has not changed since loading it.
        if tail == load[T](&q.tail) {
            if next == nil {
                // Try to link the new node at the end of the queue.
                if cas(&tail.next, next, node) {
                    // Successfully linked new node; try to swing the tail to the new node.
                    cas(&q.tail, tail, node)
                    return
                }
            } else {
                // Tail was not pointing to the last node, try to swing the tail to the next node.
                cas(&q.tail, tail, next)
            }
        }
    }
}

// Dequeue removes and returns the element at the front of the queue.
// It returns the value along with a boolean indicating whether the operation was successful.
// If the queue is empty, it returns the zero value for the type T and false.
func (q *LockFreeQueue[T]) Dequeue() (v T, ok bool) {
    for {
        head := load[T](&q.head) // Load the head of the queue.
        tail := load[T](&q.tail) // Load the tail of the queue.
        next := load[T](&head.next) // Load the next pointer of the head node.
        // Check if head and next are consistent; this is to ensure that the head
        // has not changed since loading it.
        if head == load[T](&q.head) {
            if head == tail {
                // Queue might be empty or tail is falling behind.
                if next == nil {
                    // Queue is empty, return zero value and false.
                    var zero T
                    return zero, false
                }
                // Tail is falling behind. Try to advance it.
                cas(&q.tail, tail, next)
            } else {
                // Read value before CAS, otherwise another dequeue might free the next node.
                v := next.value
                // Try to swing the head to the next node.
                if cas(&q.head, head, next) {
                    // Successfully removed the head node.
                    return v, true
                }
            }
        }
    }
}

// load is a helper function that atomically loads the pointer to a node.
func load[T any](p *unsafe.Pointer) *node[T] {
    return (*node[T])(atomic.LoadPointer(p)) // Perform an atomic load operation.
}

// cas is a helper function that performs an atomic compare-and-swap operation on a node pointer.
func cas[T any](p *unsafe.Pointer, old, new *node[T]) bool {
    // Atomically compare the current value to 'old', and if they are equal, replace with 'new'.
    return atomic.CompareAndSwapPointer(p,
        unsafe.Pointer(old), unsafe.Pointer(new))
}
MattBrittan commented 10 months ago

Thanks for the PR. I have scanned through it and it looks impressive. However reviewing this is quite a bit of work; it's so easy to introduce errors with atomic, and any bugs are very difficult to find (they tend to pop up randomly). Note that I'm not saying that there are bugs in your code just that it takes time to review because it's, necessarily, a bit convoluted.

My current thought is that we leave this until after the next release (lets get a version of the library which supports persistence out there and then look at improving performance). I guess the other option would be to add this as an experimental feature (keeping it separate from the other memory implementation - not that I'm saying my code is bug free!). While performance improvements are always nice it's difficult to know how much to invest in them (I suspect that this will make no discernible difference to 95% of users).

Reduce the complexity while keeping the no lock performance gains by only returning an open channel once (to signal initialization), and then after the first Enqueue() is called, Wait() would return an closed channel even if the queue is empty. I think if documented properly this would be potentially the best compromise.

If I'm reading this right it would result in the user effectively calling Dequeue() constantly (because <- Wait() would not actually wait). For very high throughput systems this may be an effective strategy but that would need testing (for most of my use-cases it would be a negative due hammering the CPU and chewing through batteries!). One immediate improvement would be to create a closed channel at startup and return that from Wait (as opposed to creating/closing a new channel each time).

keep a mutex with a slice of channels to notify. In this scenario when Wait() isn't called the notify overhead would be substantially less as there would be no channels to loop through. Still may need locking.

We could state that Wait() must not be called concurrently (concurrent calls should not be needed as there should never be multiple goroutines reading from the queue because the message order is important).

Use atomic counters to keep track of queue depth, I think this would be a bit racey on empty or near empty queues. Could also avoid some of this if returning EmptyErr sometimes when the queue is near starving is acceptable.

Wouldn't head be nil when the queue is empty?

I'm happy for the Queue interface to change (it's all internal and not really fixed until v1); the design was really just something that works with autopaho and was relatively easy to follow (I'm sure it can be improved; this was a large change so there is probably a lot of suboptimal code!).

MattBrittan commented 10 months ago

Sorry - one further question. Is the initial implementation your work or did this come from somewhere else (quick google led to this). If it's come from somewhere else then we need to acknowledge that (eclipse is, rightfully, cautions when it comes to checking licenses etc).

ashtonian commented 10 months ago

Thanks for the feedback!

My current thought is that we leave this until after the next release (lets get a version of the library which supports persistence out there and then look at improving performance). I guess the other option would be to add this as an experimental feature (keeping it separate from the other memory implementation - not that I'm saying my code is bug free!). While performance improvements are always nice it's difficult to know how much to invest in them (I suspect that this will make no discernible difference to 95% of users).

I agree and would be fine with both.

If I'm reading this right it would result in the user effectively calling Dequeue() constantly (because <- Wait() would not actually wait). For very high throughput systems this may be an effective strategy but that would need testing (for most of my use-cases it would be a negative due hammering the CPU and chewing through batteries!). One immediate improvement would be to create a closed channel at startup and return that from Wait (as opposed to creating/closing a new channel each time).

Correct, after thinking on this more I think it would be ideal if Wait() just always returns a closed channel and this is just a 'feature' of the lock free queue thats documented for the user. If batteries are involved this is definitely not the intended use case. This is for something like a server side ingest service that consumes all messages from an IoT fleet.

ashtonian commented 10 months ago

Sorry - one further question. Is the initial implementation your work or did this come from somewhere else (quick google led to this). If it's come from somewhere else then we need to acknowledge that (eclipse is, rightfully, cautions when it comes to checking licenses etc).

I did not invent the lock free queue, I was introduced to the concept after some googling when benchmarking from this paper. Using that as a reference point I looked for go implementations of which there are several nearly identical especially without generics, I then used the code you found as a reference point as it is the cleanest implementation of these from the paper. The PR code is different as you can see.

MattBrittan commented 6 months ago

@ashtonian just out of interest did you close this because you found an issue, got sick of waiting for me, or something else? Taking a look had been on my list for some time and I was considering making this a third option for those with performance issues (but wanted to do some more testing and look into the licensing, at minimum we would need to credit the implementation you based this on).

ashtonian commented 6 months ago

I was poking around the repo debugging another issue and figured this wasn't going to get merged. I should have communicated before closing. I would love it if this gets merged. I'm not sure how the citation would work, but I think it would go to that paper. I'm also not necessarily convinced the modifications made to match the interface are adequate, probably needs some more testing.

I don't mind waiting. I also appreciate the work you've been putting into the driver, it seems to be coming together. Let me know if I can help with this. Sorry again, should have communicated before closing.

MattBrittan commented 6 months ago

No worries; it's been idle a while (I'm busy on other projects currently).

I'm not sure how the citation would work, but I think it would go to that paper.

We need to comply with the eclipse contributor agreement which, in this case, requires you to agree that:

The contribution is based upon previous work that, to the best of my knowledge, is covered under an appropriate open source license and I have the right under that license to submit that work with modifications, whether created in whole or in part by me, under the same open source license (unless I am permitted to submit under a different license), as indicated in the file; or

As much of this was based on MIT Licensed code I think that including that license (or a link to it) in the code should be sufficient. The key thing is that we follow the license requirements and acknowledge authorship.

The eclipse foundation is pretty tough on this; with good reason. If our code becomes contaminated with anything that's not appropriately licensed then that opens up our users to legal action.

I'd suggest adding the license (and a link to the original repo) in the header of queue.go. With that done I think this could be accepted as a second option for the queue (for use where performance is an issue). It may be that, in the future after significant testing, that we make it the default implementation.