metabench / nextleveldb-server

MIT License
1 stars 0 forks source link

ws-binary handler LL_GET_RECORDS_IN_RANGE - old-style code, will be updated #10

Open metabench opened 6 years ago

metabench commented 6 years ago

let the_old_way = () => {
            //console.log('paging_option', paging_option);

            // Should probably be changed to use the server's get_records_in_range function.

            // when there are 0 records?
            // Maybe not returning data OK.

            var paging_option, page_size;

            pos = 0;
            [paging_option, pos] = x.read(buf_the_rest, pos);

            //console.log('paging_option', paging_option);
            if (paging_option > 0) {
                [page_size, pos] = x.read(buf_the_rest, pos);
            }

            let [b_l, b_u] = cm.inner;

            //[b_l, pos] = read_l_buffer(buf_the_rest, pos);
            //[b_u, pos] = read_l_buffer(buf_the_rest, pos);

            if (paging_option === NO_PAGING) {
                // read a couple more buffers.
                // want to read a buffer with the length first.

                //[b_l, pos] = read_l_buffer(buf_the_rest, pos);
                //[b_u, pos] = read_l_buffer(buf_the_rest, pos);

                //console.log('b_l', b_l);
                //console.log('b_u', b_u);

                if (return_message_type) {
                    // Though paging is an option, this is not a paged or pagable response.
                    var arr_res = [buf_msg_id, buf_record_paging_none];
                } else {
                    var arr_res = [buf_msg_id];
                }

                //var res = [];
                db.createReadStream({
                    'gt': b_l,
                    'lt': b_u
                }).on('data', function (data) {
                    //arr_page[c++] = (Buffer.concat([xas2(key.length).buffer, key]));

                    // will be both the key and the value
                    // will need to combine them as buffers.
                    var buf_combined = Binary_Encoding.join_buffer_pair([data.key, data.value]);
                    //arr_page[c++] = buf_combined;
                    //console.log('buf_combined', buf_combined);
                    arr_res.push(buf_combined);
                    //arr_res.push(x(key.length).buffer);
                    //arr_res.push(key);

                })
                    .on('error', function (err) {
                        //console.log('Oh my!', err)
                        callback(err);
                    })
                    .on('close', function () {
                        //console.log('Stream closed')
                    })
                    .on('end', function () {
                        //callback(null, res);
                        //console.log('arr_res', arr_res);
                        buf_res = Buffer.concat(arr_res);
                        connection.sendBytes(buf_res);
                    })
            } else if (paging_option === PAGING_RECORD_COUNT) {

                //[page_size, pos] = x.read(buf_the_rest, pos);
                //console.log('page_size', page_size);
                //console.log('buf_the_rest', buf_the_rest);

                // decode the rest?

                // Could put the whole thing in a Command_Message and use that to parse.

                //[b_l, pos] = read_l_buffer(buf_the_rest, pos);
                //[b_u, pos] = read_l_buffer(buf_the_rest, pos);

                let c = 0;
                let arr_page = new Array(page_size);
                let page_number = 0;

                var arr_res = [buf_msg_id, buf_record_paging_flow];
                // Send back flow pages when the data is being got.

                //console.log('b_l', b_l);
                //console.log('b_u', b_u);

                let buf_combined;

                // And need to put the results into pages.
                // Can we slow down this stream if it's reading too fast for the client?
                // Can use stream.pause to pause the reading.
                //  Could do this in response to a message?
                //  

                let read_stream = db.createReadStream({
                    'gt': b_l,
                    'lt': b_u
                }).on('data', function (data) {

                    // will be both the key and the value
                    // will need to combine them as buffers.
                    buf_combined = Binary_Encoding.join_buffer_pair([data.key, data.value]);
                    //console.log('buf_combined', buf_combined);
                    //arr_res.push(buf_combined);
                    arr_page[c++] = buf_combined;

                    if (c === page_size) {

                        //console.log('sending page', page_number);
                        //console.log('pre read_stream.pause');

                        // Check the current page number to see how far behind it is.

                        let latest_received_page = map_received_page[message_id];
                        //console.log('map_received_page', map_received_page);
                        let delay = 0,
                            pages_diff = 0;
                        if (typeof latest_received_page !== 'undefined') {
                            pages_diff = page_number - latest_received_page;

                            if (pages_diff > 2) {
                                delay = 250;
                            }
                            if (pages_diff > 4) {
                                delay = 500;
                            }
                            if (pages_diff > 6) {
                                delay = 1000;
                            }
                            if (pages_diff > 8) {
                                delay = 2000;
                            }

                            // if it gets too high, then stop the stream?
                            //  ie the client has 

                        }
                        //console.log('pages_diff', pages_diff);

                        read_stream.pause();
                        setTimeout(() => {
                            read_stream.resume();
                        }, delay);

                        // Possibility of applying a compression algorythm to the arr_page?
                        //  Compressing the data could raise the throughput to the client.
                        //   Currently data seems about 5 times the size when over the wire rather than in the DB.

                        // Could have a compressed data format for record paging.
                        //  Maybe use Binary_Encoding's buffer compression?

                        // Or have a different Buffer_Compression module available.
                        //  Don't want streaming compression, as we compress parts of the stream, ie some messages within it.

                        // record paging flow, with compression?
                        //  then read another xas2 number, or maybe read a CompressionInfo object.

                        // For the moment could have most basic compression options, with sensible defaults.

                        //  The client could request compression too.
                        //  Reading compression info from the request would be sensible.

                        connection.sendBytes(Buffer.concat([buf_msg_id, buf_record_paging_flow, xas2(page_number++).buffer].concat(arr_page)));
                        c = 0;
                        // Could empty that array, would that be faster than GC?
                        arr_page = new Array(page_size);

                        // On the client-side, don't want to pause the whole socket.

                        // Try pausing the reading of the stream for 1s.
                        //  Will be able to pause streams when the client-side receive buffer becomes too large.
                        //   Could have a client-side message to say which is the last message received and processed.
                        //    Then if it gets out of sync by more than n (ie 4), it waits until the client has caught up.

                        // Would need client-side acknowledgement of receiving the messages.
                        //  Client-side and server-side pause commands would be useful.

                        // Important to be able to correctly sync large amounts of data, fast, or at least fast enough while also reliably.
                        //  The sender's internet connection may be much faster than the receiver's.

                        // Some small messages in the protocol to say the last message number in a message chain could help.
                        //  Small receive packets would be sent back to the server.
                    }
                    //arr_res.push(x(key.length).buffer);
                    //arr_res.push(key);

                })
                    .on('error', function (err) {
                        //console.log('Oh my!', err)
                        callback(err);
                    })
                    .on('close', function () {
                        //console.log('Stream closed')
                    })
                    .on('end', function () {
                        //callback(null, res);
                        //buf_res = Buffer.concat(arr_res);
                        //connection.sendBytes(buf_res);

                        arr_res = [buf_msg_id, buf_record_paging_last, xas2(page_number).buffer].concat(arr_page.slice(0, c));
                        buf_res = Buffer.concat(arr_res);
                        connection.sendBytes(buf_res);
                    })
            } else {
                throw 'Unexpected paging option'
            }
        }
        the_old_way();
metabench commented 6 years ago

This has delays if the messages have not been acknowledged as having been received, a feature which is not (yet) in the standard send.