apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.42k stars 3.51k forks source link

[Python] read_csv from python is slow on some work loads #26299

Open asfimport opened 4 years ago

asfimport commented 4 years ago

Hi!

I've noticed that pyarrow.csv.read_csv can be slow on real workloads, processing data around 0.5GiB/s. "Real workloads" means many string, float, and all-null columns, and large file size (5-10GiB), though the file size didn't matter to much.

Moreover, profiling a little a bit with py-spy, it seems that maybe 30-50% of the time is spent on shared pointer lock mechanisms (though I'm not sure if this is to be trusted). I've attached the dumps in svg format.

I've also attached a script and a Dockerfile to run a benchmark, which reproduces the speeds I see. Building the docker image and running it on a large Azure machine, I get speeds around 0.3-1.0 GiB/s, and it's mostly around 0.5GiB/s.

This is all also available here: https://github.com/drorspei/arrow-csv-benchmark

Environment: Machine: Azure, 48 vcpus, 384GiB ram OS: Ubuntu 18.04 Dockerfile and script: attached, or here: https://github.com/drorspei/arrow-csv-benchmark Reporter: Dror Speiser

Related issues:

Note: This issue was originally created as ARROW-10308. Please see the migration documentation for further details.

asfimport commented 4 years ago

Antoine Pitrou / @pitrou: Two things: 1) you are using a Python file object (a BytesIO object). This will unnecessarily reduce performance. Instead you should use an Arrow native file object (for example pyarrow.BufferReader). 2) depending on the CSV file size and structure, it can be beneficial to change the CSV read block size in pyarrow.csv.ReadOptions: https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html

asfimport commented 4 years ago

Dror Speiser: Thanks for the quick response!

1) Sorry, I should have made this more explicit: while the benchmark uses BytesIO, I was experiencing these speeds when calling pd.read_csv("/path/to/my.csv"). Does pyarrow use BufferReader in this case?

2) Thanks for the tip, I'll try this out and report back if the numbers change.

asfimport commented 4 years ago

Antoine Pitrou / @pitrou: 1) No, it uses native file objects in that case.

2) Thank you, don't hesitate to report the numbers!

asfimport commented 4 years ago

Dror Speiser: The bad news: the default block_size of 1MB, and the default use of native file objects, are not so good for my workloads. Moreover, I don't know what's going on with the speeds O_O

The good news: I now know how to consistently get around 1.8GiB/s speed for my workload.

Attached is a csv with all the numbers: 220 runs = (5 rounds) x (2 buffer types) x (11 block sizes) x (2 times everything) arrow-csv-benchmark-times.csv

And also a scatter plot.  arrow-csv-benchmark-plot.png

Note that the x-axis is log in base 2 of the block size.

Do you think there's a place for changing the defaults of block_size and buffer objects for local paths?

asfimport commented 4 years ago

Antoine Pitrou / @pitrou: The adequate block size is heavily dependent on various characteristics, so it's not really possible to provide a one-size-fits-all default value.

As for "buffer objects for local paths", I guess I don't really understand the question.

Also: when you say "1.8GiB/s speed", this is in single-thread or multi-thread mode? If the latter how many CPU cores are active?

asfimport commented 4 years ago

Dror Speiser: Also, given the suggested results in the profiling I did, there still is the possibility of winning 30-50% performance for the defaults, if it's really about lock synchronisation.

asfimport commented 4 years ago

Dror Speiser: I'm running in multi-thread, with 48 vcpus. htop shows them all lighting up when running the benchmark.

For buffer objects: for most cases it would be faster to read entire files and then use BufferReader, though there's a higher chance of maxing out on available ram.

asfimport commented 4 years ago

Antoine Pitrou / @pitrou: If you really have 400 columns in your file, you may want to try a much larger block size, e.g. 32 MB.

asfimport commented 4 years ago

Dror Speiser: Yup, the graph confirms that block size in the range 32-100 MB is a good choice for my files.

But it still only gets to 1.8 GiB/s, which is slower than my SSD (2+ GiB/s). Is this reasonable? Are you not expecting the processing to be at least as fast as reading the files?

asfimport commented 4 years ago

Antoine Pitrou / @pitrou: Processing a CSV file can be costly. On a 12-core 24-thread machine with a 64 MiB block size, I get around 1.5 GiB/s.

Profiling at the C++ level, it seems that the main bottlenecks are:

asfimport commented 4 years ago

Antoine Pitrou / @pitrou: Also, if you're interested in only some of the columns, you can also reduce the processing time using ConvertOptions.include_columns: https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html

But really, consider using Parquet if you can. It's a highly optimized binary format.

