ZJONSSON / node-unzipper

node.js cross-platform unzip using streams
Other
435 stars 116 forks source link

Unzipping files in lambda using unzipper.Open.s3 does not work #189

Open anvik opened 4 years ago

anvik commented 4 years ago

Hi, I am trying to use unzipper library inside lambda to extract file from S3 using below code. But, lamda function call succeeds with successful response. But, the file does not get extracted anywhere and function execution halts there.

function extractZipToTempLocation(s3Client, extractionPath, bucketName, key){ return unzipper.Open.s3(s3Client, {Bucket: bucketName, Key: key}) .then(d => { return d.extract({path: extractionPath}); }); }

Can someone provide inputs on this? This works fine locally.

JoshM1994 commented 4 years ago

Can you provide more details on the parameters with which you are calling this function?

Lambda has certain restrictions on where you are allowed to write files. Given the ephemeral nature of Lambda, I'm curious as to where you are trying to extract files to?

/tmp should always be available: https://forums.aws.amazon.com/thread.jspa?threadID=174119

multiaki commented 4 years ago

I wrote an example of streaming a zipped file from s3 to s3. I wrote a little tutorial on it. it might help https://medium.com/@multiaki/unzipping-s3-files-back-to-s3-without-uncompressing-entire-file-streaming-82f662b5065a

JoshM1994 commented 4 years ago

Here's my example code of unzipping a zipped file back to S3:

    const s3Client = new AWS.S3();
    const directory = await unzipper.Open.s3(s3Client,{Bucket: 'joshtest1', Key: 'Archive.zip'});
    for await (const file of directory.files) {
        await s3Client.putObject({
            Bucket: 'joshtest1',
            Key: `test/${file.path}`,
            Body: await file.stream(),
            ContentLength: file.uncompressedSize
        }).promise();
    }
fyarepo commented 4 years ago

Hi, I have 5000 svg files in the zip file e.g., test.zip

const directory = await unzipper.Open.s3(s3, params);

for (const file of directory.files) { console.log("File==" + file.path); var result = await s3.putObject({ Bucket: bucketName, Key: uploadFolder/${file.path}, Body: await file.stream(), ContentLength: file.uncompressedSize, ACL: 'public-read' }).promise(); console.log("upload info==" + JSON.stringify(result)); }

files.length is 5000. However only first 30 files to unzip and upload to target bucket. What can we do

fyarepo commented 4 years ago

Running on Lambda

jbravo-edrans commented 4 years ago

+1 I'm having the same issue. Lambda seems to exit early.

ZJONSSON commented 4 years ago

Is it possible you are hitting a timeout on the lambda?

Also you are doing one file at a time in your example (await inside for loop), but can easily do multiple files simultaneously. Here is an example (using Bluebird.map with concurrency option):

const Bluebird = require('bluebird');

const directory = await unzipper.Open.s3(s3, params);

await Promise.map(directory.files, async file => {
  var result = await s3.putObject({
    Bucket: bucketName,
    Key: uploadFolder/${file.path},
    Body: file.stream(),
    ACL: 'public-read'
  }).promise();
  console.log("upload info==" + JSON.stringify(result));
}, { concurrency: 25});
ZJONSSON commented 4 years ago

@fyarepo @jbravo-edrans please let me know if the above example helped resolve the issue?

jbravo-edrans commented 4 years ago

Hi ZJONSSON, thanks for your support on this.

I already tried that, using Promise.map, Promise.all, Promise.each, and no luck. The issue seems to be related to "Open.s3" and AWS S3 Streams. I don't know what's happening there. The issue happened to me using a zip file which has around 100 files inside (zip is ~40MB). With smaller zip files it worked well.

BTW, I found a solution avoiding the use of Open.S3, just using the unzipper.Parse method like this:

