Azure / azure-functions-durable-js

JavaScript library for using the Durable Functions bindings
https://www.npmjs.com/package/durable-functions
MIT License
129 stars 47 forks source link

`context.df.Task.any` launches tasks even if the task was already launched and sometimes return result from a different `Task` #574

Open Ayzrian opened 8 months ago

Ayzrian commented 8 months ago

Describe the bug

First I would like to explain the idea I am trying to achieve with Durable Functions. I have two activities:

I want to achieve a result where each batch processing starts as soon as possible, but at the same time we keep producing next batch.

So when we resolve the batch, we do two actions

Then we do Task.any to wait for any of the jobs to finish (keep in mind that there can be multiple activity-process-products running ideally).

In this set-up Task.any can return two tasks:

My expectations that if a job is already launched it won't be launched again we will keep waiting for it to finish, but what happnes in reallity that activity-get-next-products-batch invocation runs longer than activity-process-products, and when the code just want to wait for the batch to arrive the runtime schedules another job, rather than waiting for already scheduled. See gant chart obtained from Durable Functions Monitor.

image

And from the logs here we starting the job as TaskEventId 2 it runs longer that another job started for processing of products.

image

Then when processing is finished we just wait for the next batch to arrive, but the runtime schedules the task again, rather than waiting for already launched task... See the input is the same, but TaskEventId is 3 this time. The code will be attached below.

image


Another issue that I see that when in the code that when both tasks for any finish around the same time, the results are being returned from a wrong Task, though the code checks that the Task being returned === the task that was scheduled to gext batch.

Here on the screenshot below you can see that both tasks finished but have different output. Then we see logs that Get Products Task Finished, this code is

    const finishedTask: Task = yield context.df.Task.any(tasksToWait);

    if (getProductsTask === finishedTask) {

But what happens is that the Task.any actually returned result from a complete different task.

image

Investigative information

If deployed to Azure App Service

To Reproduce Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

The code of orchestrator.

import { Logger } from '../../common/logger';
import { Task } from 'durable-functions/lib/src/task';
import { BloomreachFullFeedOrchestratorParams, GetNextProductsBatchResult, ProcessProductsResult } from './types';
import { RetryOptions } from 'durable-functions';

export default df.orchestrator(function* (context) {
  const logger = new Logger(context.log, {
    correlationId: context.df.newGuid(context.df.instanceId),
  });
  logger.setContext('bloomreach-full-feed-orchestrator');
  if (!context.df.isReplaying) {
    logger.info(`isReplaying=` + String(context.df.isReplaying));
  }

  if (!context.df.isReplaying) {
    logger.info(`Input payload:`, {
      input: JSON.stringify(context.df.getInput()),
    });
  }

  const params: BloomreachFullFeedOrchestratorParams = context.df.getInput();

  let lastCommerceId = null;
  let batchNumber = 1;

  if (!context.df.isReplaying) {
    logger.info('Scheduled first task to get products.');
  }

  let getProductsTask: Task | null = context.df.callActivityWithRetry(
    'activity-get-next-products-batch',
    new RetryOptions(1000, 1),
    {
      lastCommerceId,
      batchNumber,
      orchestratorId: context.bindingData.instanceId,
    }
  );
  let processingTasks: Task[] = [];
  const processingResults: ProcessProductsResult[] = [];

  while (processingTasks.length > 0 || getProductsTask !== null) {
    const tasksToWait = [...processingTasks];

    if (getProductsTask !== null) {
      tasksToWait.push(getProductsTask);
    }

    if (!context.df.isReplaying) {
      logger.info(`Waiting for any of the tasks to finish.`);
    }

    const finishedTask: Task = yield context.df.Task.any(tasksToWait);

    if (getProductsTask === finishedTask) {
      if (!context.df.isReplaying) {
        logger.info('Get Products Task Finished.');
      }

      if (!finishedTask.isCompleted) {
        if (!context.df.isReplaying) {
          logger.warn(`Get Products Task Failed...`);
        }
      }

      const result = finishedTask.result as GetNextProductsBatchResult;

      if (!context.df.isReplaying) {
        logger.info(`Result=${JSON.stringify(result)}`);
      }

      lastCommerceId = result.lastCommerceId;

      if (!context.df.isReplaying) {
        logger.info('Starting new processing job activity.');
      }

      processingTasks.push(
        context.df.callActivityWithRetry('activity-process-products', new RetryOptions(1000, 1), {
          locale: params.locale,
          inputFilePath: result.filePath,
        })
      );

      if (result.hasMoreProducts) {
        if (!context.df.isReplaying) {
          logger.info('Starting getting new batch activity.');
        }

        batchNumber++;
        getProductsTask = context.df.callActivity('activity-get-next-products-batch', {
          lastCommerceId,
          batchNumber,
          orchestratorId: context.bindingData.instanceId,
        });
      } else {
        getProductsTask = null;
        if (!context.df.isReplaying) {
          logger.info('Finished products downloading from the Commercetools.');
        }
      }
    } else {
      if (!context.df.isReplaying) {
        logger.info(
          `Batch processing activity finished. Removing from the active list. Length=${processingTasks.length}`
        );
      }

      processingTasks = processingTasks.filter((task) => finishedTask !== task);

      if (!context.df.isReplaying) {
        logger.info(`LengthAfterFilter=${processingTasks.length}`);
      }

      processingResults.push(finishedTask.result as ProcessProductsResult);
    }
  }

  if (!context.df.isReplaying) {
    logger.info(`Finished processing :)`);
  }

  // then use processing results to merge the files together and do
});

Expected behavior

a) The Task.any doesn't schedule a new task for already launched task. b) The Task.any return the valid result for the task rather than a result from different task.

