Open hkpeaks opened 1 year ago
Parquet follows the format described in https://github.com/apache/parquet-format
If you're using arrow, you can use pqarrow under https://github.com/apache/arrow/tree/main/go/parquet
I have tested arrow before. My project use bytearray as it consumes less CPU and memory resources. In fact, I have a long video that shows how my app processes large CSV files faster than Polars which implements arrow to process parquet files. https://youtu.be/1Kn665ADSck So I want to know what processing speed I can get if my app support read data from Parquet file particular columns directly.
Then just follow the parquet standard :-) https://github.com/apache/parquet-format
@hkpeaks You can use the parquet package (https://pkg.go.dev/github.com/apache/arrow/go/v12/parquet) to get direct access and reading of columns and data from the parquet file without going through the arrow conversions if you don't need to do so. The primary interface for interacting with a file directly would be the file
package which provides readers for row groups and individual columns.
I did a lot of work trying to get it as performant as possible when i originally wrote it, but as with many things in software I'm sure there are still areas that could be improved and I'll happily review any contributions you make or ideas you have for improving the performance further! Feel free to reach out if you need more help understanding how to use the package.
@zeroshade
I have downloaded for each, why the file in 2 and 3 are identical, 1 and 2 also have some duplications
Where I can find hello world code example to use this libraries?
Firstly, I need to know where to find the start and end address for each row block. Secondly, need to know where to find the start and end address of each column and column page contained within a row block. For a particular query which requires 5 columns, I can use Goruntine and mmap to read given set of row blocks for selected columns in parallel for each batch. e.g.
Row block batch 1: use 20+ threads to read 5 columns of 1~20 blocks
Row block batch 2: use 20+ threads to read 5 columns of 21~40 blocks
Row block batch 3: use 20+ threads to read 5 columns of 41~60 blocks
To determine how much row block I shall read for each stream, I need to know how many columns and row blocks of a file.
I have implemented streaming for csv in a similar way and want to extend it to cover parquet and json file format. The parquet file format offers two advantages: 1) it allows reading select columns directly and 2) it offers compression.
If the Apache library offer API to answer the about question, it can offer great help of my project to support a hyper performance of parquet format.
Data Structure my current project: https://github.com/hkpeaks/peaks-consolidation/blob/main/PeaksFramework/data_structure.go
Runtime for Windows/Linux you can find https://github.com/hkpeaks/peaks-consolidation/releases You can have a test to see whehter its read-query-write csv is faster than other software read-query-write parquet. It supports JoinTable for a billion rows using only 32GB RAM. Input 67.2GB csv output 90 GB csv.
Oopts, seems parquet-go in arrow doesn't provide some examples
I guess: https://github.com/apache/arrow/blob/65520b361941e9abad386614999dbc250295959e/go/parquet/file/file_reader.go#L79 can be a good start point. You can pay attention to the ReaderProperties
Once you open the file, you can get metadata and know the file schema and rowgroups from metadata
interface.
Then, you can open rowgroups via https://github.com/apache/arrow/blob/65520b361941e9abad386614999dbc250295959e/go/parquet/file/file_reader.go#L305 . And open column chunk in row group
I have implemented mmap for csv that can work in Windows. For try the parquet, I see this error "error opening parquet file: mmap not implemented on windows"
C:\Users\85292\go\pkg\mod\github.com\apache\arrow\go\v13@v13.0.0-20230520140400-65520b361941\parquet\cmd\parquet_schema>parquet_schema 0.1MillionRows.parquet required group field_id=-1 root { optional int64 field_id=-1 Date; optional byte_array field_id=-1 Ledger (String); optional int64 field_id=-1 Account; optional byte_array field_id=-1 PartNo (String); optional byte_array field_id=-1 Project (String); optional byte_array field_id=-1 Contact (String); optional byte_array field_id=-1 Unit Code (String); optional int64 field_id=-1 Quantity; optional double field_id=-1 Unit Price; optional byte_array field_id=-1 D/C (String); optional byte_array field_id=-1 Currency (String); optional double field_id=-1 Exchange Rate; optional double field_id=-1 Original Amount; optional double field_id=-1 Base Amount; }
C:\Users\85292\go\pkg\mod\github.com\apache\arrow\go\v13@v13.0.0-20230520140400-65520b361941\parquet\cmd\parquet_schema>parquet_reader 0.1Million_reader error opening parquet file: mmap not implemented on windows
C:\Users\85292\go\pkg\mod\github.com\apache\arrow\go\v13@v13.0.0-20230520140400-65520b361941\parquet\cmd\parquet_schema>
https://github.com/apache/arrow/blob/main/go/parquet/file/file_reader_mmap_windows.go
I guess go file reader can only use mmap in other os. You can disable mmap on windows( by set memoryMap = false
)
My app need to support billion rows with little memory, so using mmap is essential. My csv extractor success in using mmap for billion rows JoinTable.
I guess you can just comparing them under a linux machine, or wsl. Seems that using mmap needs extra development in this library. You can try on wsl2 or linux machines, develop mmap yourself if you want.
By the way, I think JoinTable should use spill instead of mmap.
I try linux before, but fail to complie it mmap is extensive use througout my system See my source code how to use mmap to read csv file partition to support streaming https://github.com/hkpeaks/peaks-consolidation/blob/main/PeaksFramework/read_file.go
@hkpeaks To answer your questions:
- go get -u github.com/apache/arrow/go/v12/parquet
This will just get the whole package and add it to your module. The others are commands in that package so of course there'll be duplications the two commands both rely on this package.
- go install github.com/apache/arrow/go/v12/parquet/cmd/parquet_reader@latest
This is just a simple mainprog which will dump metadata, info and the data from a parquet file.
- go install github.com/apache/arrow/go/v12/parquet/cmd/parquet_schema@latest
This mainprog is just a further stripped down command which only dumps a schema from a parquet file, it will not read data.
Firstly, I need to know where to find the start and end address for each row block. Secondly, need to know where to find the start and end address of each column and column page contained within a row block. For a particular query which requires 5 columns, I can use Goruntine and mmap to read given set of row blocks for selected columns in parallel for each batch. e.g.
Row block batch 1: use 20+ threads to read 5 columns of 1~20 blocks
Row block batch 2: use 20+ threads to read 5 columns of 21~40 blocks
Row block batch 3: use 20+ threads to read 5 columns of 41~60 blocks
To determine how much row block I shall read for each stream, I need to know how many columns and row blocks of a file.
Once you open the file, (either by OpenParquetFile
or NewParquetReader
), you can retrieve the information you're looking for:
Reader.NumRowGroups()
gives you the total number of row groups in the file.Reader.Metadata().RowGroup(i int)
will get you the Row Group metadata for a specific row group. (RowGroupMetaData
)
Reader.RowGroup
to get a sub reader specifically for that row group. This reader can report the total byte size for the row group and provide the metadata for that row group directly. It can also get you page readers for any given column via GetColumnPageReader
Column
method to get column readers for reading columns in parallel. In fact, we already do this for the pqarrow package when reading a whole row group into a single record, or a whole file into a table: see here: https://github.com/apache/arrow/blob/go/v12.0.0/go/parquet/pqarrow/file_reader.go#L300RowGroupMetaData.FileOffset()
is the location in the file where the data for this row group beginsColumnChunk
method to get the metadata for a specific column, which contains the offsets for that particular column along with the rest of the column metadata.I hope that the above answers your questions concerning the API (along with pointing you at the documentation for any other methods you might need). But let me know if there's any other methods/functions you need that you can't find (or don't exist yet).
See my source code how to use mmap to read csv file partition to support streaming https://github.com/hkpeaks/peaks-consolidation/blob/main/PeaksFramework/read_file.go
I looked at the file there I don't see any usage of mmap
at all there. You're just using os.Open
and using the ReadAt
method on the file. (If I'm wrong, can you please provide the line number / direct link to the mmap
?) If you follow @mapleFU's suggestion of passing memoryMap = false
to OpenParquetFile
then that's what will be used, it will just use os.Open
and ReadAt
to read from the file.
@hkpeaks All of this being said, it would be amazing if you could make some contributions to the CSV reader in the Arrow package as there's a lot of work there that needs to be done to optimize it. Right now it's just a naive reader that doesn't do any parallelism.
Based on numerous experiments, Golang readat has achieved the results of mmap. As a result, I am much more concerned with performance than with the name "mmap." I created one of the fastest dataframes specialized in CSV file. So I recently investigated whether my project should cover Parquet format. Other formats, such as JSON and HTML, are simple for me; no research is required.
You can see some of my code because I'm working on reclassifying my Peaks project as open source and proprietary. I previously tested Apache Arrow to see how it compared to slice, but I saw no improvement in performance. However, I eventually remove slice's in-memory table structure from my project. When supporting ETL functions such as "Distinct", "GroupBy", "Filter", and "JoinTable", the use of bytearray as an in-memory table has been shown to yield significant performance gains. Bytearray reduces memory and CPU usage significantly. It avoids unnecessary dataset serialization and de-serialization.
I've been retired for three years as a result of the COVID-2019 layoff exercise. I've recently become very active in programming and marketing because I intend to return to work when I'm now 55 years old. One of my goals is to contribute to the open source community. I'll think more about the benefits of using Arrow for CSV; I believe the main benefit is data exchange. However, gRPC is also an excellent way to support very high data exchange performance over the internet.
My strength is innovative system design; when I design something in my head, coding becomes simple. My weakness, however, is reading code. I'll try to replicate what I've learned from https://pkg.go.dev/github.com/apache/arrow/go/v12/arrow/csv#WithChunk. I understand that users want a fast Apache Spark running on their desktop computer. Cloud computing is always expensive and risky when paid with a credit card. As a result, I am unable to use the cloud computing services provided by Databrick, Azure, Google, and AWS. Peaks Consolidation is designed to address this issue by allowing users to process billions of rows on a desktop computer. I will request cloud companies to support prepayment by Paypal. Simplex model is, one prepaid balance utilized, all VMs be removed automatically.
Thank you for your detailed reply to my question. I will consider whether it is possible to implement the Parquet format, which can outperform my current CSV format. I noticed that writing a Parquet file requires much more time than writing a CSV file, as I have seen in other software. And I hope Apache Foundation can consider bytearray is one of best data exchange format moving from one software to alternative software.
..\go\pkg\mod\github.com\apache-reader\arrow\go\v12@v12.0.0\parquet\cmd\parquet_reader\main.go after using --no-memory-map, now can run in windows run --no-memory-map 0.1MillionRows.parquet
if I do config config.NoMemoryMap = true after below struct
run 0.1MillionRows.parquet, then (without using --no-memory-map in command line) error: --output "" cannot be created, open : The system cannot find the file specified.
var config struct {
PrintKeyValueMetadata bool
OnlyMetadata bool
NoMetadata bool
Output string
NoMemoryMap bool
JSON bool docopt:"--json"
CSV bool docopt:"--csv"
ParseInt96AsTimestamp bool docopt:"--int96-timestamp"
Columns string
File string
}
config.NoMemoryMap = true
Seems The system cannot find the file specified.
indicates that you didn't indicate the output file
D:\Go\Parquet\OpenParquetFile>run v0.7.1.parquet config.NoMemoryMap true error: --output "" cannot be created, open : The system cannot find the file specified.
============================================== change to config.NoMemoryMap = false D:\Go\Parquet\OpenParquetFile>go build
D:\Go\Parquet\OpenParquetFile>run --no-memory-map v0.7.1.parquet config.NoMemoryMap false File name: v0.7.1.parquet Version: v1.0 Created By: parquet-cpp version 1.3.2-SNAPSHOT Num Rows: 10 Number of RowGroups: 1 Number of Real Columns: 11
I plan to open source the steaming model of csv and reserve only databending source code as proprietary. I have less than half year in Go development. Prior to using Go, I use C#. C# allows to build dll for class library. Today I try to do the same thing with Go, fail to do so.
I have tested Rust can support to build non main() library. I will consider using Rust.
Now opened source one more very important file https://github.com/hkpeaks/peaks-consolidation/blob/main/PeaksFramework/peaks/controller.go This is the auto-switch parallel streaming/in-memory model throughout E-T-L processes. So now Peaks remained only one file "databending.go" keep proprietary.
@hkpeaks I'm not sure what you mean by failing to build a class library with go. You can easily build a shared library with extern "C" functions by using cgo and the buildmode
options (see docs).
Based on numerous experiments, Golang readat has achieved the results of mmap. As a result, I am much more concerned with performance than with the name "mmap."
That's fine, performance is what's important. Just please don't call it "mmap" if it's not actually using "mmap" as that confuses what you're trying to do. There's plenty of situations where mmap isn't necessary or might even slow down performance rather than improve it.
Bytearray reduces memory and CPU usage significantly. It avoids unnecessary dataset serialization and de-serialization.
This is interesting to me and I'd like to see how that is the case. Wouldn't you need to serialize/de-serialize from bytes into something you can actually process like the various integral types, float data, etc?
I'll think more about the benefits of using Arrow for CSV; I believe the main benefit is data exchange.
It's not so much the benefits of using Arrow for CSV, but rather getting CSV data into Arrow format so that other processes/exchange/analytics can be run on it. The current CSV parsing/reading in the Go Arrow lib is very naive and doesn't do any parallelization, so is ripe to be improved. I haven't had the time to do so myself but it would be fantastic to see contributions there from the community.
However, gRPC is also an excellent way to support very high data exchange performance over the internet.
I wholeheartedly agree there, this is why Arrow Flight RPC uses gRPC.
I will consider whether it is possible to implement the Parquet format, which can outperform my current CSV format.
What do you mean by implementing the Parquet format in this case? The Go Parquet library here already has implemented the Parquet format spec. Are you intending to re-implement the parquet spec? Or just use the library provided here to perform the reads (and if you find inefficiencies, then contribute improvements back)?
And I hope Apache Foundation can consider bytearray is one of best data exchange format moving from one software to alternative software.
By "bytearray" do you mean just a literal array of bytes? Or is there an actual data format called "bytearray"? I'm not quite sure what you're referring to here.
I will try this "C" functions whether fit for my purpose. I want to keep one file databending.go as a proprietary software, so I need to build it as a binary file similar to dll of C#. Without this file, the open source project I have published is no way for users to build their own runtime for Windows/Linux. If full project Go open source, it is no way to support this project financially.
If the databending process do not deal with math/statistics, I can use byte-to-byte conversion directly. If particular column which demand for filter by float type, the bytearray of current cell will be converted to float on demand basis for compare 2 float64 number only, only affect current row of bytearray select or not select.
Not knowing whether your Parquent has built-in parallel streaming. If no, I will do that for my project. I will read different row blocks of selected columns in parallel. I will calculate how much blocks can fit for memory, if over the memory, will run by batch similar to current CSV streaming.
Bytearray is exactly same as bytestream read from the CSV file using ReadAt. And it is grouping into different partitions. For a 1 billion rows cases (67GB) CSV, it is divided into 144 batches of streams by default. Each batch is further divided by 10 partitions run in parallel. If you have time to try the pre-release, you will feel the real performance. This means 1 billion rows CSV is divided by 1,440 partitions.
rdr, err := file.OpenParquetFile(config.File, false)
if err != nil {
fmt.Fprintln(os.Stderr, "error opening parquet file: ", err)
os.Exit(1)
}
rg := rdr.RowGroup(0)
col, err := rg.Column(1)
println("Descriptor: ", col.Descriptor())
println("Err: ", col.Err())
println("HasNext: ", col.HasNext())
println("Type: ", col.Type())
println("Descriptor-ColumnOrder: ", col.Descriptor().ColumnOrder())
println("Descriptor-ColumnPath: ", col.Descriptor().ColumnPath())
println("Descriptor-ConvertedTyp:", col.Descriptor().ConvertedType())
Results:
D:\Go\Parquet\OpenParquetFile>run v0.7.1.parquet Descriptor: 0xc00005daa0 Err: (0x0,0x0) HasNext: true Type: 6 Descriptor-ColumnOrder: 0xc00000a028 Descriptor-ColumnPath: [1/1]0xc00005dac0 Descriptor-ConvertedTyp: 0
How can I read all cell value from RowGroup(0) & Column(1) ?
https://github.com/apache/arrow/blob/main/go/parquet/file/column_reader_test.go#L227
You can try to follow the style here.
Today I do benchmark for DuckDB(C++) and Polars(Rust) using Parquet and CSV. The performance of DuckDB to Filter and GroupBy a parquet file which contain 300 Million Rows, is only 3.9s. Video: https://youtu.be/gnIh6r7Gwh4 So I consider to use DuckDB directly as my Peaks parquet calc engine as I am unlikely can faster than 3.9s.
@ zeroshade, now I know who are you, Matt Topol, the Arrow In Memory Analytics author.
lol yes. That is who I am.
Today I saw this https://www.nature.com/articles/s41586-023-06004-9, and reddit community react to this news greatly. This moviate me to continue the CSV development for combine query, header-body dataset and sorting billions of rows.
I plan use Apache Arrow compatible python libraries to support building a new ETL Framework (it look like an alternative version of SQL statement which specialized in dealing with file, tcp stream, in-memory table and bigdata). So to implement file formats other than CSV (i.e. Parquet, Delta Lake) will become very simple for me as I can write Python code to call different arrow compatible python libraries. Golang is unlikely to offer support my dev of Python bindings offically. Newer version of Go is generally not support to build dll for Windows app written in Go.
Example:
UserDefineFunctionName: SourceFile.ext/ Table / SQL.Table ~ ResultFile.ext /Table / SQL.Table | Command: Setting
ExpandFile: Fact.csv ~ 1BillionRows.csv | ExpandFactor: 123
JoinScenario1: 1BillionRows.csv ~ Test1Results.csv | JoinTable: Quantity, Unit_Price => InnerJoin(Master)Multiply(Amount) | OrderBy: Date(D) => CreateFolderLake(Shop) | Select: Date,Shop,Style,Product,Quantity,Amount
BuildKeyValueTable: Master.csv ~ KeyValueTable | BuildKeyValue: Product, Style
JoinScenario2: 1BillionRows.csv ~ Test2AResults.csv | JoinKeyValue: Product, Style => AllMatch(KeyValueTable) | AddColumn: Quantity, Unit_Price => Multiply(Amount) | Filter: Amount(Float > 50000) | GroupBy: Product, Style => Count() Sum(Quantity) Sum(Amount) | OrderBy: Shop(A)Product(A)Date(D)
SplitFile: Test1Results.csv ~ FolderLake | CreateFolderLake: Shop
FilterFolder: Outbox/FolderLake/S15/*.csv ~ Result-FilterFolderLake.csv | Filter: Product(222..888) Style(=F)
ReadSample2View: Outbox/Result-FilterFolderLake.csv ~ SampleTable | ReadSample: StartPosition%(0) ByteLength(100000) | View
Describe the usage question you have. Please include as many useful details as possible.
Based on current testing my Golang dataframe project which is achieving outstanding performance when data source is csv file. Next step I want to implement parquet file format when it becomes very popular. After a basic reserach in this topic, I am still not clear to approach how to implement Parquet using your golang library as there are little code sample I can find in the community.
I have built a in-memory and streaming engine which is using simple data structure bytearray similar to csv, I have developed a set of algorithms to support byte-to-byte conversion directly for ETL functons such as Read/Write file, Distinct, GroupBy, JoinTable and Select column/row. Also solve JoinTable for billion of row using 32GB memory.
My requirement to support Parquet format is simple. I need to know how to read bytearray of particular columns of Parquet format. After I can read, of course need to know how to write to disk.
Today I have published the pre-release 64-bit runtime for Windows/Linux of my project @ https://github.com/hkpeaks/peaks-consolidation/releases
Component(s)
Go