dariuszseweryn / RxAndroidBle

An Android Bluetooth Low Energy (BLE) Library with RxJava3 interface
http://polidea.github.io/RxAndroidBle/
Apache License 2.0
3.44k stars 583 forks source link

Wrong notification and indication order #628

Closed ArtyomBasharkevich closed 4 years ago

ArtyomBasharkevich commented 5 years ago

Summary

I have two notification observables and one indication observable. When I receive any byte array I convert the header to get the message number. In my example I send a command to the device and receive indication about the command status (success, for example) and then after 1-2 milliseconds (as in the logs) notification about a change in device status. The trouble is that if I have RxBleLogs with logLevel at least Info then everything is fine, but if I just coment out the code snippet for logging I will get the wrong messages order.

Library version

1.10.1

Preconditions

Pixel 3, Android 9.0 or 10.0. It shouldn't matter.

Steps to reproduce actual result


1. Comment out RxBleLog options
2. Establish ble connection
3. Setup notification and indication
4. Subscribe to observables
5. Send any command

Actual result

NEW_NOTIFICATION in the log, then NEW_INDICATION (in 0-1 milliseconds on my phone).

Expected result

NEW_INDICATION in the log, then NEW_NOTIFICATION (in 1-2 milliseconds on my phone).

Minimum code snippet reproducing the issue

bleDevice.establishConnection(false)
      .delay(Constants.DELAY_FOR_SETUP_INDICATION, TimeUnit.MILLISECONDS)
      .flatMap(
        {
          it.setupNotification(UUID.fromString(BluetoothConstants.PTLS_LOCK_READ_NOTIFICATION_CHARACTERISTIC))
        },
        { bleConnection, ptlsNotifications ->
          ConnectionData(bleConnection, ptlsNotifications.map { convertHeader(it) })
        })
      .flatMap(
        {
          it.rxBleConnection.setupNotification(UUID.fromString(BluetoothConstants.LOCK_READ_NOTIFICATION_CHARACTERISTIC))
        },
        { connectionData, lockNotifications ->
          connectionData.apply {
            this.lockNotifications = lockNotifications.map {
              Timber.e("NEW_NOTIFICATION = " + System.currentTimeMillis())
              convertHeader(it)
            }
          }
        })
      .flatMap(
        {
          it.rxBleConnection.setupIndication(UUID.fromString(BluetoothConstants.LOCK_NOTIFICATION_CHARACTERISTIC))
        },
        { connectionData, lockIndications ->
          connectionData.apply {
            this.lockIndications = lockIndications.map {
              Timber.e("NEW_INDICATION = " + System.currentTimeMillis())
              convertHeader(it)
            }
          }
        })
      .doOnError {
        Timber.e(it)
        if (it !is BleAlreadyConnectedException) {
          wrapperListener.onConnectionChanged(false)
          ptlsHelper?.closeConnection()
          ptlsHelper = null
          session = null
        }
      }
      .retryWhen {
        it.takeWhile { remaining.getAndIncrement() != MAX_RETRY_ATTEMPTS_ON_CONNECTION_ERROR }
          .flatMap { error ->
            Timber.e("retry ${remaining.get()} time")
            Timber.e("${bleDevice.macAddress} : ${error.localizedMessage}")
            Observable.timer(5, TimeUnit.SECONDS)
          }
      }
      .doOnNext { remaining.lazySet(0) }
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(
        { onConnected(it) },
        { wrapperListener.onConnectionChanged(false) }
      ).addTo(compositeDisposable)
    RxBleLog.updateLogOptions(
      LogOptions.Builder()
        .setMacAddressLogSetting(LogConstants.MAC_ADDRESS_FULL)
        .setShouldLogAttributeValues(true)
        .setUuidsLogSetting(LogConstants.UUIDS_FULL)
        .setLogLevel(LogConstants.INFO)
        .build()
    )
dariuszseweryn commented 5 years ago

Hello,

I see no actionable data here. All notifications/indications are serialised as soon as possible to mitigate such situations. Android however may dispatch notifications on arbitrary threads which can cause some subtle race conditions — this the library tries to avoid as mentioned above.

You have not added logs as you mention that everything seems to work. You could show logs with and without library logs and there is a chance that notification callback logs from the OS will still show something.

It would be best if you could provide a unit test-case that is failing right now due to race conditions so it could be fixed. Even better if you would fix the issue on your own and make a PR. I just doubt that there is anything more we could do at this moment as some Android OS implementations are extremely sensitive for timing/threading of BluetoothGattCallback callbacks — this makes BLE to break on older versions.

Anyway you should be able to change your code to accommodate uncertainty of order of notifications.

