codename-hub / php-parquet

PHP implementation for reading and writing Apache Parquet files/streams
Other
58 stars 8 forks source link

Batch Writing - Example #3

Closed cedricfuturistech closed 2 years ago

cedricfuturistech commented 2 years ago

Hi,

Does this support batch writing?

I have 100k+ data in my database and planning to do batch writing to a parquet file. I am thinking of chunking my database 5k of data at a time then performing a write to a parquet file.

I saw it in this link - https://github.com/aloneguid/parquet-dotnet/blob/master/doc/writing.md#appending-to-files that they have an example, but when I tried that on this library I am always getting this error not a Parquet file(tail is '\\\')

This is my code:

public function export($table, $startDate, $endDate)
    {
        $fields = [];
        $columns = [];

        $appendFile = false;

        $fileName = $table . '_' . str_replace(':', '-', $startDate) . '.parquet';
        $filePath = storage_path('/' . $fileName);
        $fileStream = fopen($filePath, 'a+');

        DB::table($table)->whereBetween('created_at', [$startDate, $endDate])->orderBy('id')->chunk(5000, function ($result) use (&$fields, &$columns, &$filePath, $table, $startDate, $endDate, &$parquetWriter, &$groupWriter, &$appendFile, &$fileStream) {
            if ($result) {
                $keyOnly = Arr::first($result);

                foreach ($keyOnly as $key => $value) {
                    $colType = $this->getColumnType($table, $key);
                    $dataColumn = new DataColumn(
                        DataField::createFromType($key, $colType),
                        $result->pluck($key)->toArray()
                    );

                    $columns[] = $dataColumn;
                    $fields[] = $dataColumn->getField();
                }

                $schema = new Schema($fields);

                $parquetWriter = new ParquetWriter($schema, $fileStream, null, $appendFile);

                $appendFile = true;

                // create a new row group in the file
                $groupWriter = $parquetWriter->CreateRowGroup();
                foreach ($columns as $col) {
                    $groupWriter->WriteColumn($col);
                }

                $groupWriter->finish();   // finish inner writer(s)
            }
        });

        $parquetWriter->finish(); // finish the parquet writer last

        return $filePath;
    }

Your help is greatly appreciated! Thanks.

cedricfuturistech commented 2 years ago

I have updated my code to be like this:

public function export($table, $startDate, $endDate)
    {
        $fields = [];
        $columns = [];

        $appendFile = false;

        $fileName = $table . '_' . str_replace(':', '-', $startDate) . '.parquet';
        $filePath = storage_path('/' . $fileName);
        $fileStream = fopen($filePath, 'a+');

        DB::table($table)->whereBetween('created_at', [$startDate, $endDate])->orderBy('id')->chunk(5000, function ($result) use (&$fields, &$columns, &$filePath, $table, $startDate, $endDate, &$parquetWriter, &$groupWriter, &$appendFile, $fileStream) {
            dump($result);
            if ($result) {
                $keyOnly = Arr::first($result);

                foreach ($keyOnly as $key => $value) {
                    $colType = $this->getColumnType($table, $key);
                    $dataColumn = new DataColumn(
                        DataField::createFromType($key, $colType),
                        $result->pluck($key)->toArray()
                    );

                    $columns[] = $dataColumn;
                    $fields[] = $dataColumn->getField();
                }

                $schema = new Schema($fields);

                $parquetWriter = new ParquetWriter($schema, $fileStream, null, $appendFile);

                $appendFile = true;

                // create a new row group in the file
                $groupWriter = $parquetWriter->CreateRowGroup();
                foreach ($columns as $col) {
                    $groupWriter->WriteColumn($col);
                }

                $groupWriter->finish();   // finish inner writer(s)
                $parquetWriter->finish(); // finish the parquet writer last
            }
        });

        return $filePath;
    }

It throws an error: In ParquetWriter.php line 137:

Call to undefined method jocoon\parquet\data\Schema::GetNotEqualsMessage()

cedricfuturistech commented 2 years ago

Looks like I found an issue into my code.

Resolved it by changing it to be like this:

public function export($table, $startDate, $endDate)
    {
        $append = false;

        $fileName = $table . '_' . str_replace(':', '-', $startDate) . '.parquet';
        $filePath = storage_path('/' . $fileName);
        $fileStream = fopen($filePath, 'w+');

        DB::table($table)->whereBetween('created_at', [$startDate, $endDate])->orderBy('id')->chunk(5000, function ($result) use ($table, &$parquetWriter, &$groupWriter, &$append, $fileStream) {
            if ($result) {
                $fields = [];
                $columns = [];

                $keyOnly = Arr::first($result);
                foreach ($keyOnly as $key => $value) {
                    $colType = $this->getColumnType($table, $key);
                    $dataColumn = new DataColumn(
                        DataField::createFromType($key, $colType),
                        $result->pluck($key)->toArray()
                    );

                    $columns[] = $dataColumn;
                    $fields[] = $dataColumn->getField();
                }
                $schema = new Schema($fields);
                $parquetWriter = new ParquetWriter($schema, $fileStream, null, $append);

                $append = true;

                // create a new row group in the file
                $groupWriter = $parquetWriter->CreateRowGroup();
                foreach ($columns as $col) {
                    $groupWriter->WriteColumn($col);
                }

                $groupWriter->finish();   // finish inner writer(s)
                $parquetWriter->finish(); // finish the parquet writer last
            }
        });

        return $filePath;
    }
Katalystical commented 2 years ago

Hey there,

for the issue Call to undefined method jocoon\parquet\data\Schema::GetNotEqualsMessage(): this seems to be a porting mistake of mine, sorry about this. It also contains a bad string interpolation.

For the writing process in general: As long as you're writing in the same process (e.g. you're still iterating over your data) you can simply write more Row Groups instead of finishing your Parquet Writer after a batch of 5000. If you start again writing records a while later (new PHP process) and don't have your original writer instance anymore, you may use the appending mechanism.

Feel free to re-open the issue, if there is more to this. I'll incorporate a fix for your initial error message into the dev branch asap. Thanks!