Azure / azure-sdk-for-js

This repository is for active development of the Azure SDK for JavaScript (NodeJS & Browser). For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/javascript/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-js.
MIT License
2.07k stars 1.19k forks source link

[cosmos] Bulk API does not retry when some Operations throttle #29100

Open Lougnar opened 6 months ago

Lougnar commented 6 months ago

Describe the bug Context: I use an Azure function to compute data from an eventhub to cosmosDB with the bulk API. The throtled operations are not retried according to the retry policy set in the CosmosClient.

connectionPolicy: {
    retryOptions: {
      maxRetryAttemptCount: 50,
      fixedRetryIntervalInMilliseconds: 1_000,
      maxWaitTimeInSeconds: 120,
    },
 },

To Reproduce Steps to reproduce the behavior:

  1. Create a CosmosClient with a retry policy
  2. Try to Upsert 100 documents on a container provisionned with 400 RU
  3. Response return with some failed operation (statusCode: 429)

Expected behavior The failed operations should be retried according to the retryPolicy

Additional context

I dived a bit into the sdk codebase. I found that all the operations are sent in a single request to : https://:443/dbs//colls//docs

The server respond with a status 207 and a body with all the operations status. The sdk only check the status of the request to throw an error that will be catched and retried if the status code is 429. Since the status code of the request is 207 no error are thrown and no check are performed on the request body to ensure that all the operations are successful.

Here the small node script i use to reproduce the issue:

const cosmos = require("@azure/cosmos");
const crypto = require("crypto");
const dotEnv = require("dotenv");

const id = crypto.randomUUID();
dotEnv.config();

const client = new cosmos.CosmosClient({
  endpoint: process.env["COSMOS_DATABASE_ENDPOINT"],
  key: process.env["COSMOS_DATABASE_KEY"],
  connectionPolicy: {
    retryOptions: {
      maxRetryAttemptCount: 50,
      fixedRetryIntervalInMilliseconds: 1_000,
      maxWaitTimeInSeconds: 120,
    },
  },
});
const container = client
  .database(process.env["COSMOS_DATABASE_ID"])
  .container(process.env["COSMOS_CONTAINER_ID"]);

const operationsChunk = Array.from({ length: 100 }, (_, index) => ({
  operationType: cosmos.BulkOperationType.Upsert,
  id: `${id}-${index}`,
  partitionKey: `serial-${id}-${index}`,
  resourceBody: {
    id: `${id}-${index}`,
    serial: `serial-${id}-${index}`,
    history: [{ value: `${id}-${index}`, ts: 0 }],
  },
}));

(async () => {
  const results = await container.items.bulk(operationsChunk, {
    continueOnError: true,
  });
  console.log(results.filter(({ statusCode }) => statusCode === 429));
})();
github-actions[bot] commented 6 months ago

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @sajeetharan @simorenoh @v1k1.

akcyp commented 1 month ago

I confirm that the problem still exists. My setup:

1) Creating db & collection

import { CosmosClient, PartitionKeyKind } from "@azure/cosmos";
import { ProxyAgent } from "proxy-agent";

process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
process.env.HTTPS_PROXY = "http://example.local:8000";
process.env.HTTP_PROXY = "http://example.local:8000";
process.env.NO_PROXY = 'example.local';

const client = new CosmosClient({
  endpoint: "https://localhost:8081/",
  key: process.env.DB_KEY,
  connectionPolicy: {
    retryOptions: {
      maxRetryAttemptCount: 10,
      fixedRetryIntervalInMilliseconds: 1_000,
      maxWaitTimeInSeconds: 30,
    },
    requestTimeout: 70000,
  },
  agent: new ProxyAgent({
    rejectUnauthorized: false,
  }),
});

(async () => {
  const { resources: databaseList } = await client.databases.readAll().fetchAll();
  for (const database of databaseList) {
    await client.database(database.id).delete();
  }
  console.log("Cleared all databases");

  const { database } = await client.databases.createIfNotExists({ id: "exampledb" });
  console.log("Created db");

  const { container } = await database.containers.createIfNotExists({
    id: "examplecontainer",
    throughput: 400,
    partitionKey: {
      paths: ["/PK"],
      kind: PartitionKeyKind.Hash,
    },
    conflictResolutionPolicy: {
      mode: "LastWriterWins",
      conflictResolutionPath: "/_ts",
    },
    indexingPolicy: {
      indexingMode: "consistent",
      automatic: true,
      includedPaths: [
        {
          path: "/*",
        },
      ],
      excludedPaths: [
        {
          path: '/"_etag"/?',
        },
      ],
    },
  });
  console.log("Created container");
})();

2) Importing the data

import { BulkOperationType, CosmosClient } from "@azure/cosmos";
import { randomUUID } from "crypto";
import { ProxyAgent } from "proxy-agent";

process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
process.env.HTTPS_PROXY = "http://example.local:8000";
process.env.HTTP_PROXY = "http://example.local:8000";
process.env.NO_PROXY = 'example.local';

const client = new CosmosClient({
  endpoint: "https://localhost:8081/",
  key: process.env.DB_KEY,
  connectionPolicy: {
    retryOptions: {
      maxRetryAttemptCount: 10,
      fixedRetryIntervalInMilliseconds: 1_000,
      maxWaitTimeInSeconds: 30,
    },
    requestTimeout: 70000,
  },
  agent: new ProxyAgent({
    rejectUnauthorized: false,
  }),
});

const id = randomUUID();
const operationsChunk = Array.from({ length: 100 }, (_, index) => ({
  operationType: BulkOperationType.Create,
  id: `${id}-${index}`,
  partitionKey: `serial-${index % 5}`,
  resourceBody: {
    id: `${id}-${index}`,
    index,
    PK: `serial-${index % 5}`,
  },
}));

function* chunkify<T>(arr: T[], size: number) {
  for (let i = 0; i < arr.length; i += size) {
    yield arr.slice(i, i + size);
  }
}

(async () => {
  const container = client.database('exampledb').container('examplecontainer');

  for (const chunk of chunkify(operationsChunk, 50)) {
    const results = await container.items.bulk(chunk, {
      continueOnError: true,
    });
    console.log(results.filter(({ statusCode }) => statusCode === 429));
  }
})();

2nd script logs:

[]
[
  { statusCode: 429, requestCharge: 0, retryAfterMilliseconds: 13 },
  { statusCode: 429, requestCharge: 0, retryAfterMilliseconds: 13 },
  { statusCode: 429, requestCharge: 0, retryAfterMilliseconds: 13 },
  { statusCode: 429, requestCharge: 0, retryAfterMilliseconds: 13 },
  { statusCode: 429, requestCharge: 0, retryAfterMilliseconds: 13 },
  { statusCode: 429, requestCharge: 0, retryAfterMilliseconds: 13 },
  { statusCode: 429, requestCharge: 0, retryAfterMilliseconds: 13 }
]

Proxy logs: image We can see that no additional request was made - (expected 2 / done 2 - even though the last one partially failed).