exxeleron / qPython

interprocess communication between Python and kdb+
http://www.devnet.de
Apache License 2.0
152 stars 89 forks source link

Queries For Large Tables Fail #40

Open derekwisong opened 7 years ago

derekwisong commented 7 years ago

I've noticed that queries for large data sets fail. My example is a 69 million row table with 5 columns.

The IPC for this table from q to q works fine. But qPython fails. The actual error seems to be an infinite hang and the call to q.sync() never returns.

I was able to create a fix, which I'll explain as I detail the 2 problems I found.

Problem 1: Message Size Overflow

I put in some debugging statements to qreader.py and found that in QMessage, the read_header() function has the following code:

message_size = self._buffer.get_int().

This call returns a negative number for my query, which I'm guessing means that the size read from the IPC message was read as signed and overflowed. I added a get_uint() function to BytesBuffer get an unsigned integer for the message size, which gets me a positive size as I would expect.

def get_uint(self):
    return self.get('I')

This solves problem 1.

Problem 2: Socket Read Length Too Big

After fixing problem 1, a failure happens in QReader.read_data()

The following line of code creates an OverflowError: signed integer is greater than maximum:

raw_data = self._read_bytes(message_size - 8)

However, I was able to resolve the problem by adding from StringIO import StringIO and changing _read_bytes() to do the following:

def _read_bytes(self, length):
    if not self._stream:
        raise QReaderException('There is no input data. QReader requires either stream or data chunk')

    if length == 0:
        return b''
    else:
        CHUNKSIZE = 2048
        remaining = length
        buff = StringIO()

        while remaining > 0:
            chunk = self._stream.read(min(remaining, CHUNKSIZE))

            if chunk:
                remaining = remaining - len(chunk)
                buff.write(chunk)
            else:
                break

        data = buff.getvalue()

    if len(data) == 0:
        raise QReaderException('Error while reading data')
    return data

This seems to stem from the fact that you cant ask a file-like object to read more than the signed integer number of bytes. To get around that I read in chunks and combine the chunks using a StringIO.

I did no work to try and optimize the CHUNKSIZE, I also do not know if StringIO and reading chunks is the best way to go about this,

A better option, and to work most like the original implementation, might be to say something like (pseudo-code):

if length <= MAX_READABLE_LENGTH:
    data = read_in_one_shot_as_before()
else:
    data = read_in_chunks_as_proposed()

I'm curious to know what you think, and if you would try to address this. With the somewhat recent increase of the IPC limit to 1TB, I'm guessing this will happen more and more.

Thanks, Derek

derekwisong commented 7 years ago

Pull request submitted.