ArtyomBasharkevich commented 5 years ago

I will try to test it better, and then I will answer what I achieve.

ArtyomBasharkevich commented 5 years ago

I found the trouble. Inside RxBleGattConnect there is such a method:

@Override
        public void onCharacteristicChanged(BluetoothGatt gatt, BluetoothGattCharacteristic characteristic) {
            LoggerUtil.logCallback("onCharacteristicChanged", gatt, characteristic, true);
            nativeCallbackDispatcher.notifyNativeChangedCallback(gatt, characteristic);
            super.onCharacteristicChanged(gatt, characteristic);

            /*
             * It is important to call changedCharacteristicSerializedPublishRelay as soon as possible because a quick changing
             * characteristic could lead to out-of-order execution since onCharacteristicChanged may be called on arbitrary
             * threads.
             */
            if (changedCharacteristicSerializedPublishRelay.hasObservers()) {
                changedCharacteristicSerializedPublishRelay.accept(
                        new CharacteristicChangedEvent(
                                characteristic.getUuid(),
                                characteristic.getInstanceId(),
                                characteristic.getValue()
                        )
                );
            }
        }

It is called on separate Binder thread by android. Here the accept method is called on PublishRelay. PublishRelay is obtained by 'NotificationAndIndicationManager' from here:

    public Observable<CharacteristicChangedEvent> getOnCharacteristicChanged() {
        return Observable.merge(
                disconnectionRouter.<CharacteristicChangedEvent>asErrorOnlyObservable(),
                changedCharacteristicSerializedPublishRelay
        )
                .observeOn(callbackScheduler);
    }

Then, in the 'NotificationAndIndicationManager' class, it is called:

    @NonNull
    private static Observable<byte[]> observeOnCharacteristicChangeCallbacks(RxBleGattCallback gattCallback,
                                                                             final CharacteristicNotificationId characteristicId) {
        return gattCallback.getOnCharacteristicChanged()
                .filter(new Predicate<CharacteristicChangedEvent>() {
                    @Override
                    public boolean test(CharacteristicChangedEvent notificationIdWithData) {
                        return notificationIdWithData.equals(characteristicId);
                    }
                })
                .map(new Function<CharacteristicChangedEvent, byte[]>() {
                    @Override
                    public byte[] apply(CharacteristicChangedEvent notificationIdWithData) {
                        return notificationIdWithData.data;
                    }
                });
    }

I mean, when we get a new CharacteristicChangedEvent, thenfilter and map are called.

For example we have two subscribers. When PublishRelay receives a CharacteristicChangedEvent, the first subscriber receives this event in the filter, and then the second. But when the PublishRelay receives two events one after the other with a slight delay, sometimes due to the race condition the first subscriber receives these two events in the filter, and only then the second subscriber receives them. In my case, if the first event must be processed by the second subscriber, and the second event by the first subscriber, I get the wrong order, because the first subscriber will receive the second event faster than the second subscriber will receive the first event. This trouble occurs due to the call to 'observeOn(callbackScheduler)' in the getOnCharacteristicChanged () method. As far as I understand, rxJava has its own message queue, and it does not work the way I expect, not the way it works in the message queue inside the Executor.

There are several possible solutions:

First:


1. Remove 'observeOn(callbackScheduler)' in the getOnCharacteristicChanged () method.
2. Call it inside observeOnCharacteristicChangeCallbacks after calling filter.

Second:

We need to call accept in a new thread and delete 'observeOn(callbackScheduler)'. We can use the existing Scheduler orExecutor for this purpose.

Below is a snippet that may help to understand and test these solutions.

import android.annotation.SuppressLint
import android.os.Bundle
import android.util.Log
import android.widget.Button
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.Executors
import kotlin.concurrent.thread

class MainActivity : AppCompatActivity() {

    private lateinit var button: Button

    private val executor = Executors.newSingleThreadExecutor()
    private val scheduler = Schedulers.from(executor)
    private val subject = PublishSubject.create<Int>()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        button = findViewById(R.id.button)
        button.setOnClickListener {
            thread(name = "My thread") {
                //executor.submit { 
                //scheduler.scheduleDirect {
                Log.e("start", "Start" + '\n' + "Thread = ${Thread.currentThread().name}")
                subject.onNext(2)
                subject.onNext(1)
//             }
//             }
            }
        }
        setupListeners(1)
        setupListeners(2)
    }

    @SuppressLint("CheckResult")
    private fun setupListeners(int: Int) {
        subject
            .observeOn(scheduler)
            .filter {
                Log.e(
                    "filter",
                    "subscriber = $int filter = $it Thread = ${Thread.currentThread()}"
                )
                Thread.sleep(100)
                it == int
            }
//            .observeOn(scheduler)
            .map {
                Log.e("map", "subscriber = $int map = $it Thread = ${Thread.currentThread()}")
                it
            }
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                Log.e(
                    "subscribe",
                    "subscriber = $int subscribe = $it Thread = ${Thread.currentThread()}"
                )
            }
    }
}

