JetBrains / Exposed

Kotlin SQL Framework
http://jetbrains.github.io/Exposed/
Apache License 2.0
8.36k stars 695 forks source link

Can't init value outside the transaction #1655

Closed crowforkotlin closed 1 year ago

crowforkotlin commented 1 year ago

image Ktor+Exposed+Hikar connection pool, stress test, frequent get results in IllegalStateException Can't init value outside the transaction, at least five errors.

fun getNewTransaction(dbName: String) = BaseDBFactory.mDataBaseMap[dbName]!!.transactionManager.newTransaction()

fun Route.GetAppVersion(path: String) = get(path) {
    val transaction = getNewTransaction(BaseStringConfig.DataBase.DB_Crow)
    transaction.connection.autoCommit = false

    transaction.suspendedTransaction(Dispatchers.IO) {
        UserTable.insert {
            it[user] = "123"
            it[pwd] = "456"
        }
    }
    println(transaction.connection.isClosed)
    transaction.commit()

    transaction.close()
    call.respond("Hello")
}
object BaseDBFactory {

    val mDataBaseMap = hashMapOf<String, Database>()

    // 获取数据库连接字符串
    private fun getDBUrl(baseDBConfig: BaseDBConfig): String {
        return if (baseDBConfig.databaseName == null) {
            "jdbc:mysql://${baseDBConfig.url}:${baseDBConfig.port}"
        } else {
            "jdbc:mysql://${baseDBConfig.url}:${baseDBConfig.port}/${baseDBConfig.databaseName}"
        }
    }

    // 创建连接池
    private fun createHikariDataSource(baseDBConfig: BaseDBConfig) = HikariDataSource(HikariConfig().apply {
        username = baseDBConfig.username ?: ""
        password = baseDBConfig.password ?: ""
        driverClassName = DRIVER_DB
        jdbcUrl = getDBUrl(baseDBConfig)
        maximumPoolSize = 20
        isAutoCommit = false
        transactionIsolation = "TRANSACTION_REPEATABLE_READ"
        validate()
    })

    // 初始化本地数据库
    fun createDBConnection(baseDBConfig: BaseDBConfig, iBaseDBTransaction: IBaseDBTransaction) {

        val dbName = baseDBConfig.databaseName ?: DB_NULL
        val dbConfig = DatabaseConfig {
            keepLoadedReferencesOutOfTransaction = true
        }
        runCatching {

            // 如果数据库名称为空,则提前返回
            if (dbName == DB_NULL) {

                // 获取Database,无连接池
                val db = Database.connect(getDBUrl(baseDBConfig), DRIVER_DB, baseDBConfig.username ?: "", baseDBConfig.password ?: "", databaseConfig = dbConfig)

                // 以数据库名为键,将数据源添加到值中
                mDataBaseMap[dbName] = db

                // 连接数据库并执行事务
                transaction(db) {
                    iBaseDBTransaction.doOnTransaction(this)
                }

                return
            }

            // 以数据库名为键,将数据源 (使用提供的配置创建一个 HikariDataSource 连接池) 添加到值中
            mDataBaseMap[dbName] = Database.connect(createHikariDataSource(baseDBConfig), databaseConfig = dbConfig)

            // 使用数据源连接数据库并执行事务
            transaction(mDataBaseMap[dbName]) {
                iBaseDBTransaction.doOnTransaction(this)
            }
        }.onFailure {
            baseLogger.error(it.stackTraceToString())
        }
    }
}
crowforkotlin commented 1 year ago

https://github.com/JetBrains/Exposed/issues/1574 Referring to this issue, I set keepLoadedReferencesOutOfTransaction = true, but this error is still reported, and I have a very question, why not use the inline keyword to optimize high-level functions when operating tables in transactions, and the other is if I The whole logic is relatively heavy and written in the transaction, which will cause additional memory and performance overhead, so what I think is to only perform database operations in the transaction, the logic is outside the transaction, and the transaction can be submitted manually And off, I don't really understand this design by author.

crowforkotlin commented 1 year ago

屏幕截图 2022-12-22 182826

AlexeySoshin commented 1 year ago

Curious, is there a particular reason, you use autoCommit = false and commit your transaction manually?

mbunderline76 commented 1 year ago

