googleapis / nodejs-storage

Node.js client for Google Cloud Storage: unified object storage for developers and enterprises, from live data serving to data analytics/ML to data archiving.
https://cloud.google.com/storage/
Apache License 2.0
898 stars 370 forks source link

Problem with uploading multiple files simultaneously #2281

Closed majkelEXE closed 1 year ago

majkelEXE commented 1 year ago

My problem appears only sometimes(what is really strange) while saving multiple files simultaneously to cloud storage. I am using google-cloud/storage library version 6.9.5. I was able to find out people having the same issue however none of their solutions worked for me(neither changing stream options nor applying timeout). Firstly I thought that bandwidth could be exceeded, however I am almost sure we are not saving over 50Gb of data(I tried to check it using cloud charts; however you can check bandwidth usage only if other services are using cloud storage - in my case i get "No data available").

For further explanation: In my code there is a function called "uploadFileToGCS" which is invoked around 57 times per each process. Process includes saving 35jpg(20KB per one) images, 1 tiff image(500KB) and 7 json(150KB per one) file. Simoultaneously 30 processes are invoked.

Here is exactly what error says(different files throw this error, sometimes jpg, sometimes json - there is no rule):

error - unhandledRejection: FetchError: request to https://storage.googleapis.com/upload/storage/v1/b/satellite-photos/o?uploadType=multipart&name=64d162a80103b55f87a6cdb4_64db59204f7cd1f34a65e5c7_2023_6_31_raw.tiff failed, reason: read ECONNRESET
    at ClientRequest.<anonymous> (D:\WORK\nirby-project\node_modules\next\dist\compiled\node-fetch\index.js:1:65756)
    at ClientRequest.emit (node:events:525:35)
    at TLSSocket.socketErrorListener (node:_http_client:496:9)
    at TLSSocket.emit (node:events:513:28)
    at emitErrorNT (node:internal/streams/destroy:151:8)
    at emitErrorCloseNT (node:internal/streams/destroy:116:3)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21) {
  type: 'system',
  errno: 'ECONNRESET',
  code: 'ECONNRESET'
}

function saving 5 images(which is invoked 7 times in the same time):

saveLayers.ts:

 import { saveSinglePhoto } from "~/helpers/sentinel/saveSinglePhoto";

interface INdviData {
    min: number;
    max: number;
    averageNDVI: number;
    buffer: Buffer;
}

const saveLayers = async (
    fileNameBase: string,
    trueColorData: Buffer,
    ndviData: INdviData,
    contrastNdviData: Buffer,
    sclData: Buffer,
    clmData: Buffer
) => {
    console.log(fileNameBase);

    const data = await Promise.all([
        saveSinglePhoto(trueColorData, fileNameBase, "TRUECOLOR"),
        saveSinglePhoto(ndviData.buffer, fileNameBase, "NDVI"),
        saveSinglePhoto(contrastNdviData, fileNameBase, "CONTRAST_NDVI"),
        saveSinglePhoto(sclData, fileNameBase, "SCL"),
        saveSinglePhoto(clmData, fileNameBase, "CLM"),
    ]);

    return data;
};

export default saveLayers;

saveSinglePhoto.ts

import { SatelliteData } from "~/interfaces/sentinel/SatelliteData";
import uploadFileToGCS from "../gcs/uploadFileToGCS";

const baseUrl = `https://storage.googleapis.com/${process.env.GCLOUD_STORAGE_BUCKET}/`;

export const saveSinglePhoto = async (
    buffer: Buffer,
    fileNameBase: string,
    layerId: string
): Promise<SatelliteData | null> => {
    const fileName = fileNameBase + layerId + ".jpg";

    await uploadFileToGCS(fileName, buffer, "image/jpeg");

    const satelliteData: SatelliteData = {
        layerId: layerId,
        fileUrl: baseUrl + fileName,
    };

    return satelliteData;
};

uploadFileToGCS.ts

import { Storage } from "@google-cloud/storage";

const gcsKey = JSON.parse(Buffer.from(process.env.GCLOUD_CRED_FILE, "base64").toString());

const storage = new Storage({
    credentials: {
        client_email: gcsKey.client_email,
        private_key: gcsKey.private_key,
    },
    projectId: process.env.GCLOUD_PROJECT_ID,
});

const uploadFileToGCS = (filename: string, data: any, contentType: string) => {
    return new Promise((resolve, reject) => {
        const file = storage.bucket(process.env.GCLOUD_STORAGE_BUCKET).file(filename);

        const stream = file.createWriteStream({
            metadata: {
                contentType,
            },
            resumable: false,
            validation: false,
            timeout: 86400,
        });

        stream.on("error", (err) => {
            reject(err);
        });

        stream.on("finish", () => {
            resolve("ok");
        });

        stream.end(data);
    });
};

export default uploadFileToGCS;
ddelgrosso1 commented 1 year ago

@majkelEXE just to clarify something, there are 57 calls to upload a file per process, and there are 30 processes, so a potential total of 30 * 57 = 1710 concurrently running uploads, is this accurate?

majkelEXE commented 1 year ago

@ddelgrosso1 Thank you for the response. Yes this is the exact amount and the worst part is - it is happening for only one client, so in case of multiple requests to server this amount can dramatically increase. I have realized that it may be a problem with connection concurrency, but still have no clue what to do - because we cannot skip any of this update. Should I prepare a kind of a queue or something?

ddelgrosso1 commented 1 year ago

@majkelEXE my suspicion is that the amount of concurrent requests is causing some connections to go idle long enough that the server closes them resulting in the ECONNRESET.

I have done some informal testing on concurrent operations and have generally seen problems start to popup when climbing above 500 concurrent ops. This number can vary widely based on system specs / resources but to give you an idea I generally test on a GCE C2-Standard-60 instance.

A few things I might suggest to see if it helps alleviate the issue:

  1. Configure retries for the uploads by passing a precondition option. Uploads by default are conditionally idempotent which means they will not be retried unless the condition passes. You can also set the retry strategy to RetryAlways but this is a wider net and can potentially have unwanted side effects depending on your application logic / requirements.

  2. Utilize something like pLimit to throttle the number of concurrently running operations. I can't say what the magic number here would be for you but I generally don't do much testing above 50.

  3. As you noted, implement some kind of queueing mechanism.

majkelEXE commented 1 year ago

@ddelgrosso1 I was advised to use Cloud Tasks instead of direct inserts to Cloud Storage, however I faced another problem. My code is freezing waiting for the response from createTask .

If I omit this await the problem disappears, however I need to know the name of the task, so I cannot skip the response.

Here is the code:

import { v2beta3 } from "@google-cloud/tasks";
import { google } from "@google-cloud/tasks/build/protos/protos";

const client = new v2beta3.CloudTasksClient();

const project = process.env.GCLOUD_PROJECT_ID;
const location = process.env.GCLOUD_QUEUE_LOCATION;
const queue = process.env.GCLOUD_QUEUE_NAME;

const parent = client.queuePath(project, location, queue);

const gcsKey = JSON.parse(Buffer.from(process.env.GCLOUD_CRED_FILE, "base64").toString());
const email = gcsKey.client_email;

const createUploadTask = async (filename: string, bucket: string, contentType: string, data: any) => {
    const payload = {
        filename,
        bucket,
        contentType,
        data,
    };

    const url = `${process.env.GCLOUD_FUNCTIONS_URL}/uploadToCS`;

    const task = {
        httpRequest: {
            httpMethod: google.cloud.tasks.v2beta3.HttpMethod.POST,
            url,
            oidcToken: {
                serviceAccountEmail: email,
                audience: url,
            },
            headers: {
                "Content-Type": "application/json",
            },
            body: Buffer.from(JSON.stringify(payload)).toString("base64"),
        },
    };

    const [response] = await client.createTask({ parent, task });
    const name = response.name ?? "name not exists";

    return name;
};

export default createUploadTask;
ddelgrosso1 commented 1 year ago

I can't speak much to Cloud Tasks as my exposure to it is pretty limited. However, if you want to try opening an issue with that repo it is here: https://github.com/googleapis/google-cloud-node/issues. Were you able to try any of my suggestions above? Did they not work for your use case?

majkelEXE commented 1 year ago

@ddelgrosso1 honestly I did try your suggestion - exactly the last one - which is implementing a queue. Taking into consideration the future of my application I've decided to apply the most scalable solution. In addition I will add retries as you advised, however it will be implemented to the mechanism of task creation - so your previous comment was really helpful.

I've also created the issue in the linked repository.

Thank you!

ddelgrosso1 commented 1 year ago

Going to close this out. If there are any other questions or problems, please feel free to reopen.

majkelEXE commented 1 year ago

Let me share my solution. I've decided to change my code to have an array of SatelliteFiles(file name + buffer + content type) and check p-limit library as you previously adviced. This new approach allowed me to achieve similar results to this:

Number of files to upload:  186
All files uploaded to GCS:  207.94119999930263  ms

Here is my code for other people dealing with this problem:

const downloadSatelliteFiles = async (files: SatelliteFile[]) => {
    const limit = pLimit(5);
    const promises: Promise<void>[] = [];

    files.forEach((file) => {
        promises.push(
            limit(() => {
                uploadFileToGCS(file.fileName, file.buffer, file.contentType);
            })
        );
    });

    await Promise.all(promises);

    return;
};

const uploadFileToGCS = (filename: string, data: any, contentType: string) => {
    return new Promise(async (resolve, reject) => {
        const file = storage.bucket(process.env.GCLOUD_STORAGE_BUCKET).file(filename);

        const stream = file.createWriteStream({
            metadata: {
                contentType,
                cacheControl: "no-cache",
            },
            resumable: false,
        });
        stream.on("error", (err) => {
            console.log("UPLOAD_ERROR");
            console.log(err);
        });
        stream.on("finish", () => {
            resolve("ok");
        });
        stream.end(data);
    });
};

Thank you so much @ddelgrosso1 !