Closed AlexeySoshin closed 1 year ago
MessageConsumer is a ReadStream
It could work, but I would say it doesn't have the same level of ergonomics. Instead of
bus.consumer<String>("some-address") {
delay(10)
}
It will be something like:
bus.consumer<String>("some-address").toChannel(vertx).consumeEach {
delay(10)
}
And this will have to be done inside a suspending function, since consumeEach
is suspending.
Not the end of the world, for sure.
Also I think if we do the channel approach the code gets 'stuck' inside the consume
with the first approach i just want it to attach the suspending handler and then move onto the code below it
Hi, did anything happen in meanwhile, since using code like this doesn't leverage kotlin coroutines:
private suspend fun listenForSaveBankAccount() {
vertx.eventBus().localConsumer<JsonObject>(Address.BANK_ACCOUNT_SAVE) { msg ->
val json = msg.body()
val username = json.getString("username")
val bankAccount = json.getJsonObject("bankAccount")
sqlClient.preparedQuery("select * from users where username = $1")
.execute(Tuple.of(username))
.compose { Future.succeededFuture(it.first().getInteger(1)) }
.onFailure { msg.fail(FAILURE_CODE_GENERAL, "Could not resolve user by username $username") }
.flatMap { userId ->
sqlClient.preparedQuery("select count(*) from bank_accounts where user_id = $1")
.execute(Tuple.of(userId))
.map { it.first().getInteger(1) }
}
.flatMap { totalUserBankAccounts ->
val maybeCode: String? = bankAccount.getString("code")
if (!maybeCode.isNullOrBlank()) {
} else {
}
}
}
}
Motivation: Following #171 this is my suggestion to have a coroutine-aware EventBus
CoroutineEventBus
that delegates to underlyingEventBus
for non-suspending cases, and implements suspending casesCoroutineMessageConsumer
that does the same for message consumption