I also have this issue but more weird. i faced with this exception and I set keepLoadedReferencesOutOfTransaction = true and the it got fixed. and I developed another transaction excatlly like the old on with a few differences and the new one gets this error the first one is for creating an entity and the next one is for updating it.

crowforkotlin commented 1 year ago

All the problems have been solved so far, you can refer to this code

crowforkotlin commented 1 year ago
@file:Suppress("unused")

package com.crow.ktor.database

import com.crow.ktor.config.BaseException
import com.crow.ktor.config.BaseString
import com.crow.ktor.data.BaseServiceResult
import com.crow.ktor.data.BaseStatusCode
import com.crow.ktor.database.BaseDBFactory.mDataBaseMap
import com.crow.ktor.database.ibase.IBaseTransaction
import com.crow.ktor.database.ibase.IBaseTransactionMap
import com.crow.ktor.logger.BaseLogger
import io.ktor.http.*
import io.ktor.server.application.*
import io.ktor.server.response.*
import io.ktor.util.pipeline.*
import kotlinx.coroutines.Dispatchers
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransaction
import org.jetbrains.exposed.sql.transactions.transactionManager

/*************************
 * @Machine: RedmiBook Pro 15 Win11
 * @Path: lib_base/src/main/kotlin/com/crow/ktor/config/database
 * @Time: 2022/12/21 23:07
 * @Author: BarryAllen
 * @Description: Base Database Extension Base数据库扩展
 * @formatter:on
 **************************/

// 根据数据库名称获取数据库的事务管理器
fun getTransactionManager(dbName: String): TransactionManager {

    // 通过下标获取数据库对象, 获取数据库的事务管理器
    return mDataBaseMap[dbName]!!.transactionManager
}

// 在 IO 线程中执行数据库操作,必须手动提交事务
suspend inline fun Transaction.doOnQuery(crossinline executeBlock: () -> Unit) {

    // 在 IO 线程中执行挂起的事务
    this.suspendedTransaction(Dispatchers.IO) {

        // 绑定事务到线程
        this@doOnQuery.db.transactionManager.bindTransactionToThread(this@doOnQuery)

        // 执行回调函数中的代码块
        executeBlock()
    }

    // 将事务绑定回来
    this.db.transactionManager.bindTransactionToThread(this)
}

// 在 Kotlin 协程中执行数据库操作,并管理事务的开启、提交、回滚和关闭
suspend fun PipelineContext<Unit, ApplicationCall>.withTransaction(
    dbNames: List<String>,
    autoCommit: Boolean = true,
    iBaseTransactionMap: IBaseTransactionMap
): BaseDBEventState {

    // 如果数据库名称列表为空,则抛出异常
    if (dbNames.isEmpty()) throw BaseException(BaseString.Exception.ListMustNotEmpty)

    // 为每个数据库创建新事务,并将事务与数据库名称关联起来
    val transactions = dbNames.associateWith { getTransactionManager(it).newTransaction() }

    try {

        // 执行回调接口中的方法
        iBaseTransactionMap.onTransaction(transactions)
        if (autoCommit) transactions.values.forEach { it.commit() }
        return BaseDBEventState.Success()
    } catch (e: Exception) {
        BaseLogger.baseLogger.error("(Error)[doOnTransaction] : ${e.stackTraceToString()}")

        // 如果出现异常,则对所有事务进行回滚
        transactions.values.forEach { it.rollback() }

        // 将响应状态设为 400 Bad Request,并返回错误信息
        call.respond(
            HttpStatusCode.BadRequest,
            BaseServiceResult(BaseStatusCode(BaseStatusCode.Type.ERROR, "Exception : ${e::class.java.simpleName}"))
        )

        return BaseDBEventState.Failure(e)

    } finally {

        // 关闭所有事务
        transactions.values.forEach { it.close() }
    }
}

