mongo-dart / mongo_dart

Mongo_dart: MongoDB driver for Dart programming language
https://pub.dev/packages/mongo_dart
MIT License
447 stars 98 forks source link

Watch insert #219

Open Jonaswinz opened 3 years ago

Jonaswinz commented 3 years ago

Hi, I want to detect document inserts in a collection. This is my current try:

Stream changeStream = collection.watch([
      { '\$match' : {"operationType" : "insert" } }
    ]);

    changeStream.listen((event) {
      print("asdf");
    });

I get this error:

type '_InternalLinkedHashMap<String, Object>' is not a subtype of type 'Map<String, Map<String, String>>' of 'element'
#0      List.insert (dart:core-patch/growable_array.dart)
#1      new ChangeStreamOperation (package:mongo_dart/src/database/operation/commands/aggreagation_commands/wrapper/change_stream/change_stream_operatio
n.dart:29:19)
#2      DbCollection.watchCursor (package:mongo_dart/src/database/dbcollection.dart:771:20)
#3      DbCollection.watch (package:mongo_dart/src/database/dbcollection.dart:757:7)
#4      Database.watch (package:database/database.dart:99:71)
#5      main (file:///app/bin/server.dart:24:6)
<asynchronous suspension>

Am I doing it right ? This is inspired by https://docs.mongodb.com/manual/reference/method/db.collection.watch/. I also tried this:

final pipeline = AggregationPipelineBuilder()
        .addStage(Match(where.eq('operationType', "insert").map['\$match'])).build();
giorgiofran commented 3 years ago

It is a problem related to strong type checking. The system infers that your pipeline list is of type Map<String, Map<String, String>> and then complains when there is method adding to it a Map<String, Object>. This is uncomfortable, and I have fixed it. It will be published in the next days (I guess...). In the meanwhile you can declare you pipeline in the following way:

var stream = collection.watch( <Map<String, Object>>[ {  '\$match': {'operationType': 'insert'}  } ]);

Really interesting is the attempt to use the the AggregationPipelineBuilder. It is a shortcut, as it has not been updated to manage values like these, but the eq method should do the work. This said you should define the pipeline like this:

final pipeline = AggregationPipelineBuilder().addStage(Match(where.eq('operationType', 'insert').map['\$query']));
var stream = collection.watch(pipeline);

extracting the '\$query' element and without calling the build method (it is called internally) You were very near to the solution!

Jonaswinz commented 3 years ago

Thank you. It works now. But there is one interesting thing: If i use this code, to listen to the stream events:

insertStream.listen((event) async{
    print("asdf");
  }, onError: (error){
    print(error.toString());
  });

It slows down my code drastically! (I am building a small api) Without the code the request taskes 0,01 - 0,06 seconds, with the code its > 2 seconds.

Do you have any idea why?

giorgiofran commented 3 years ago

Well, no idea. What do you mean with request? Do you mean: 'I do an insert', so if the insert is not "watched", it is fast, otherwise it takes a lot of time? What kind of writeConcern are you using? Because a watch method active forces a "majority" (i.e. if you have a three member replica set it will wait for at least two members to acknowledge the write before sending an event). Try to set a "majority" writeConcern on the insert method independently from the watch being present or not and see what happens.

Jonaswinz commented 3 years ago

With "request" I mean a http request to my dart api. I am currently developing in a docker enviroment and only have one mongodb instance. No "real" replica set. The watcher works fine, but somehow slows down the other api code. I really have no idea why. I am using other streams with no problem (http request stream). I will continue some try and error :-)

giorgiofran commented 3 years ago

The final 0.5.0 is out. Did you discover what caused your performance issues?

natgross commented 3 years ago

Using 0.7.1 on Mongo Server 5.0.2, with 2 secondary replicas, [per this thread] I did:

final pipeline = AggregationPipelineBuilder().addStage(Match(where.eq('operationType', 'insert').map['\$query'])); var stream = collection.watch(pipeline);

Everything works great, EXCEPT the onData(event) callback is being called TWICE per insert with exactly the same data.


UPDATE: The aforementioned is true as viewed from a debugger. However when I try to get data out of the event it complains: Unhandled Exception: type 'ChangeEvent' is not a subtype of type ... The problem is I don't have a ChangeEvent class in any of my libs/pkg's. So, although with the debugger I see the data (twice), I can't get at it at runtime. Here is a screenshot of the ChangeEvent data from the debugger: tnx.

2021-08-18 19_53_31

giorgiofran commented 3 years ago

I did some test on 4.4 and it seems to work fine. The change event shows the original server response, and the main fields extracted to simplify the class use. So, you see the same data twice, but it is only one event. The path to the ChangeEvent class is: lib/src/database/commands/aggreagation_commands/aggregate/return_classes/change_event.dart Please try the example: example/manual/watch/watch_on_collection_insert.dart and let me know if it works.

giorgiofran commented 3 years ago

Please note that the statement should be

final pipeline = AggregationPipelineBuilder().addStage(Match(where.eq('operationType', 'insert').map['\$query']));
natgross commented 3 years ago

Am going to try it shortly. But clarification, I wasn't complaining on the data structure sent inside the event. That's common. I am saying that the event fires twice, a few milliseconds apart, with the same data. As-if two inserts just happened. Seems like you are emitting the same event to the stream twice.

natgross commented 3 years ago

Ok. I tested the example. I am posting the output below.

I/flutter (13177): Waiting for insert to be detected...
I/flutter (13177): Detected change for "custId" 4: "Nathan"
I/flutter (13177): Insert detected, closing stream and db.

ps. The funniest part of this is, that how did you know my name, Nathan, when you wrote that test!!!!!!!

natgross commented 3 years ago

Please note that the statement should be

final pipeline = AggregationPipelineBuilder().addStage(Match(where.eq('operationType', 'insert').map['\$query']));

Unless I'm missing something it is exactly what I had (verified after copy of line into my editor).

natgross commented 3 years ago

Obviously the example code is not exhibiting the duplicate problem, so the problem must be in my code somehow. But I can't figure it out. How do I check why my onData callback gets called twice? Anyhow, I learned a lot from this thread, and .watch() is cool!

giorgiofran commented 3 years ago

It is difficult to say why your callback is called twice. If you could provide a small program where your problem can be tested, it could be a great help. The biggest issue in these cases is always to replicate the error.

natgross commented 3 years ago

I reduced my program to a stripped-down version and it works ok! I think that is somehow related to not await'ing when I should, but not sure about that either. If and when I find the cause, I'll post it. ta

natgross commented 3 years ago

Got it! Say the following method that sets up the .watch():

void initStream(DbCollection collection) {
    void onData(event) {
      print('onData.  eventID: ${event.id}');
    }

    final pipeline01 = AggregationPipelineBuilder().addStage(Match(where.eq('operationType', 'insert').map['\$query']));
    var stream = collection.watch(pipeline01);
    stream.listen(onData);
  }

If you call the method multiple times for the same collection, it will not complain. It will happily oblige and create multiple watch cursors for the same collection! (Of course triggering the callbacks for each.) Yes, it's an error on my part (deep in my framework I was calling some init code that "doesn't hurt to be called multiple times"), but is this expected behavior? At the least, imho, the collection.watch() should have an optional bool to allow/disallow this.

Anyhow, what relief! nat

giorgiofran commented 3 years ago

Thanks Nathan for your feedback. I will evaluate your suggestion.