const s3Client = new AWS.S3();

  const s3Stream = s3Client.getObject(
    {
      Bucket: ZIP_BUCKET_NAME,
      Key: key,
    },
  ).createReadStream();

  const files = s3Stream
    .pipe(unzipper.Parse({forceStream: true}));

  for await (const entry of files) {
    const fileName = entry.path;
    const type = entry.type;
    const size = entry.vars.uncompressedSize;
    debug(`Type: ${type}`);
    if (type === 'File') {
      debug(`Uploading ${fileName}...`);
      const sDir = key.substring(8);
      const params = {
        Bucket: ZIP_BUCKET_NAME,
        Key: `extracted/${sDir}/${fileName}`,
        Body: entry.on('finish', () => {
          debug(`Stream 1 finish`);
        }),
        ContentLength: size,
      };
      await s3Client.upload(params).promise();
    } else {
      entry.autodrain();
    }

It was the only way I could make the unzipping work fine.

ranhalprin commented 4 years ago

I'm having a similar problem, and it's not just on Lambda. When I unzip a large file (~5000 files, 200MB zipped), it just exits with exit code 0 after a minute or so, with only about 20 or 30 files unzipped. This is the code I used:

return unzipper.Open.s3(s3, {Bucket: zipBucket, Key: zipKey}).then(directory =>
        Promise.map(
            directory.files,
            file => {
                const targetKey = [targetFolder, file.path].join('/');
                return s3.upload({Bucket: targetBucket, Key: targetKey, Body: file.stream()}).promise();
            },
            { concurrency: 40 }
        )
    );

It's the same when I use mapSeries or a different concurrency. When I slice directory.files to only run on about 100 files, it works OK, so it might be a resource issue.

When I tried to apply @jbravo-edrans 's approach (see code below), it exits after a few seconds on the last line here:

    const s3Stream = s3.getObject({
        Bucket: zipBucket,
        Key: zipKey,
    }).createReadStream();

    const files = await s3Stream.pipe(unzipper.Parse({forceStream: true})).promise();

Something interesting is that it exits without process.exit() being invoked - I replaced process.exit() with my own function so I can debug or log information about it, and Node dies without getting there.

Update: Found a hack that gets around the process exiting in the middle. It makes Node think it's not done with all events using the method below. This means that the bug is probably that somehow the event loop queue gets cleared out in the unzipping process.

var done = (
    function wait () {
        if (!done) setTimeout(wait, 5000);
    }
)();

doMyStuff().then(() => {
    done = true;
}
vivmaha commented 3 years ago

I'm also running into this issue. I've got pretty much the same code as @ZJONSSON, except I'm using p-map instead of Bluebird. After sucessfully unzipping and uploading about 40 files, the other files do not complete, and as a result the lambda times out.

I wondered if the files that it was hanging on were very large, but I unzipped it locally (took a couple seconds) and these files were reasonable (comparable to the others in the zip archive).

antonioandrade commented 3 years ago

We're having similar issues, pretty much what @jbravo-edrans and @ranhalprin describe.

callmephilip commented 3 years ago

I had some luck with the following approach (+ increasing lambda function timeout value):

import AWS from 'aws-sdk';
import unzipper from 'unzipper';
import stream from 'stream';

const S3 = new AWS.S3();

const uploadStream = ({ Bucket, Key }) => {
  const pass = new stream.PassThrough();
  return {
    writeStream: pass,
    promise: S3.upload({ Bucket, Key, Body: pass }).promise(),
  };
}

export const handleUpload = async (event: S3Event) => {
  const batchSize = 30; // << tweak this based on your needs
  const record = event.Records[0];
  const zip = S3.getObject({
    Bucket: process.env.S3_BUCKET, 
    Key: record.s3.object.key
  }).createReadStream().pipe(unzipper.Parse({forceStream: true}));

  let promises = [];

  for await (const entry of zip) {
    const fileName = entry.path;

    if (entry.type.match(/file/ig) /* include more filters here if needed */) {      
      const { writeStream, promise } = uploadStream({Bucket: process.env.S3_BUCKET, Key: `parsed/${fileName}`});
      entry.pipe(writeStream);
      promises.push(promise);
    } else {
      entry.autodrain();
    }

    if (promises.length === batchSize) {
      await Promise.all(promises);
      promises = [];
    }
  }

  if (promises.length === 0) {
    return;
  }

  await Promise.all(promises);
};
thakkerdhawal commented 3 years ago

I think, the issue is well explained here, lambda limitation on disk size; @jbravo-edrans code worked for me, thanks.

BenMiriello commented 2 years ago

I was having this issue before as well. For me it was not because of the memory size or time limits in Lambda, though those do need to be raised from the default. I couldn't identify where in the current unzipper code the failure was occurring, so I rewrote the decrypt/inflate upload stream and it worked without silently failing after around 40 entries. Here's a simplified version of my implementation, ready to be tested locally or in a Lambda.

initDecrypt and checkCRC are mostly refactored implementations of what's in lib/unzip.js.

entryByteRange attempts to replicate what's done in lib/unzip.js where PullStream is used to skip the entry's local headers and find the first byte of file data. This has worked with good reliability for me and lifts the ~40 entry limit. However I haven't been able yet to calculate the right byte range for zips using zip64. I'm not sure if I need to switch back to using PullStream or if it's possible to calculate the start and end of a zip64 entry's file data using just the headers from the central directory. I'd appreciate any feedback on my method in that function in particular.

const AWS = require('aws-sdk');
const unzipper = require('unzipper');
const Decrypt = require('unzipper/lib/Decrypt');
const zlib = require('zlib');
const Stream = require('stream');

// Fill in config vals for local tests. Remove config when running in Lambda.

const awsConfig = new AWS.Config({
  accessKeyId: '',
  secretAccessKey: '',
  region: '',
  apiVersion: '2006-03-01',
});

let s3;
let evt;

const handler = async (event) => {
  evt = event;
  s3 = new AWS.S3(awsConfig);

  const directory = await unzipper.Open.s3(s3, { Bucket: event.bucket, Key: event.zipKey });
  const files = directory.files.filter((entry) => entry.type === 'File');
  const uploads = files.map((entry) => uploadEntry(entry));

  await Promise.all(uploads);

  s3 = null;
  evt = null;
};

const uploadEntry = async (entry) => {
  const decrypt = await initDecrypt(entry);
  const inflater = entry.compressionMethod ? zlib.createInflateRaw() : Stream.PassThrough();
  const params = { Bucket: evt.bucket, Key: evt.zipKey, Range: entryByteRange(entry) };

  const Body = s3.getObject(params).createReadStream()
    .on('error', (e) => { throw e; })
    .pipe(decrypt)
    .on('error', (e) => { throw e; })
    .pipe(inflater)
    .on('error', (e) => { throw e; });

  return s3.upload({ Bucket: evt.bucket, Key: evt.unzipFolderKey + entry.path, Body }).promise();
};

const entryByteRange = (entry) => {
  const {
    path, flags, offsetToLocalFileHeader: offset, compressedSize, extraFieldLength,
  } = entry;
  const headerLength = 30 + path.length + (flags & 8 ? extraFieldLength : 0);
  const start = offset + headerLength + (encrypted(entry) ? 12 : 0);
  const end = offset + headerLength + compressedSize - 1;
  return `bytes=${start}-${end}`;
};

const initDecrypt = async (entry) => {
  if (!encrypted(entry)) return Stream.PassThrough();
  if (!evt.password) throw new Error('missing_password');

  const decrypt = Decrypt();
  evt.password.split('').forEach((d) => decrypt.update(d));
  await checkCRC(decrypt, entry);

  return decrypt.stream();
};

const checkCRC = async (decrypt, entry) => {
  const params = { Bucket: evt.bucket, Key: evt.zipKey, Range: crcByteRange(entry) };
  const { Body } = await s3.getObject(params).promise();

  const data = Body.map((byte) => decrypt.decryptByte(byte));

  const check = (entry.flags & 0x8)
    ? (entry.lastModifiedTime >> 8) & 0xff
    : (entry.crc32 >> 24) & 0xff;

  if (data[11] !== check) throw new Error('bad_password');
};

const crcByteRange = (entry) => {
  const start = entry.offsetToLocalFileHeader + 30 + entry.path.length;
  return `bytes=${start}-${start + 11}`;
};

const encrypted = (entry) => entry.flags & 0x01;

exports.handler = handler;

// Fill in demoEvt vals

const demoEvt = {
  bucket: '',
  zipKey: '',
  unzipFolderKey: '',
  password: '',
};

handler(demoEvt);

I'd like to help fix this issue in the unzipper module, but I don't yet understand what's breaking in unzipper.Open that isn't breaking in my code. Once we know what's happening I can work out a solution and open a PR for it.

muzamilw commented 2 years ago

Hi ZJONSSON, thanks for your support on this.

I already tried that, using Promise.map, Promise.all, Promise.each, and no luck. The issue seems to be related to "Open.s3" and AWS S3 Streams. I don't know what's happening there. The issue happened to me using a zip file which has around 100 files inside (zip is ~40MB). With smaller zip files it worked well.

BTW, I found a solution avoiding the use of Open.S3, just using the unzipper.Parse method like this:

const s3Client = new AWS.S3();

  const s3Stream = s3Client.getObject(
    {
      Bucket: ZIP_BUCKET_NAME,
      Key: key,
    },
  ).createReadStream();

  const files = s3Stream
    .pipe(unzipper.Parse({forceStream: true}));

  for await (const entry of files) {
    const fileName = entry.path;
    const type = entry.type;
    const size = entry.vars.uncompressedSize;
    debug(`Type: ${type}`);
    if (type === 'File') {
      debug(`Uploading ${fileName}...`);
      const sDir = key.substring(8);
      const params = {
        Bucket: ZIP_BUCKET_NAME,
        Key: `extracted/${sDir}/${fileName}`,
        Body: entry.on('finish', () => {
          debug(`Stream 1 finish`);
        }),
        ContentLength: size,
      };
      await s3Client.upload(params).promise();
    } else {
      entry.autodrain();
    }

It was the only way I could make the unzipping work fine.

I was able to extract/unzip 5GB file in 12 minutes using Lambda using this approach. approx 5k files were in it. Thanks.

chizr commented 1 year ago

I found I was running out of open sockets (50 is the default max) because the read streams used to get the ranges were not being closed, though I am using Open.custom with AWS SDK v3.

Also if I lift the maximum I seem to hit an S3-imposed limit of 200 open read streams (after which I cannot even make head object requests to the zip file), though I cannot find where this is documented.

// stream isn't async so make a passthru stream and return that
Open.custom({
  stream: (offset, length) => {
    const passThroughStream = new PassThrough();
    const range = `bytes=${offset}-${length || ''}`;

    // This needs to be in scope when passThroughStream.finish event is called
    let s3RangeStream: Readable;
    void this.s3Client
      .send(new GetObjectCommand({ Bucket: bucket, Key: key, Range: range }))
      .then(({ Body }) => {
        if (Body === undefined) {
          throw Error('getZipContents: could not get read stream');
        }
        s3RangeStream = Body as Readable;
        s3RangeStream.pipe(passThroughStream);
      });

    passThroughStream.on('finish', () => s3RangeStream.destroy()); // <---

    return passThroughStream as Readable;
  },

I made this (naive) modification to unzip.js

stream
  .pipe(inflater)
  .on('error',function(err) { entry.emit('error',err);})
  .pipe(entry)
  .on('finish', function() {
    if (req.end) // <-- added this here so that passThroughStream finish event it triggered
      req.end();
    else if (req.abort)
      req.abort();
    else if (req.close)
      req.close();
    else if (req.push)
      req.push(); // <-- this is called otherwise, though I do not seem to be able to listen for it
    else
      console.log('warning - unable to close stream');
  });
NamesMT commented 9 months ago

I was having this issue before as well. For me it was not because of the memory size or time limits in Lambda, though those do need to be raised from the default. I couldn't identify where in the current unzipper code the failure was occurring, so I rewrote the decrypt/inflate upload stream and it worked without silently failing after around 40 entries. Here's a simplified version of my implementation, ready to be tested locally or in a Lambda.

initDecrypt and checkCRC are mostly refactored implementations of what's in lib/unzip.js.

entryByteRange attempts to replicate what's done in lib/unzip.js where PullStream is used to skip the entry's local headers and find the first byte of file data. This has worked with good reliability for me and lifts the ~40 entry limit. However I haven't been able yet to calculate the right byte range for zips using zip64. I'm not sure if I need to switch back to using PullStream or if it's possible to calculate the start and end of a zip64 entry's file data using just the headers from the central directory. I'd appreciate any feedback on my method in that function in particular.

const AWS = require('aws-sdk');
const unzipper = require('unzipper');
const Decrypt = require('unzipper/lib/Decrypt');
const zlib = require('zlib');
const Stream = require('stream');

// Fill in config vals for local tests. Remove config when running in Lambda.

const awsConfig = new AWS.Config({
  accessKeyId: '',
  secretAccessKey: '',
  region: '',
  apiVersion: '2006-03-01',
});

let s3;
let evt;

const handler = async (event) => {
  evt = event;
  s3 = new AWS.S3(awsConfig);

  const directory = await unzipper.Open.s3(s3, { Bucket: event.bucket, Key: event.zipKey });
  const files = directory.files.filter((entry) => entry.type === 'File');
  const uploads = files.map((entry) => uploadEntry(entry));

  await Promise.all(uploads);

  s3 = null;
  evt = null;
};

const uploadEntry = async (entry) => {
  const decrypt = await initDecrypt(entry);
  const inflater = entry.compressionMethod ? zlib.createInflateRaw() : Stream.PassThrough();
  const params = { Bucket: evt.bucket, Key: evt.zipKey, Range: entryByteRange(entry) };

  const Body = s3.getObject(params).createReadStream()
    .on('error', (e) => { throw e; })
    .pipe(decrypt)
    .on('error', (e) => { throw e; })
    .pipe(inflater)
    .on('error', (e) => { throw e; });

  return s3.upload({ Bucket: evt.bucket, Key: evt.unzipFolderKey + entry.path, Body }).promise();
};

const entryByteRange = (entry) => {
  const {
    path, flags, offsetToLocalFileHeader: offset, compressedSize, extraFieldLength,
  } = entry;
  const headerLength = 30 + path.length + (flags & 8 ? extraFieldLength : 0);
  const start = offset + headerLength + (encrypted(entry) ? 12 : 0);
  const end = offset + headerLength + compressedSize - 1;
  return `bytes=${start}-${end}`;
};

const initDecrypt = async (entry) => {
  if (!encrypted(entry)) return Stream.PassThrough();
  if (!evt.password) throw new Error('missing_password');

  const decrypt = Decrypt();
  evt.password.split('').forEach((d) => decrypt.update(d));
  await checkCRC(decrypt, entry);

  return decrypt.stream();
};

const checkCRC = async (decrypt, entry) => {
  const params = { Bucket: evt.bucket, Key: evt.zipKey, Range: crcByteRange(entry) };
  const { Body } = await s3.getObject(params).promise();

  const data = Body.map((byte) => decrypt.decryptByte(byte));

  const check = (entry.flags & 0x8)
    ? (entry.lastModifiedTime >> 8) & 0xff
    : (entry.crc32 >> 24) & 0xff;

  if (data[11] !== check) throw new Error('bad_password');
};

const crcByteRange = (entry) => {
  const start = entry.offsetToLocalFileHeader + 30 + entry.path.length;
  return `bytes=${start}-${start + 11}`;
};

const encrypted = (entry) => entry.flags & 0x01;

exports.handler = handler;

// Fill in demoEvt vals

const demoEvt = {
  bucket: '',
  zipKey: '',
  unzipFolderKey: '',
  password: '',
};

handler(demoEvt);

I'd like to help fix this issue in the unzipper module, but I don't yet understand what's breaking in unzipper.Open that isn't breaking in my code. Once we know what's happening I can work out a solution and open a PR for it.

TYSM!, Saved me.