jonataslaw / getx

Open screens/snackbars/dialogs/bottomSheets without context, manage states and inject dependencies easily with Get.
MIT License
10.34k stars 1.62k forks source link

Using Stream with GetX #785

Open NoldyNayoan opened 3 years ago

NoldyNayoan commented 3 years ago

Hi, I am trying to use GetX with streamed data from Firebase. The data fetched from Firebase has a parameter that can be changed from another widget. The code does not produce errors but the stream does not reset when I change the userId parameter to another value, in fact the stream gets doubled, For intance the first userID='01', click refresh button, the stream with data from userId '01' returned and then I changed the userId from another widget to '02' now I got two streams active, . How to make sure that the stream is always reset and only the last stream active? Here is my code:


From another Widget :
// REFRESH button CLICK, UserListWidget
  Navigator.pushNamed( context, StreamManagerRoute, arguments: { "userId": userId});

//
class StreamManagerScreen extends StatelessWidget {
StreamManagerScreen({this.userId});
final String userId;

 @override
  Widget build(BuildContext context) {
       Get.put(MyStreamService());
    Get.find<MyStreamService>().init(userId); // INIT STREAM WITH USERID    
       return WillPopScope(
              onWillPop: () async {
             return exitToUsersList(
             context,
           );
      },
      child: GetX<MyStreamService>(
    builder: (svc) {
        if (svc == null || svc.userMessage == null) {
          return Scaffold(
              body: Container(
                  child: Center(child: Text("Loading Message".tr + "..."))));
        }
        return SafeArea(
          child: MessageScreen(
            userMessage: svc.userMessage,

          ),
        );
      }),
    );
  }
}

class MyStreamService extends GetxController  {

  final CollectionReference messageCollection =
      Firestore.instance.collection('messages');
  Stream<UserMessage> getUserMessageStream(String userId) {
    return messageCollection
        .document(UserId)
        .snapshots()
        .map(_dataFromSnapshot);
   }

  UserMessage _dataFromSnapshot(
    DocumentSnapshot snapshot,
  ) {
    return UserMessage.fromJson(snapshot.data);

  }
  Rx<UserMessage> _userMessage = Rx<UserMessage>();

  Wheel get userMessage => _userMessage.value;

   init(String userId) {

    try {
      _userMessage.bindStream(getUserMessageStream(userId)); //stream coming from firebase

    } catch (e) {
      print("$e");
    }

  }

}

Can you help with the correct code to tackle this? Thanks.

alvincast commented 3 years ago

maybe this can be an oportunity to see how to implement the last aproach about the proper use of streams with getx anyways i dont know why u "got two streams active" it seems more a problem about your code than the stream itself...

prakash-indorkar commented 3 years ago

Hi I can provide a solution for your message stream. Hope it help others too :)

final CollectionReference messageCollection =
      Firestore.instance.collection('messages');

Stream<List<UserMessage>> getUserMessageStream(String userId) {
    List<UserMessage> messages = [];
    Stream<QuerySnapshot> snapshots = messageCollection
        .document(UserId)
        .snapshots();
    snapshots.listen((QuerySnapshot query) {
      if (query.docChanges.isNotEmpty) {
        messages.clear();
      }
    });
    return snapshots.map((snapshot) {
      snapshot.docs.forEach((messageData) {
        messages.add(UserMessage.fromMap(messageData));
      });
      //print('Total message fetched: ${messages.length}');
      return messages.toList();
    });
  }

////Now that messages is a stream ... bind it ... 
 _userMessage.bindStream(getUserMessageStream(userId));
NoldyNayoan commented 3 years ago

Hi I can provide a solution for your message stream. Hope it help others too :)

final CollectionReference messageCollection =
      Firestore.instance.collection('messages');

Stream<List<UserMessage>> getUserMessageStream(String userId) {
    List<UserMessage> messages = [];
    Stream<QuerySnapshot> snapshots = messageCollection
        .document(UserId)
        .snapshots();
    snapshots.listen((QuerySnapshot query) {
      if (query.docChanges.isNotEmpty) {
        messages.clear();
      }
    });
    return snapshots.map((snapshot) {
      snapshot.docs.forEach((messageData) {
        messages.add(UserMessage.fromMap(messageData));
      });
      //print('Total message fetched: ${messages.length}');
      return messages.toList();
    });
  }