This is not a unit test, but that may be enough.

ArtyomBasharkevich commented 5 years ago

When I tried to call accept on a new thread using the existing callbackScheduler, I got the wrong behavior. I decided to use observeOn(callbackScheduler) inside observeOnCharacteristicChangeCallbacks after calling filter.

ArtyomBasharkevich commented 5 years ago

Actually, since observeOnCharacteristicChangeCallbacks() is static, I added observeOn to the setupServerInitiatedCharacteristicRead().

...
final Observable<Observable<byte[]>> newObservable = setCharacteristicNotification(bluetoothGatt, characteristic, true)
                            .andThen(ObservableUtil.justOnNext(observeOnCharacteristicChangeCallbacks(gattCallback, id)))
                            .observeOn(callbackScheduler)
                            .compose(setupModeTransformer(descriptorWriter, characteristic, enableNotificationTypeValue, setupMode))
...
dariuszseweryn commented 5 years ago

Hello again, I am back from holiday (that is why I was silent)

Thank you for your investigation. You have provided an example but have not included logs of result. Also you did not include logs (actual/expected) as per my request from the actual app you use. Having a log of what is actually happening with the notifications you receive and how they are processed could help me understand the issue.

It seems that you want to use several .subscribe() statements which do not give you any promises about sequence between them. If you want a resilient flow you should use a single .subscribe() for the computation.

ArtyomBasharkevich commented 5 years ago

Each time I call rxBleConnection.setupNotification orrxBleConnection.setupIndication, a new observer for the subject is created inside your library. There are different UUIDs, and for this reason you use filter.

I subscribe to one notification and one indication.

I added some logs to show you what happened.

@Override
    public void onCharacteristicChanged(BluetoothGatt gatt,
        BluetoothGattCharacteristic characteristic) {
      >>> Log.e("onCharacteristicChanged", characteristic.getUuid().toString());
      LoggerUtil.logCallback("onCharacteristicChanged", gatt, characteristic, true);
      nativeCallbackDispatcher.notifyNativeChangedCallback(gatt, characteristic);
      super.onCharacteristicChanged(gatt, characteristic);

      /*
       * It is important to call changedCharacteristicSerializedPublishRelay as soon as possible because a quick changing
       * characteristic could lead to out-of-order execution since onCharacteristicChanged may be called on arbitrary
       * threads.
       */
      if (changedCharacteristicSerializedPublishRelay.hasObservers()) {
        changedCharacteristicSerializedPublishRelay.accept(
            new CharacteristicChangedEvent(
                characteristic.getUuid(),
                characteristic.getInstanceId(),
                characteristic.getValue()
            )
        );
      }
    }
@NonNull
  private static Observable<byte[]> observeOnCharacteristicChangeCallbacks(
      RxBleGattCallback gattCallback,
      final CharacteristicNotificationId characteristicId) {
    return gattCallback.getOnCharacteristicChanged()
        .filter(new Predicate<CharacteristicChangedEvent>() {
          @Override
          public boolean test(CharacteristicChangedEvent notificationIdWithData) {
           >>> Log.e("filter", "notificationIdWithData = " + notificationIdWithData.first.toString()
                + " characteristicId = " + characteristicId.first.toString());
            return notificationIdWithData.equals(characteristicId);
          }
        })
        .map(new Function<CharacteristicChangedEvent, byte[]>() {
          @Override
          public byte[] apply(CharacteristicChangedEvent notificationIdWithData) {
            >>> Log.e("map", "notificationIdWithData = " + notificationIdWithData.first.toString()
                + " characteristicId = " + characteristicId.first.toString());
            return notificationIdWithData.data;
          }
        });
  }

Actual result

