kuzzleio / sdk-dart

Dart SDK developed for Flutter to interact with Kuzzle API for database storage and search, user authentication and realtime notifications
MIT License
17 stars 4 forks source link

The subscribe method of Dart SDK throws error after few seconds. #43

Closed rafay-tariq closed 3 years ago

rafay-tariq commented 3 years ago

I am using the Dart SDK. So, when I subscribe the collection to kuzzle. It works fine but after few seconds it may tries to re-subscribe the collection (this is our assumption). So after few seconds it throws the following error in subscribe method only.


E/flutter (30105): [ERROR:flutter/lib/ui/ui_dart_state.cc(166)] Unhandled Exception: Concurrent modification during iteration: Instance(length:43) of '_GrowableList'.
E/flutter (30105): #0      ListIterator.moveNext (dart:_internal/iterable.dart:337:7)
E/flutter (30105): #1      RealTimeController._renewSubscribe (package:kuzzle/src/controllers/realtime.dart:159:25)
E/flutter (30105): <asynchronous suspension>
E/flutter (30105): #2      Function._apply (dart:core-patch/function_patch.dart:11:30)
E/flutter (30105): #3      Function.apply (dart:corepatch/function_patch.dart:34:12)
E/flutter (30105): #4      KuzzleEventEmitter.emit (package:kuzzle/src/kuzzle/event_emitter.dart:143:18)
E/flutter (30105): #5      Kuzzle.connect.<anonymous closure>.<anonymous closure> (package:kuzzle/src/kuzzle.dart:232:9)
E/flutter (30105): #6      _rootRunUnary (dart:async/zone.dart:1198:47)
E/flutter (30105): #7      _CustomZone.runUnary (dart:async/zone.dart:1100:19)
E/flutter (30105): #8      _FutureListener.handleValue (dart:async/future_impl.dart:143:18)
E/flutter (30105): #9      Future._propagateToListeners.handleValueCallback (dart:async/future_impl.dart:696:45)
E/flutter (30105): #10     Future._propagateToListeners (dart:async/future_impl.dart:725:32)
E/flutter (30105): #11     Future._completeWithValue (dart:async/future_impl.dart:529:5)
Aschen commented 3 years ago

Hi @rafay-tariq !

Apparently you are removing/adding item to your list through an iteration (see this stackoverflow answer)

I suspect that you are modifying your list inside the realtime subscription callback and iterating over it somewhere else in the code.

I'm not sure this is related with the SDK behavior but maybe @jenow @xbill82 or @Njuelle will have more insight than me on this one

rafay-tariq commented 3 years ago

Thanks for answering. But I am not adding/removing anything in it. Just subscribe the method and print the notifications only. This gives the error after 30-40 seconds

Aschen commented 3 years ago

Can you try to provide a minimal code to reproduce you error please?

It will help us to quickly identify and fix the bug

rafay-tariq commented 3 years ago

Using Dart Language. In the main file I make connection with kuzzle and then on the second screen I subscribe the function. This sends the above exception. But the whole code works fine and I also received the notification when i update the document on kuzzle however it sends me the error after few seconds. Please figure it out.

