pozil / pub-sub-api-node-client

A node client for the Salesforce Pub/Sub API
Creative Commons Zero v1.0 Universal
71 stars 40 forks source link

"Failed to load Schema for topic *topic*" when using a custom channel #27

Closed DidZ0 closed 7 months ago

DidZ0 commented 8 months ago

Hi ! Thanks for the good work !

I'm having an issue when i try to subscribe to a custom channel

sync-engine-server-1  | Received 1 events, latest replay ID: 487259
sync-engine-server-1  | [Nest] 111  - 03/01/2024, 2:25:52 PM   ERROR [SalesforceConnectionStrategy] gRPC stream error: 
sync-engine-server-1  | {"cause":{},"replayId":487259,"event":{"event":{"id":"594594b7-fd12-4cd8-a77b-030e033bec19","schemaId":"i6_JU3VTbNhRZYbMVlV-xQ","payload":{"type":"Buffer","data":[28,65,99,116,105,111,110,86,101,110,116,101,95,95,99,2,36,97,50,49,65,85,48,48,48,48,48,48,70,83,67,53,89,65,79,0,2,104,99,111,109,47,115,97,108,101,115,102,111,114,99,101,47,97,112,105,47,115,111,97,112,47,54,48,46,48,59,99,108,105,101,110,116,61,83,102,100,99,73,110,116,101,114,110,97,108,65,80,73,47,72,48,48,48,48,49,57,56,51,45,52,102,100,102,45,98,50,99,102,45,51,102,48,57,45,50,50,98,102,100,50,101,51,49,49,55,51,2,176,230,193,166,191,99,136,128,209,247,160,138,213,184,47,36,48,48,53,53,74,48,48,48,48,48,49,68,56,57,77,81,65,83,0,0,2,12,48,120,50,48,56,48,0,0,0,0,0,0,0,2,176,230,193,166,191,99,0,0,0,0,0,2,16,97,114,115,116,114,97,115,116,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"replayId":{"type":"Buffer","data":[0,0,0,0,0,7,111,91,0,0]}},"latestReplayId":487259}
sync-engine-server-1  | 
sync-engine-server-1  | EventParseError: Failed to parse event with replay ID 487259
sync-engine-server-1  |     at /usr/app/node_modules/salesforce-pubsub-api-client/dist/client.cjs:780:34
sync-engine-server-1  |     at processTicksAndRejections (node:internal/process/task_queues:95:5) {
sync-engine-server-1  |   cause: Error: Failed to load schema for topic /data/SyncEngine_AccountChangeEvent__chn
sync-engine-server-1  |       at PubSubApiClient.#getEventSchema (/usr/app/node_modules/salesforce-pubsub-api-client/dist/client.cjs:923:15)
sync-engine-server-1  |       at processTicksAndRejections (node:internal/process/task_queues:95:5)
sync-engine-server-1  |       at /usr/app/node_modules/salesforce-pubsub-api-client/dist/client.cjs:762:28 {
sync-engine-server-1  |     [cause]: Error: 3 INVALID_ARGUMENT: Schema validation failed. The schema ID cannot be blank. rpcId: e563498d-fe38-4d21-b96c-dc30a71942ea
sync-engine-server-1  |         at callErrorFromStatus (/usr/app/node_modules/@grpc/grpc-js/src/call.ts:82:17)
sync-engine-server-1  |         at Object.onReceiveStatus (/usr/app/node_modules/@grpc/grpc-js/src/client.ts:360:55)
sync-engine-server-1  |         at Object.onReceiveStatus (/usr/app/node_modules/@grpc/grpc-js/src/client-interceptors.ts:458:34)
sync-engine-server-1  |         at Object.onReceiveStatus (/usr/app/node_modules/@grpc/grpc-js/src/client-interceptors.ts:419:48)
sync-engine-server-1  |         at /usr/app/node_modules/@grpc/grpc-js/src/resolving-call.ts:132:24
sync-engine-server-1  |         at processTicksAndRejections (node:internal/process/task_queues:77:11)
sync-engine-server-1  |     for call at
sync-engine-server-1  |         at ServiceClientImpl.makeUnaryRequest (/usr/app/node_modules/@grpc/grpc-js/src/client.ts:325:42)
sync-engine-server-1  |         at ServiceClientImpl.<anonymous> (/usr/app/node_modules/@grpc/grpc-js/src/make-client.ts:189:15)
sync-engine-server-1  |         at Object.callback (/usr/app/node_modules/salesforce-pubsub-api-client/dist/client.cjs:943:24)
sync-engine-server-1  |         at Object.onReceiveStatus (/usr/app/node_modules/@grpc/grpc-js/src/client.ts:356:28)
sync-engine-server-1  |         at Object.onReceiveStatus (/usr/app/node_modules/@grpc/grpc-js/src/client-interceptors.ts:458:34)
sync-engine-server-1  |         at Object.onReceiveStatus (/usr/app/node_modules/@grpc/grpc-js/src/client-interceptors.ts:419:48)
sync-engine-server-1  |         at /usr/app/node_modules/@grpc/grpc-js/src/resolving-call.ts:132:24
sync-engine-server-1  |         at processTicksAndRejections (node:internal/process/task_queues:77:11) {
sync-engine-server-1  |       code: 3,
sync-engine-server-1  |       details: 'Schema validation failed. The schema ID cannot be blank. rpcId: e563498d-fe38-4d21-b96c-dc30a71942ea',
sync-engine-server-1  |       metadata: [Metadata]
sync-engine-server-1  |     }
sync-engine-server-1  |   },
sync-engine-server-1  |   replayId: 487259,
sync-engine-server-1  |   event: {
sync-engine-server-1  |     event: {
sync-engine-server-1  |       id: '594594b7-fd12-4cd8-a77b-030e033bec19',
sync-engine-server-1  |       schemaId: 'i6_JU3VTbNhRZYbMVlV-xQ',
sync-engine-server-1  |       payload: <Buffer 1c 41 63 74 69 6f 6e 56 65 6e 74 65 5f 5f 63 02 24 61 32 31 41 55 30 30 30 30 30 30 46 53 43 35 59 41 4f 00 02 68 63 6f 6d 2f 73 61 6c 65 73 66 6f 72 ... 186 more bytes>
sync-engine-server-1  |     },
sync-engine-server-1  |     replayId: <Buffer 00 00 00 00 00 07 6f 5b 00 00>
sync-engine-server-1  |   },
sync-engine-server-1  |   latestReplayId: 487259
sync-engine-server-1  | }

The same channel is working fine through the streaming API (using JSForce).

I got it to work with a standard channel such as /data/AccountChangeEvent but it is no longer working for a different reason.

sync-engine-server-1  | [Nest] 176  - 03/01/2024, 3:13:25 PM   ERROR [SalesforceConnectionStrategy] gRPC stream error: 
sync-engine-server-1  | {"cause":{},"replayId":487273,"event":{"event":{"id":"39709eed-bb76-4d8a-af0f-6e2e3995309a","schemaId":"8y1L6AvVNJ-Yx_MkMe8u2Q","payload":{"type":"Buffer","data":[14,65,99,99,111,117,110,116,2,36,48,48,49,53,74,48,48,48,48,48,52,84,78,116,122,81,65,71,0,2,104,99,111,109,47,115,97,108,101,115,102,111,114,99,101,47,97,112,105,47,115,111,97,112,47,54,48,46,48,59,99,108,105,101,110,116,61,83,102,100,99,73,110,116,101,114,110,97,108,65,80,73,47,72,48,48,48,48,49,99,50,102,45,99,99,101,55,45,53,54,98,99,45,54,48,51,48,45,53,101,97,56,101,51,98,100,48,53,102,54,2,192,136,158,169,191,99,134,128,252,171,166,176,214,184,47,36,48,48,53,53,74,48,48,48,48,48,49,68,56,57,77,81,65,83,0,0,2,48,48,120,48,56,48,48,48,48,48,48,48,48,48,48,48,48,48,49,48,48,48,48,48,48,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,192,136,158,169,191,99,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,16,84,48,48,48,48,52,56,52,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]}},"replayId":{"type":"Buffer","data":[0,0,0,0,0,7,111,105,0,0]}},"latestReplayId":487273}
sync-engine-server-1  | 
sync-engine-server-1  | EventParseError: Failed to parse event with replay ID 487273
sync-engine-server-1  |     at /usr/app/node_modules/salesforce-pubsub-api-client/dist/client.cjs:780:34
sync-engine-server-1  |     at processTicksAndRejections (node:internal/process/task_queues:95:5) {
sync-engine-server-1  |   cause: TypeError: Do not know how to serialize a BigInt
sync-engine-server-1  |       at JSON.stringify (<anonymous>)
sync-engine-server-1  |       at PubSubEventEmitter.<anonymous> (/usr/app/dist/apps/server/webpack:/src/app/modules/connection/salesforce/salesforce-connection.strategy.ts:189:24)
sync-engine-server-1  |       at PubSubEventEmitter.emit (node:events:531:35)
sync-engine-server-1  |       at PubSubEventEmitter.emit (/usr/app/node_modules/salesforce-pubsub-api-client/dist/client.cjs:115:18)
sync-engine-server-1  |       at /usr/app/node_modules/salesforce-pubsub-api-client/dist/client.cjs:772:28
sync-engine-server-1  |       at processTicksAndRejections (node:internal/process/task_queues:95:5),
sync-engine-server-1  |   replayId: 487273,

Am i doing something wrong ?

Here is the code :

  /**
   * Subscribes to Salesforce events using the provided connection.
   * @param connection - The Salesforce connection document.
   * @param conn - The jsforce connection instance.
   */
  private async subscribeToSalesforceEvents(connection: SalesforceConnectionDocument, conn: jsforce.Connection): Promise<void> {
    const client = new PubSubApiClient();
    const channels = ['AccountChangeEvent'];
    try {
      await client.connectWithAuth(conn.accessToken, conn.instanceUrl, connection.organizationId);
      channels.forEach(channel => {
        client.subscribe("/data/"+channel, 10).then(eventEmitter => {
          this.setupEventListeners(eventEmitter,client);
        });
      });
    } catch (err) {
      this.logger.error("Error subscribing to Salesforce events: ", err);
    }
  }

  /**
   * Sets up event listeners for a Salesforce event subscription.
   * @param eventEmitter - The event emitter for the subscription.
   */
  private setupEventListeners(eventEmitter: PubSubEventEmitter, client: PubSubApiClient): void {
    eventEmitter.on('connect', () => this.logger.log('Connected to gRPC stream.'));
    eventEmitter.on('data', event => this.logger.log('Event received: ', event));
    eventEmitter.on('data', (event) => {
      console.log(
          `Handling ${event.payload.ChangeEventHeader.entityName} change event ` +
              `with ID ${event.replayId} ` +
              `on channel ${eventEmitter.getTopicName()} ` +
              `(${eventEmitter.getReceivedEventCount()}/${eventEmitter.getRequestedEventCount()} ` +
              `events received so far)`
      );
      console.log(JSON.stringify(event, null, 2));
    });
    eventEmitter.on('end', () => this.logger.log('gRPC stream ended.'));
    eventEmitter.on('error', error => this.logger.error('gRPC stream error: ', JSON.stringify(error)));
    eventEmitter.on('lastevent', () => client.requestAdditionalEvents(eventEmitter, 10));
  }
rjruigrok commented 7 months ago

I have the same issue, with custom channels it cannot fetch the schema. Would be great to have support for custom channels too.

So far this module is working really great! Keep up the good work :) 👍

pozil commented 7 months ago

Hi all, thanks for your patience with this, I was swamped with work for TrailblazerDX and World Tour Paris.

I can reproduce the issue and found that this is due to a Pub/Sub API bug that is not specific to this client. The server is not returning an event schema ID for custom channel topics as you can see in this Postman capture:

Screenshot 2024-03-18 at 16 17 52

I've escalated this to the Pub/Sub API product team.

pozil commented 7 months ago

Hi again, I got an answer from the product team. According to them, this is by design: schema IDs are not provided at the topic level form custom channels because this would be a composite schema (an aggregation of the custom channel member schemas).

This means that I need to redesign the way this client work to support this specific scenario. I'll add it to my todo list.