square / sqlbrite

A lightweight wrapper around SQLiteOpenHelper which introduces reactive stream semantics to SQL operations.
https://square.github.io/sqlbrite/3.x/sqlbrite/
Apache License 2.0
4.57k stars 416 forks source link

Inconsistent notifications #231

Closed bnvinay92 closed 6 years ago

bnvinay92 commented 6 years ago

I'm initialising BriteDatabase with Schedulers.single() and executing a query on a table then writing 5 items to the same table on the same scheduler, namely Schedulers.single().

I'm receiving all 5 notifications only after all 5 writes, so my query emits the same (last) item 5 times.

@MainThread
  private void testInterleaved() {
    kvStore.getAll(String.class).subscribe(list -> Timber.i(list.toString())); //SELECT * FROM key_value_store;

    kvStore.put("reviewer1", "Vinay1", String.class).subscribeOn(Schedulers.single()).subscribe(); // INSERT OR REPLACE INTO key_value_store (id, value) VALUES(?,?);
    kvStore.put("reviewer1", "Vinay2", String.class).subscribeOn(Schedulers.single()).subscribe();
    kvStore.put("reviewer1", "Vinay3", String.class).subscribeOn(Schedulers.single()).subscribe();
    kvStore.put("reviewer1", "Vinay4", String.class).subscribeOn(Schedulers.single()).subscribe();
    kvStore.put("reviewer1", "Vinay5", String.class).subscribeOn(Schedulers.single()).subscribe();
  }

A PUT looks something like this:

Single<Long> put(BriteDatabase db) {
    return Single.create(emitter -> {
      PutKeyValue putKeyValue = new PutKeyValue(db.getWritableDatabase());
      putKeyValue.bind(id(), value());
      try {
        long rowId = db.insert(TABLE_NAME, FACTORY.marshal(this).asContentValues(), SQLiteDatabase.CONFLICT_REPLACE);
        emitter.onSuccess(rowId);
      } catch (SQLException e) {
        emitter.onError(e);
      }
    });
  }

Even when I read and write on separate threads, I get inconsistent notifications.

JakeWharton commented 6 years ago

All 5 subscribeOn's enqueue on the scheduler before the first insert completes. Thus, the enqueued notifications happen after all 5 inserts complete.

Even if you received the notifications immediately the output is probabilistic in nature at best. We only send a notification of a change and not the actual change itself. When you eventually call/trigger run() on the Query object the data in the underlying table could have been changed.