apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.46k stars 3.52k forks source link

[JS] Dictionary encoded values repeating between record batches #41683

Closed vivek1729 closed 4 months ago

vivek1729 commented 5 months ago

Describe the bug, including details regarding any error messages, version, and platform.

We are trying to use the AsyncRecordBatchStreamReader to read several record batches from an http response stream. The record batches are dictionary encoded and we started noticing that the values of these records start repeating after reading the first record batch.

For a minimal repro, we can see that the RecordBatchStreamReader starts repeating values for record batches with regular dictionary encoding as well. I've added a sample txt file that contains the arrow serialized results for 2 record batches. Both the record batches are dictionary encoded (not delta), the first record batch contains 200 records and the second one just contains one. ArrowDebugging-WrongRecordBatch.txt

I'm simply trying to retrieve the value for the first column in these 2 batches which are expected to be different. Here's a snippet to repro this behavior :

function readFileAsStream(fileName: string) {
    // read the contents of the text file as a string
    const base64String = readFileSync(fileName, 'utf-8');
    // Decode the base64 string
    const binaryString = atob(base64String);

    // Convert the binary string to a Uint8Array
    const bytes = new Uint8Array(binaryString.length);
    for (let i = 0; i < binaryString.length; i++) {
        bytes[i] = binaryString.charCodeAt(i);
    }

    const reader = arrow.RecordBatchStreamReader.from(bytes);

    // Read the record batches
    let batch;
    while (batch = reader.next()) {
        if (!batch || batch?.done) {
            break;
        }
        if (batch.value) {
            // Get the value of the first column
            console.log(batch.value.data.children[0].dictionary?.get(0));
        }
    }
}

Observed result:

'2013042345'
'2013042345'

Expected result (notice the second value is different from the first one):

'2013042345'
'2012020145'

Since the record batches are not delta dictionary encoded, I'd expect that the dictionary associated with the first record batch should get replaced with a separate dictionary when reading the second batch. I was looking at related issues and I wonder if this might be related to #23572 .

Additionally, I'd like to understand what's the recommended way to read multiple dictionary encoded record batches from an http stream. I imagine that we can use the reader.next() iterator pattern to keep reading record batches in a stream but I'd like to confirm my understanding.

Component(s)

JavaScript

mirdaki commented 5 months ago

To add to this, we think the issue stems from the reader's internal dictionaries state not being reset after finishing a record batch.

pohuan commented 4 months ago

@trxcllnt @domoritz I looked through the js commit and you two seems to be the main contributor. Could you help take a look?

trxcllnt commented 4 months ago

IIRC Dictionaries were initially spec'd as immutable. I do recall implementing delta dictionaries, but it seems like around that time the spec was updated such that later DictionaryMessages should replace existing DictionaryMessages, and I just didn't catch it.

Specifically this section:

Alternatively, if isDelta is set to false, then the dictionary replaces the existing dictionary for the same ID.

I think this should be a straightforward fix, I'll try to file a PR soon.

pohuan commented 4 months ago

https://github.com/pohuan/arrow/commit/1187cd8cdc467091d7e491008d52ae81810a5538

Thank you Paul. This is the change that Keshuang suggested earlier within company. If it's ok, I could file a PR as well

trxcllnt commented 4 months ago

@vivek1729 As for asynchronously reading batches from a stream, you have the right idea. I recommend either using AsyncIterables (the for await(const batch of reader) { ... } syntax) or the node/WhatWG streaming APIs.

I have an old codepen example of different strategies here. Lines 29-39 are most relevant to your situation. I put this together to show a friend how to wrap the Arrow IPC format in a custom binary protocol, which is why it includes some intermediate transform steps in between the Arrow IPC writer and reader.

Here's a shorter example:

import * as Arrow from 'apache-arrow';
for await (const batch of await Arrow.RecordBatchReader.from(await fetch('/get-a-table'))) {
  console.log(batch.toArray());
}

One neat thing the JS implementation supports is reading/writing multiple tables on the same stream. This can be handy if you need to send data under latency-sensitive conditions, where opening/closing the underlying transport incurs unnecessary overhead (think HTTP response streams):

import * as Arrow from 'apache-arrow';
let tableCount = 0;
for await (const reader of Arrow.RecordBatchReader.readAll(await fetch('/get-many-tables'))) {
  switch(tableCount++) {
    case 0: doThingWithTheFirstTable(reader); break;
    case 1: doThingWithTheSecondTable(reader); break;
    // ...
  }
}