Actual behavior

a) The Task.any schedule a new task for already launched task. b) The Task.any return the invalid result for the task

Screenshots

See above in the bug description

Known workarounds MAY BE this could be workaround-ed if I treat each activity as a sub-orchestrator in "singleton" mode, where each of them has an id, and than parent somehow will do the similar logic with different subochestrators...

Ayzrian commented 8 months ago

Hey, @castrodd , could you please take a look?

Sorry for pinging u, but see some recent activity from you on this repository.

Thanks.

Ayzrian commented 8 months ago

Or maybe @davidmrdavid , you could take a look at this one?

p.s. sorry for pinging u :)

Thanks

castrodd commented 8 months ago

@Ayzrian I will take a look soon and get back to you. Thank you for your patience!

Ayzrian commented 7 months ago

Hey, @castrodd I wonder if you have any updates for it?

Ayzrian commented 6 months ago

Hey team,

It has been more than a month already, do we have any updates for this issue?

Best Regards

davidmrdavid commented 6 months ago

Hi @Ayzrian - apologies for the delay here, it's been busy. I'll coordinate internally with @castrodd to help debug this one.

There's a lot to grok here but I think I recognize, from the Python SDK, the first behavior you mentioned, that a task inside a Task.any may be launched twice under certain conditions. For example, when provided again in a newly constructed Task.any, which seems to be what your code is doing.

This bug is in part a technical limitation of the legacy protocol the JS SDK is using to communicate with the C# Durable Functions code - the inter-process/out-of-process protocol does not have a notion of "Task ID", meaning the C# Durable Functions code is not able to filter out repeated sub-tasks in a TaskAny and instead believes all sub-tasks are brand new Task scheduling requests.

The latest out-of-process protocol handles this edge-case better, but that will take substantial effort to incorporate so that is not a short term fix. One way to fix this is to filter out, on the SDK side, any Task.Any sub-tasks that have already been scheduled from being sent over the protocol (as otherwise they'll be re-scheduled) but that creates other edge cases. Still, from our experience in the Python SDK, I've come to that performing this SDK-side filtering is a better trade-off, and that the new edge cases it introduces are less likely to be encountered by users. I'll work with @castrodd to develop that fix.

All that said, I'm don't quite understand your second bug report:

b) The Task.any return the invalid result for the task

Can you break this down further? A small example with concrete inputs and outputs would help me understand. Unfortunately, I can't quite download the attached pictures and zooming in with the browser makes them distorted, so I can't review the logs you shared.

Thanks!