asfimport commented 3 years ago

Dror Speiser: Thanks for the suggestions :) I am indeed getting the files from a third party, and I'm converting them to parquet on arrival using arrow. I'm actually content with 0.5 GiB/s. I'm here because I saw a tweet by Wes Mckinney saying that the csv parser in arrow is "extremely fast". I tweeted back my results and he suggested that I open an issue.

I would like to note that the numbers don't quite add up. If the cpu usage is totally accounted for by the operations of parsing and building arrays, then that would mean that a single processor is doing between 0.06 to 0.13 GiB/s, which is very slow.

When I run the benchmark without threads I get 0.3 GiB/s, which is reasonable for a single processor. But, it also means that the 48 vcpus I have are very far from achieving a linear speedup, which is in line with my profiling (though the attached images are block size of 1 mb). Do you see a linear speedup on your machine?

As for processing csv's being costly in general, I'm not familiar enough with other libraries to say, but I am familiar with the simdjson library that claims to parse json files at over 2 GiB/s, on a single core. I'm looking at the code of both projects, hoping I'll be able to contribute something from simdjson to the csv parser in arrow.

asfimport commented 3 years ago

Wes McKinney / @wesm: I do think we should be doing better here than we are so it merits some analysis to see if some default options should change. The results do strike me as peculiar

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: "vcpu" doesn't mean anything precise unfortunately. What is the CPU model and how many physical cores are allocated to the virtual machine?

I am familiar with the simdjson library that claims to parse json files at over 2 GiB/s, on a single core

It all depends what "parsing" entails, what data it is tested on, and what is done with the data once parsed.

On our internal micro-benchmarks, the Arrow CSV parser runs at around 600 MB/s (on a single core), but that's data-dependent. I tend to test on data with narrow column values since that's what "big data" often looks like, and that's the most difficult case for a CSV parser. It's possible that better speeds can be achieved on larger column values (such as large binary strings).

But parsing isn't sufficient, then you have to convert the data to Arrow format, which also means you switch from a row-oriented format to a column-oriented format. That part probably hits quite hard on the memory and cache subsystem.

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: For the record, on a 12-core 24-thread CPU, I get between 8x and 10x scaling from single-core to multi-core. This is far from linear scaling, but not horrific either.

asfimport commented 3 years ago

Dror Speiser: Yeah, Azure doesn't tell me how many physical cores are at my disposal, which makes it hard to compare between setups. But even if it's 12 cpus with hyperthreading and bad advertising, there is still a gap to be explained between single thread and multi thread performance.

I offer to work on a benchmark that measures reading csvs of different sizes and compositions, for a variety of block sizes, and run it on a few different machines sizes on AWS (tiny to xlarge) and Azure, and report here the results.

Antoine, do you think this is a good idea? Do you have input on what csv compositions are found in the wild? You said that narrow columns is common, how would you quantify this? Personally I work with finance and real estate data; I can create "data profiles" for what I see in my own workloads and share.

asfimport commented 3 years ago

Antoine Pitrou / @pitrou:

Antoine, do you think this is a good idea? Do you have input on what csv compositions are found in the wild?

Yes, that sounds like a very good idea. Instead of generating data, I think it's better to use actual data. You can find a variety of real-world datasets here: https://github.com/awslabs/open-data-registry

A commonly used dataset for demonstration and benchmarking purposes is the New York taxi dataset: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

You may also find datasets of Twitter messages, which would be more text-heavy and therefore would stress the CSV reader a bit differently.

Generally, for multi-thread benchmarking, you want files that are at least 1GB long. It may be possible to take a smaller file and replicate its contents a number times to reach the desired size, though.

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: For the record, another set of CSV datasets here:

https://github.com/p-ranav/csv2#results-as-of-23-apr-2020

asfimport commented 3 years ago

Wes McKinney / @wesm: It occurred to me that the 48 vcpu machine is likely a dual-socket machine which may have NUMA issues, so the performance issue may arise from the fact that our software is not NUMA-aware.

asfimport commented 3 years ago

Dror Speiser: How about I run a benchmark of multiple file sizes, compositions, and block sizes, on a few types of machines on AWS (from tiny to some xlarge)?

We might get lucky and there's a simple default that increases speed all around.

asfimport commented 3 years ago

Diana Clarke / @dianaclarke: Hi Dror,

Profiling different file sizes, compositions, and block sizes on various AWS instances would be fantastic. Based on that data, perhaps we could come up with a better default value for ReadOptions.block_size.

I ran your benchmark on my local machine yesterday, experimenting with different block sizes, and for this particular case, defaulting the block_size to None was definitely not optimal.

