Closed JasperCiti closed 2 years ago
awaitOne
is a suspending extension function from spring-r2dbc
. Are you sure you are using Kotlin coroutines?
I must admit that I am very new to Kotlin coroutines. I was under the impression that I am using it because my build.gradle.kts got:
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.12.5")
implementation("io.micrometer:micrometer-registry-prometheus")
implementation("io.r2dbc:r2dbc-h2:$r2dbcH2Version")
implementation("io.r2dbc:r2dbc-postgresql:$r2dbcPostgresqlVersion")
implementation("org.apache.commons:commons-lang3")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:$coroutinesVersion")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$coroutinesVersion")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$coroutinesVersion")
implementation("org.springframework.boot:spring-boot-configuration-processor")
implementation("org.springframework.boot:spring-boot-devtools") // Doesn't work on compileOnly
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
implementation("org.springframework.boot:spring-boot-starter-security")
implementation("org.springframework.boot:spring-boot-starter-validation")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("pl.treksoft:r2dbc-e4k:$e4kVersion")
implementation(kotlin("reflect"))
implementation(kotlin("stdlib-jdk7"))
implementation(kotlin("stdlib-jdk8"))
and it works in your AddressService example that is also within the same project.
I noticed that
suspend fun insert(history: History) {
dbClient.insert().into("history")
.value("period", history.period)
.value("low", history.low)
.value("high", history.high)
.await()
log.info("$history")
}
doesn't give a compile error or runtime error, but it also doesn't seem to insert anything into the DB although I do see the log output.
And what if you add this import:
import org.springframework.r2dbc.core.awaitOne
Does it compile?
Here is the whole file:
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.future.future
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import pl.treksoft.e4k.core.DbClient
import pl.treksoft.e4k.core.insert
import the.profiter.features.history.HistoryPeriod
import the.profiter.features.history.entities.History
import java.util.concurrent.CompletableFuture
import org.springframework.r2dbc.core.awaitOne
@Component
class HistoryStore(private val dbClient: DbClient) {
fun insertAndWait(history: History, period: HistoryPeriod): CompletableFuture<Unit> = GlobalScope.future {
insert(history, period)
}
suspend fun insert(history: History, period: HistoryPeriod) {
dbClient.insert().into("history_${period.name.lowercase()}")
.value("period", history.period)
.value("low", history.low)
.value("high", history.high)
.awaitOne()
log.info("$history for $period")
}
companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
}
}
Got a new error with the import:
Unresolved reference. None of the following candidates is applicable because of receiver type mismatch:
public suspend fun <T> RowsFetchSpec<TypeVariable(T)>.awaitOne(): TypeVariable(T) defined in org.springframework.r2dbc.core
I was wrong. Sorry. This awaitOne()
function you are searching for is NOT from spring. It's in fact my own function. You have this problem because there are two forms of insert()
. One is insert().into("table_name", "id_column")
and the other is insert().into("table_name")
. Only the first form has .awaitOne()
method at the end (because it returns the value of the inserted id column). The other form should have .fetch().rowsUpdated().awaitSingle()
(and will just return number of updated columns - 1).
Thank you very much. I got it to work.
This code allows an external Java method to call insertAndWait(...) if I am interested in the ID.
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.future.future
import kotlinx.coroutines.reactor.awaitSingle
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import pl.treksoft.e4k.core.DbClient
import pl.treksoft.e4k.core.insert
import the.profiter.features.history.HistoryPeriod
import the.profiter.features.history.entities.History
import java.util.concurrent.CompletableFuture
@Component
class HistoryStore(private val dbClient: DbClient) {
fun insertAndWait(history: History, period: HistoryPeriod): CompletableFuture<Int> = GlobalScope.future {
insert(history, period)
}
suspend fun insert(history: History, period: HistoryPeriod): Int {
val result = dbClient.insert().into("history_${period.name.lowercase()}")
.value("period", history.period)
.value("low", history.low)
.value("high", history.high)
.fetch().rowsUpdated().awaitSingle()
log.info("$history for $period")
return result;
}
companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
}
}
And this code allows me to insert the row and forget if my Java method don't care about the ID.
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import pl.treksoft.e4k.core.DbClient
import pl.treksoft.e4k.core.insert
import the.profiter.features.history.HistoryPeriod
import the.profiter.features.history.entities.History
@Component
class HistoryStore(private val dbClient: DbClient) {
fun insert(history: History, period: HistoryPeriod) {
dbClient.insert().into("history_${period.name.lowercase()}")
.value("period", history.period)
.value("low", history.low)
.value("high", history.high)
.fetch().rowsUpdated().block()
log.info("$history for $period")
}
companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
}
}
I also noticed that without the .fetch().rowsUpdated()...
it doesn't update the database at all.
I'm looking at the example: https://github.com/rjaros/r2dbc-e4k
I got
but I get an error