apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
13.85k stars 3.37k forks source link

[Go] How to approach to implement Parquet-Go file format #35688

Open hkpeaks opened 1 year ago

hkpeaks commented 1 year ago

Describe the usage question you have. Please include as many useful details as possible.

Based on current testing my Golang dataframe project which is achieving outstanding performance when data source is csv file. Next step I want to implement parquet file format when it becomes very popular. After a basic reserach in this topic, I am still not clear to approach how to implement Parquet using your golang library as there are little code sample I can find in the community.

I have built a in-memory and streaming engine which is using simple data structure bytearray similar to csv, I have developed a set of algorithms to support byte-to-byte conversion directly for ETL functons such as Read/Write file, Distinct, GroupBy, JoinTable and Select column/row. Also solve JoinTable for billion of row using 32GB memory.

My requirement to support Parquet format is simple. I need to know how to read bytearray of particular columns of Parquet format. After I can read, of course need to know how to write to disk.

Today I have published the pre-release 64-bit runtime for Windows/Linux of my project @ https://github.com/hkpeaks/peaks-consolidation/releases

Component(s)

Go

mapleFU commented 1 year ago

Parquet follows the format described in https://github.com/apache/parquet-format

If you're using arrow, you can use pqarrow under https://github.com/apache/arrow/tree/main/go/parquet

hkpeaks commented 1 year ago

I have tested arrow before. My project use bytearray as it consumes less CPU and memory resources. In fact, I have a long video that shows how my app processes large CSV files faster than Polars which implements arrow to process parquet files. https://youtu.be/1Kn665ADSck So I want to know what processing speed I can get if my app support read data from Parquet file particular columns directly.

mapleFU commented 1 year ago

Then just follow the parquet standard :-) https://github.com/apache/parquet-format

zeroshade commented 1 year ago

@hkpeaks You can use the parquet package (https://pkg.go.dev/github.com/apache/arrow/go/v12/parquet) to get direct access and reading of columns and data from the parquet file without going through the arrow conversions if you don't need to do so. The primary interface for interacting with a file directly would be the file package which provides readers for row groups and individual columns.

I did a lot of work trying to get it as performant as possible when i originally wrote it, but as with many things in software I'm sure there are still areas that could be improved and I'll happily review any contributions you make or ideas you have for improving the performance further! Feel free to reach out if you need more help understanding how to use the package.

hkpeaks commented 1 year ago

@zeroshade

  1. go get -u github.com/apache/arrow/go/v12/parquet
  2. go install github.com/apache/arrow/go/v12/parquet/cmd/parquet_reader@latest
  3. go install github.com/apache/arrow/go/v12/parquet/cmd/parquet_schema@latest

I have downloaded for each, why the file in 2 and 3 are identical, 1 and 2 also have some duplications

Where I can find hello world code example to use this libraries?

hkpeaks commented 1 year ago

Firstly, I need to know where to find the start and end address for each row block. Secondly, need to know where to find the start and end address of each column and column page contained within a row block. For a particular query which requires 5 columns, I can use Goruntine and mmap to read given set of row blocks for selected columns in parallel for each batch. e.g.

Row block batch 1: use 20+ threads to read 5 columns of 1~20 blocks

Row block batch 2: use 20+ threads to read 5 columns of 21~40 blocks

Row block batch 3: use 20+ threads to read 5 columns of 41~60 blocks

To determine how much row block I shall read for each stream, I need to know how many columns and row blocks of a file.

I have implemented streaming for csv in a similar way and want to extend it to cover parquet and json file format. The parquet file format offers two advantages: 1) it allows reading select columns directly and 2) it offers compression.

If the Apache library offer API to answer the about question, it can offer great help of my project to support a hyper performance of parquet format.

Data Structure my current project: https://github.com/hkpeaks/peaks-consolidation/blob/main/PeaksFramework/data_structure.go

Runtime for Windows/Linux you can find https://github.com/hkpeaks/peaks-consolidation/releases You can have a test to see whehter its read-query-write csv is faster than other software read-query-write parquet. It supports JoinTable for a billion rows using only 32GB RAM. Input 67.2GB csv output 90 GB csv.

mapleFU commented 1 year ago

