neume-network / extraction-worker

Async worker_thread component for maximalizing concurrent data retrieval and processing.
GNU General Public License v3.0
3 stars 5 forks source link

extraction-worker should parallelize different root domains #19

Open TimDaub opened 2 years ago

TimDaub commented 2 years ago
TimDaub commented 2 years ago

better-queue has a priority function that can inform order: https://github.com/diamondio/better-queue#filtering-validation-and-priority

il3ven commented 2 years ago

Thinking out loud: We can give each task a random priority out of 5.

TimDaub commented 2 years ago

Thinking out loud: We can give each task a random priority out of 5.

Yeah, randomizing all incoming request could be a good parallelization strategy for now that doesn't require lots of other effort. Good idea, let's test this.

il3ven commented 2 years ago

I tried this out and it didn't work. for some reason the order didn't change. this statement is based on the fact that the order in which the files were written to data was the same. if the messages are executed in random order i expect the data to be written random order.

also if the range of priority was big like 50 then i started getting a strange error. FetchError: request to https://node.rugpullindex.com/ failed, reason: Client network socket disconnected before secure TLS connection was established

i will try it out later again.

TimDaub commented 2 years ago

weird. But I'd have to see code to comment meaningfully.

il3ven commented 2 years ago

Here's the code. The only change I did was in extraction-worker/src/worker.mjs.

export function run() {
  log(
    `Starting as worker thread with queue options: "${JSON.stringify(
      workerData.queue.options
    )}`
  );
  const queue = new Queue(messages.route, {
    ...workerData.queue.options,
    priority: function (message, cb) {
      const pr = Math.floor(Math.random() * 1000)
      cb(null, pr);
    },
  });
  queue.on("task_finish", loggingProxy(queue, reply));
  queue.on("task_failed", loggingProxy(queue, panic));
  parentPort.on("message", messageHandler(queue));
  return queue;
}
TimDaub commented 2 years ago

I've cross-checked this with the better-queue docs and to me it looks correct.