2019-10-28 14:43:08.493 E/onCharacteristicChanged: 00000501...
2019-10-28 14:43:08.493 E/onCharacteristicChanged: 00000101...
2019-10-28 14:43:08.494 D/LockBtApiInteractor$setupSubscriber: closeLock()::onSuccess()
2019-10-28 14:43:08.494 E/filter: notificationIdWithData = 00000501... characteristicId = 00000101...
2019-10-28 14:43:08.494 E/filter: notificationIdWithData = 00000101... characteristicId = 00000101...
2019-10-28 14:43:08.494 E/map: notificationIdWithData = 00000101... characteristicId = 00000101...
2019-10-28 14:43:08.495 D/DoorLockConnectionWrapper current message number = 14
2019-10-28 14:43:08.495 E/filter: notificationIdWithData = 00000501... characteristicId = 00000501...
2019-10-28 14:43:08.495 E/map: notificationIdWithData = 00000501... characteristicId = 00000501...
2019-10-28 14:43:08.495 D/DoorLockConnectionWrapper current message number = 13
2019-10-28 14:43:08.495 E/filter: notificationIdWithData = 00000101... characteristicId = 00000501...

Expected result

E/onCharacteristicChanged: 00000501...
E/onCharacteristicChanged: 00000101...
D/LockBtApiInteractor$setupSubscriber: closeLock()::onSuccess()
E/filter: notificationIdWithData = 00000501... characteristicId = 00000101...
E/filter: notificationIdWithData = 00000501... characteristicId = 00000501...
E/map: notificationIdWithData = 00000501... characteristicId = 00000501...
D/DoorLockConnectionWrapper current message number = 13
E/filter: notificationIdWithData = 00000101... characteristicId = 00000101...
E/map: notificationIdWithData = 00000101... characteristicId = 00000101...
D/DoorLockConnectionWrapper current message number = 14
E/filter: notificationIdWithData = 00000101... characteristicId = 00000501...
dariuszseweryn commented 5 years ago

The code you provided does not show how you consume the API and has no info about threads. Could you add those?

ArtyomBasharkevich commented 5 years ago
private fun connect() {
    val remaining = AtomicInteger()
    bleDevice.establishConnection(false)
      .delay(Constants.DELAY_FOR_SETUP_INDICATION, TimeUnit.MILLISECONDS)
      .flatMap(
        {
          it.setupNotification(UUID.fromString(BluetoothConstants.PTLS_LOCK_READ_NOTIFICATION_CHARACTERISTIC))
        },
        { bleConnection, ptlsNotifications ->
          ConnectionData(bleConnection, ptlsNotifications.map { convertHeader(it) })
        })
      .flatMap(
        {
          it.rxBleConnection.setupNotification(UUID.fromString(BluetoothConstants.LOCK_READ_NOTIFICATION_CHARACTERISTIC))
        },
        { connectionData, lockNotifications ->
          connectionData.apply {
            this.lockNotifications = lockNotifications.map { convertHeader(it) }
          }
        })
      .flatMap(
        {
          it.rxBleConnection.setupIndication(UUID.fromString(BluetoothConstants.LOCK_NOTIFICATION_CHARACTERISTIC))
        },
        { connectionData, lockIndications ->
          connectionData.apply { this.lockIndications = lockIndications.map { convertHeader(it) } }
        })
      .doOnError {
        Logger.err(it, "Error in connection to ${bleDevice.macAddress}")
        if (it !is BleAlreadyConnectedException) {
          wrapperListener.onConnectionChanged(false)
          ptlsHelper?.closeConnection()
          ptlsHelper = null
          session = null
        }
      }
      .retryWhen {
        it.flatMap { error ->
          if (remaining.getAndIncrement() != MAX_RETRY_ATTEMPTS_ON_CONNECTION_ERROR
            && bleClient.state == RxBleClient.State.READY
          ) {
            Timber.e("retry connect to lock: ${remaining.get()} time")
            Timber.e("${bleDevice.macAddress} : ${error.localizedMessage}")
            Observable.timer(5, TimeUnit.SECONDS)
          } else {
            Observable.error(ConnectionWasDeadError())
          }
        }
      }
      .doOnNext { remaining.lazySet(0) }
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(
        { onConnected(it) },
        {
          when (it) {
            is ConnectionWasDeadError -> wrapperListener.onConnectionDead()
            else ->  wrapperListener.onConnectionChanged(false)
          }
        }
      ).addTo(compositeDisposable)
  }
  private fun onConnected(connectionData: ConnectionData) {
    rxBleConnection = connectionData.rxBleConnection
    ptlsEstablishNotifications = connectionData.ptlsEstablishNotifications
    lockNotifications = connectionData.lockNotifications
    lockInteractor = LockBtApiInteractor(connectionData.rxBleConnection)
    lockNotifications
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(
        { onNotificationCharacteristic(it) },
        { Timber.e(it) }
      ).addTo(compositeDisposable)
    connectionData.lockIndications
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(
        { onIndicationCharacteristic(it) },
        { Timber.e(it) }
      ).addTo(compositeDisposable)

    establishPtlsConnection()
  }