Oopts, seems parquet-go in arrow doesn't provide some examples

I guess: https://github.com/apache/arrow/blob/65520b361941e9abad386614999dbc250295959e/go/parquet/file/file_reader.go#L79 can be a good start point. You can pay attention to the ReaderProperties

Once you open the file, you can get metadata and know the file schema and rowgroups from metadata interface.

Then, you can open rowgroups via https://github.com/apache/arrow/blob/65520b361941e9abad386614999dbc250295959e/go/parquet/file/file_reader.go#L305 . And open column chunk in row group

hkpeaks commented 1 year ago

I have implemented mmap for csv that can work in Windows. For try the parquet, I see this error "error opening parquet file: mmap not implemented on windows"

C:\Users\85292\go\pkg\mod\github.com\apache\arrow\go\v13@v13.0.0-20230520140400-65520b361941\parquet\cmd\parquet_schema>parquet_schema 0.1MillionRows.parquet required group field_id=-1 root { optional int64 field_id=-1 Date; optional byte_array field_id=-1 Ledger (String); optional int64 field_id=-1 Account; optional byte_array field_id=-1 PartNo (String); optional byte_array field_id=-1 Project (String); optional byte_array field_id=-1 Contact (String); optional byte_array field_id=-1 Unit Code (String); optional int64 field_id=-1 Quantity; optional double field_id=-1 Unit Price; optional byte_array field_id=-1 D/C (String); optional byte_array field_id=-1 Currency (String); optional double field_id=-1 Exchange Rate; optional double field_id=-1 Original Amount; optional double field_id=-1 Base Amount; }

C:\Users\85292\go\pkg\mod\github.com\apache\arrow\go\v13@v13.0.0-20230520140400-65520b361941\parquet\cmd\parquet_schema>parquet_reader 0.1Million_reader error opening parquet file: mmap not implemented on windows

C:\Users\85292\go\pkg\mod\github.com\apache\arrow\go\v13@v13.0.0-20230520140400-65520b361941\parquet\cmd\parquet_schema>

mapleFU commented 1 year ago

https://github.com/apache/arrow/blob/main/go/parquet/file/file_reader_mmap_windows.go

I guess go file reader can only use mmap in other os. You can disable mmap on windows( by set memoryMap = false)

hkpeaks commented 1 year ago

My app need to support billion rows with little memory, so using mmap is essential. My csv extractor success in using mmap for billion rows JoinTable.

mapleFU commented 1 year ago

I guess you can just comparing them under a linux machine, or wsl. Seems that using mmap needs extra development in this library. You can try on wsl2 or linux machines, develop mmap yourself if you want.

By the way, I think JoinTable should use spill instead of mmap.

hkpeaks commented 1 year ago

I try linux before, but fail to complie it mmap is extensive use througout my system See my source code how to use mmap to read csv file partition to support streaming https://github.com/hkpeaks/peaks-consolidation/blob/main/PeaksFramework/read_file.go

zeroshade commented 1 year ago

@hkpeaks To answer your questions:

  1. go get -u github.com/apache/arrow/go/v12/parquet

This will just get the whole package and add it to your module. The others are commands in that package so of course there'll be duplications the two commands both rely on this package.

  1. go install github.com/apache/arrow/go/v12/parquet/cmd/parquet_reader@latest

This is just a simple mainprog which will dump metadata, info and the data from a parquet file.

  1. go install github.com/apache/arrow/go/v12/parquet/cmd/parquet_schema@latest

This mainprog is just a further stripped down command which only dumps a schema from a parquet file, it will not read data.

Firstly, I need to know where to find the start and end address for each row block. Secondly, need to know where to find the start and end address of each column and column page contained within a row block. For a particular query which requires 5 columns, I can use Goruntine and mmap to read given set of row blocks for selected columns in parallel for each batch. e.g.

Row block batch 1: use 20+ threads to read 5 columns of 1~20 blocks

Row block batch 2: use 20+ threads to read 5 columns of 21~40 blocks

Row block batch 3: use 20+ threads to read 5 columns of 41~60 blocks

To determine how much row block I shall read for each stream, I need to know how many columns and row blocks of a file.