void main() {
  driverKuzzle().then((_) {
    runApp(DriverApp());
  });
}
static driverKuzzle() async {
    kuzzleObject = Kuzzle(
        WebSocketProtocol(Uri(
          scheme: 'wss',
          host: 'YOUR HOST NAME',
          port: 443,
          //  userInfo:
        )),
        offlineMode: OfflineMode.auto);

    kuzzleObject.connect().then((_) async {
      print("* Kuzzle is connected");
      var token = await kuzzleObject.auth
          .login('local', {'username': 'YOUR USER NAME', 'password': 'YOUR PASSWORD'});
      KuzzleClass.kuzzleObject = kuzzleObject;
      KuzzleClass.kuzzleToken = token;
    });

And on the second screen I called this function in the initState

@override
  void initState() {
    getPermission().then((_) {
      subscribeMessage();
    });
    super.initState();
  }
 void subscribeMessage() {
    Map<String, dynamic> _filters = <String, dynamic>{};
    var roomId = KuzzleClass.kuzzleObject.realtime
        .subscribe('dukaan', 'order', _filters, (notification) {
      if (notification.action != 'create') return;
      if (notification.controller != 'document') return;    

        print(notification.result);

    });
    print(roomId);
  }
rafay-tariq commented 3 years ago

Hi @Aschen, this code is enough for reproducing this error ?

xbill82 commented 3 years ago

Hello @rafay-tariq I'll take a look at this ASAP, stay tuned.

rafay-tariq commented 3 years ago

Thanks @xbill82 👍

xbill82 commented 3 years ago

Hello @rafay-tariq I'm trying to reproduce your issue, but there's some details I don't understand.

Here's how my code looks so far

var kuzzle

void main() {
  driverKuzzle().then((_) {
    runApp(MyApp());
  });
}

driverKuzzle() async {
  kuzzle = Kuzzle(
      WebSocketProtocol(Uri(
        scheme: 'ws',
        host: 'localhost',
        port: 7512,
        //  userInfo:
      )),
      offlineMode: OfflineMode.auto);

  kuzzle.connect().then((_) async {
    print("* Kuzzle is connected, now authenticating...");
    final token = await kuzzle.auth
        .login('local', {'username': 'luca', 'password': 'luca'});
    print("Authenticated!");
    print(token);
  }).catchError((error) {
    print("# Kuzzle wasn't able to connect or to authenticate");
    print(error.toString());
  });
}

class MyApp extends StatelessWidget {
  // This widget is the root of your application.
  // ... code code code
}

// ... 

class _MyHomePageState extends State<MyHomePage> {
  int _counter = 0;

  @override
  void initState() {
    this.subscribeMessage();
    super.initState();
  }

  void subscribeMessage() {
    print("Subscribing to messages...");
    Map<String, dynamic> _filters = <String, dynamic>{};
    var roomId =
        kuzzle.realtime.subscribe('test', 'test', _filters, (notification) {
      if (notification.action != 'create') return;
      if (notification.controller != 'document') return;
      print("received notification");
      print(notification.result);
    });
    print(roomId);
  }

  // ...

}

Here's my Debug Console when starting the app...

Restarted application in 418ms.
flutter: Subscribing to messages...  // This is because driverKuzzle doesn't return a Future, so the MainApp gets initialized       before the connection is finished
flutter: Instance of 'Future<String>' // This is the return of subscribe, since there is no await on the call
flutter: * Kuzzle is connected, now authenticating...
flutter: Authenticated!
flutter: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJfaWQiOiJsdWNhIiwiaWF0IjoxNjAyNTgyMzAxLCJleHAiOjE2MDI1ODU5MDF9.mCluAUttIcHWIpdgCPOdjrqjKBo_q4-Qx-2Sh9ndX1U
Reloaded 1 of 586 libraries in 391ms.
flutter: received notification  // Then I create a document...
flutter: {_id: toto, _source: {toti: tata, _kuzzle_info: {author: -1, createdAt: 1602582363686, updatedAt: null, updater: null}}, _version: 1}
rafay-tariq commented 3 years ago

Thanks @xbill82 for replying this. We tried this same code locally ( with HTTP) and also we are not receiving this error. But when we are trying this same code with HTTPs we receive this error that I already mention above. Please fix this issue

E/flutter ( 7214): [ERROR:flutter/lib/ui/ui_dart_state.cc(177)] Unhandled Exception: Concurrent modification during iteration: Instance(length:15) of '_GrowableList'.
E/flutter ( 7214): #0      ListIterator.moveNext (dart:_internal/iterable.dart:337:7)
E/flutter ( 7214): #1      RealTimeController._renewSubscribe (package:kuzzle/src/controllers/realtime.dart:160:25)
E/flutter ( 7214): <asynchronous suspension>
E/flutter ( 7214): #2      Function._apply (dart:core-patch/function_patch.dart:11:30)
E/flutter ( 7214): #3      Function.apply (dart:core-patch/function_patch.dart:34:12)
E/flutter ( 7214): #4      KuzzleEventEmitter.emit (package:kuzzle/src/kuzzle/event_emitter.dart:143:18)
E/flutter ( 7214): #5      Kuzzle.connect.<anonymous closure>.<anonymous closure> (package:kuzzle/src/kuzzle.dart:232:9)
E/flutter ( 7214): #6      _rootRunUnary (dart:async/zone.dart:1198:47)
E/flutter ( 7214): #7      _CustomZone.runUnary (dart:async/zone.dart:1100:19)
E/flutter ( 7214): #8      _FutureListener.handleValue (dart:async/future_impl.dart:143:18)
E/flutter ( 7214): #9      Future._propagateToListeners.handleValueCallback (dart:async/future_impl.dart:696:45)
E/flutter ( 7214): #10     Future._propagateToListeners (dart:async/future_impl.dart:725:32)
E/flutter ( 7214): #11     Future._completeWithValue (dart:async/future_impl.dart:529:5)
E/flutter ( 7214): #12     _AsyncAwaitCompleter.complete (dart:async-patch/async_patch.dart:40:15)
E/flutter ( 7214): #13     _completeOnAsyncReturn (dart:async-patch/async_patch.dart:311:13)
E/flutter ( 7214): #14     AuthController.checkToken (package:kuzzle/src/controllers/auth.dart)
E/flutter ( 7214): <asynchronous suspension>
E/flutter ( 7214): #15     Kuzzle.connect.<anonymous closure> (package:kuzzle/src/kuzzle.dart:226:12)
E/flutter ( 7214): #16     Function._apply (dart:core-patch/function_patch.dart:11:30)
E/flutter ( 7214): #17     Function.apply (dart:core-patch/function_patch.dart:34:12)
E/flutter ( 7214): #18     KuzzleEventEmitter.emit (package:kuzzle/src/kuzzle/event_emitter.dart:143:18)
E/flutter ( 7214): #19     KuzzleProtocol.clientConnected (package:kuzzle/src/protocols/abstract.dart:63:5)
E/flutter ( 7214): #20     WebSocketProtocol.connect (package:kuzzle/src/protocols/websocket.dart:66:5)
E/flutter ( 7214): <asynchronous suspension>
E/flutter ( 7214): #21     KuzzleProtocol.clientNetworkError.<anonymous closure> (package:kuzzle/src/protocols/abstract.dart:87:15)
E/flutter ( 7214): #22     _rootRun (dart:async/zone.dart:1182:47)
E/flutter ( 7214): #23     _CustomZone.run (dart:async/zone.dart:1093:19)
E/flutter ( 7214): #24     _CustomZone.runGuarded (dart:async/zone.dart:997:7)
E/flutter ( 7214): #25     _CustomZone.bindCallbackGuarded.<anonymous closure> (dart:async/zone.dart:1037:23)
E/flutter ( 7214): #26     _rootRun (dart:async/zone.dart:1190:13)
E/flutter ( 7214): #27     _CustomZone.run (dart:async/zone.dart:1093:19)
E/flutter ( 7214): #28     _CustomZone.bindCallback.<anonymous closure> (dart:async/zone.dart:1021:23)
E/flutter ( 7214): #29     Timer._createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:18:15)
E/flutter ( 7214): #30     _Timer._runTimers (dart:isolate-patch/timer_impl.dart:397:19)
E/flutter ( 7214): #31     _Timer._handleMessage (dart:isolate-patch/timer_impl.dart:428:5)
E/flutter ( 7214): #32     _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:168:12)
Aschen commented 3 years ago

I think I see the issue now:

This issue happen only when the SDK is reconnecting.

I think the fix can be quite easy, we just need to duplicate the list before iterating on it here: https://github.com/kuzzleio/sdk-dart/blob/master/lib/src/controllers/realtime.dart#L158

rafay-tariq commented 3 years ago

Updated file --> C:\flutter.pub-cache\hosted\pub.dartlang.org\kuzzle-2.0.1\lib\src\controllers So @Aschen @xbill82 when we _update these two function (subscribe and renewSubscribe ) with the give code the error will be resolved. Right now it is not giving any error but Kindly review this code is the previous error is completely fixed or not ?

 Future<String> subscribe(String index, String collection,
          Map<String, dynamic> filters, SubscribeListener callback,
          {String scope = 'all',
          String state,
          String users,
          Map<String, dynamic> volatile,
          bool subscribeToSelf = true,
          bool autoResubscribe}) async =>
      kuzzle
          .query(
              KuzzleRequest(
                action: 'subscribe',
                controller: 'realtime',
                index: index,
                collection: collection,
                body: filters,
                state: state,
                scope: scope,
                users: users,
                volatile: volatile,
              ),
              volatile: volatile)
          .then((response) {
        final subscription = Subscription(
            index: index,
            collection: collection,
            filters: filters,
            callback: callback,
            volatile: volatile,
            users: users,
            scope: scope,
            state: state,
            subscribeToSelf: subscribeToSelf,
            autoResubscribe: autoResubscribe);
        final roomId = response.result['roomId'] as String;
        final channel = response.result['channel'] as String;

        if (_currentSubscriptions[channel] == null) {
          final List<Subscription> item = [];
          item.add(subscription);
          _currentSubscriptions[channel] = item;
          _subscriptionsCache[channel] = item;
        } else {
   //--------------------------------- updated code -----------------------
          if (!_currentSubscriptions[channel].contains(subscription)) {
            _currentSubscriptions[channel].add(subscription);
          }
          if (!_subscriptionsCache[channel].contains(subscription)) {
            _subscriptionsCache[channel].add(subscription);
          }
  //-----------------------------------end of updated code ----------------
        }
        _rooms[roomId] = channel;

        kuzzle.on(ProtocolEvents.RECONNECTED, _renewSubscribe);

        return roomId;
      });
void _renewSubscribe() async {
    for (final subs in _subscriptionsCache.values) {
  //-------------------------------------updated code --------------
      final List<Subscription> _localSubs = List<Subscription>.from(subs);
      subs.clear();
      for (final sub in _localSubs) {
 //------------------------------------end of updated code----------
        await subscribe(
          sub.index,
          sub.collection,
          sub.filters,
          sub.callback,
          scope: sub.scope,
          state: sub.state,
          users: sub.users,
          volatile: sub.volatile,
          subscribeToSelf: sub.subscribeToSelf,
          autoResubscribe: sub.autoResubscribe,
        );
      }
    }
  }