JetBrains / Exposed

Kotlin SQL Framework
http://jetbrains.github.io/Exposed/
Apache License 2.0
8.21k stars 683 forks source link

Use Kotlinx Coroutines SharedFlow to subscribe to updates #1560

Open aSemy opened 2 years ago

aSemy commented 2 years ago

I would like to use a Kotlinx Coroutines SharedFlow to subscribe to entity updates.

Example usage

Here's a very quick demo showing how I'd like to use it:

object StarWarsFilms : IntIdTable() {
  val sequelId: Column<Int> = integer("sequel_id").uniqueIndex()
  val name: Column<String> = varchar("name", 50)
  val director: Column<String> = varchar("director", 50)
}

val starWarsFilmNames: SharedFlow<String> =
  StarWarsFilms
    .select { 
      StarWarsFilms.sequelId eq 8
    }
    .sharedFlow { 
      // map the entity
      it[StarWarsFilms.name]
    }

suspend fun main() {
  starWarsFilmNames.onEach { name ->
    println("Star Wars film name $name") // will print the name every time a new entity is added
  }.launchIn(this)
}

Updating

I would also like to be able to push updates into a table using a MutableSharedFlow. Although I think this can be achieved already, having a built-in library function would help with ergonomics.

Restrictions

I understand that there are issues with the database drivers being inherently synchronous ('Working with Coroutines', https://github.com/JetBrains/Exposed/issues/1551#issuecomment-1198542569). However, I would like it if this functionality was implemented as best-effort, even if the underlying driver was not optimally.

Current options?

I couldn't see any easy way to do this presently.

I couldn't find any 'subscribe' or 'listening' options described in the DSL or DAO docs, and I couldn't see any existing usage of a Flow in the project.

SqlDelight has similar functionality: https://cashapp.github.io/sqldelight/jvm_sqlite/coroutines/

Related

Dogacel commented 1 year ago

👍 Kotlin Flow support would be super nice. Being able to process data as streams rather than batches is a huge plus. Also, it decreases the load on the DB because of the backpressure mechanism of flows. Makes processing huge amounts of data very easy.

I think the backpressure mechanism can be implemented by using pagination (limit - offset) with a synchronous drivers.

wwalkingg commented 1 year ago

any updates or plan?