@Param(name = "element", gen = IntGen::class, conf = "0:3")
class FaaQueueSpinLockTest {
private val queue = FlatCombiningQueue<Int>()
@Operation
fun enqueue(@Param(name = "element") element: Int) = queue.enqueue(element)
@Operation
fun dequeue() = queue.dequeue()
@Test
fun modelCheckingTest() {
ModelCheckingOptions()
.iterations(100)
.invocationsPerIteration(5_000)
.actorsBefore(2)
.threads(3)
.actorsPerThread(2)
.actorsAfter(2)
.checkObstructionFreedom(false)
.minimizeFailedScenario(false)
.sequentialSpecification(FaaIntQueueSequential::class.java)
.check(this::class.java)
}
}
class FlatCombiningQueue<E> {
private val queue = ArrayDeque<E>() // sequential queue
private val combinerLock = atomic(false) // unlocked initially
private val tasksForCombiner = atomicArrayOfNulls<Any?>(TASKS_FOR_COMBINER_SIZE)
private fun tryLock() = combinerLock.compareAndSet(false, true)
private fun unlock() = check(combinerLock.compareAndSet(true, false))
fun enqueue(element: E) {
var index = randomCellIndex() // code locations: 59, 60, 61
while (true) {
if (tasksForCombiner[index].compareAndSet(null, element)) break // code locations: 17, 18
index = randomCellIndex() // code locations: 59, 60, 61
}
while (!tryLock()) {
val value = tasksForCombiner[index].value
if (value is Result<*>) {
tasksForCombiner[index].value = null
return
}
}
help()
unlock()
tasksForCombiner[index].value as Result<*>
tasksForCombiner[index].value = null
return
}
fun dequeue(): E? {
var index = randomCellIndex()
while (true) {
if (tasksForCombiner[index].compareAndSet(null, Dequeue)) break
index = randomCellIndex()
}
while (!tryLock()) {
val value = tasksForCombiner[index].value
if (value is Result<*>) {
tasksForCombiner[index].value = null
return value.value as? E
}
}
help()
unlock()
val value = tasksForCombiner[index].value as Result<*>
// Uncomment the line below to get correct implementation
// tasksForCombiner[index].value = null
return value.value as? E
}
private fun help() {
repeat(TASKS_FOR_COMBINER_SIZE) {
val task = tasksForCombiner[it].value ?: return@repeat
if (task is Result<*>) return@repeat
if (task === Dequeue) {
tasksForCombiner[it].value = Result(queue.removeFirstOrNull())
} else {
queue.add(task as E)
tasksForCombiner[it].value = Result(Unit)
}
}
}
private fun randomCellIndex(): Int =
ThreadLocalRandom.current().nextInt(tasksForCombiner.size)
}
private const val TASKS_FOR_COMBINER_SIZE = 3 // Do not change this constant!
private object Dequeue
private class Result<V>(
val value: V
)
class FaaIntQueueSequential {
private val q = ArrayList<Int>()
fun enqueue(element: Int) {
q.add(element)
}
fun dequeue() = q.removeFirstOrNull()
fun remove(element: Int) = q.remove(element)
}
This code hung in the enqueue operation, and in the output we see the following:
In trace is shown that cycle start from line 71, but it's not true.
Consider the following example:
This code hung in the
enqueue
operation, and in the output we see the following:In trace is shown that cycle start from line 71, but it's not true.
(Also #218 Issue is present in the output above)