Closed arnabguptadev closed 4 years ago
Many thanks for investigating this problem and finally figuring it out! The root problem here is that there is an undocumented assumption that appendRow is not called concurrently, i.e. that, at any given time, there is only one outstanding call to the method.
Alas, I think the proposed change does not completely eliminate the problems you get from calling the method concurrently, but just pushes it further down the stack. Instead of races around the rowBuf you would now get races around file writing when called concurrently.
One possible solution would be to go back to the drawing board and see if parquetjs can be re-designed so that concurrent modifications to the same file are supported. However, this would most likely end up introducing some kind of "lock" for the ParquetWriter class, which, considering that JavaScript is fundamentally a single threaded language, would to me feel like the wrong approach. Even if you had a mechanism inside the ParquetWriter to sequence all operations, users would still be required to eventually await the results of every single operation, since otherwise:
The proper solution, I think, is to raise a "concurrent modification" exception whenever a user attempts to call the appendRow (or any other) method concurrently. That will fix the undefined behaviour by turning it into a hard error.
However, I'm not a JavaScript expert by any means, so please correct me if I am wrong and there is a better way to improve this.
Thanks for your response.
I checked your response on issue 60 (https://github.com/ironSource/parquetjs/issues/60), but do not agree that appendFile behaves the same way. E.g. the following code using appendFile works fine - not preserving the order but still keeping count same - which is exactly what is expected of the library as well - but it appends a lot of extra lines which grows unbound for very large appends and saturates memory.
describe('appendFile Behavior', function() {
it('write a test file using appendfile', async function() {
let to_write = [];
for (let i = 0; i < 1000; i++) {
to_write.push(`data to append ${i}`);
}
to_write.forEach(async line => await fs.appendFile('message.txt', line + os.EOL, function (err) {
if (err) throw err;
}));
//Read it back
let inStream = fs.createReadStream('message.txt');
let lineReader = require('readline').createInterface(inStream);
let count = 0;
for await (const line of lineReader) {
count += 1;
}
assert.equal(count, 1000);
});
The issue with the current code in writer.js
is in how it handles async/await. I am new to Javascript as well and I figured this out the hard way. await
seems like syntactic sugar - and in that it gives the appearance of "staying" within the context of the same method when it does not. Normally javascript allows access serially to methods but when you call await
with a call to another function, the current function can now be accessed. And so having the buffer cleared out after the call is a bad idea. It works when you strictly enforce serial call, but does not work when using forEach where the order is not preserved.
That said, I did not clearly understand your concern on "Instead of races around the rowBuf you would now get races around file writing when called concurrently".
I have verified row counts on all sizes of data after this fix and it seems to hold. The order of insertion is not important. Should it be? And if not, then can you please point a way in which I can bring out your above concern?
I also wanted to request this: Even if you think the fix does not fully address the issue, as long as it does not cause additional harm - will it be possible to get an approval and a merge? It is painful to use the library from forks and we would like to pick up all other fixes.
Yes, I agree that this is a trap that should be fixed quickly. However, if you do not wait for the promise returned by appendRow to resolve before issuing a new call, the order of writes to the filesystem and internal metadata structures will still be undefined. That means that the content of the resulting parquet file is also undefined and potentially corrupt. Sadly, just reordering the two operations in the appendRow
method does not generally fix the issue, which is that concurrent calls to appendRow
will race on all kinds of changes to internal data structures and the filesystem.
So, for now, the only correct fix is to not issue concurrent calls to appendRow
, but chain the calls instead. The proper medium-term solution, in my opinion, is to:
1) Be explicit in that the ParquetWriter
class is a file-backed stateful object, so concurrent modifications of a single instance are not supported. The user needs to ensure that operations are correctly sequenced when using the ParquetWriter
API directly, i.e. perform one call to appendRow
after another before issuing a final close
call to write the file trailer. This is similar and related to how concurrent operations on the same file using the fs
API will produce undefined results if you do not await the result of a given operation before starting the next one. Also, we should add a programmatic check for this error condition and raise an exception when a concurrent modification is attempted. That way, instead of a corrupt file, users will the get a nice error message. This change can be done quickly.
2) Add a new convenience method callled writeParquetFile(path, data)
which is completely stateless and would, from the user's point of view, perform all writing and flushing of all rows as a single atomic operation. This method would internally use the existing ParquetWriter class, but ensure that all operations are executed in a well-defined and correct order. The data
argument would have to either be an array containing all rows of the file or some kind of generator/iterator that incrementally produces all rows for the file. We could then change the documentation to point users to this method instead of the lower-level ParquetWriter
class. In my opinion, this would completely solve the problem with a minimal amount of work, while still preserving the existing stateful ParquetWriter interface for users that want more control over how the file is written. A nice added bonus is that this new, simplified, wrapper interface would also make it impossible to forget to call the close
method.
I still do not think this is an issue. Here's why:
await
in my code for appendRow. That is not the issue.this.rowBuffer
in writer.js
- that is all it does. With or without await
you will still get an issue since the async call to this.envelopeWriter.writeRowGroup(to_write)
from appendRow
happens in the middle of the method. That allows other calls to come in and write to the same buffer with new rows again on top of existing ones (should have been emptied by now). And once they get into the this.envelopeWriter.writeRowGroup(this.rowBuffer)
it produces huge number of duplicate writes. You can easily check this by printing out on console the number of rows on each call.appendRow
; and all writing happens on close
only. If you try the same rapid insertion (using forEach
instead of for - of
, with number of rows less than rowBufferSize you will not see any issues.I agree your second point is a good solution. But it may not work in cases of large data handling where you want streaming behavior.
Thanks again for the continued conversation.
1) Even with the await, the loop in the test case will not only execute the operations in an undefined order, but it will also cause all operations to run concurrently, i.e. it might start to execute the second appendRow
operation before the first appendRow
operation has completed. This breaks a number of assumptions in the ParquetWriter. At the moment, the only valid usage of that method is the one shown in the README; you have to await the completion of the first appendRow
before starting the next call to appendRow
.
2) The code could maybe be re-engineered so that the order of actual OS write() system calls would not be a problem in itself, since I think parquet itself never requires us to overwrite existing data, but I'm not sure about this right now, it has been a few years since I last looked at it. However, the process of building up a parquet file is a stateful process where the position of block N depends on the size of the preceding block N-1. This code has to live somewhere and it definitely relies on having a well-defined order of modifications to the metadata. So we would have to change the writer in such a way that all code that modifies internal state is guaranteed to run synchronously before any execution is deferred via await
. This would require a larger refactoring of the code.
But even then the final write to disk would still be an async operation returning a promise. In other languages we would just block the thread, but in JS we should never do that; we have to return the promise back the user instead. So we would still (always!) require the user to wait for that returned promise before continuing to insert more data. This is required to wait for the current block to be flushed to disk before starting to assemble block N. Otherwise we would introduce a hidden and unbounded queue of buffers waiting for the disk, giving up the strict bounds on memory usage that we currently have.
That is why I think that forcing the user to block for the returned promise before issuing more writes by introducing a hard error if they don't is the better approach. It gives correct results in all scenarios while still guaranteeing that we will never use more than rowGroupSize
memory at any given point in time. And of course we should then also just hide the ParquetWriter interface behind a simple wrapper function as discussed.
3) You're right in that your change works around this specific issue, the problem is that there are many more issues in the code when you attempt to use it concurrently. Concurrent modifications are currently fundamentally unsupported because the code changes internal data structures and data on disk all over the place. If this wasn't written in JavaScript I would simply say the code is not reentrancy safe, but I'm not sure what the right term term to use is in the JS context.
4) In the simple case you described, you might not see any problems, but that is just because, without any actually asynchronous operations (such as file I/O), nodejs will just happen to run the continuations in the "right" order, because nothing ever defers, but this scheduling is in no way guaranteed; it just works "accidentally" and breaks once any call to "await" is performed inside of the parquet writing code.
As for the streaming data; that's why I mentioned the possibility of having this new method accept a generator/iterator as an input. That would allow the user provide a generator that doesn't keep the full dataset in memory or consumes a streaming source while still providing a solid abstraction that frees the user from having to sequence a bunch of futures in the right order. For users that want full control, there will always be the fallback option to use the ParquetWriter
class directly, but they will have to make sure to correctly serialize all access to the file and to remember to call the close
method.
Thanks, I better understand your concern now.
Yes, further changes may again bring back the issue. But looking at the flow of how the write happens, the only other async
call it encounters is the os.write
- if I am not mistaken.
So with the fix - as the code appears today - from the await this.envelopeWriter.writeRowGroup(to_write);
onwards it should always be ordered (and one write at a time) even if it starts to execute the second appendRow operation before the first appendRow operation has completed (as you mentioned).
That said, another fix could be to use aync.queue
to put all write requests in a single queue per writer and then write one by one from there. (https://caolan.github.io/async/v3/docs.html#queue).
I will see if I find time to use that. For now, I think I have to stick with my fork if the change is not accepted.
I agree that the appendRow
method is probably the only entry point that we have to worry about for now.
So how about this for a resolution: We could ammend your PR to also use something like async-mutex to force serialization of all calls to appendRow
. This will make the result completely correct and ensure it stays correct in the future without requiring any changes to user code. Discounting the undefined order of rows of course. Could you make that change to your PR? Also we should use camelCase
for variables to be consistent with the rest of the code. Alternatively I could add the mutex stuff in a separate commit.
Then, in a separate change, we can add the new wrapper method that sidesteps this issue and the related issue of having to remember to call close
altogether by providing a simplified "fire and forget" API.
I think that will give us the best of both worlds. Full correctness while still not having to change existing user code, like it would have been required if we turned concurrent modifications into a hard error. I now see that from a user perspective this is the better solution, since it will just make it work instead of requiring users to change their code. If the user then forgets to await
the returned promises entirely that might still result in unbounded memory usage, but at least the file will be correct.
I will try to make that change. Will get back to you. Thanks!
Please check if this works. I retained the change in the test file to force the issue of unordered write and make sure it works.
@asmuth - Will it be possible to get a review of this pull request this week and then a merge? Thanks!
Code looks good to me and tests still pass; will go ahead and merge this since there seem to be no objections. Thank you!
Thank you @asmuth. This is very helpful.
@asmuth - will it be possible to get an npm build, or does this have to wait to be made along with other changes?
Although I see the solution here is merged, it doesn't seem the best. Probably instead of a mutex you could just have a variable on the writer that is the promise of the current row group being written and await that if it is there, e.g.
appendRow() {
if(this.writeInProgress) await this.writeInProgress;
// ...
if (this.rowBuffer.rowCount >= this.rowGroupSize) {
this.writeInProgress = this.envelopeWriter.writeRowGroup(this.rowBuffer);
this.rowBuffer = {};
await this.writeInProgress;
this.writeInProgress = null;
}
}
}
close() {
if(this.writeInProgress) await this.writeInProgress;
}
Can avoid an extra dependency that way.
This is a fix for issue 60 (https://github.com/ironSource/parquetjs/issues/60) and 101 (https://github.com/ironSource/parquetjs/issues/101)