JetBrains / lincheck

Framework for testing concurrent data structures
Mozilla Public License 2.0
545 stars 31 forks source link

Execution hangs in test with channels #283

Open avpotapov00 opened 4 months ago

avpotapov00 commented 4 months ago

Please consider the following test. Error reproduces with Lincheck version 2.26 with the following output:

image

The test:

@file:OptIn(InternalCoroutinesApi::class)
/*
 * Lincheck
 *
 * Copyright (C) 2019 - 2024 JetBrains s.r.o.
 *
 * This Source Code Form is subject to the terms of the
 * Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed
 * with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */
package org.jetbrains.kotlinx.lincheck_test

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.SelectClause1
import kotlinx.coroutines.selects.select
import org.jetbrains.kotlinx.lincheck.Options
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.annotations.Param
import org.jetbrains.kotlinx.lincheck.check
import org.jetbrains.kotlinx.lincheck.paramgen.IntGen
import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingOptions
import org.junit.Test

class Buffered1BroadcastChannelLincheckTest : ChannelLincheckTestBase(
    c = ChannelViaBroadcast(BroadcastChannel(1)),
    sequentialSpecification = SequentialBuffered1Channel::class.java,
    obstructionFree = false
)

class Buffered2BroadcastChannelLincheckTest : ChannelLincheckTestBase(
    c = ChannelViaBroadcast(BroadcastChannel(2)),
    sequentialSpecification = SequentialBuffered2Channel::class.java,
    obstructionFree = false
)

@Param.Params(
    Param(name = "value", gen = IntGen::class, conf = "1:9"),
    Param(name = "closeToken", gen = IntGen::class, conf = "1:9")
)
abstract class ChannelLincheckTestBase(
    protected val c: Channel<Int>,
    private val sequentialSpecification: Class<*>,
    private val obstructionFree: Boolean = true
) {

    @Operation(allowExtraSuspension = true, blocking = true)
    suspend fun send(@Param(name = "value") value: Int): Any = try {
//        uselessField.incrementAndGet()
        c.send(value)
    } catch (e: NumberedCancellationException) {
        e.testResult
    }

    @Operation(allowExtraSuspension = true, blocking = true)
    suspend fun receive(): Any = try {
        c.receive()
    } catch (e: NumberedCancellationException) {
        e.testResult
    }

    @Operation(allowExtraSuspension = true, blocking = true)
    suspend fun receiveCatching(): Any = c.receiveCatching()
        .onSuccess { return it }
        .onClosed { e -> return (e as NumberedCancellationException).testResult }

    @Operation(blocking = true)
    fun tryReceive(): Any? =
        c.tryReceive()
            .onSuccess { return it }
            .onFailure { return if (it is NumberedCancellationException) it.testResult else null }

    @Operation(allowExtraSuspension = true, blocking = true)
    suspend fun receiveViaSelect(): Any = try {
        select<Int> { c.onReceive { it } }
    } catch (e: NumberedCancellationException) {
        e.testResult
    }

    @Operation(causesBlocking = true, blocking = true)
    fun close(@Param(name = "closeToken") token: Int): Boolean = c.close(NumberedCancellationException(token))

    @Operation(causesBlocking = true, blocking = true)
    fun cancel(@Param(name = "closeToken") token: Int) = c.cancel(NumberedCancellationException(token))

    @Operation(blocking = true)
    fun isClosedForSend() = c.isClosedForSend

    fun <O : Options<O, *>> O.customize(isStressTest: Boolean) =
        actorsBefore(0).sequentialSpecification(sequentialSpecification)
            .addCustomScenario {
                parallel {
                    thread {
                        actor(::receiveCatching)
                        actor(::cancel, 5)
                    }
                    thread {
                        actor(::send, 5)
                        actor(::receive)
                    }
                    thread {
                        actor(::receive)
                        actor(::close, 5)
                    }
                }
            }

    @Test
    fun modelCheckingTest() = ModelCheckingOptions()
        .iterations(if (isStressTest) 200 else 20)
        .invocationsPerIteration(if (isStressTest) 10_000 else 1_000)
        .actorsBefore(if (isStressTest) 3 else 1)
        .threads(3)
        .actorsPerThread(if (isStressTest) 3 else 2)
        .actorsAfter(if (isStressTest) 3 else 0)
        .checkObstructionFreedom(obstructionFree)
        .minimizeFailedScenario(false)
        .check(this::class)

}