////Now that messages is a stream ... bind it ... 
 _userMessage.bindStream(getUserMessageStream(userId));

OK, Thank you for the code. Now, how to refresh the stream when userId changes? Because my code is really similar to your code but when UserId changes from user01 to user02 ( ), using my previous code the stream is not refreshed to data from user02, instead I got from user01 not from user02 datastream, strangely. I need to have a stream that is refreshed/reset to get data from corresponding userId. Any idea?

NoldyNayoan commented 3 years ago

maybe this can be an oportunity to see how to implement the last aproach about the proper use of streams with getx anyways i dont know why u "got two streams active" it seems more a problem about your code than the stream itself...

Yeah, there is no example of how to use stream properly. I found one in youtube from Amateur Coder but the init stream is only called once and there is no parameter change (like from my example code, userId can be changed). About the two active streams I mentioned, I noticed that when the stream is active and running, and I change the userId (for instance from user01 to user02) and call "Get.find().init(userId)" , the data stream is activated 2 times but the data is still from user01 not from user02. I resorted to using a streambuilder instead of using stream with Getx, but would be nice to have a working example of my problem using GetX.

prakash-indorkar commented 3 years ago
Rx<UserMessage> _userMessage = Rx<UserMessage>();   >>>>>>> You are creating a stream for single user only. Thats why receiving single stream. 

Basically, you need to have a separate stream from a separate user.
Like 
Rx<UserMessage> _user1Message = Rx<UserMessage>();
_user1Message.bindStream(getUserMessageStream(userId));

Rx<UserMessage> _user2Message = Rx<UserMessage>();
_user2Message.bindStream(getUserMessageStream(userId));

I do not know what is your collection structure. But I'm guessing you are storing all the "Messages" in one Top collection.
Just my idea .. you could have a Conversation Collection as Top collection storing only the user ids or maybe the last message received and then a sub collection as Messages, here you can have all the messages between two users with just sender id.

Good Luck.
NoldyNayoan commented 3 years ago
Rx<UserMessage> _userMessage = Rx<UserMessage>();   >>>>>>> You are creating a stream for single user only. Thats why receiving single stream. 

Basically, you need to have a separate stream from a separate user.
Like 
Rx<UserMessage> _user1Message = Rx<UserMessage>();
_user1Message.bindStream(getUserMessageStream(userId));

Rx<UserMessage> _user2Message = Rx<UserMessage>();
_user2Message.bindStream(getUserMessageStream(userId));

I do not know what is your collection structure. But I'm guessing you are storing all the "Messages" in one Top collection.
Just my idea .. you could have a Conversation Collection as Top collection storing only the user ids or maybe the last message received and then a sub collection as Messages, here you can have all the messages between two users with just sender id.

Good Luck.

Thanks for the reply. My User Collections data looks like this

{userId : '01', messages : [{messageId : 1, message : "message 1"}, {messageId:2 , message : "message 2}]} {userId : '02', messages : [{messageId : 21, message : "message 21"}, {messageId:22 , message : "message 22}]} and so on.. I don't intend to create two separate streams, rather I wanted to refresh / recreate one stream when userId changes.

NoldyNayoan commented 3 years ago

I finally got it working by recreating the userMessage object 👍 before binding the stream

var _userMessage = Rx<UserMessage>();

