Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.05k stars 1.85k forks source link

Some problem about `addLast` in LockFreeTaskQueue #4205

Closed autorain closed 2 months ago

autorain commented 3 months ago
fun addLast(element: E): Int {
        _state.loop { state ->
            if (state and (FROZEN_MASK or CLOSED_MASK) != 0L) return state.addFailReason() // cannot add
            state.withState { head, tail ->
                val mask = this.mask // manually move instance field to local for performance
                // If queue is Single-Consumer then there could be one element beyond head that we cannot overwrite,
                // so we check for full queue with an extra margin of one element
                if ((tail + 2) and mask == head and mask) return ADD_FROZEN // overfull, so do freeze & copy
                // If queue is Multi-Consumer then the consumer could still have not cleared element
                // despite the above check for one free slot.
                if (!singleConsumer && array[tail and mask].value != null) {
                    // There are two options in this situation
                    // 1. Spin-wait until consumer clears the slot
                    // 2. Freeze & resize to avoid spinning
                    // We use heuristic here to avoid memory-overallocation
                    // Freeze & reallocate when queue is small or more than half of the queue is used
                    if (capacity < MIN_ADD_SPIN_CAPACITY || (tail - head) and MAX_CAPACITY_MASK > capacity shr 1) {
                        return ADD_FROZEN
                    }
                    // otherwise spin
                    return@loop
                }
                val newTail = (tail + 1) and MAX_CAPACITY_MASK
                if (_state.compareAndSet(state, state.updateTail(newTail))) {
                    // successfully added
                    array[tail and mask].value = element
                    // could have been frozen & copied before this item was set -- correct it by filling placeholder
                    var cur = this
                    while(true) {
                        if (cur._state.value and FROZEN_MASK == 0L) break // all fine -- not frozen yet
                        cur = cur.next().fillPlaceholder(tail, element) ?: break
                    }
                    return ADD_SUCCESS // added successfully
                }
            }
        }
    }

Why use (tail + 2) and mask == head and mask not (tail + 1) and mask == head and mask ? Could some kind person explain it to me? Thanks a lot.

qwwdfsad commented 2 months ago

Hey, there is no real reason. This +2 was relevant in the earlier version of LFQ where instead of arraylets, a two-phased copy was used, and there was an additional invariant (the deque operation was two-phased, and there was an intermediate REMOVE state).

You can read more here: https://github.com/Kotlin/kotlinx.coroutines/blob/5d94a264260c83f80b245020e27a742cd5b10983/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/internal/LockFreeMPSCQueue.kt