Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.95k stars 1.84k forks source link

1.9.0-RC test task:compileDemoDebugUnitTestKotlin error Suspension functions can only be called within coroutine body. #4179

Open akingyin1987 opened 2 months ago

akingyin1987 commented 2 months ago

I won't encounter this issue when using version 1.8.1

` @Test fun concurrentExecution1() = runTest {

    println("当前线程=${Thread.currentThread().name}")
    val queue = ArrayBlockingQueue<doOnBackground<String>>(150)
    val scope = CoroutineScope(testScheduler)
    for (index in 1..150) {
        queue.add {
            getDataDelayedTime(RandomUtil.getRandomNum(1, 2000).toLong(), index)
        }
    }
    scope.launch {
        CoroutineRunBlockUtil.asynchronousBatchProcessing(
            coroutineScope = scope,
            jobList = queue,
            onErrorIsIgnore = {false},
            onComplete = {
                println("完成")
            },
            onProgress = {total, progress, error ->
                println("total=$total, progress=$progress, error=$error")
            },
            togetherSize = 5
        ).let {
            println("获取到最后结果")
        }
    }

}

` private suspend fun getDataDelayedTime(delayedTime: Long, index: Int): Result { println("第${index} 睡眠时长=${delayedTime}") delay(delayedTime) println( "第${index}项完成->${DateUtil.testNowTimeString},thread.name=${Thread.currentThread().name},thread.id=${Thread.currentThread().id},返回结果=${ index.mod( 7 ) != 0 }" )

    return  Result.Success("")
   // return if (index.mod(7) != 0) Result.Success("") else Result.Failure(Exception())
}

`

akingyin1987 commented 2 months ago

` suspend fun asynchronousBatchProcessing( coroutineScope: CoroutineScope, jobList: ArrayBlockingQueue<doOnBackground>, onErrorIsIgnore: (Result) -> Boolean = { false }, togetherSize: Int = 5, onProgress: (total: Int, progress: Int, error: Int) -> Unit, onComplete: (Boolean) -> Unit, ): kotlin.Result {

    /** 当前等待执行的任务 */
    val currentWaitDoingTaskList = ArrayBlockingQueue<Deferred<Result<T>>>(togetherSize)

    /** 正在运行的任务 */
    val currentDoingTaskList = ArrayBlockingQueue<Deferred<Result<T>>>(togetherSize)
    /** false 正常执行 */
    val currentTaskState = AtomicBoolean(false)

    /** 当前任务总数 */
    val taskTotal: Int = jobList.size

    /** 当前完成数  */
    val currentCompleteTotal = AtomicInteger(0)

    /** 当前失败数 */
    val currentErrorTotal = AtomicInteger(0)

    var currentJob: Job? = null

    return suspendCancellableCoroutine { continuation ->

        if (coroutineScope.isActive) {
            currentJob = coroutineScope.launch(Dispatchers.IO) {
                //当前任务正在进行中

                while (coroutineScope.isActive && jobList.isNotEmpty() && !currentTaskState.get()) {
                    println("开始进入循环准备执行---->>>>>>,${currentWaitDoingTaskList.size}")
                    if (taskTotal == currentCompleteTotal.get()) {
                        //任务完成
                        currentTaskState.set(true)

                    }
                    if (currentWaitDoingTaskList.size < togetherSize && currentDoingTaskList.size < togetherSize) {
                        jobList.poll()?.let { job ->
                            async {
                                println("开启异步执行=${DateUtil.testNowTimeString}")
                                job.invoke()
                            }.let { task ->
                                if (coroutineScope.isActive) {
                                    //待执行的任务列表
                                    currentWaitDoingTaskList.add(task)
                                    if(currentDoingTaskList.size < togetherSize){
                                        currentDoingTaskList.add(task)
                                    }

                                   // task.start()
                                }

                            }
                        }
                    }
                    if (currentWaitDoingTaskList.size == togetherSize || (jobList.isEmpty() && currentWaitDoingTaskList.size< togetherSize)) {

                        //当前执行中 的数=最大执行数 或者没有待处理的任务且正在进行的任务非空
                        while (currentWaitDoingTaskList.isNotEmpty() && !currentTaskState.get() && currentDoingTaskList.size<= togetherSize) {
                            currentWaitDoingTaskList.poll()?.let {
                                //未执行前加入到正在等待的任务
                                val result = it.await()
                                //从正在等待的任务内移出
                                currentDoingTaskList.remove(it)
                                println("移出已完成的任务=${currentDoingTaskList.size},${DateUtil.testNowTimeString}")

                                currentCompleteTotal.incrementAndGet()
                                if (taskTotal == currentCompleteTotal.get()) {
                                    //任务完成
                                    currentTaskState.set(true)

                                }
                                if (result is Result.Failure) {
                                    println("当前为失败-----》")
                                    //当前为失败
                                    currentErrorTotal.incrementAndGet()
                                    withContext(Dispatchers.Main) {
                                        onProgress.invoke(
                                            taskTotal,
                                            currentCompleteTotal.get(),
                                            currentErrorTotal.get()
                                        )
                                    }

                                    if (!onErrorIsIgnore(result)) {
                                        //如果当前不忽略终止操作
                                        currentTaskState.set(true)
                                        withContext(Dispatchers.Main) {
                                            onComplete.invoke(false)
                                        }
                                        println("正在执行的任务总数:${currentDoingTaskList.size}")
                                        currentDoingTaskList.forEach { doJob ->
                                            println("终止执行=${doJob.isActive},${doJob.isCancelled},${doJob.isCompleted}")
                                            doJob.cancel()
                                        }
                                        //   continuation.resumeWithException(result.exception)
                                        continuation.resume(kotlin.Result.failure(result.exception))

                                        currentJob?.cancel()
                                        return@launch
                                    }
                                } else {
                                    withContext(Dispatchers.Main) {
                                        onProgress.invoke(
                                            taskTotal,
                                            currentCompleteTotal.get(),
                                            currentErrorTotal.get()
                                        )
                                    }
                                }
                            }
                        }
                    }
                    println("循环底部----->>>>>>>>${jobList.size},${currentTaskState.get()}")
                }
                withContext(Dispatchers.Main) {
                    onComplete.invoke(true)
                }
                println("返回结果-----------》》》》》》》")
                continuation.resume(kotlin.Result.success("完成"))

            }

        }
    }

}`