Open lpatiny opened 3 weeks ago
class MoleculeStream extends TransformStream {
#buffer = '';
constructor() {
super({
transform: (chunk, controller) => {
this.#buffer += chunk;
let begin = 0;
let index = 0;
while ((index = this.#buffer.indexOf('$$$$\r\n', index)) !== -1) {
controller.enqueue(this.#buffer.slice(begin, index));
index += 6;
begin = index;
}
this.#buffer = this.#buffer.slice(begin);
},
flush: (controller) => {
if (this.#buffer) {
controller.enqueue(this.#buffer);
}
},
});
}
}
const response = await fetch('http://localhost:8080');
const byteStream = response.body;
const decompressionStream = byteStream.pipeThrough(
new DecompressionStream('gzip'),
);
const textStream = decompressionStream.pipeThrough(new TextDecoderStream());
const moleculeStream = textStream.pipeThrough(new MoleculeStream());
console.time('Process time');
let count = 0;
for await (const molecule of moleculeStream) {
count++;
}
console.log('Processed %i molecules', count);
console.timeEnd('Process time');
We still need to deal with \n and \r\n