OpenFn / lightning

OpenFn/Lightning ⚡️ is the newest version of the OpenFn DPG and provides a web UI to visually manage complex workflow automation projects.
https://openfn.github.io/lightning/
GNU Lesser General Public License v3.0
131 stars 36 forks source link

Collections: something up with cursors? #2683

Open josephjclark opened 4 hours ago

josephjclark commented 4 hours ago

Running some demos with Collections and I think something is up at the backend:

Here's some really simple code. Generate 100k items, upsert them all, get them all, remove them all.

fn((state = {}) => {
  state.values = new Array(1e5).fill(0)
    .map(() => Math.round(Math.random() * 1e5))
    .map((v, count) => ({ id: `item-${count}`, cost: v }))

  console.log(`Generated ${state.values.length} items of the form: `, state.values[0])

  return state;
})

collections.set('nh-demo', (value) => value.id, $.values)

collections.get('nh-demo', '*').then((state) => {
  return state
});

collections.remove('nh-demo', { key: '*' })

Check these logs:

Generated 100000 items of the form:  {"cost":83801,"id":"item-0"}
[set 10000 items]
Collections: Fetched total of 9883 values from "nh-demo"
ADA Collections: Removed 100000 values in "nh-demo"

(I've excluded the set logs because it uploads in chunks and I don't have a convenient counter)

Something is amiss here! Why did get('*') only download 9883 / 100000 values??

The server tells us that 100k items were removed, so I'm confident that the items a were all actually uploaded and were all actually removed.

I think this issue is either:

a) Something is wrong with the paginated cursor (adaptor-side or server-side) b) Something is wrong with the JSON streamer adaptor side

josephjclark commented 4 hours ago

This unit test works fine in the adaptor code:

it('should iterate over many many items', async () => {
    const items = new Array(1e4)
      .fill(0)
      .map((v, idx) => [`${idx}`, { id: `item-${idx}` }]);

    const { state } = init(items);

    let count = 0;

    await collections.each(COLLECTION, '*', (state, value, key) => {
      count++;
      expect(state).to.eql(state);

      const item = JSON.parse(api.byKey(COLLECTION, key));
      expect(item).not.to.be.undefined;
      expect(item).to.eql(value);
    })(state);

    expect(count).to.eql(items.length);
  });

Some notes:

josephjclark commented 4 hours ago

Another significant node: in the adaptor implementation, get uses each. So it's no surprise that get and each both exhibit the problem - they both use the JSON streaming API internally.