// 在 Kotlin 协程中执行数据库操作,并管理事务的开启、提交、回滚和关闭
suspend fun PipelineContext<Unit, ApplicationCall>.withTransaction(
    dbName: String,
    autoCommit: Boolean = true,
    iBaseTransaction: IBaseTransaction
): BaseDBEventState {

    // 为数据库创建新事务
    val transaction = getTransactionManager(dbName).newTransaction()

    try {

        // 执行回调接口中的方法
        iBaseTransaction.onTransaction(transaction)
        if (autoCommit) transaction.commit()
        return BaseDBEventState.Success()
    } catch (e: Exception) {
        BaseLogger.baseLogger.error("(Error)[doOnTransaction] : ${e.stackTraceToString()}")

        // 如果出现异常,则对事务进行回滚
        transaction.rollback()

        // 将响应状态设为 400 Bad Request,并返回错误信息
        call.respond(
            HttpStatusCode.BadRequest,
            BaseServiceResult(BaseStatusCode(BaseStatusCode.Type.ERROR, "Exception : ${e::class.java.simpleName}"))
        )
        return BaseDBEventState.Failure(e)
    } finally {

        // 关闭事务
        transaction.close()
    }
}

suspend inline fun BaseDBEventState.onSuccess(block: () -> Unit): BaseDBEventState {
    if (this is BaseDBEventState.Success) {
        block()
    }
    return this
}

suspend inline fun BaseDBEventState.onFailure(block: (catch: Throwable) -> Unit): BaseDBEventState {
    if (this is BaseDBEventState.Failure) {
        block(catch)
    }
    return this
}
crowforkotlin commented 1 year ago
@file:Suppress("unused")

package com.crow.ktor.database

import com.crow.ktor.config.BaseString.DataBase.DRIVER_DB
import com.crow.ktor.logger.BaseLogger.baseLogger
import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.DatabaseConfig
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.transaction

object BaseDBFactory {

    val mDataBaseMap = hashMapOf<String, Database>()

    // 获取数据库连接字符串
    private fun getDBUrl(baseDBConfig: BaseDBConfig): String {
        return "jdbc:mysql://${baseDBConfig.url}:${baseDBConfig.port}/${baseDBConfig.databaseName}"
    }

    // 创建连接池
    fun createHikariDataSource(baseDBConfig: BaseDBConfig) = HikariDataSource(HikariConfig().apply {
        username = baseDBConfig.username ?: ""
        password = baseDBConfig.password ?: ""
        driverClassName = DRIVER_DB
        jdbcUrl = getDBUrl(baseDBConfig)
        maximumPoolSize = 20
        isAutoCommit = false
        transactionIsolation = "TRANSACTION_REPEATABLE_READ"
        validate()
    })

    // 初始化本地数据库并添加到事务管理
    inline fun addDBConnectionToTransaction(baseDBConfig: BaseDBConfig, crossinline dbSuccess: (db: Database) -> Unit) {

        runCatching {

            // 以数据库名为键,将数据源 (使用提供的配置创建一个 HikariDataSource 连接池) 添加到值中
            mDataBaseMap[baseDBConfig.databaseName] =
                Database.connect(createHikariDataSource(baseDBConfig), databaseConfig = DatabaseConfig {
                    keepLoadedReferencesOutOfTransaction = true
                })

            // 使用数据源连接数据库并执行事务
            dbSuccess(mDataBaseMap[baseDBConfig.databaseName]!!)

        }.onFailure {
            baseLogger.error(it.stackTraceToString())
        }
    }

    fun createDataBase(baseDBConfig: BaseDBConfig, vararg dbNames: String) {
        baseDBConfig.databaseName = ""
        val db = Database.connect(
            getDBUrl(baseDBConfig),
            DRIVER_DB,
            baseDBConfig.username ?: "",
            baseDBConfig.password ?: ""
        )
        transaction(db) {
            dbNames.forEach {
                SchemaUtils.createDatabase(it)
            }
        }
        TransactionManager.closeAndUnregister(db)
    }
}
crowforkotlin commented 1 year ago

How to use

    BaseDBFactory.createDataBase(BaseDBConfig(), CrowString.DataBase.DB_UpdateApp, BaseString.DataBase.DB_Crow)

    BaseDBFactory.addDBConnectionToTransaction(BaseDBConfig(CrowString.DataBase.DB_UpdateApp)) {
        transaction(it) {
            SchemaUtils.create(AppUpdateTable)
        }
    }
AlexeySoshin commented 1 year ago

@CrowForKotlin Glad it has been resolved. Can we close this issue then, please?