aloneguid / parquet-dotnet

Fully managed Apache Parquet implementation
https://aloneguid.github.io/parquet-dotnet/
MIT License
541 stars 140 forks source link

Reading row groups in parallel #518

Open Pyroluk opened 3 weeks ago

Pyroluk commented 3 weeks ago

Issue description

Hi!

I would like to read parquet files in parallel to significantly speed up a processing pipeline.

using (var parquetReader = await ParquetReader.CreateAsync(File.OpenRead(filePath)))
{
    for (int currentRowGroupIndex = 0; currentRowGroupIndex < parquetReader.RowGroupCount; currentRowGroupIndex++)
    { ... }
}

Works as expected.

using (var parquetReader = await ParquetReader.CreateAsync(File.OpenRead(filePath)))
{
    Parallel.For(0, parquetReader.RowGroupCount, async currentRowGroupIndex =>
    { ... } );
}

But reading the row groups in parallel with Parallel.For or Parallel.ForAsync fails with "random" error messages like System.InvalidOperationException: "don't know how to skip type Double", System.InvalidOperationException: "don't know how to skip type 14", ...

Is there a thread safe way for reading row groups in parallel already implemented?

Cheers!

aloneguid commented 3 weeks ago

File stream are generally not compatible with parallel processing. You can, however, open file stream per prallel thread i.e. your Parallel.For should perform file opening operation. Or you can introduce a lock on file read, depends on what works better for you. I might state the obvious here, but asynchronous and parallel are not the same thing.

Pyroluk commented 2 weeks ago

Thanks a lot for the hint.

This works flawlessly:

var rowGroupCount = (await ParquetReader.CreateAsync(filePath)).RowGroupCount;
await Parallel.ForAsync(0, rowGroupCount, async (currentRowGroupIndex, cancellationToken) =>
{
    using (var parquetReader = await ParquetReader.CreateAsync(predictionFilePath))
    using (var rowGroupReader = parquetReader.OpenRowGroupReader(currentRowGroupIndex))
    { ... }
});

I think a short section about reading row groups in parallel would be cool in the documentation: https://aloneguid.github.io/parquet-dotnet/reading.html When passing a filePath (string) to ParquetReader.CreateAsync(), it might also be possible to handle the concurrency within the library itself. This would be a little bit more elegant, sparing the need to figure out the row group count beforehand with another separate ParquetReader.CreateAsync() call.

Tangent problem: Is there any way to write row groups in parallel?