JupiterOne / sdk

Home of the JupiterOne SDK
Mozilla Public License 2.0
20 stars 16 forks source link

Speed up `iterateEntities` by sending callbacks to a promise queue #546

Open ndowmon opened 3 years ago

ndowmon commented 3 years ago

Recently, we encountered an integration where a single step that calls jobState.iterateEntities() took multiple hours to execute. Since this step is sequentially reading from disk, waiting for network calls, and writing to disk, it appears that we could significantly speed up certain steps by implementing something like below:

  export async function iterateEntityTypeIndex<T extends Entity = Entity>({
    type,
    iteratee,
  }: IterateIndexInput<T>) {
    const path = buildIndexDirectoryPath({
      collectionType: 'entities',
      type,
    });

+   const queue = new PQueue({ concurrency: 5 }); 
    await walkDirectory({
      path,
      iteratee: async (input) => {
        const object = await readGraphObjectFile<FlushedEntityData>(input);
        if (isObjectFlushedEntityData(object)) {
          for (const entity of object.entities as T[]) {
-           await iteratee(entity);
+           void queue.add(async () => await iteratee(entity));
          }
        }
      },
    });
+   await queue.onIdle();
  }
VDubber commented 2 years ago

This should be benchmarked before desired changes are made.