cap-js-community / event-queue

An event queue that enables secure multi-tenant enabled transactional processing of asynchronous events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.
https://cap-js-community.github.io/event-queue/
Apache License 2.0
11 stars 1 forks source link

Add Type Definitions for usage in Typescript #155

Closed niklasvieth closed 2 months ago

niklasvieth commented 3 months ago

Hi @soccermax,

really like the library, currently looking into using it in a project. I prefer using CAP Typescript though, unfortunately does not work out of the box because of the missing type definitions. image

It may be possible to generate the types out of the existing JSDoc (see https://www.typescriptlang.org/docs/handbook/declaration-files/dts-from-js.html)

I tried it, for most of the files it kind of works already.

As temporary workaround I declared the types myself for the methods I may use in the project by adding a event-queue.d.ts file like this:

declare module "@cap-js-community/event-queue" {
  import cds from "@sap/cds";
  type GenericObject = {
    [key: string]:
      | string
      | number
      | boolean
      | GenericObject
      | Array<GenericObject>;
  };
  type CdsLogger = ReturnType<typeof cds.log>;
  type CdsTransaction = cds.Transaction;
  type Config = {
    type: string;
    subType: string;
    priority: string;
    impl: string;
    load: number;
    interval: number;
    internalEvent: boolean;
    isPeriodic: boolean;
  };

  class EventQueueProcessorBase {
    constructor(
      context: cds.EventContext,
      eventType: string,
      eventSubType: string,
      config: Config,
    );

    /**
     * Process one or multiple events depending on the clustering algorithm by default there it's one event
     * @param processContext the context valid for the event processing. This context is associated with a valid transaction
     *                       Access to the context is also possible with this.getContextForEventProcessing(key).
     *                       The associated tx can be accessed with this.getTxForEventProcessing(key).
     * @param {string} key cluster key generated during the clustering step. By default, this is ID of the event queue entry
     * @param {Array<Object>} queueEntries this are the queueEntries which are collected during the clustering step for the given
     *        clustering key
     * @param {Object} payload resulting from the functions checkEventAndGeneratePayload and the clustering function
     * @returns {Promise<Array <Array <String, Number>>>} Must return an array of the length of passed queueEntries
     *          This array needs to be nested based on the following structure: [ ["eventId1", EventProcessingStatus.Done],
     *          ["eventId2", EventProcessingStatus.Error] ]
     */
    processEvent(
      processContext: cds.EventContext,
      key: string,
      queueEntries: Array<object>,
      payload: GenericObject,
    ): Promise<Array<Array<string | number>>>;

    /**
     * Process one periodic event
     * @param processContext the context valid for the event processing. This context is associated with a valid transaction
     *                       Access to the context is also possible with this.getContextForEventProcessing(key).
     *                       The associated tx can be accessed with this.getTxForEventProcessing(key).
     * @param {string} key cluster key generated during the clustering step. By default, this is ID of the event queue entry
     * @param {Object} queueEntry this is the queueEntry which should be processed
     * @returns {Promise<undefined>}
     */
    processPeriodicEvent(
      processContext: cds.EventContext,
      key: string,
      queueEntry: GenericObject,
    ): Promise<void>;

    startPerformanceTracerEvents(): void;
    startPerformanceTracerPeriodicEvents(): void;
    startPerformanceTracerPreprocessing(): void;
    endPerformanceTracerEvents(): void;
    endPerformanceTracerPeriodicEvents(): void;
    endPerformanceTracerPreprocessing(): void;
    logTimeExceeded(iterationCounter: number): void;
    logStartMessage(): void;

    /**
     * This function will be called for every event which should to be processed. Within this function basic validations
     * should be done, e.g. is the event still valid and should be processed. Also, this step should be used to gather the
     * required data for the clustering step. Keep in mind that this function will be called for every event and not once
     * for all events. Mass data select should be done later (beforeProcessingEvents).
     * If no payload is returned the status will be set to done. Transaction is available with this.tx;
     * this transaction will always be rollbacked so do not use this transaction persisting data.
     * @param {Object} queueEntry which has been selected from event queue table and been modified by modifyQueueEntry
     * @returns {Promise<Object>} payload which is needed for clustering the events.
     */
    checkEventAndGeneratePayload(queueEntry: GenericObject): Promise<object>;

    /**
     * This function will be called for every event which should to be processed. This functions sets for every event
     * the payload which will be passed to the clustering functions.
     * @param {Object} queueEntry which has been selected from event queue table and been modified by modifyQueueEntry
     * @param {Object} payload which is the result of checkEventAndGeneratePayload
     */
    addEventWithPayloadForProcessing(
      queueEntry: GenericObject,
      payload: GenericObject,
    ): void;

    /**
     * This function sets the status of an queueEntry to done
     * @param {Object} queueEntry which has been selected from event queue table and been modified by modifyQueueEntry
     */
    setStatusToDone(queueEntry: GenericObject): void;

    /**
     * This function allows to cluster multiple events so that they will be processed together. By default, there is no
     * clustering happening. Therefore, the cluster key is the ID of the event. If an alternative clustering is needed
     * this function should be overwritten. For every cluster-key the function processEvent will be called once.
     * This can be useful for e.g. multiple tasks have been scheduled and always the same user should be informed.
     * In this case the events should be clustered together and only one mail should be sent.
     */
    clusterQueueEntries(queueEntriesWithPayloadMap: object): void;

    /**
     * This function allows to add entries to the process map. This function is needed if the function clusterQueueEntries
     * is redefined. For each entry in the processing map the processEvent function will be called once.
     * @param {String} key key for event
     * @param {Object} queueEntry queueEntry which should be clustered with this key
     * @param {Object} payload payload which should be clustered with this key
     */
    addEntryToProcessingMap(
      key: string,
      queueEntry: GenericObject,
      payload: GenericObject,
    ): void;

    /**
     * This function sets the status of multiple events to a given status. If the structure of queueEntryProcessingStatusTuple
     * is not as expected all events will be set to error. The function respects the config transactionMode. If
     * transactionMode is isolated the status will be written to a dedicated map and returned afterwards to handle concurrent
     * event processing.
     * @param {Array} queueEntries which has been selected from event queue table and been modified by modifyQueueEntry
     * @param {Array<Object>} queueEntryProcessingStatusTuple Array of tuple <queueEntryId, processingStatus>
     * @param {boolean} returnMap Allows the function to allow the result as map
     * @return {Object} statusMap Map which contains all events for which a status has been set so far
     */
    setEventStatus(
      // eslint-disable-next-line @typescript-eslint/no-explicit-any
      queueEntries: any[],
      queueEntryProcessingStatusTuple: Array<object>,
      returnMap?: boolean,
    ): object;

    /**
     * This function allows to modify a select queueEntry (event) before processing. By default, the payload of the event
     * is parsed. The return value of the function is ignored, it's required to modify the reference which is passed into
     * the function.
     * @param {Object} queueEntry which has been selected from event queue table
     */
    modifyQueueEntry(queueEntry: GenericObject): void;

    handleErrorDuringProcessing(
      error: unknown,
      // eslint-disable-next-line @typescript-eslint/no-explicit-any
      queueEntries: Array<object>,
    ): {
      // eslint-disable-next-line @typescript-eslint/no-explicit-any
      [k: string]: any;
    };
    handleErrorDuringPeriodicEventProcessing(
      error: unknown,
      queueEntry: GenericObject,
    ): void;
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    setPeriodicEventStatus(queueEntryIds: any, status: any): Promise<void>;

    /**
     * This function validates for all selected events one status has been submitted. It's also validated that only for
     * selected events a status has been submitted. Persisting the status of events is done in a dedicated database tx.
     * The function accepts no arguments as there are dedicated functions to set the status of events (e.g. setEventStatus)
     */
    persistEventStatus(
      tx: CdsTransaction,
      {
        skipChecks,
        statusMap,
      }?: {
        skipChecks: boolean;
        statusMap?: object;
      },
    ): Promise<void>;
    handleErrorDuringClustering(error: unknown): void;
    handleInvalidPayloadReturned(queueEntry: GenericObject): void;

    /**
     * This function selects all relevant events based on the eventType and eventSubType supplied through the constructor
     * during initialization of the class.
     * Relevant Events for selection are: open events, error events if the number retry attempts has not been succeeded or
     * events which are in progress for longer than 30 minutes.
     * @return {Promise<Array<Object>>} all relevant events for processing for the given eventType and eventSubType
     */
    getQueueEntriesAndSetToInProgress(): Promise<Array<object>>;
    checkTxConsistency(tx: CdsTransaction): Promise<void>;
    handleExceededEvents(): Promise<void>;
    set processEventContext(context: cds.EventContext);

    /**
     * This function enables the possibility to execute custom actions for events for which the retry attempts have been
     * exceeded. As always a valid transaction is available with this.tx. This transaction will be committed after the
     * execution of this function.
     * @param {Object} exceededEvent exceeded event queue entry
     */
    hookForExceededEvents(exceededEvent: object): Promise<void>;

    /**
     * This function serves the purpose of mass enabled preloading data for processing the events which are added with
     * the function 'addEventWithPayloadForProcessing'. This function is called after the clustering and before the
     * process-events-steps. The event data is available with this.eventProcessingMap.
     */
    beforeProcessingEvents(): Promise<void>;

    /**
     * This function checks if the db records of events have been modified since the selection (beginning of processing)
     * If the db records are unmodified the field lastAttemptTimestamp of the records is updated to
     * "send a keep alive signal". This extends the allowed processing time of the events as events which are in progress
     * for more than 30 minutes (global tx timeout) are selected with the next tick.
     * If events are outdated/modified these events are not being processed and no status will be persisted.
     * @return {Promise<boolean>} true if the db record of the event has been modified since selection
     */
    isOutdatedAndKeepalive(queueEntries: Array<object>): Promise<boolean>;
    acquireDistributedLock(): Promise<boolean>;
    handleReleaseLock(): Promise<void>;
    scheduleNextPeriodEvent(queueEntry: GenericObject): Promise<boolean>;
    handleDuplicatedPeriodicEventEntry(
      queueEntries: Array<object>,
    ): // eslint-disable-next-line @typescript-eslint/no-explicit-any
    Promise<any>;

    /**
     * Asynchronously gets the timestamp of the last successful run.
     *
     * @returns {Promise<string|null>} A Promise that resolves to a string representation of the timestamp
     * of the last successful run (in ISO 8601 format: YYYY-MM-DDTHH:mm:ss.sss),
     * or null if there has been no successful run yet.
     *
     * @example
     * const timestamp = await instance.getLastSuccessfulRunTimestamp();
     * console.log(timestamp);  // Outputs: 2023-12-07T09:15:44.237
     *
     * @throws {Error} If an error occurs while fetching the timestamp.
     */
    getLastSuccessfulRunTimestamp(): Promise<string | null>;
    statusMapContainsError(statusMap: object): boolean;
    clearEventProcessingContext(): void;
    broadCastEvent(): void;

    set logger(value: CdsLogger);
    get logger(): CdsLogger;
    get queueEntriesWithPayloadMap(): object;
    get eventProcessingMap(): object;
    get parallelEventProcessing(): boolean;
    get concurrentEventProcessing(): boolean;
    get tx(): CdsTransaction;
    get context(): cds.EventContext;

    get isPeriodicEvent(): boolean;
  }

  /**
   * Asynchronously publishes a series of events to the event queue.
   *
   * @param {CdsTransaction} tx - The transaction object to be used for database operations.
   * @param {Array|Object} events - An array of event objects or a single event object. Each event object should match the Event table structure:
   *   {
   *     type: String, // Event type. This is a required field.
   *     subType: String, // Event subtype. This is a required field.
   *     referenceEntity: String, // Reference entity associated with the event.
   *     referenceEntityKey: UUID, // UUID key of the reference entity.
   *     status: Status, // Status of the event, defaults to 0.
   *     payload: LargeString, // Payload of the event.
   *     attempts: Integer, // The number of attempts made, defaults to 0.
   *     lastAttemptTimestamp: Timestamp, // Timestamp of the last attempt.
   *     createdAt: Timestamp, // Timestamp of event creation. This field is automatically set on insert.
   *     startAfter: Timestamp, // Timestamp indicating when the event should start after.
   *   }
   * @param {Boolean} skipBroadcast - (Optional) If set to true, event broadcasting will be skipped. Defaults to false.
   * @throws {EventQueueError} Throws an error if the configuration is not initialized.
   * @throws {EventQueueError} Throws an error if the event type is unknown.
   * @throws {EventQueueError} Throws an error if the startAfter field is not a valid date.
   * @returns {Promise} Returns a promise which resolves to the result of the database insert operation.
   */
  export function publishEvent(
    tx: CdsTransaction,
    events: object[] | object,
    skipBroadcast?: boolean,
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
  ): Promise<any>;
}

Would be awesome if the library exports the types of the most commonly used methods like publishEvent, processPeriodicEvent and processEvent.

Thanks in advance! BR Niklas

soccermax commented 3 months ago

Sure I can look into that. Thanks for already providing types file. Just FYI: functions like processPeriodicEvent and processEvent should not be called specificallyy. The framework will do this. The function to trigger the processing of the event-queue manually is processEventQueue --> this function is also exposed via index.js of the package.

Maybe I need to add this to the documentation.