subquery / subql

SubQuery is an Open, Flexible, Fast and Universal data indexing framework for web3. Our mission is to help developers create the decentralised products of the future.
https://subquery.network
GNU General Public License v3.0
18.94k stars 331 forks source link

GraphQL subscriptions `mutation_type` is always UPDATE #2590

Open IcanDivideBy0 opened 4 days ago

IcanDivideBy0 commented 4 days ago

Prerequisites

Description

When subscribing to an entity, the mutation_type seems to always be UPDATE, at least for newly created entity

Details

These details can help to reproduce the environment the issue is occurring

Local Environment: node v20.15.1 Query Version: @subql/query@^2.15.2 Indexer Version: @subql/node-ethereum@^5.1.7 Network Details:

Steps to Reproduce

  1. subscribe to any model
  2. create an entity
  3. the subscription message payload contains a mutation_type set to UPDATE

Expected behavior: mutation_type should be INSERT Actual behavior: mutation_type is UPDATE

IcanDivideBy0 commented 4 days ago

Also, i'm now playing around with deletions, it can't manage to receive any DELETE event in the subscriptions

stwiname commented 12 hours ago

I've had a look into this and without some larger changes this isn't trivial.

The main change would be to this function: https://github.com/subquery/subql/blob/main/packages/node-core/src/db/sync-helper.ts#L173

It would need to be updated to something like:

export function createSendNotificationTriggerFunction(schema: string): string {
  return `
CREATE OR REPLACE FUNCTION "${schema}".send_notification()
    RETURNS trigger AS $$
DECLARE
    channel TEXT;
    table_name TEXT;
    row RECORD;
    payload JSONB;
    prev_entity BOOLEAN;
    next_entity BOOLEAN;
BEGIN
    channel:= TG_ARGV[0];
    table_name:= TG_TABLE_NAME;
    IF (TG_OP = 'DELETE') THEN
        row = OLD;
    ELSE
        row = NEW;
    END IF;
    payload = jsonb_build_object(
            'id', row.id,
            'mutation_type', TG_OP,
            '_entity', row);
    IF payload -> '_entity' ? '_block_range' then
      payload = payload #- '{"_entity","_id"}';
      payload = payload #- '{"_entity","_block_range"}';
        IF NOT upper_inf(row._block_range) then
          EXECUTE FORMAT(
            'SELECT EXISTS (SELECT 1 FROM "${schema}".%I WHERE id = $1 AND lower(_block_range) = upper($2))',
            TG_TABLE_NAME
          )
          INTO next_entity
          USING row.id, row._block_range;

          IF NOT next_entity THEN
            payload = payload || '{"mutation_type": "DELETE"}';
          ELSE
            RETURN NULL;
          END IF;
        ELSE
          EXECUTE FORMAT(
            'SELECT EXISTS (SELECT 1 FROM "${schema}".%I WHERE id = $1 AND upper(_block_range) = lower($2))',
            TG_TABLE_NAME
          )
          INTO prev_entity
          USING row.id, row._block_range;

          IF NOT prev_entity THEN
            payload = payload || '{"mutation_type": "INSERT"}';
          ELSE
            payload = payload || '{"mutation_type": "UPDATE"}';
          END IF;
        END IF;
    END IF;
    IF (octet_length(payload::text) >= 8000) THEN
        payload = payload || '{"_entity": null}';
    END IF;
    PERFORM pg_notify(
            channel::text,
            payload::text);
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;`;
}

But this falls down with historical indexing, because when an entity is updated, it will create a new record + update the previous record. As the trigger that calls this function is called for each operation its not easy to determine whether it should be an insert or update (delete works fine).

There will also be a performance impact on this as it needs to make extra select queries

I've parked my work here https://github.com/subquery/subql/tree/sub-historical-mutation