Closed fresheneesz closed 1 year ago
I noticed I even get "already piping an entry" if you try to add an entry before a previous entry's stream is done. This is not how streams should work - if you have to wait for the stream to finish before moving on to the next stream, the usefulness of streams is destroyed.
You mean so you can do stuff like this?
fs.createReadStream('file-1').pipe(pack.entry({name:'file-1', size:...})
fs.createReadStream('file-2').pipe(pack.entry({name:'file-2', size:...})
pack.finalize()
I'm ok with a PR for that change :)
@mafintosh Yeah exactly.
I added a failing unit test, and attempted to make it work. Didn't succeed but this is what I have:
https://github.com/fresheneesz/tar-stream/commit/47116da91bde29c9e5317ab779f433a6995395fc
I thought it wasn't working because the Sink._write method was writing too early, but that doesn't seem to be the case anymore. It does look like file 2 is writing too early, but I have no idea why at this point.
@fresheneesz try updating your fork to use this branch of my fork of tar-stream and try again: https://github.com/malandrew/tar-stream/tree/update-deps
The reason I suggest this is because you're trying to do things similar to what I was doing.
Hey everyone !! Any news about this issue? Adding multiple files within a tar archive would be really useful..
@croqaz @mafintosh @fresheneesz
Hi, I was able to stream multiple entries using the following code and without any modifications to mafintosh's original source code. This will create a .tar.gz file using only streams:
const fs = require('fs');
const zlib = require('zlib');
const tar = require('tar-stream');
function exampleCreateTarGzip (files) {
const pack = tar.pack();
const gzipStream = zlib.createGzip({level: zlib.constants.Z_BEST_COMPRESSION});
// Example list of files
files = [ { name:"./works.csv", size: 3654 }, { name: "./manuscripts.csv", size: 303156 } ];
return new Promise((resolve, reject) => {
pack.on('error', reject);
packEntry();
// Callback is recursive
function packEntry(err) {
if (err) {
reject(err);
} else if (files.length) {
const file_entry = files.pop();
//please note the recursive callback here
const entry = pack.entry({ name: file_entry.name, size: file_entry.size }, packEntry);
const readStream = fs.createReadStream(file_entry.name);
readStream.on('end', entry.end.bind(entry)); // This is key to getting this to work
readStream.on('error', reject);
readStream.pipe(entry);
} else {
pack.finalize();
}
}
const writeStream = fs.createWriteStream('./archive.tar.gz');
writeStream.on('error', reject);
writeStream.on('end', () => { resolve('Done archiving files'); });
pack.pipe(gzipStream).pipe(writeStream);
});
}
The key here is to call pack.entry()
only after the previous entry has finished streaming and also to know when to explicitly call entry.end()
This is recursive but perhaps can be done differently. Maybe some tweaks are needed to get this to work properly for you (this was roughly copied and modified from my own code), but this solution works just fine for me.
Hi,
I created a wrapper to add multiple streams sequentially. Independent from the node fs
module, since I am using read and write streams from Google Cloud Storage. I'm running in the Firebase Functions environment with 2GB memory and was running into out-of-memory errors when using different packages.
With this I was able to create gzipped archives >2GB and also including single files which are larger than 2GB
import {Readable, Writable} from "stream";
import * as tar from "tar-stream";
import {Pack} from "tar-stream";
type FileInfo = {name: string, size: number, stream: Readable};
export class TarArchive {
private pack = tar.pack();
private streamQueue: FileInfo[] = [];
private size = 0;
constructor() {
}
addBuffer(name: string, buffer: Buffer) {
this.size += buffer.length;
this.pack.entry({
name: name
}, buffer);
console.log(`Added ${name}`, buffer.length, this.size);
return this;
}
addStream(name: string, size: number, stream: Readable) {
this.streamQueue.push({
name, size, stream
});
}
write(streamCallback: (pack: Pack) => Writable) {
return new Promise((resolve, reject) => {
this.nextEntry((err) => {
if (err) {
reject(err)
} else {
resolve();
}
}, this.streamQueue.length);
streamCallback(this.pack)
.on('error', (err) => {
this.pack.destroy(err);
reject(err);
})
});
}
private nextEntry(callback: (err?: Error) => void, total: number) {
const file = this.streamQueue.shift();
if (file) {
const writeEntryStream = this.pack.entry({
name: file.name,
size: file.size
}, (err) => {
if (err) {
callback(err);
} else {
this.size += file.size;
console.log(`Added ${file.name}`, file.size, this.size, `${total - this.streamQueue.length}/${total}`);
this.nextEntry(callback, total);
}
});
file.stream.pipe(writeEntryStream);
} else {
this.pack.finalize();
callback();
}
}
}
Here's an example on how TarArchive
is used:
const tar = new TarArchive();
tar.addBuffer('program.json', programData);
for (const file of files) {
const meta = await this.fs.lstat(file);
tar.addStream(file, meta.size, this.fs.createReadStream(file));
}
return tar.write((stream) => {
return stream
.pipe(zlib.createGzip({level: zlib.constants.Z_BEST_COMPRESSION}))
.pipe(output)
});
The fs
package I'm using for streams is node-fs-firebase but it should work with any read and write stream.
@abrinckm Thanks so much for the example. That was a lifesaver. Once issue to correct, on this line:
writeStream.on('end', () => { resolve('Done archiving files'); });
Write streams don't have an end
event. You would need to listen for finish
or close
instead. Your example is probably working, but only because you don't have any code after exampleCreateTarGzip
. If you do something like this:
await exampleCreateTarGzip(...)
console.log('done')
You would never see the "done" message. Node exits (without a message) for some reason, since it knows the promise will never resolve.
Yes 'dominicbartl' you are a life saver. The 'buffer' parameter for pack.entry( {...}, buffer ) is what I was not aware of..thanks !!
Code to pull files from a remote server, start packing and Gzipping them and then use the resulting stream; probably to start streaming the tar to S3 or any other remote storage or just locally maybe
import { eachSeries } from 'async';
import * as tar from 'tar-stream';
import axios from 'axios';
import { IncomingMessage } from 'http';
import { createGzip, constants, createDeflate } from 'zlib';
const tarball = tar.pack();
const uris = [
'https://img.freepik.com/free-photo/cool-geometric-triangular-figure-neon-laser-light-great-backgrounds-wallpapers_181624-9331.jpg?size=626&ext=jpg',
'https://img.freepik.com/free-photo/cool-geometric-triangular-figure-neon-laser-light-great-backgrounds-wallpapers_181624-9331.jpg?size=626&ext=jpg',
'https://img.freepik.com/free-photo/cool-geometric-triangular-figure-neon-laser-light-great-backgrounds-wallpapers_181624-9331.jpg?size=626&ext=jpg',
'https://img.freepik.com/free-photo/cool-geometric-triangular-figure-neon-laser-light-great-backgrounds-wallpapers_181624-9331.jpg?size=626&ext=jpg',
'https://img.freepik.com/free-photo/cool-geometric-triangular-figure-neon-laser-light-great-backgrounds-wallpapers_181624-9331.jpg?size=626&ext=jpg',
];
(async () => {
await eachSeries(
uris,
async function (file, cb) {
console.log(file);
const { readable, name, size } = await getStream(file);
const writable = tarball.entry({ name, size });
readable.pipe(writable);
await new Promise((res, rej) => {
writable.on('finish', () => res('ok'));
writable.on('error', (err) => rej(err));
readable.on('error', (err) => rej(err));
});
return cb();
}
);
tarball.finalize();
})();
// Gets the readable stream for each file + other props
async function getStream(file: string) {
const readable: IncomingMessage = (await axios.get(file, { responseType: 'stream' })).data;
const size: number = +readable.headers['content-length'];
const name = new Date().toString() + '.png'; // assigning a random name to the file
return { readable, size, name };
}
const gZipCompress = createGzip({ level: constants.Z_BEST_COMPRESSION });
// use this stream anywhere now
export = tarball.pipe(gZipCompress)
const writeStream = fs.createWriteStream('./archive.tar.gz');
const pack = tar.pack();
pack.pipe(createGzip()).pipe(writeStream);
for (const file of files) {
await new Promise((resolve, reject) => {
const readStream = fs.createReadStream(file.path);
const entry = pack.entry({ name: file.name, size: file.size });
readStream.on('error', reject);
entry.on('error', reject);
entry.on('finish',resolve);
readStream.pipe(entry);
});
}
pack.finalize();
Ideally, I would think I could add as many entries with streams as I want, then call
finalize
right afterward and everything would work (ie automatically wait for all the input streams to complete before actually creating the package). The documentation seems to imply that this isn't the case tho. Can I or can't i do that? If not, why not? Can we make it so finalize can be called without explicitly waiting for the streams?