function doThingWithTheFirstTable(
  reader: Arrow.AsyncRecordBatchStreamReader<{ strs: Arrow.Utf8 }>
) { /*...*/ }

function doThingWithTheSecondTable(
  reader: Arrow.AsyncRecordBatchStreamReader<{ ints: Arrow.Int32 }>
) { /*...*/ }

And this is what that might look like on the server:

import * as https from 'node:https';
import * as Arrow from 'apache-arrow';

https.createServer(options, async (req, res) => {
  res.writeHead(200);
  // set autoDestroy: false so the underlying response isn't closed after sending the first table
  // initially call `.reset(res)` so the writer has a sink and immediately flushes each batch
  const writer = new Arrow.RecordBatchStreamWriter({ autoDestroy: false }).reset(res);
  // alternatively, you can write each table as individual batches if producing asynchronously

  // sleep then write the first table
  await new Promise((r) => setTimeout(r, 1000));
  writer.write(new Arrow.Table({
    strs: Arrow.vectorFromArray(['a', 'b', 'c'], new Arrow.Utf8)
  });

  // sleep then write the second table
  await new Promise((r) => setTimeout(r, 1000));
  writer.write(new Arrow.Table({
    ints: Arrow.vectorFromArray([0, 1, 2, 3, 4, 5], new Arrow.Int32)
  });

  // must explicitly close when done (this will call `res.end()` on the response)
  writer.close();
}).listen(8000); 
vivek1729 commented 4 months ago

Thanks a lot for taking a look at this issue @trxcllnt . Not sure if we can use the readAll abstraction as we don't expect arrow tables from our http response. Instead we expect many sequence of record batches each sharing a common schema. Specifically, our data could look something like this:

<recordBatch1_1Schema1><recordBatch1_2Schema1>|<recordBatch2_1Schema2>|...

Yes, we are using the async iterables as you suggested in our code. Here's what the high level code looks like for our case. Notice how we create a single instance of the reader and continue reading even after we get an empty record batch because that could signify the end of a result set for us and there could be more record batches in the http response stream.

import { AsyncRecordBatchStreamReader, Table } from 'apache-arrow';

const resultTables = [];
const arrowReader = await AsyncRecordBatchStreamReader.from(responseStream);
await arrowReader.open({ autoDestroy: false });
while (true) {
    let batches = [];/*RecordBatch<any>[]*/
    let batch;/*IteratorResult<RecordBatch<any>, any> */
    while (batch = await arrowReader.next()) {
        // End of result set
        if (!batch || batch?.done)
        {
            break;
        }
        if (batch.value) {
            batches.push(batch.value);
        }
    }
    // End of stream
    if (batches.length === 0) {
        break;
    }
    resultTables.push(new Table(batches));
}

Specifically, I noticed that we had to do await arrowReader.open({ autoDestroy: false }); otherwise the reader would auto close after reading the first record batch.

Does our approach sound sensible?

trxcllnt commented 4 months ago

@vivek1729 It looks like you've re-implemented readAll :slightly_smiling_face:.

The autoDestroy: false option exists to prevent the RecordBatchReader from closing the underlying transport after reading the EOS bytes from the IPC stream.

Notice how we create a single instance of the reader

The readAll() implementation does this as well, though that's more for convenience than anything else. Yielding a new RecordBatchReader instance for each IPC stream would not impact performance in any way (as long as the underlying ReadableStream was not closed after reading each Arrow IPC stream).

Here's a more concise version of your example, using readAll():

import { RecordBatchReader, Table } from 'apache-arrow';

const resultTables = [];

for await (const reader of RecordBatchReader.readAll(responseStream)) {
  resultTables.push(new Table(await reader.readAll()));
}

You must exhaust the reader in this loop, e.g. by using readAll() to collect all the RecordBatches into an Array, or by using an inner for await (const recordBatch of reader) { ... } loop to read each batch as it arrives.

vivek1729 commented 4 months ago

@trxcllnt, thanks a lot for your response. In our case, since we receive multiple sequences of record batches in a single http response stream, it looks like each result set i.e. sequence of record batches is separated by an EOS marker. So, our record batch structure looks something like this:

ResultSet 1
    RB1
    RB2
    EOS
Result Set 2
    RB3
    RB4
    EOS
Physical EOS

Since our response stream can have multiple EOS markers, can we use the RecordBatchReader.readAll() pattern you've highlighted above to continue reading record batches after an EOS has been encountered so that it can move onto the next result set?