estuary / flow

🌊 Continuously synchronize the systems where your data lives, to the systems where you _want_ it to live, with Estuary Flow. 🌊
https://estuary.dev
Other
530 stars 45 forks source link

doc/reduce: implement merge.associative, following lww.associative #1435

Closed mdibaiee closed 2 months ago

mdibaiee commented 2 months ago

Description:

Here is how I tested this, and I have questions about part of the behaviour as I'm not sure if my expectations are misaligned or if the behaviour is legitimately questionable. My expectations are based on what I understand "History Mode" to be, not on an understanding of how associative actually works:

Here is my schema:

---
type: object
required: ['id', '_meta']
$anchor: PublicTest
properties:
  id:
    type: integer
  x:
    type: ['string', 'null']
  _meta:
    type: object
    required:
      - op
      - source
    properties:
      op:
        enum: ['c', 'd', 'u']

reduce:
  strategy: merge
  associative: false

I start with a postgres database with a table test with no records. I then run:

flowctl preview --source source-postgres.flow.yaml --network flow-test --delay 10s

During the first 10 seconds, I run in the database:

postgres=# insert into test(id, x) values (1, 'created'); update test set x='updated' where id=1; delete from test; insert into test(id, x) values (1, 'created'); update test set x='updated' where id=1;

After 10 seconds, I get these reduced documents from the capture preview:

["acmeCo/test",{"_meta":{"op":"c","source":{"loc":[-1,0,0],"schema":"public","snapshot":true,"table":"test"}},"id":1,"x":"updated"}]

This means the events from my first run of insert, update, delete and again insert and update have all been reduced to one document. This does not match my expectation that each event will be emitted separately.

Then I run this command on the database, with the same flowctl preview running:

postgres=# delete from test; insert into test(id, x) values (1, 'created'); update test set x='updated' where id=1; delete from test; insert into test(id, x) values (1, 'created'); update test set x='updated' where id=1;

This time, after 10 seconds I get:

["acmeCo/test",{"_meta":{"op":"d","source":{"loc":[23954352,23954352,23954416],"schema":"public","table":"test","ts_ms":1712316069738}},"id":1}]
["acmeCo/test",{"_meta":{"op":"c","source":{"loc":[23954464,23954464,23954600],"schema":"public","table":"test","ts_ms":1712316069739}},"id":1,"x":"created"}]
["acmeCo/test",{"_meta":{"op":"u","source":{"loc":[23954648,23954648,23954728],"schema":"public","table":"test","ts_ms":1712316069740}},"id":1,"x":"updated"}]
["acmeCo/test",{"_meta":{"op":"d","source":{"loc":[23954776,23954776,23954840],"schema":"public","table":"test","ts_ms":1712316069741}},"id":1}]
["acmeCo/test",{"_meta":{"op":"c","source":{"loc":[23954888,23954888,23955024],"schema":"public","table":"test","ts_ms":1712316069741}},"id":1,"x":"created"}]
["acmeCo/test",{"_meta":{"op":"u","source":{"loc":[23955072,23955072,23955152],"schema":"public","table":"test","ts_ms":1712316069742}},"id":1,"x":"updated"}]

This does match my expectation that each event has its own document now and the sequence is okay. This did not happen for the first run, but on subsequent updates / deletes to the documents, events seem to be emitted separately.


Now I run the same process, this time with associative: true, that is:

---
type: object
required: ['id', '_meta']
$anchor: PublicTest
properties:
  id:
    type: integer
  x:
    type: ['string', 'null']
  _meta:
    type: object
    required:
      - op
      - source
    properties:
      op:
        enum: ['c', 'd', 'u']

reduce:
  strategy: merge
  associative: true

Then:

flowctl preview --source source-postgres.flow.yaml --network flow-test --delay 10s

And:

postgres=# insert into test(id, x) values (1, 'created'); update test set x='updated' where id=1; delete from test; insert into test(id, x) values (1, 'created'); update test set x='updated' where id=1;

I get:

["acmeCo/test",{"_meta":{"op":"c","source":{"loc":[-1,0,0],"schema":"public","snapshot":true,"table":"test"}},"id":1,"x":"updated"}]

So far, this is the same as associative: false for the first set of events. Then I run:

postgres=# delete from test; insert into test(id, x) values (1, 'created'); update test set x='updated' where id=1; delete from test; insert into test(id, x) values (1, 'created'); update test set x='updated' where id=1;

I get:

["acmeCo/test",{"_meta":{"op":"d","source":{"loc":[23963240,23963296,23963360],"schema":"public","table":"test","ts_ms":1712316313616}},"id":1}]
["acmeCo/test",{"_meta":{"op":"u","source":{"loc":[23964016,23964016,23964096],"schema":"public","table":"test","ts_ms":1712316313631}},"id":1,"x":"updated"}]

Here this is also a bit strange, because the deletion event seems to be emitted separately, but the create and update events are not emitted separately and are reduced. This seems to be the difference between associative: true and false. But it is not as I expect it to be either. In case of non-history-mode, I would expect to have only one document emitted which would be the final updated document. The deletion does not need to be there. Note that I don't have a delete reduction annotation here.


This change is Reviewable