dariuszseweryn commented 5 years ago

And about onNotificationCharacteristic / onIndicationCharacteristic?

ArtyomBasharkevich commented 5 years ago
  private fun convertHeader(byteArray: ByteArray): ByteArray {
    Timber.d("Lock ${bleDevice.macAddress} current message number = ${(byteArray.first().toInt() shr (4)) + 8}")
    return byteArray.apply { this[0] = first() and 0xF }
  }
  private fun onNotificationCharacteristic(incomingMessage: ByteArray) {
    if (incomingMessage.first() == BluetoothConstants.PTLS_DATA_ENCRYPTED) {
      session?.read(
        SecurityData(incomingMessage.copyOfRange(1, incomingMessage.size),
          {
            it?.also { message ->
              Logger.ble(LoggerUtils.mapLockNotification(bleDevice.macAddress, message))
              wrapperListener.onNotificationChanged(message)
            }
          })
      )
    } else if (incomingMessage.first() == BluetoothConstants.PTLS_DATA_NOT_ENCRYPTED) {
      val message = incomingMessage.copyOfRange(1, incomingMessage.size)
      Logger.ble(LoggerUtils.mapLockNotification(bleDevice.macAddress, message))
      wrapperListener.onNotificationChanged(message)
    }
  }
  private fun onIndicationCharacteristic(incomingMessage: ByteArray) {
    if (incomingMessage.first() == BluetoothConstants.PTLS_DATA_ENCRYPTED) {
      session?.read(
        SecurityData(incomingMessage.copyOfRange(1, incomingMessage.size),
          { byteArray ->
            byteArray?.also { message ->
              Logger.ble(LoggerUtils.mapLockCommand(bleDevice.macAddress, message))
              wrapperListener.onIndicationChanged(message)
            }
          })
      )
    } else if (incomingMessage.first() == BluetoothConstants.PTLS_DATA_NOT_ENCRYPTED) {
      val message = incomingMessage.copyOfRange(1, incomingMessage.size)
      Logger.ble(LoggerUtils.mapLockCommand(bleDevice.macAddress, message))
      wrapperListener.onIndicationChanged(message)
    }
  }
dariuszseweryn commented 5 years ago

And again — down the rabbit hole. wrapperListener.on*

ArtyomBasharkevich commented 5 years ago
          override fun onNotificationChanged(message: ByteArray) {
            if (message.isEmpty()) return
            notificationSubject.onNext(message)
            when (message.first()) {
              BluetoothConstants.NOTIFICATION_LOCK_STATUS_CHANGE -> {
                updateLockStatus(deviceToConnect.macAddress, message[1], message[2])
              }
            }
          }
          override fun onIndicationChanged(message: ByteArray) {
            indicationSubject.onNext(message)
          }

It doesn't matter what's next. As I said, the main problem is that in our case, the FIFO order is not used for observation. Just try running my simple example that I posted above.

dariuszseweryn commented 5 years ago

Could you add info about threading (it looks like 16516-16516 in logcat)?

2019-10-28 14:43:08.493 E/onCharacteristicChanged: 00000501...
2019-10-28 14:43:08.493 E/onCharacteristicChanged: 00000101...
2019-10-28 14:43:08.494 D/LockBtApiInteractor$setupSubscriber: closeLock()::onSuccess()
2019-10-28 14:43:08.494 E/filter: notificationIdWithData = 00000501... characteristicId = 00000101...
2019-10-28 14:43:08.494 E/filter: notificationIdWithData = 00000101... characteristicId = 00000101...
2019-10-28 14:43:08.494 E/map: notificationIdWithData = 00000101... characteristicId = 00000101...
2019-10-28 14:43:08.495 D/DoorLockConnectionWrapper current message number = 14
2019-10-28 14:43:08.495 E/filter: notificationIdWithData = 00000501... characteristicId = 00000501...
2019-10-28 14:43:08.495 E/map: notificationIdWithData = 00000501... characteristicId = 00000501...
2019-10-28 14:43:08.495 D/DoorLockConnectionWrapper current message number = 13
2019-10-28 14:43:08.495 E/filter: notificationIdWithData = 00000101... characteristicId = 00000501...

I am trying to understand the problem you are trying to solve. The proposed solution of moving .observeOn() around can potentially break a lot of working applications either by doing too much work on the Android BLE callback threads or by changing behaviour of code consuming notification emissions. The funny thing is how RxJava handles two different .subscribe() observers. From the logs you pasted it looks like both of them are correctly called in the same sequence as the notifications arrived. There is a good chance that if you would rewrite the flow to use only a single .subscribe() the problem would be solved. Absolutely non-bulletproof solution — have you tried changing the order of .subscribe() statements of your notifications and indications?

