ACINQ / phoenix

Phoenix is a self-custodial Bitcoin wallet using Lightning to send/receive payments.
https://phoenix.acinq.co
Apache License 2.0
632 stars 95 forks source link

Start a transaction when consuming a query as a `Flow` #415

Closed robbiehanson closed 10 months ago

robbiehanson commented 10 months ago

During the upgrade/channel-migration process, we've been seeing a bug that's fairly reproducible (not 100% but very common). What happens is, the most recent transaction (in this case, the open-merged-channel transaction) isn't displayed on the Home screen.

While debugging this, my assumption was that probably a Flow wasn't properly firing when the transaction was added to the database. However my assumption was wrong. The Flow definitely was firing properly. However, after it fired, and performed the database query, it was receiving "old" data.

In particular, what I was seeing looked like this:

So here's the previous code we had:

suspend fun listPaymentsOrderFlow(
   count: Int,
   skip: Int
): Flow<List<WalletPaymentOrderRow>> = withContext(Dispatchers.Default) {
   aggrQueries.listAllPaymentsOrder(
      limit = count.toLong(),
      offset = skip.toLong(),
      mapper = ::allPaymentsOrderMapper
   ).asFlow().mapToList()
}

There's a couple things wrong here. First there's no reason for this function to suspend (or to call withContext()) because this function just creates and returns a flow which will suspend in the future, when you collect it. So it can be changed to this:

fun listPaymentsOrderFlow(
   count: Int,
   skip: Int
): Flow<List<WalletPaymentOrderRow>> {
   return aggrQueries.listAllPaymentsOrder(
      limit = count.toLong(),
      offset = skip.toLong(),
      mapper = ::allPaymentsOrderMapper
   ).asFlow().mapToList()
}

Now if we look at SQLDelight's implementation of mapToList() we find this simple implementation:

fun <T : Any> Flow<Query<T>>.mapToList(
  context: CoroutineContext = Dispatchers.Default
): Flow<List<T>> = map {
  withContext(context) {
    it.executeAsList()
  }
}

So mapToList is actually controlling which Dispatcher will be used. And if we change our implementation to this, then the problem appears to be fixed:

return aggrQueries.listAllPaymentsOrder(
   limit = count.toLong(),
   offset = skip.toLong(),
   mapper = ::allPaymentsOrderMapper
)
.asFlow()
.map {
   withContext(Dispatchers.Default) {
      database.transactionWithResult { // <- important line
         it.executeAsList()
      }
   }
}

So what was the bug, and why does this fix it ?

If we're using withContext(Dispatchers.Default), then we might be on a different thread, because Dispatchers.Default can be using a thread pool.

And if we look at SQLDelight's code base we'll find:

/**
 * Native driver implementation.
 *
 * The driver creates two connection pools, which default to 1 connection maximum.
 * There is a reader pool, which handles all query requests outside of a transaction.
 * The other pool is the transaction pool, which handles all transactions and write
 * requests outside of a transaction.
 *
 * When a transaction is started, that thread is aligned with a transaction pool connection.
 * Attempting a write or starting another transaction, if no connections are available,
 * will cause the caller to wait.
 *
 * You can have multiple connections in the transaction pool, but this would only be useful
 * for read transactions. Writing from multiple connections in an overlapping manner can
 * be problematic.
 *
 * ...
 */
class NativeSqliteDriver(
  private val databaseManager: DatabaseManager,
  maxReaderConnections: Int = 1,
) : ConnectionWrapper(), SqlDriver {

  // A pool of reader connections used by all operations not in a transaction
  internal val transactionPool: Pool<ThreadConnection>
  internal val readerPool: Pool<ThreadConnection>

  // Once a transaction is started and connection borrowed, it will be here, but only for that
  // thread
  private val borrowedConnectionThread = ThreadLocalRef<Borrowed<ThreadConnection>>()

  // ... lots of code ...
}

So my theory is that:

Which means it's possible for transaction2 to not see the in-flight readWriteTransaction. Unless we change the code to explicitly start a transaction, which pushes it thru the transactionPool, and then guarantees that it sees the new data.

If this query is correct, it also means that other query.asFlow().mapToWhatever() are subtle bugs that sometimes reproduce. Which would also explain why: