googleapis / nodejs-storage

Node.js client for Google Cloud Storage: unified object storage for developers and enterprises, from live data serving to data analytics/ML to data archiving.
https://cloud.google.com/storage/
Apache License 2.0
891 stars 367 forks source link

Application hangs when destroying a `File.createWriteStream()` stream to abort the upload #2367

Closed vbshnsk closed 3 months ago

vbshnsk commented 8 months ago

Environment details

Steps to reproduce

So I am trying to implement an endpoint on my server that would pass the incoming data to a File write stream and save it to GCS, and I want to halt the upload if during streaming file turns out to be bigger than expected. Here's a snippet of how it is implemented:

const MAX_TRANSACTION_IMAGE_SIZE_BYTES = 10 * 1024 * 1024;

async function test(body: NodeJS.ReadableStream) {
  let receivedBytes = 0;
  const cancelStream = new AbortController();

  const bucketName = 'bucket';
  const file = this.storage
    .bucket(bucketName)
    .file('image.png')
    .createWriteStream();

  const validateStream = new PassThrough();
  validateStream.on('data', (chunk) => {
    receivedBytes += chunk.length;
    if (receivedBytes > MAX_TRANSACTION_IMAGE_SIZE_BYTES) {
      // this correctly aborts everything in a pipeline, and destroys the streams, but
      // the underlying streams still seem to hang
      cancelStream.abort(new PayloadTooLargeException('Image too large'));
    }
  });

  await pipeline(body, validateStream, file, {
    signal: cancelStream.signal,
  });
}

I have checked the code and I see that calling .destroy() on the returned write stream does not really result in anything being passed to the underlying emitStream or writeFileStream, so they stay hanging. Is this a bug on the SDK side? Is there a way to abort the upload request?

vbshnsk commented 7 months ago

I have since tried to DELETE the resumable upload by crc32 when I destroy my write/validate streams but it also doesn't affect the underlying streams, so the handle stays open.

xXACCEXx commented 6 months ago

Not sure if this is the same behaviour I'm seeing or not.

I'm seeing streams aren't closing when using WebStream apis in Node. To me, it feels like the same problem, the handler on the web stream side completes correctly, but the event is not emitted on the GCS lib side.

This has the effect of any means of closing the stream does not work... Either destroy or end... Neither actually closes the handle.

xXACCEXx commented 6 months ago

Here's a little example of what I mean.

I tested this on node v20.10.0 using ts-node. You can get the CSV from here https://github.com/datablist/sample-csv-files/raw/main/files/people/people-2000000.zip

import { join } from 'path';
import { createReadStream } from 'fs';
import { Writable, Readable } from 'stream'
import { TransformStream } from 'stream/web'

import { Storage } from '@google-cloud/storage'

const _sourcePathname = join(process.cwd(), 'test/people-2000000.csv');
const _storage = new Storage({ keyFilename: './secrets/gcp.json' });
const _bucket = _storage.bucket('journeywise_test_bucket');

const sourceFile = createReadStream(_sourcePathname, { encoding: 'utf-8' })
const _readable = Readable.toWeb(sourceFile);

const destinationFile = _bucket.file('output.csv');
const destinationStream = destinationFile.createWriteStream();
const _writable = Writable.toWeb(destinationStream);

const assertSize = (assertSize: number) => {
    let size = 0;
    return new TransformStream({
        transform(chunk, ctrl) {
            size += chunk.length;
            ctrl.enqueue(chunk);
        },
        flush() {
            console.log('Closed stream. Size match?', size == assertSize);
        }
    })
}

_readable
    .pipeThrough(assertSize(235121126))
    .pipeTo(_writable)
    .then(() => console.log('stream completed successfully'))
ddelgrosso1 commented 3 months ago

Apologies for my delay in getting to look at this. I have successfully recreated the problem and see what the issue is. I will work on a fix and update this issue accordingly. The short of it is that within createWriteStream there is another pipeline that does not get notified of an error occurring in the returned stream when it is aborted and thus never cleans up emitStream, writeFileStream and any validation streams.

ddelgrosso1 commented 3 months ago

I will get this merged and released after Cloud Next has finished.