ArtyomBasharkevich commented 5 years ago
2019-10-28 22:21:49.005 16845-16878 E/onCharacteristicChanged: 00000501...
2019-10-28 22:21:49.005 16845-16878 E/onCharacteristicChanged: 00000101...
2019-10-28 22:21:49.005 16845-16991 D/LockBtApiInteractor$setupSubscriber: openLock()::onSuccess()
2019-10-28 22:21:49.005 16845-16991 E/filter: notificationIdWithData = 00000501... characteristicId = 00000101...
2019-10-28 22:21:49.006 16845-16991 E/filter: notificationIdWithData = 00000101... characteristicId = 00000101...
2019-10-28 22:21:49.006 16845-16991 E/map: notificationIdWithData = 00000101... characteristicId = 00000101...
2019-10-28 22:21:49.006 16845-16991 D/DoorLockConnectionWrapper current message number = 11
2019-10-28 22:21:49.006 16845-16991 E/filter: notificationIdWithData = 00000501... characteristicId = 00000501...
2019-10-28 22:21:49.006 16845-16991 E/map: notificationIdWithData = 00000501... characteristicId = 00000501...
2019-10-28 22:21:49.006 16845-16991 D/DoorLockConnectionWrapper current message number = 10
2019-10-28 22:21:49.007 16845-16991 E/filter: notificationIdWithData = 00000101... characteristicId = 00000501...

I use only one subscriber for notification and one for indication. I can use Observable.merge() to have one subscriber but it doesn't solve the problem. If I change the subscription order, it will help me, but if I receive notification and indication in a different order I will have the same problem. You need to understand that using observeOn() before filter() with your Executor can change the message order. There is Schedulers.trampoline() for keeping the order of messages, but it does not matter as it does not change the thread.

I call rxBleConnection.setupNotification() and rxBleConnection.setupIndication() for different uuids. This means that two observables are created inside the library in NotificationAndIndicationManager in

setupServerInitiatedCharacteristicRead(@NonNull final BluetoothGattCharacteristic characteristic,
final NotificationSetupMode setupMode, final boolean isIndication)

In my logs you can see that first I receive characteristic with uuid 00000501..., then with uuid 00000101... But after filter() they are swapped, because first map() is called for 00000101... and then for 00000501... Different observables are created inside the library for different characteristics, but you use one subject for all of them. Then you use filter() to separate messages for different characteristics.

  public Observable<CharacteristicChangedEvent> getOnCharacteristicChanged() {
    return Observable.merge(
        disconnectionRouter.<CharacteristicChangedEvent>asErrorOnlyObservable(),
        changedCharacteristicSerializedPublishRelay
    )
        .observeOn(callbackScheduler);
  }
    @NonNull
    private static Observable<byte[]> observeOnCharacteristicChangeCallbacks(RxBleGattCallback gattCallback,
                                                                             final CharacteristicNotificationId characteristicId) {
        return gattCallback.getOnCharacteristicChanged()
                .filter(new Predicate<CharacteristicChangedEvent>() {
                    @Override
                    public boolean test(CharacteristicChangedEvent notificationIdWithData) {
                        Log.e("filter", "notificationIdWithData = " + notificationIdWithData.first.toString()
                            + " characteristicId = " + characteristicId.first.toString());
                        return notificationIdWithData.equals(characteristicId);
                    }
                })
                .map(new Function<CharacteristicChangedEvent, byte[]>() {
                    @Override
                    public byte[] apply(CharacteristicChangedEvent notificationIdWithData) {
                        Log.e("map", "notificationIdWithData = " + notificationIdWithData.first.toString()
                            + " characteristicId = " + characteristicId.first.toString());
                        return notificationIdWithData.data;
                    }
                });
    }

For example I have ten observables with different uuid and for all of them I call subscribe(). All code runs in a single thread. I receive ten CharacteristicChangedEvents with different uuid at the same time. changedCharacteristicSerializedPublishRelay.accept is called, then filter() is called in observeOnCharacteristicChangeCallbacks method for subscribers/Observables that were created in the library in setupServerInitiatedCharacteristicRead method. My subscribers receive first CharacteristicChangedEvent in the order in which I called subscribe(). Filter() is called for the first subscriber with the first event. Then for the second subscriber with the first event. Then for the third subscriber with the first event, etc. If the fifth subscriber has the same uuid as the CharacteristicChangedEvent then map() is called. When the chain call for my fifth subscriber is finished the sixth subscriber receives CharacteristicChangedEvent in the filter() with the first event. When the first event is processed by all subscribers, the second event begins to be processed, etc.