init (String userId) {
_userMessage = Rx<UserMessage>(); // RECREATE/REFRESH a blank new userMessage object _userMessage.bindStream(getUserMessageStream(userId)); //stream coming from firebase } `

HOWEVER, I found out that using stream with GetX is not really stable, sometimes I get error like 'cannot add new event when the object is closed' (or maybe it is me that is not implementing GetX stream in a good possible way). Hence I think, the author of this package should release a working example on how to use stream properly with GetX. Overall , I really like this package because of it's simplicity and performance.

prakash-indorkar commented 3 years ago

Great!.. your Initialising stream with the blank object idea helped me as well. Thank you!..

sooxt98 commented 3 years ago

I think getx should have a closeStream() or disposeStream() for opened bindStream()

like a.bindStream(b), then i can close it with a.closeStream() or maybe clear stream content without closing it a.resetStream()

rawquesh commented 3 years ago

hello, I wanna introduce the best and stable method for firebase streams with getx, it will work for those who want to paginate too.

suppose you have documents of booking,


  RxList<BookingModel> bookings = <BookingModel>[].obs;  // for storing bookings.
  List<StreamSubscription<QuerySnapshot>> streams = []; // for storing subscriptions as list so we can cancel every stream at onClose method.
  RxList<String> bookingsAvailableFor = <String>[].obs; list of dates that already subscribed.

in my case, I am fetching bookings based on the selected date.

  void fetchBookings(DateTime _time) async {
    if (bookingsAvailableFor.contains(BookingModel.getDateAsString(_time))) return; // if stream is already subscribed or not. 
    try {
      final _subscription = FirebaseFirestore.instance
          .collection('bookings')
          .where("selected_dates", arrayContains: BookingModel.getDateAsString(_time)) // its just for my case.
          .snapshots(includeMetadataChanges: true)
          .listen((res) {
            List<BookingModel> newb = res.docs.map((e) => BookingModel.fromSnapshot(e)).toList(); // converting to model.
            for (final b in newb) bookings.removeWhere((e) => b.id == e.id); // deleting old and duplicate data.
            bookings.addAll(newb); // adding to bookings
          });
      streams.add(_subscription); // adding subs to streams list so we can remove every subs at onClose.
      bookingsAvailableFor.add(BookingModel.getDateAsString(_time)); // adding date for prevent duplicating streams.
    } on FirebaseException catch (e) {
      print(e.message);
      return;
    } on Exception catch (e) {
      print(e);
    }
  }

  void onClose() {
    for (final stream in streams) stream.cancel();
    super.onClose();
  }

// closing every single stream.

NOTE: bookingsAvailableFor was just for my case, it depends on your use-case.

this function gets called at OnInit with today's date, and when the user changes the current date.

just wrap your widget with OBX then you are good to go.

let me know if I did something wrong or you have some suggestions.

CSharper63 commented 3 years ago

@rakesh4577 Hello I tried your example. But I don't understand how to listen. Because I use it in collectionGroup query but my list doesn't update.

I want to use this id stream to fetch my models. But when the condition changes inMOVIE_ACCESS_STATUS it doesn't update the id list.

My models are well fetched but when I change in firestore the status like PENDING to ACCEPTED my listener seems not try to re-fetch ids

final List<String> MOVIE_ACCESS_STATUS = ['PENDING', 'ACCEPTED', 'REJECTED'];
RxList<String> docId = RxList();

void movieId(String ui) {
    final _subscription = FirebaseFirestore.instance
        .collectionGroup(MOVIE_VIEWERS_COLLECTION)
        .where('uid', isEqualTo: ui)
        .where('movieAccessStatus', isEqualTo: MOVIE_ACCESS_STATUS[1])
        .snapshots(includeMetadataChanges: true)
        .listen((event) {
      List<String> newJoinedId = event.docs
          .map((e) => e.get('movieId').toString())
          .toList(); //get array from firestore
      for (final b in newJoinedId)
        docId.removeWhere((e) => e == b); 
      docId.addAll(newJoinedId);
    });
    _allStreams.add(_subscription);
  }

Hope you understand my problem :)

claptv commented 3 years ago

Most of my controllers are using streams and it requires too much work for me to convert them to async and listeners. As a quick and easy solution I simply modified get_rx/src/rx_types/rx_core/rx_impl.dart file. I added StreamSubscription\<T> return type to bindStream method as below.

StreamSubscription\<T> bindStream(Stream stream) { final listSubscriptions = _subscriptions[subject] ??= []; final sub = stream.listen((va) => value = va); listSubscriptions.add(sub); return sub; }

And I added a list to each controller: List _subs = [];

When I bind stream, I simply add the returned subscription to the list: _subs.add(_posts.bindStream(_getPosts()));

Finally, cancel all the subscriptions in onClose: @override void onClose() { closeAllStreams(); super.onClose(); }

void closeAllStreams() { try { for (final sub in _subs) sub.cancel(); _subs.clear(); } catch (e) {} }

Now you can manually cancel all the subscriptions of a particular controller by simply invoking the closeAllStreams() method.

In my case, I needed to make a manual call only when the user log out because somehow GetX doesn't close the open streams and delete the instances of the controllers.