private class NumberedCancellationException(number: Int) : CancellationException() {
    val testResult = "Closed($number)"
}

class SequentialBuffered2Channel : SequentialIntChannelBase(2)
class SequentialBuffered1Channel : SequentialIntChannelBase(1)

@InternalCoroutinesApi
abstract class SequentialIntChannelBase(private val capacity: Int) {
    private val senders = ArrayList<Pair<CancellableContinuation<Any>, Int>>()
    private val receivers = ArrayList<CancellableContinuation<Any>>()
    private val buffer = ArrayList<Int>()
    private var closedMessage: String? = null

    suspend fun send(x: Int): Any = when (val offerRes = trySend(x)) {
        true -> Unit
        false -> suspendCancellableCoroutine { cont ->
            senders.add(cont to x)
        }

        else -> offerRes
    }

    fun trySend(element: Int): Any {
        if (closedMessage !== null) return closedMessage!!
        if (capacity == Channel.CONFLATED) {
            if (resumeFirstReceiver(element)) return true
            buffer.clear()
            buffer.add(element)
            return true
        }
        if (resumeFirstReceiver(element)) return true
        if (buffer.size < capacity) {
            buffer.add(element)
            return true
        }
        return false
    }

    private fun resumeFirstReceiver(element: Int): Boolean {
        while (receivers.isNotEmpty()) {
            val r = receivers.removeAt(0)
            if (r.resume(element)) return true
        }
        return false
    }

    suspend fun receive(): Any = tryReceive() ?: suspendCancellableCoroutine { cont ->
        receivers.add(cont)
    }

    suspend fun receiveCatching() = receive()

    fun tryReceive(): Any? {
        if (buffer.isNotEmpty()) {
            val el = buffer.removeAt(0)
            resumeFirstSender().also {
                if (it !== null) buffer.add(it)
            }
            return el
        }
        resumeFirstSender()?.also { return it }
        if (closedMessage !== null) return closedMessage
        return null
    }

    private fun resumeFirstSender(): Int? {
        while (senders.isNotEmpty()) {
            val (s, el) = senders.removeAt(0)
            if (s.resume(Unit)) return el
        }
        return null
    }

    suspend fun sendViaSelect(element: Int) = send(element)
    suspend fun receiveViaSelect() = receive()

    fun close(token: Int): Boolean {
        if (closedMessage !== null) return false
        closedMessage = "Closed($token)"
        for (r in receivers) r.resume(closedMessage!!)
        receivers.clear()
        return true
    }

    fun cancel(token: Int) {
        close(token)
        for ((s, _) in senders) s.resume(closedMessage!!)
        senders.clear()
        buffer.clear()
    }

    fun isClosedForSend(): Boolean = closedMessage !== null
    fun isClosedForReceive(): Boolean = isClosedForSend() && buffer.isEmpty() && senders.isEmpty()

    fun isEmpty(): Boolean {
        if (closedMessage !== null) return false
        return buffer.isEmpty() && senders.isEmpty()
    }
}

@InternalCoroutinesApi
private fun <T> CancellableContinuation<T>.resume(res: T): Boolean {
    val token = tryResume(res) ?: return false
    completeResume(token)
    return true
}

private val isStressTest = false

internal class ChannelViaBroadcast<E>(
    private val broadcast: BroadcastChannel<E>
) : Channel<E>, SendChannel<E> by broadcast {
    val sub = broadcast.openSubscription()

    override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
    override val isEmpty: Boolean get() = sub.isEmpty

    override suspend fun receive(): E = sub.receive()
    override suspend fun receiveCatching(): ChannelResult<E> = sub.receiveCatching()
    override fun iterator(): ChannelIterator<E> = sub.iterator()
    override fun tryReceive(): ChannelResult<E> = sub.tryReceive()

    override fun cancel(cause: CancellationException?) = broadcast.cancel(cause)

    // implementing hidden method anyway, so can cast to an internal class
    @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
    override fun cancel(cause: Throwable?): Boolean = error("unsupported")

    override val onReceive: SelectClause1<E>
        get() = sub.onReceive
    override val onReceiveCatching: SelectClause1<ChannelResult<E>>
        get() = sub.onReceiveCatching
}