scramjetorg / scramjet

Public tracker for Scramjet Cloud Platform, a platform that bring data from many environments together.
https://www.scramjet.org
MIT License
253 stars 20 forks source link

Track stream offset for retries #94

Closed KrishnaPG closed 3 years ago

KrishnaPG commented 3 years ago

Could not find much information on how to "track the input stream offset" so that in case of errors, the stream can start from that offset rather than starting from the initial 0 position.

Consider this scenario: a large csv file from S3 is being streamed-in, and some application or network error happens near the end before it completes processing all the records. We then have to retry after the connection is reestablished (or app restarts). We do not want to re-start from 0, rather resume from the last recorded offset. In the below code snippet, readOffset is already supported by S3 clients such as minio, but what is not clear is how to track this readOffset inside the DataStream so that we can record it (for later restarts, if required)

minioClient.getPartialObject(Bucket, Key, readOffset)
    .then(s3Stream =>DataStream.from(FastCSV.parseStream(s3Stream, csvOptions)))
MichalCz commented 3 years ago

Hi,

Thanks for the issue, this is an interesting case. Let me think about this, I'll propose a solution.

M.

MichalCz commented 3 years ago

Not as a solution yet, but just considering what we're looking at here.

So the program itself would look like this pseudocode:

    StringStream.from(async ({Bucket, Key}) => {
        const offset = await getOffsetFromSomewhere(); // perhaps column names
        // actually this could be a generator that would add some error handling on the stream itself.
        return StringStream.from(minioClient.getPartialObject(Bucket, Key, offset));
    }, {Bucket, Key}) // this is just a better way to pass options to the function above - so you can extract it to a separate file.
        .do(incrementByteCounterFunction) // so we keep track how many bytes we consumed
        .CSVParse(csvOptions)
        .filter(compareWithLastResultId) // so we don't redo stuff we already did
        .do(executeMainCommandFunction)
        .do(keepLastResultId) // so we record what we did

I'm not so sure if because of UTF this shouldn't work on BufferStream and then move to StringStream, but that we can check out down the road... but is this what you were thinking about?

KrishnaPG commented 3 years ago

Thank you @MichalCz Yes, that is spot on.

I was planning to use something on the lines of the below:

I was not sure how to achieve these with CSVParse.

MichalCz commented 3 years ago

As to the last part - I think we'd need to look at implementation of CSVParse and rebuild it as a module (with fast-csv or papaparse - doesn't matter).

Then there's the last question: how often would we save the offset and under what circumstances? The thing is the three last operations are composed in Scramjet, so there's no clear order there - we'd need to either realign those or use .setOptions({maxParallel:1}) - which would slow things down. We could actually use .batch(100) method and run ops in batches if orderly inserts are necessary.

What do you think? If you want to talk a bit about it, head over to Scramjet's slack

MichalCz commented 3 years ago

Ok, circling back, I'd propose to transform the stream to a CSV that tracks the offset (adds it to the begining of each line):

DataStream.from(async function* () {
    const { offset } = await getOffsetFromSomewhere();
    let cnt = offset;
    while (checkCodition()) {
        let decoder = new StringDecoder('utf-8'); // from native 'string-decoder'
        const minioStream = minioClient.getPartialObject(Bucket, Key, cnt);
        // TODO: some failure checks here
        yield* BufferStream.from(minioStream)
            .parse(buf => {
                let current = cnt;
                cnt += buf.length;
                return [current, decoder.write(buf)];
            })
        yield [cnt, decoder.end()]; // flush the remaining data
    }
})
    // now we have a stream of [offset, buffer]
    .use(async function* (stream) {
        let lastLine = '';
        let lastOffset = 0;
        for await (let [offset, data] of stream) {
            lastOffset = offset;
            const [first, ...rest] = data.split('\n'); // or other delimiter
            if (!rest.length) continue; // only yield when there was an endline
            yield [offset, lastLine + first];
            lastLine = rest.pop();
            yield* rest.map(x => [offset, x]);
        }
        yield [lastOffset, lastLine];
    })
    .stringify(
        ([offset, line]) => `${offset},${line}`
    )

This is still some pseudocode and you have to think about how to handle the first line, but the general idea is as above...

However - if you'd be ready to help out - I did want to add some counting feature in version v5 where some context could be passed along with each data point... What you could do is to think of a sensible API for the above use case and we could start from there...

KrishnaPG commented 3 years ago

Thank you @MichalCz I would think the skipping till offset feature can be generalized into a more skip rows / objects on the input stream to allow filter kind of functionality that enables filtering out rows / objects that satisfy some condition on the input stream.

For example, few filtering conditions could be:

Looks like the dataStream already supports filter, shift, slice etc. methods. But the problem is how can the inputStream utilize this information in an intelligent way so that the filtering/shifting/slicing happens at the source stream level itself, rather than down the line (i.e. after reading the data from the server and ignoring it at the client)

MichalCz commented 3 years ago

I see what you mean - so it would be something like the seek feature... I can't really think of including this in the 4.x series, as it would need a special interface and may actually break backwards compatibility, but I put it on the board for 5.x.

KrishnaPG commented 3 years ago

Understood @MichalCz . Looking forward to the 5.x