Once you open the file, (either by OpenParquetFile or NewParquetReader), you can retrieve the information you're looking for:

I hope that the above answers your questions concerning the API (along with pointing you at the documentation for any other methods you might need). But let me know if there's any other methods/functions you need that you can't find (or don't exist yet).

See my source code how to use mmap to read csv file partition to support streaming https://github.com/hkpeaks/peaks-consolidation/blob/main/PeaksFramework/read_file.go

I looked at the file there I don't see any usage of mmap at all there. You're just using os.Open and using the ReadAt method on the file. (If I'm wrong, can you please provide the line number / direct link to the mmap?) If you follow @mapleFU's suggestion of passing memoryMap = false to OpenParquetFile then that's what will be used, it will just use os.Open and ReadAt to read from the file.

zeroshade commented 1 year ago

@hkpeaks All of this being said, it would be amazing if you could make some contributions to the CSV reader in the Arrow package as there's a lot of work there that needs to be done to optimize it. Right now it's just a naive reader that doesn't do any parallelism.

hkpeaks commented 1 year ago

Based on numerous experiments, Golang readat has achieved the results of mmap. As a result, I am much more concerned with performance than with the name "mmap." I created one of the fastest dataframes specialized in CSV file. So I recently investigated whether my project should cover Parquet format. Other formats, such as JSON and HTML, are simple for me; no research is required.

You can see some of my code because I'm working on reclassifying my Peaks project as open source and proprietary. I previously tested Apache Arrow to see how it compared to slice, but I saw no improvement in performance. However, I eventually remove slice's in-memory table structure from my project. When supporting ETL functions such as "Distinct", "GroupBy", "Filter", and "JoinTable", the use of bytearray as an in-memory table has been shown to yield significant performance gains. Bytearray reduces memory and CPU usage significantly. It avoids unnecessary dataset serialization and de-serialization.

I've been retired for three years as a result of the COVID-2019 layoff exercise. I've recently become very active in programming and marketing because I intend to return to work when I'm now 55 years old. One of my goals is to contribute to the open source community. I'll think more about the benefits of using Arrow for CSV; I believe the main benefit is data exchange. However, gRPC is also an excellent way to support very high data exchange performance over the internet.

My strength is innovative system design; when I design something in my head, coding becomes simple. My weakness, however, is reading code. I'll try to replicate what I've learned from https://pkg.go.dev/github.com/apache/arrow/go/v12/arrow/csv#WithChunk. I understand that users want a fast Apache Spark running on their desktop computer. Cloud computing is always expensive and risky when paid with a credit card. As a result, I am unable to use the cloud computing services provided by Databrick, Azure, Google, and AWS. Peaks Consolidation is designed to address this issue by allowing users to process billions of rows on a desktop computer. I will request cloud companies to support prepayment by Paypal. Simplex model is, one prepaid balance utilized, all VMs be removed automatically.

Thank you for your detailed reply to my question. I will consider whether it is possible to implement the Parquet format, which can outperform my current CSV format. I noticed that writing a Parquet file requires much more time than writing a CSV file, as I have seen in other software. And I hope Apache Foundation can consider bytearray is one of best data exchange format moving from one software to alternative software.

hkpeaks commented 1 year ago

..\go\pkg\mod\github.com\apache-reader\arrow\go\v12@v12.0.0\parquet\cmd\parquet_reader\main.go after using --no-memory-map, now can run in windows run --no-memory-map 0.1MillionRows.parquet

if I do config config.NoMemoryMap = true after below struct

run 0.1MillionRows.parquet, then (without using --no-memory-map in command line) error: --output "" cannot be created, open : The system cannot find the file specified.

var config struct { PrintKeyValueMetadata bool OnlyMetadata bool NoMetadata bool Output string NoMemoryMap bool JSON bool docopt:"--json" CSV bool docopt:"--csv" ParseInt96AsTimestamp bool docopt:"--int96-timestamp" Columns string File string }

config.NoMemoryMap = true

mapleFU commented 1 year ago

Seems The system cannot find the file specified. indicates that you didn't indicate the output file

hkpeaks commented 1 year ago

D:\Go\Parquet\OpenParquetFile>run v0.7.1.parquet config.NoMemoryMap true error: --output "" cannot be created, open : The system cannot find the file specified.

============================================== change to config.NoMemoryMap = false D:\Go\Parquet\OpenParquetFile>go build

D:\Go\Parquet\OpenParquetFile>run --no-memory-map v0.7.1.parquet config.NoMemoryMap false File name: v0.7.1.parquet Version: v1.0 Created By: parquet-cpp version 1.3.2-SNAPSHOT Num Rows: 10 Number of RowGroups: 1 Number of Real Columns: 11

hkpeaks commented 1 year ago

I plan to open source the steaming model of csv and reserve only databending source code as proprietary. I have less than half year in Go development. Prior to using Go, I use C#. C# allows to build dll for class library. Today I try to do the same thing with Go, fail to do so.

I have tested Rust can support to build non main() library. I will consider using Rust.

hkpeaks commented 1 year ago

Now opened source one more very important file https://github.com/hkpeaks/peaks-consolidation/blob/main/PeaksFramework/peaks/controller.go This is the auto-switch parallel streaming/in-memory model throughout E-T-L processes. So now Peaks remained only one file "databending.go" keep proprietary.

zeroshade commented 1 year ago

@hkpeaks I'm not sure what you mean by failing to build a class library with go. You can easily build a shared library with extern "C" functions by using cgo and the buildmode options (see docs).

Based on numerous experiments, Golang readat has achieved the results of mmap. As a result, I am much more concerned with performance than with the name "mmap."

That's fine, performance is what's important. Just please don't call it "mmap" if it's not actually using "mmap" as that confuses what you're trying to do. There's plenty of situations where mmap isn't necessary or might even slow down performance rather than improve it.

Bytearray reduces memory and CPU usage significantly. It avoids unnecessary dataset serialization and de-serialization.

This is interesting to me and I'd like to see how that is the case. Wouldn't you need to serialize/de-serialize from bytes into something you can actually process like the various integral types, float data, etc?

I'll think more about the benefits of using Arrow for CSV; I believe the main benefit is data exchange.

It's not so much the benefits of using Arrow for CSV, but rather getting CSV data into Arrow format so that other processes/exchange/analytics can be run on it. The current CSV parsing/reading in the Go Arrow lib is very naive and doesn't do any parallelization, so is ripe to be improved. I haven't had the time to do so myself but it would be fantastic to see contributions there from the community.

However, gRPC is also an excellent way to support very high data exchange performance over the internet.

I wholeheartedly agree there, this is why Arrow Flight RPC uses gRPC.

I will consider whether it is possible to implement the Parquet format, which can outperform my current CSV format.

What do you mean by implementing the Parquet format in this case? The Go Parquet library here already has implemented the Parquet format spec. Are you intending to re-implement the parquet spec? Or just use the library provided here to perform the reads (and if you find inefficiencies, then contribute improvements back)?

And I hope Apache Foundation can consider bytearray is one of best data exchange format moving from one software to alternative software.

By "bytearray" do you mean just a literal array of bytes? Or is there an actual data format called "bytearray"? I'm not quite sure what you're referring to here.

hkpeaks commented 1 year ago
  1. I will try this "C" functions whether fit for my purpose. I want to keep one file databending.go as a proprietary software, so I need to build it as a binary file similar to dll of C#. Without this file, the open source project I have published is no way for users to build their own runtime for Windows/Linux. If full project Go open source, it is no way to support this project financially.

  2. If the databending process do not deal with math/statistics, I can use byte-to-byte conversion directly. If particular column which demand for filter by float type, the bytearray of current cell will be converted to float on demand basis for compare 2 float64 number only, only affect current row of bytearray select or not select.

  3. Not knowing whether your Parquent has built-in parallel streaming. If no, I will do that for my project. I will read different row blocks of selected columns in parallel. I will calculate how much blocks can fit for memory, if over the memory, will run by batch similar to current CSV streaming.

  4. Bytearray is exactly same as bytestream read from the CSV file using ReadAt. And it is grouping into different partitions. For a 1 billion rows cases (67GB) CSV, it is divided into 144 batches of streams by default. Each batch is further divided by 10 partitions run in parallel. If you have time to try the pre-release, you will feel the real performance. This means 1 billion rows CSV is divided by 1,440 partitions.

hkpeaks commented 1 year ago
rdr, err := file.OpenParquetFile(config.File, false)
if err != nil {
    fmt.Fprintln(os.Stderr, "error opening parquet file: ", err)
    os.Exit(1)
}   

rg := rdr.RowGroup(0)

col, err := rg.Column(1)

println("Descriptor: ", col.Descriptor())
println("Err: ", col.Err())
println("HasNext: ", col.HasNext())
println("Type: ", col.Type())
println("Descriptor-ColumnOrder: ", col.Descriptor().ColumnOrder())
println("Descriptor-ColumnPath: ", col.Descriptor().ColumnPath())
println("Descriptor-ConvertedTyp:", col.Descriptor().ConvertedType())

Results:

D:\Go\Parquet\OpenParquetFile>run v0.7.1.parquet Descriptor: 0xc00005daa0 Err: (0x0,0x0) HasNext: true Type: 6 Descriptor-ColumnOrder: 0xc00000a028 Descriptor-ColumnPath: [1/1]0xc00005dac0 Descriptor-ConvertedTyp: 0


How can I read all cell value from RowGroup(0) & Column(1) ?

mapleFU commented 1 year ago

https://github.com/apache/arrow/blob/main/go/parquet/file/column_reader_test.go#L227

You can try to follow the style here.

hkpeaks commented 1 year ago

Today I do benchmark for DuckDB(C++) and Polars(Rust) using Parquet and CSV. The performance of DuckDB to Filter and GroupBy a parquet file which contain 300 Million Rows, is only 3.9s. Video: https://youtu.be/gnIh6r7Gwh4 So I consider to use DuckDB directly as my Peaks parquet calc engine as I am unlikely can faster than 3.9s.

hkpeaks commented 1 year ago

@ zeroshade, now I know who are you, Matt Topol, the Arrow In Memory Analytics author.

zeroshade commented 1 year ago

lol yes. That is who I am.

hkpeaks commented 1 year ago

Today I saw this https://www.nature.com/articles/s41586-023-06004-9, and reddit community react to this news greatly. This moviate me to continue the CSV development for combine query, header-body dataset and sorting billions of rows.

hkpeaks commented 11 months ago

I plan use Apache Arrow compatible python libraries to support building a new ETL Framework (it look like an alternative version of SQL statement which specialized in dealing with file, tcp stream, in-memory table and bigdata). So to implement file formats other than CSV (i.e. Parquet, Delta Lake) will become very simple for me as I can write Python code to call different arrow compatible python libraries. Golang is unlikely to offer support my dev of Python bindings offically. Newer version of Go is generally not support to build dll for Windows app written in Go.

Example:

UserDefineFunctionName: SourceFile.ext/ Table / SQL.Table ~ ResultFile.ext /Table / SQL.Table | Command: Setting

ExpandFile: Fact.csv ~ 1BillionRows.csv | ExpandFactor: 123

JoinScenario1: 1BillionRows.csv ~ Test1Results.csv | JoinTable: Quantity, Unit_Price => InnerJoin(Master)Multiply(Amount) | OrderBy: Date(D) => CreateFolderLake(Shop) | Select: Date,Shop,Style,Product,Quantity,Amount

BuildKeyValueTable: Master.csv ~ KeyValueTable | BuildKeyValue: Product, Style

JoinScenario2: 1BillionRows.csv ~ Test2AResults.csv | JoinKeyValue: Product, Style => AllMatch(KeyValueTable) | AddColumn: Quantity, Unit_Price => Multiply(Amount) | Filter: Amount(Float > 50000) | GroupBy: Product, Style => Count() Sum(Quantity) Sum(Amount) | OrderBy: Shop(A)Product(A)Date(D)

SplitFile: Test1Results.csv ~ FolderLake | CreateFolderLake: Shop

FilterFolder: Outbox/FolderLake/S15/*.csv ~ Result-FilterFolderLake.csv | Filter: Product(222..888) Style(=F)

ReadSample2View: Outbox/Result-FilterFolderLake.csv ~ SampleTable | ReadSample: StartPosition%(0) ByteLength(100000) | View

https://github.com/hkpeaks/pypeaks