alvincast commented 3 years ago

Most of my controllers are using streams and it requires too much work for me to convert them to async and listeners. As a quick and easy solution I simply modified get_rx/src/rx_types/rx_core/rx_impl.dart file. I added StreamSubscription return type to bindStream method as below.

StreamSubscription bindStream(Stream stream) { final listSubscriptions = _subscriptions[subject] ??= []; final sub = stream.listen((va) => value = va); listSubscriptions.add(sub); return sub; }

And I added a list to each controller: List _subs = [];

When I bind stream, I simply add the returned subscription to the list: _subs.add(_posts.bindStream(_getPosts()));

Finally, cancel all the subscriptions in onClose: @override void onClose() { closeAllStreams(); super.onClose(); }

void closeAllStreams() { try { for (final sub in _subs) sub.cancel(); _subs.clear(); } catch (e) {} }

Now you can manually cancel all the subscriptions of a particular controller by simply invoking the closeAllStreams() method.

In my case, I needed to make a manual call only when the user log out because somehow GetX doesn't close the open streams and delete the instances of the controllers.

i tried to use ur code but got some errors can un explain where u get _subscription?

claptv commented 3 years ago

Most of my controllers are using streams and it requires too much work for me to convert them to async and listeners. As a quick and easy solution I simply modified get_rx/src/rx_types/rx_core/rx_impl.dart file. I added StreamSubscription return type to bindStream method as below. StreamSubscription bindStream(Stream stream) { final listSubscriptions = _subscriptions[subject] ??= []; final sub = stream.listen((va) => value = va); listSubscriptions.add(sub); return sub; } And I added a list to each controller: List _subs = []; When I bind stream, I simply add the returned subscription to the list: _subs.add(_posts.bindStream(_getPosts())); Finally, cancel all the subscriptions in onClose: @override void onClose() { closeAllStreams(); super.onClose(); } void closeAllStreams() { try { for (final sub in _subs) sub.cancel(); _subs.clear(); } catch (e) {} } Now you can manually cancel all the subscriptions of a particular controller by simply invoking the closeAllStreams() method. In my case, I needed to make a manual call only when the user log out because somehow GetX doesn't close the open streams and delete the instances of the controllers.

i tried to use ur code but got some errors can un explain where u get _subscription?

As mentioned, you need to modify get_rx/src/rx_types/rx_core/rx_impl.dart file. The original bindStream(Stream stream) method is void and doesn't return the pointer to the bind stream. So you need to add a return type, StreamSubscription\<T\T>. And keep this pointer whenever you bind a new stream as explained above.

If you can provide your code, I could give you better explanation.

OliverRhyme commented 2 years ago

I have this problem also. What I did is I manually close the Rx objects via the onClose callbacks

NoldyNayoan commented 2 years ago

Hi OliverRhime, could you please give a more completed code on how to use the OnClose callbacks to close Rx Objects?

OliverRhyme commented 2 years ago

Hi OliverRhime, could you please give a more completed code on how to use the OnClose callbacks to close Rx Objects?

Just override the onClose method of the GetXContoller then there you can call the Rx object close() method. I can confirm that this actually closes the binded stream via bindStream of the Rx object by attaching a doOnCancel (rxdart) on the source stream.

AntiAura commented 1 year ago
Rx<String?> rxWithStream = Rx(null);
StreamSubscription<String?>? _streamSubscription;

/// Allows to bind a new stream to the Rx while also cancelling the previous stream subscription.
/// Call [stream] with null to cancel the current stream subscription.
void updateStream(Stream<String?>? stream) {
  // Cancel previous stream subscription
  var subscription = _streamSubscription;
  if (subscription != null) {
    subscription.cancel();
  }

  // Optional: Clear Rx before binding new stream
  rxWithStream.value = null;

  // Bind the new stream to Rx
  if (stream != null) {
    _streamSubscription = stream.listen((value) {
      rxWithStream.value = value;
    });
  }
}

Manually subscribe to the stream and update the Rx value when needed. This allows you to 'update' the stream by cancelling the old subscription before creating a new one. Simply overwriting the Rx is not recommended because all Obx widgets listening to it won't update anymore.

Important Obx widgets will no longer automatically cancel the stream subscription when disposed. Add a updateStream(null) call in your onClose.