But what happenes if we call observeOn() before the filter() as in the library? The first subscriber will receive the first event in filter(). Then the first subscriber will receive the second event in filter(). Then the first subscriber will receive the third event in filter(), etc. In the end, we have the wrong order of messages due to the fact that messages are processed in the subscription order, and not in the order in which my application received them.

There are two possible solutions.

First:

you can delete observeOn() in this method

  public Observable<CharacteristicChangedEvent> getOnCharacteristicChanged() {
    return Observable.merge(
        disconnectionRouter.<CharacteristicChangedEvent>asErrorOnlyObservable(),
        changedCharacteristicSerializedPublishRelay
    )
        .observeOn(callbackScheduler);
  }

and call changedCharacteristicSerializedPublishRelay.accept in a new thread.

Second:

you can call observeOn() after calling filter(). For example you can make

@NonNull
    private static Observable<byte[]> observeOnCharacteristicChangeCallbacks(RxBleGattCallback gattCallback,
                                                                             final CharacteristicNotificationId characteristicId)

non-static and call observeOn() right after the filter().

ArtyomBasharkevich commented 5 years ago

Also you can add observeOn() here:

Observable<Observable<byte[]>> setupServerInitiatedCharacteristicRead(
            @NonNull final BluetoothGattCharacteristic characteristic, final NotificationSetupMode setupMode, final boolean isIndication
    ) {
        return Observable.defer(new Callable<ObservableSource<Observable<byte[]>>>() {
            @Override
            public ObservableSource<Observable<byte[]>> call() {
                synchronized (activeNotificationObservableMap) {

                    ...

                    final Observable<Observable<byte[]>> newObservable = setCharacteristicNotification(bluetoothGatt, characteristic, true)
                            .andThen(ObservableUtil.justOnNext(observeOnCharacteristicChangeCallbacks(gattCallback, id)))
>>>                         .observeOn(callbackScheduler)
                    ...
dariuszseweryn commented 5 years ago

FYI ACK, investigating...

dariuszseweryn commented 5 years ago

Thank you @ArtyomBasharkevich for your persistence — I have learned something. You can checkout the new PR and confirm if it helps your case.

ArtyomBasharkevich commented 5 years ago

@dariuszseweryn, I have two news: 1) first, I will not be able to answer next week; 2) second, I reviewed your fix and read your messages on the rx tracker, but my example and the actual project with a delay do not work as I expected (the message order is still changing). It’s strange for me that your example was fine. I have not tried your example, but I am sure that the problem is not solved. As far as I remember, when I researched the source code, I found the problem in Scheduler. That means it doesn't matter what function you call (but I was so surprised that delay() could solve the problem!). If you want to have a good example and know for sure that the solution works, add Thread.sleep(100) into the filter(). This ensures that all your messages are queued and there is no race condition.

ArtyomBasharkevich commented 5 years ago

I will explore it more deeply in a week.

dariuszseweryn commented 5 years ago

In tests I have used a TestScheduler that is fully controllable and in RxBleGattCallbackTest all actions are triggered in testScheduler.triggerActions() which exposed the buggy behaviour from this issue. When I have switched to delay(0, TimeUnit.SECONDS, scheduler) the issue indeed seems to be fixed. I have also test it on your original code from here and it looks to be fixed. Feel free to retest it and report back.

ArtyomBasharkevich commented 4 years ago

Using your new RxBleGattCallback I checked this bug again in real project. This is still not fixed. Try this, you can change delay() to observeOn() and there will be no difference:

import android.annotation.SuppressLint
import android.os.Bundle
import android.util.Log
import android.widget.Button
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.concurrent.thread

class MainActivity : AppCompatActivity() {

    private lateinit var button: Button

    private val executor = Executors.newSingleThreadExecutor()
    private val scheduler = Schedulers.from(executor)
    private val subject = PublishSubject.create<Int>()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        button = findViewById(R.id.button)
        button.setOnClickListener {
            thread(name = "My thread") {
                for (x in 1..10) {
                    Log.e("start $x", "Start" + '\n' + "Thread = ${Thread.currentThread().name}")
                    subject.onNext(x)
                }
            }
        }
        for (x in 10 downTo 1) {
            setupListeners(x)
        }
    }

    @SuppressLint("CheckResult")
    private fun setupListeners(int: Int) {
        subject
            .delay(0, TimeUnit.SECONDS, scheduler)
//            .observeOn(scheduler)
            .filter {
                Log.e(
                    "filter",
                    "subscriber = $int filter = $it Thread = ${Thread.currentThread()}"
                )
                Thread.sleep(100)
                it == int
            }
            .map {
                Log.e("map", "subscriber = $int map = $it Thread = ${Thread.currentThread()}")
                it
            }
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                Log.e(
                    "subscribe",
                    "subscriber = $int subscribe = $it Thread = ${Thread.currentThread()}"
                )
            }
    }
}
akarnokd commented 4 years ago

Hi. Try a single threaded scheduled executor for the delay(0).

ArtyomBasharkevich commented 4 years ago

@akarnokd you are right. Using Schedulers.single() sovles the problem, but why can't I use Executors.newSingleThreadExecutor()? What if I want to create my own single threaded scheduled executor?

akarnokd commented 4 years ago

newSingleThreadExecutor doesn't support timed scheduling so behind the scenes, Schedulers.single() is helping. If you mix timed and non timed execution on Schedulers.from, you can end up with time races based on the OS' timer resolution. Use [newSingleThreadScheduledExecutor](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newSingleThreadScheduledExecutor()).

ArtyomBasharkevich commented 4 years ago

@akarnokd I already tried, it worked like newSingleThreadExecutor.

    private val executor = Executors.newSingleThreadScheduledExecutor()
    private val scheduler = Schedulers.from(executor)
akarnokd commented 4 years ago

It could mean you have a race somewhere else before the single-threaded funneling of events. You mentioned in a comment that the setup is driven by events that can be triggered from any thread. Neither PublishRelay nor PublishSubject serializes its input so if two threads signal to the same subject, you get a race. You serialize it and you can get an arbitrary reorder.

Could you post some project that demonstrates the issue so I can see what I can dig up from the internals of RxJava?

ArtyomBasharkevich commented 4 years ago

I created an example where there are no race conditions, try this https://github.com/Polidea/RxAndroidBle/issues/628#issuecomment-554525711

ArtyomBasharkevich commented 4 years ago

This example only works correctly with private val scheduler = Schedulers.single() and using delay() instead of observeOn() (observeOn() is commented out in this example).

akarnokd commented 4 years ago

Okay, I see what you mean. Schedulers.from uses a trampoline so if you happen to submit a lot of work together, it will essentially batch them up for the underlying executor to work on them together. With single(), there is no such coalescing. Schedulers.from is to work with an arbitrary executor which could house any number of worker threads, therefore, to ensure the contract is held, it uses a trampoline that can exhibit this batching behavior.

ArtyomBasharkevich commented 4 years ago

Does this mean that the only solution to my problem is to use delay with Schedulers.single()?

akarnokd commented 4 years ago

Yes. In case you need many such single schedulers you can create more with RxJavaPlugins.createSingleScheduler officially or new SingleScheduler() unofficially.

ArtyomBasharkevich commented 4 years ago

@akarnokd, thank you very much! You helped me a lot!

ArtyomBasharkevich commented 4 years ago

@dariuszseweryn in this method

        @Provides
        @Named(NamedSchedulers.BLUETOOTH_CALLBACKS)
        @ClientScope
        static Scheduler provideBluetoothCallbacksScheduler(@Named(NamedExecutors.BLUETOOTH_CALLBACKS) ExecutorService service) {
            return Schedulers.from(service);
        }

you could use something like this:

return RxJavaPlugins.createSingleScheduler(RxThreadFactory("ANY YOUR NAME", Thread.NORM_PRIORITY, true));

in pair with delay() as you used earlier in pr.

dariuszseweryn commented 4 years ago

Yup, I am watching this thread. I am also thinking whether there should be (at some time in the future) an option to switch between .delay(0) and .observeOn() implementations depending on user preference.

dariuszseweryn commented 4 years ago

I think having a Executors.newSingleThreadScheduledExecutor() would be an equivalent

dariuszseweryn commented 4 years ago

This new RxJavaPlugins.createSingleScheduler() should be shut down eventually? Correct me if I am wrong but Schedulers.from(Executor) does not handle .shutdown() correctly and the Executor should be shut down directly? — this question is for David I suppose

akarnokd commented 4 years ago

Yes, they need manual shutdown if you don't use daemon threads.

ArtyomBasharkevich commented 4 years ago

RxThreadFactory in the above example will use a daemon thread, as I understand it

akarnokd commented 4 years ago

Yes, but RxThreadFactory is an internal API thus shouldn't be relied upon this way. The create is public and you should provide your custom factory.

dariuszseweryn commented 4 years ago

@ArtyomBasharkevich You can check the branch/PR now — it now has a SingleScheduler used @akarnokd Again thank you for the help

ArtyomBasharkevich commented 4 years ago

@dariuszseweryn Excellent, now everything works