Closed wkillerud closed 3 months ago
From https://github.com/isaacs/node-tar?tab=readme-ov-file#class-tarparse
Each entry will not emit until the one before it is flushed through, so make sure to either consume the data (with on('data', ...) or .pipe(...)) or throw it away with .resume() to keep the stream flowing.
Does this mean we technically have batching after all?
I wrote this test to mimic an async onentry
, but I might be misunderstanding how these streams work and when we end up getting emit
events. With the below test, ONENTRY
is logged at one second intervals.
import fs from 'node:fs';
import { pipeline } from 'node:stream';
import tar from 'tar';
import tap from 'tap';
const FIXTURE_TAR = new URL('../../fixtures/package.tar', import.meta.url);
tap.test(
'tar.Parse() - async sink finishes writing before processing next entry',
{ timeout: 10000 },
(t) => {
let queue = [];
let file = fs.createReadStream(FIXTURE_TAR);
const extract = new tar.Parse({
strict: true,
onentry: (entry) => {
console.log('ONENTRY ' + entry.path);
queue.push(
new Promise((resolve) => {
setTimeout(() => {
entry.resume();
console.log('PERSISTED ' + entry.path);
resolve(entry);
}, 1000);
}),
);
},
});
pipeline(file, extract, (error) => {
if (error) {
t.fail(error);
t.end();
}
console.log('AWAITING ALL');
Promise.all(queue)
.then((result) => {
t.ok(result);
t.end();
})
.catch((err) => {
t.fail(err);
t.end();
});
});
},
);
I suppose the problem is that in the real scenario what is resolved is a connection/stream to GCS, not a finished file upload.
Edit: yeah, that's it. New test:
import fs from 'node:fs';
import { pipeline, Writable } from 'node:stream';
import tar from 'tar';
import tap from 'tap';
const FIXTURE_TAR = new URL('../../fixtures/package.tar', import.meta.url);
tap.test(
'tar.Parse() - async sink finishes writing before processing next entry',
{ timeout: 10000 },
(t) => {
let queue = [];
let file = fs.createReadStream(FIXTURE_TAR);
const extract = new tar.Parse({
strict: true,
onentry: (entry) => {
console.log(
new Date().toISOString() + ' ONENTRY ' + entry.path,
);
queue.push(
new Promise((resolve) => {
// mimic the write method of sink-gcs that returns a writable stream
setTimeout(() => {
console.log(
new Date().toISOString() +
' CONNECTED TO UPLOAD ' +
entry.path,
);
const writable = new Writable({
autoDestroy: true,
write: (chunk, encoding, done) => {
// simulate the upload happening in the writable stream
setTimeout(() => done(), 800);
},
});
pipeline(entry, writable, () => {
console.log(
new Date().toISOString() +
' PERSISTED ' +
entry.path,
);
});
resolve();
}, 25);
}),
);
},
});
pipeline(file, extract, (error) => {
if (error) {
t.fail(error);
t.end();
}
console.log('AWAITING ALL');
Promise.all(queue)
.then((result) => {
t.ok(result);
t.end();
})
.catch((err) => {
t.fail(err);
t.end();
});
});
},
);
Result
2024-07-02T13:08:37.288Z ONENTRY ie11/
2024-07-02T13:08:37.314Z CONNECTED TO UPLOAD ie11/
2024-07-02T13:08:37.314Z ONENTRY ie11/index.js
2024-07-02T13:08:37.315Z PERSISTED ie11/
2024-07-02T13:08:37.339Z CONNECTED TO UPLOAD ie11/index.js
2024-07-02T13:08:37.340Z ONENTRY ie11/index.js.map
2024-07-02T13:08:37.365Z CONNECTED TO UPLOAD ie11/index.js.map
2024-07-02T13:08:37.365Z ONENTRY main/
2024-07-02T13:08:37.392Z CONNECTED TO UPLOAD main/
2024-07-02T13:08:37.392Z ONENTRY main/index.js
2024-07-02T13:08:37.392Z PERSISTED main/
2024-07-02T13:08:37.418Z CONNECTED TO UPLOAD main/index.js
2024-07-02T13:08:37.419Z ONENTRY main/index.js.map
2024-07-02T13:08:37.445Z CONNECTED TO UPLOAD main/index.js.map
2024-07-02T13:08:37.445Z ONENTRY main/index.css.map
2024-07-02T13:08:37.471Z CONNECTED TO UPLOAD main/index.css.map
2024-07-02T13:08:37.471Z ONENTRY main/index.css
2024-07-02T13:08:37.498Z CONNECTED TO UPLOAD main/index.css
AWAITING ALL
2024-07-02T13:08:38.142Z PERSISTED ie11/index.js
2024-07-02T13:08:38.167Z PERSISTED ie11/index.js.map
2024-07-02T13:08:38.220Z PERSISTED main/index.js
2024-07-02T13:08:38.247Z PERSISTED main/index.js.map
2024-07-02T13:08:38.272Z PERSISTED main/index.css.map
2024-07-02T13:08:38.299Z PERSISTED main/index.css
Problem:
tar
stream moves forward as soon as we pipe the entry to a writable stream (ex. the response from Google Cloud Storage's createWriteStream
).tar
has many files (creating a writable stream is faster than completing an upload).Questions:
tar
so it doesn't move on to the next entry until a stream later in the pipeline is finished?async write
function for GCS so it doesn't resolve if there are N number of active streams?the tar stream moves forward as soon as we pipe the entry to a writable stream (ex. the response from Google Cloud Storage's createWriteStream). because of this we end up with a large amount of concurrent connections if the tar has many files (creating a writable stream is faster than completing an upload).
Looking closer at it now I'm not sure we do end up with a lot of concurrent connections. My presumption about concurrent streams might be wrong. From tar.Parse():
Each entry will not emit until the one before it is flushed through, so make sure to either consume the data (with on('data', ...) or .pipe(...)) or throw it away with .resume() to keep the stream flowing.
I think we get one connection after each other due to tar not emitting a new file before the previous is fully consumed (flushed through). It looks like that is whats happening in the result of your test too.
It looks like that is whats happening in the result of your test too.
I thought the upload is done only when the callback to the pipeline(entry, writable
etc is run, logging PERSISTED <filename>
. I get a bunch of those at the end, which makes me think that is an issue.
I thought the upload is done only when the callback to the pipeline(entry, writable etc is run, logging PERSISTED
. I get a bunch of those at the end, which makes me think that is an issue.
The tar file upload and extraction itself is done when the callback on the pipeline executes. Though; For each file in the tar file there is a entry
there is created a stream which writes the files to the GCS bucket. These happen after each other, not in parallel so we are not getting concurrent writes to the bucket. They are in sequence.
Each entry
is pushed to an queue array which is resolved with a Promise.all
in the callback to the pipeline of the extraction of the tar. My hunch is that all the promises in the Promise.all
is already resolved when that Promise.all
is executed. Which make me question if its needed atm.
I can't recall exactly why its like this, but it might be that tar.Parse()
did not wait until each entry was consumed before. Iow; in an earlier version of tar we got a bunch of concurrent entry streams. Then the Promise.all
make sense.
I am not convinced this is the issue though.
Allright, let's put a pin in this then 👍
From https://github.com/eik-lib/sink-gcs/pull/263