robfallows / tunguska-reactive-aggregate

Reworks jcbernack:reactive-aggregate for ES6/ES7 and Promises
MIT License
52 stars 27 forks source link

Multi observers lookup bug : not firing a re-run of the aggregation #36

Open DonJGo opened 4 years ago

DonJGo commented 4 years ago

Hi guys,

Package version : 1.3.0 Was enjoying your great job with your package while I encounter an issue. I would like to have time to dive into your project and push a PR but I don't have yet.

My data was never updated while using a lookup and specific observers like in this exemple

Meteor.publish("biographiesByWelshAuthors", function () {
  ReactiveAggregate(this, Authors, [{
    $lookup: {
      from: "books",
      localField: "_id",
      foreignField: "author_id",
      as: "author_books"
    }
  }], {
    noAutomaticObserver: true,
    observers: [
      Authors.find({ nationality: 'welsh'}),
      Books.find({ category: 'biography' })
    ]
  });
});

But while i was using the aggregate like following it was working but an update on the lookup collection was not firing a re-run of the aggregation.

Meteor.publish("biographiesByWelshAuthors", function () {
  ReactiveAggregate(this, Authors, [{
    $lookup: {
      from: "books",
      localField: "_id",
      foreignField: "author_id",
      as: "author_books"
    }
  }]);
});

So i dive into the aggregate.js code to understand why it was working with default parameters. There I found that in the default case : the find() parameters were empty objects.

// Set up local options based on defaults and supplied options
  const localOptions = {
    ...{
      noAutomaticObserver: false,
      aggregationOptions: {},
      observeSelector: {},
      observeOptions: {},
      observers: [], // cursor1, ... cursorn
      debounceCount: 0,
      debounceDelay: 0, // mS
      clientCollection: collection._name,
      debug: false,
      objectIDKeysToRepair: []
    },
    ...options
  };
  // Here the default case treated
  if (!localOptions.noAutomaticObserver) {
    const cursor = collection.find(localOptions.observeSelector, localOptions.observeOptions);
    localOptions.observers.push(cursor);
  }

  const handles = [];
  // track any changes on the observed cursors
  localOptions.observers.forEach(cursor => {
    const name = cursor._cursorDescription.collectionName;
    if (localOptions.debug) console.log(`Reactive-Aggregate: collection: ${name}: initialise observer`)
    handles.push(cursor.observeChanges({
      added(id) {
        debounce({ name, mutation: 'added', id } );
      },
      changed(id) {
        debounce({ name, mutation: 'changed', id });
      },
      removed(id) {
        debounce({ name, mutation: 'removed', id });
      },
      error(err) {
        throw new TunguskaReactiveAggregateError (err.message);
      }
    }));
  });

So my solution is to do this in order to re-run the aggregate also on observers :

Meteor.publish("biographiesByWelshAuthors", function () {
  ReactiveAggregate(this, Authors, [{
    $lookup: {
      from: "books",
      localField: "_id",
      foreignField: "author_id",
      as: "author_books"
    }
  }], {
    noAutomaticObserver: true,
    observers: [
      Authors.find({}, {}),
      Books.find({}, {})
    ]
  });
});

I cannot find a way to make it works with find({ id: "1" }) parameters, maybe I am doing something wrong but i was doing the same as your exemple in the readme. Don't hesitate to tell me the good way if i'm wrong.

I hope this could be help to resolve this issue and help those who could encouter it. Best regards,

robfallows commented 4 years ago

@DonJGo : find({ id: "1" }) will only cause a re-run if the id field === "1", which likely will never be. Your find needs to be one which will result in the return of changes to the cursor based on the query. If you don't know what that query looks like, the brute-force way is to use find(), which modifies the cursor on any change to any document. That's not a scaleable solution, but should get you started.

It would be more helpful if you could provide your code, not mine.

DonJGo commented 4 years ago

Hi Rob, I cannot provide my code but the given exemple explain well what happen to me. As you mention, id: "1" will never be fired as auto generated _id cannot be 1. But it was for the exemple.

If you prefer i'm filtering my observers with an organizationID field. Do not take into account the _id values they are there for the example.

// Exemple item in collection Organization
const Organization = {
   _id: "12324567", // an auto generatedID by the database
   name: "test",
}
// Exemple item in collection App. We could have multi app per organizationID
const App = {
   _id: "1e12ffgg1234", // an auto generatedID by the database
   name: "test",
   organizationID: "12324567",
   status: "active"
}

// There when i sub on organization with all related app I cannot do : 
Meteor.publish("orgsWithApp", function (organizationID) {
  ReactiveAggregate(this, Organization, [{
    $lookup: {
      from: "app",
      localField: "_id",
      foreignField: "organizationID",
      as: "apps"
    }
  }], {
    noAutomaticObserver: true,
    observers: [
      Organization.find({ _id: organizationID }),
      App.find({ organizationID: organizationID, status: "active" })
    ]
  });
});

I hope this new exemple will be more representative of my problem. In my use case i just need to publish the Apps related to my organizationID other should not be published. With App.find({ organizationID: organizationID, status: "active" }) this cursor will re-run aggregate only for document that match my find. But with this as observers of ReactiveAggregate, my aggregate never re-run for both collection (App and Organization).

robfallows commented 4 years ago

I think I understand what you're trying to do. If I've followed your logic you're trying to use observers to control the evaluation of the pipeline. What I think you should be doing is changing your pipeline.

In my use case i just need to publish the Apps related to my organizationID other should not be published.

Maybe like this:

Meteor.publish("orgsWithApp", function (organizationID) {
  ReactiveAggregate(this, Organization, [{
    $match: {
      _id: organizationID
    }
  }, {
    $lookup: {
      from: "app",
      localField: "_id",
      foreignField: "organizationID",
      as: "apps"
    }
  }], {
    noAutomaticObserver: true,
    observers: [
      App.find({ organizationID: organizationID, status: "active" })
    ]
  });
});
DonJGo commented 4 years ago

Yeah the match is ok to filter my data and i get the correct result on the first call. But if there is any change on an app of this organization, there is no re-run of the aggregate and my data in front is never updated.

Moreover, in your exemple, as you have noAutomaticObserver: true, the main table Organization is no more observed. So any change on it wont trigger the re-run of the aggregate.

Yesterday i tried to make it work with only one observer and the result was the same.

robfallows commented 4 years ago

But if there is any change on an app of this organization, there is no re-run of the aggregate and my data in front is never updated.

This: App.find({ organizationID: organizationID, status: "active" }) will only track changes to documents in Map which have the expected organizationID and are active. Documents not having the expected organizationID will not be tracked. So I would expect that, for example:

Moreover, in your exemple, as you have noAutomaticObserver: true, the main table Organization is no more observed. So any change on it wont trigger the re-run of the aggregate.

The only change your example code would have tracked was on a single document in the Organization collection. In other words, if the name of that exact organization got changed or the document was deleted, the aggregetation would re-run. Any other changes to the Organization collection would not be observed. Looking at your requirement, as I understand it, that would be the correct behaviour. It does not need to re-run if another Organization document is created.

DonJGo commented 4 years ago

Hi Rob,

Sorry for the delay i'm pretty busy these days. I want to listen both because it change my react component display depending on those values (on organization and app, imagine a liste of app from your organization dashboard).

But when i use find query filters the data is never updated. I would take time to build an example, have you got a test environment or should i build it from scratch (in that case i cannot take time now to build everithing from scratch) ?