Incidentally, I'm currently working on a continuous benchmarking framework to run Arrow benchmarks on each commit (to safeguard against regressions), but also to do this kind of research.

The continuous benchmarking repo isn't quite ready to be made public, but once it is, we can collaborate and add your benchmark to it. It'll be fun!

Thanks for taking the time to run these benchmarks, and for sharing your code and results.

Much appreciated,

--diana

PS. Were you able to find out if the 48 vcpu machine was NUMA enabled?

asfimport commented 3 years ago

Dror Speiser: Hi Diana,

Cool!

I've created a small benchmark that spins up EC2 instances, downloads a NY Taxi dataset, and runs read_csv with different block sizes. I'll upload the raw data tomorrow, but meanwhile here is a draft basic analysis notebook on the data that I already have:

https://github.com/drorspei/arrow-csv-benchmark/blob/ec2-block-size/analysis.ipynb

If you look in the containing branch, you will find two files: a script, and a supporting file for boto code.

I'll be super glad to collaborate whenever I can :)

As for NUMA enabled, I think I was running Azure's E48_as_v4, which I see in the table on this page:

https://docs.microsoft.com/en-us/azure/virtual-machines/linux/compute-benchmark-scores

There's a column "NUMA Nodes" which says "6". I'm not familiar with NUMA, so I don't know what this means. Is there something I could run on the machine to check?

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: "NUMA", as in "non-uniform memory access" means that different CPU cores will have varying latencies to different parts of memory. "Six NUMA nodes" therefore means there are six different groups of cores with distinct memory access latencies.

Note that "memory" is to be taken in a wide sense and can also include some shared caches. For example, on my CPU (a AMD Zen 2 CPU), the L3 cache is private to clusters of 4 CPU cores (and my CPU has 12 CPU cores).

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: [~drorspei] The data is very interesting, thank you. I don't want to create homework for you, but if you're willing to run other benchmarks, there are various open datasets listed here, many of them are probably in the form of CSV files: https://github.com/awslabs/open-data-registry

Since the optimal block size is most probably dependent on the file structure, it would be interesting to have similar analysis as you did, but for different kinds of CSV files.

asfimport commented 3 years ago

Dror Speiser: Yeah for sure; I went into the open registry when you posted it the first time, and it's definitely the thing to do: go over all of them, and measure.

Looking at some random files there, it's not trivial to discern what items are csv files, so it's manual work to register each data item in the registry, making this task feel more Sisyphean, even though in the long run (and even in the short run) most of the time will still be spent on other things...

I've now run the ny taxi benchmark with duplicating the columns axis, in contrast to duplicating the rows axis, and I think this changes things a bit. I'm starting to think that the optimal block size is going to depend on the number of columns. Is this something you would expect from the implementation?

Speaking of the implementation, we were talking about the parallelization. I read your implementation to get a better idea of what is happening. So, almost all the algorithm is parallel, but there is one step that isn't: chunking. Given that it processes data slower than reading from disk, on the larger machines this should then be the bottleneck of read_csv. I've implemented a second version of the chunker that uses a lookup table and simd operations (only x86) to apply the state changes, basically copying the code from this blog post:

https://branchfree.org/2018/05/25/say-hello-to-my-little-friend-sheng-a-small-but-fast-deterministic-finite-automaton/

The chunker speed indeed went up, but I didn't see performance gains overall, so I let it go for the time being.

I think the next step in boosting the speed (other than adaptive default block size) is to make the chunker run in parallel. The sheng blog post references this paper that shows a way to do this:

https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/asplos302-mytkowicz.pdf

But, take all of this with a large grain of salt. I don't have a profile output to really show that the chunker is the bottleneck, so maybe it's not.

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: Hmm, I'm a bit surprised by your assessment of chunking. Unless you're enabling ParseOptions::newlines_in_values, chunking is basically a linear search for an end-of-line character (you can look for NewlineBoundaryFinder in src/arrow/util). So it should be extremely fast. I'd be curious of a case where this chunking is the bottleneck.

To answer your other question, I do think that optimal block size is going to depend on the number of columns. Basically, you want a single block to contain enough rows so that administration overhead is minimal compared to converting actual column batches. But of course, if the block size grows too much, the CPU caches will be less efficient and parallelization may also suffer from a coarser-grained partition of data. I can't think of a simple formula that can integrate all these concerns and compute the ideal block size for a given situation, so I think it's best to let users experiment.

(that said, we can change the default block size if we decide that another value, e.g. 4 MB, is more likely to be close to the optimum in most cases)

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: That said, feel free to post your chunker reimplementation somewhere. That may be useful for later experiments.