apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.3k stars 3.47k forks source link

[Python][Parquet] Parquet deserialization speeds slower on Linux #38389

Open mrocklin opened 11 months ago

mrocklin commented 11 months ago

Describe the bug, including details regarding any error messages, version, and platform.

I'm debugging slow performance in Dask DataFrame and have tracked things down, I think, to slow parquet deserialization in PyArrow. Based on what I know of Arrow I expect to get GB/s and I'm getting more in the range of 100-200 MB/s. What's more is that this seems to depend strongly on the environment (Linux / OSX) I'm using. I could use help tracking this down.

Experiment

I've isolated the performance difference down to the following simple experiment (notebook here):

# Create dataset
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
import pandas as pd
import time
import io

x = np.random.randint(0, 100000, size=(1000000, 100))
df = pd.DataFrame(x)
t = pa.Table.from_pandas(df)

# Write to local parquet file

pq.write_table(t, "foo.parquet")

# Time Disk speeds

start = time.time()
with open("foo.parquet", mode="rb") as f:
    bytes = f.read()
    nbytes = len(bytes)
stop = time.time()
print("Disk Bandwidth:", int(nbytes / (stop - start) / 2**20), "MiB/s")

# Time Arrow Parquet Speeds

start = time.time()
_ = pq.read_table("foo.parquet")
stop = time.time()
print("PyArrow Read Bandwidth:", int(nbytes / (stop - start) / 2**20), "MiB/s")

# Time In-Memory Read Speeds

start = time.time()
pq.read_table(io.BytesIO(bytes))
stop = time.time()

print("PyArrow In-Memory Bandwidth:", int(nbytes / (stop - start) / 2**20), "MiB/s")

Results

I've tried this on a variety of cloud machines (intel/arm, VMs/metal, 8-core/64-core, AWS/GCP) and they all get fast disk speeds (probably cached), but only about 150MB/s parquet deserialization speeds. I've tried this on two laptops, one a MBP and one a ThinkPad running Ubuntu and I get ...

In all cases I've installed latest release, PyArrow 13 from conda-forge

Summary

I'm confused by this. I've seen Arrow go way faster than this. I've tried to isolate the problem as much as possible to identify something in my environment that is the cause, but I can't. Everything seems to point to the conclusion that "PyArrow Parquet is just slow on Linux" which doesn't make any sense to me.

I'd welcome any help. Thank you all for your work historically.

Component(s)

Parquet, Python

mrocklin commented 11 months ago

Here is a rendered notebook from my Ubuntu Thinkpad to make the numbers more concrete: https://gist.github.com/mrocklin/526120bb5231cc5d9d4e3ca87fc09c68

This run was actually a bit faster than I usually see. On Cloud VMs (even very nice ones) it's hard to get above 150MB/s on real data.

jorisvandenbossche commented 11 months ago

For reference, I ran your snippet above and repeated the timing part multiple times on a Linux (Ubuntu 20.04) Dell XPS 13 9380 (more than 4 years old, 8th gen Intel Core i7, 4 cores / 8 threads), and I get almost 2 GB/s for disk speed and around 1 GB/s for reading (just under for from file, just above for in-memory).

(so at least it's not a simple mac vs linux issue)

One environment characteristic that will significantly influence those numbers is the parallelization (the Parquet reading will by default use all the available cores). So it might be worth to run those timings with and without threads enabled, to check if also single-threaded performance is bad and ensure it's not related to bad scaling on that front. On my laptop, I get the expected 3-4x speedup with threads enabled (the numbers above), as I get around 250-300 MB/s using use_threads=False.

fjetter commented 11 months ago

I ran a couple of pyspy benchmarks on pure pq.read_table downloading from S3. I ran two tests, one with column projection and one with bulk reading. Both show basically the same profile but with different weighting of components.

This profile shows the case where I'm reading a file and are selecting about half it's columns (a mix between different dtypes)

image

Note how the read_table request is split into three parts

  1. A HEAD request that infers whether the provided path is a file or a directory https://github.com/apache/arrow/blob/ac2d207611ce25c91fb9fc90d5eaff2933609660/python/pyarrow/parquet/core.py#L2481 This is latency bound which on S3 is typically 100-200ms but can vary quite strongly. This is a request we cannot cache or use to pre-fetch any kind of data. In this example, this alone took 20% of the entire read time
  2. The initialization of the FileSystemDataset object. In native profiles, this points to arrow::dataset::Fragment::ReadPhysicalSchema so I assume this is fetching the footer. This is probably unavoidable but at least this request could be used to pre-fetch some payload data but I'm not sure if this is actually done (I guess not since, the buffer_size kwarg is zero by default). In this specific example, this is about 10% of the read
  3. The final section is now the actual reading of the file.

So, that's 30% where we're doing nothing/not a lot? I'm not sure at which point the pre_buffering can kick on or how this works. This stuff does not show up in my profile since it's the arrow native threadpool.

At least this initial HEAD request appears to be bad, particularly if we're fetching just a couple of columns from otherwise already small-ish files. The file I was looking at is one of the TPCH lineitem files which in our dataset version is 22.4MiB large.

Edit: This all ran on a Coiled VM and I was basically running the read request in a for loop. No multi threading on my side, just the pyarrow native stuff.

mapleFU commented 11 months ago

About (1) some optimization will be included later, see:

  1. https://github.com/apache/arrow/pull/37868 (This patch should be revisited )

Seems we can enable larger prefetch-depth to arrow fetching multiple files concurrently.

fjetter commented 11 months ago

Sorry, I just realize that my comment is also slightly off topic. The OP discusses pure deserialization without S3 in between

fjetter commented 11 months ago

FWIW I slightly modified the above script to run each operation N times since I noticed quite some variance on my machine (M1 2020 MacBook)

```python # Create dataset import pyarrow as pa import pyarrow.parquet as pq import numpy as np import pandas as pd import time import io x = np.random.randint(0, 100000, size=(1000000, 100)) df = pd.DataFrame(x) t = pa.Table.from_pandas(df) niterations = 20 # Write to local parquet file pq.write_table(t, "foo.parquet") # Time Disk speeds start = time.perf_counter() for _ in range(niterations): with open("foo.parquet", mode="rb") as f: bytes = f.read() nbytes = len(bytes) stop = time.perf_counter() print("Disk Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s") # Time Arrow Parquet Speeds start = time.perf_counter() for _ in range(niterations): _ = pq.read_table("foo.parquet") stop = time.perf_counter() print("PyArrow Read Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s") # Time In-Memory Read Speeds start = time.perf_counter() for _ in range(niterations): pq.read_table(io.BytesIO(bytes)) stop = time.perf_counter() print("PyArrow In-Memory Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s") # Time In-Memory Read Speeds start = time.perf_counter() for _ in range(niterations): pq.read_table(io.BytesIO(bytes)).to_pandas() stop = time.perf_counter() print("PyArrow (to_pandas) Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s") ```

and I get M1 (using pyarrow-13.0.0-py310h382c99a_11_cpu from conda-forge

Disk Bandwidth: 5154 MiB/s
PyArrow Read Bandwidth: 2294 MiB/s
PyArrow In-Memory Bandwidth: 2439 MiB/s
PyArrow (to_pandas) Bandwidth: 1142 MiB/s

while on the cloud using a m6i.xlarge (a coiled based, dask bootstrapped VM, see below) (4vCPUs) it is just (using pyarrow-13.0.0-py310hf9e7431_11_cpu from conda-forge)

Disk Bandwidth: 2173 MiB/s
PyArrow Read Bandwidth: 448 MiB/s
PyArrow In-Memory Bandwidth: 448 MiB/s
PyArrow (to_pandas) Bandwidth: 313 MiB/s
fjetter commented 11 months ago

@jorisvandenbossche have you used the same conda forge build for your measurements or did you build it yourself? It would be nice to rule out any build differences

fjetter commented 11 months ago

Ok, fun experiment. I wrapped the above script in a function run_benchmark and ran this on my machine...

image

Looks like the simple fact that we're running this in the dask environment is slowing us down quite a bit. This also biases most/all Coiled-based cloud benchmarks

Code ```python # Create dataset import pyarrow as pa import pyarrow.parquet as pq import numpy as np import pandas as pd import time import io def run_benchmark(): from distributed.worker import print x = np.random.randint(0, 100000, size=(1000000, 100)) df = pd.DataFrame(x) t = pa.Table.from_pandas(df) niterations = 20 # Write to local parquet file pq.write_table(t, "foo.parquet") # Time Disk speeds start = time.perf_counter() for _ in range(niterations): with open("foo.parquet", mode="rb") as f: bytes = f.read() nbytes = len(bytes) stop = time.perf_counter() print("Disk Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s") # Time Arrow Parquet Speeds start = time.perf_counter() for _ in range(niterations): pq.read_table("foo.parquet") stop = time.perf_counter() print("PyArrow Read Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s") # Time In-Memory Read Speeds start = time.perf_counter() for _ in range(niterations): pq.read_table(io.BytesIO(bytes)) stop = time.perf_counter() print("PyArrow In-Memory Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s") # Time In-Memory Read Speeds start = time.perf_counter() for _ in range(niterations): pq.read_table(io.BytesIO(bytes)).to_pandas() stop = time.perf_counter() print("PyArrow (to_pandas) Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s") run_benchmark() from distributed import Client client = Client() client.submit(run_benchmark).result() ```
fjetter commented 11 months ago

Is pyarrow using either one of OMP_NUM_THREADS, MKL_NUM_THREADS, OPENBLAS_NUM_THREADS to infer how large the threadpool is allowed to be?

Edit: Looking at the code base, I see references and documentation that suggests that OMP_NUM_THREADS controls the size of the internal threadpool

Dask is setting those to one and when I remove that, I'm back to the non-dask speed.

mrocklin commented 11 months ago

To be clear, my experiment in this issue was run without Dask and without Coiled. I wanted to isolate things.

mrocklin commented 11 months ago

Turning threads off I get around 170MB/s on my linux machine, 600 MB/s on my OS-X machine.

as I get around 250-300 MB/s using use_threads=False.

I'm curious, for deserializing integers is this expected performance? I would have thought that for this kind of data (just ints) we'd be closer to GB/s speeds. I'm curious, what is the slow part here? Is there some intense compression or something?

have you used the same conda forge build for your measurements or did you build it yourself? It would be nice to rule out any build differences

I'm also quite curious about this.

jorisvandenbossche commented 11 months ago

I was using a conda-forge install for the above numbers, no custom build.

Turning threads off I get around 170MB/s on my linux machine, 600 MB/s on my OS-X machine.

But for linux you mentioned above a similar number with threads. So that means you see hardly any perf improvement with threads on linux?

jorisvandenbossche commented 11 months ago

Is pyarrow using either one of OMP_NUM_THREADS, MKL_NUM_THREADS, OPENBLAS_NUM_THREADS to infer how large the threadpool is allowed to be?

Yes, it seems we are using OMP_NUM_THREADS (and otherwise check std::thread::hardware_concurrency(), which I think also doesn't always give the correct number, eg in a container), see the relevant code. You can also manually override this with pa.set_cpu_count().

mrocklin commented 11 months ago

But for linux you mentioned above a similar number with threads. So that means you see hardly any perf improvement with threads on linux

Yes. That's correct. To be clear though, I'm currently more confused about only getting 150-200 MB/s deserializing integers on a single thread. That seems very strange to me.

fjetter commented 11 months ago

Yes, it seems we are using OMP_NUM_THREADS (and otherwise check std::thread::hardware_concurrency(), which I think also doesn't always give the correct number, eg in a container), see the relevant code. You can also manually override this with pa.set_cpu_count().

Yes, thanks. I already found that pyarrow is setting the CPU threadpool to one inside of dask regardless of the env settings. I already tested a little with set_cpu_count but so far we haven't seen the hoped-for speedup

mrocklin commented 11 months ago

To find a standardized shared compute environment I tried this on Google Colab.
I found that x.copy() ran in 2 GB/s and pq.read_table(io.BytesIO(bytes)) ran in 180 MB/s.

fjetter commented 11 months ago

I found that x.copy() ran in 2 GB/s and pq.read_table(io.BytesIO(bytes)) ran in 180 MB/s.

I'm not sure if this comparison is actually fair and valid. Parquet -> Arrow has to do a nontrivial amount of work. Even your random data is encoded and compressed. (See pq.ParquetFile("foo.parquet").metadata.to_dict() to inspect the metadata)

image

I also ran this on colab and got something like this

Disk Bandwidth: 1636 MiB/s
PyArrow Read Bandwidth: 231 MiB/s
PyArrow In-Memory Bandwidth: 220 MiB/s

from your benchmark output. I went along and ran

import pickle
pickled_df = pickle.dumps(x)
compressedb = pa.compress(pickled_df, "SNAPPY")
nbytes = len(compressedb)
start = time.time()
pa.decompress(compressedb, decompressed_size=len(pickled_df), codec="SNAPPY")
stop = time.time()
print("SNAPPY Decompress Bandwidth:", int(nbytes / (stop - start) / 2**20), "MiB/s")

 which gives me 

SNAPPY Decompress Bandwidth: 199 MiB/s

so we're moving in the same vicinity as the parquet read.

mrocklin commented 11 months ago

Cool. What I'm reading from you are a couple of insights:

  1. Arrow uses SNAPPY compression by default
  2. SNAPPY is performing around 200 MB/s on these machines

I'll add my understanding, which is that I also expect SNAPPY to operate at GB/s speeds, but for some reason it's not here.

On Colab I also get 200MB/s as you do, but on my MacBook I get 767 MB/s

This probably allows us to dive a bit deeper into the problem.

jorisvandenbossche commented 11 months ago

But for linux you mentioned above a similar number with threads. So that means you see hardly any perf improvement with threads on linux

Yes. That's correct. To be clear though, I'm currently more confused about only getting 150-200 MB/s deserializing integers on a single thread. That seems very strange to me.

Yes, I understand (and dask uses use_threads=False anyway, so mostly depends on this single threaded performance). But then to not mix too many different issues at once, it might be better to focus the various timings in this issue on single threaded performance.

Parquet -> Arrow has to do a nontrivial amount of work

Parquet is indeed a complex file format. In addition to the decompression, there is also the decoding (although the file here will use dictionary encoding, and that should be quite fast I would expect. Also quickly testing plain and delta_binary_packed encodings, and that actually gives slower reads than the default in this case).

I was also wondering if we could have an idea which bandwidth one can expect for just the decompression, to have some point of comparison. The snappy readme (https://github.com/google/snappy) itself mentions decompression at 500MB/s for Intel Core i7. Running the snippet of Florian above, I actually only get around 100MB/s for the SNAPPY decompression..

Arrow uses SNAPPY compression by default

Quickly testing with another compression (pq.write_table(t, "foo_lz4.parquet", compression="lz4"), I get consistently faster reads with LZ4 compared to SNAPPY for this dataset, but only around 5-10% faster. Not a huge difference, but so in general one can always tweak the encoding and compression settings for their specific datasets to achieve optimal read performance.

Using no compression at all (compression="none") also gives some speed-up (but of course trading storage size with read speed, and on eg S3 that might not even be beneficial)

jorisvandenbossche commented 11 months ago

Re-running the benchmarks with a slightly adapted script from above (single threaded, different compressions), and ensuring I run it while having no other applications running, I actually get quite decent single threaded performance:

Code ```python import pyarrow as pa import pyarrow.parquet as pq import numpy as np import pandas as pd import time import io # Create datasets x = np.random.randint(0, 100000, size=(1000000, 100)) df = pd.DataFrame(x) t = pa.Table.from_pandas(df) pq.write_table(t, "foo.parquet") pq.write_table(t, "foo-lz4.parquet", compression="lz4") pq.write_table(t, "foo-uncompressed.parquet", compression="none") def run_benchmark(fname): niterations = 20 # Time Disk speeds start = time.perf_counter() for _ in range(niterations): with open(fname, mode="rb") as f: bytes = f.read() nbytes = len(bytes) stop = time.perf_counter() print("Disk Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s") # Time Arrow Parquet Speeds start = time.perf_counter() for _ in range(niterations): pq.read_table(fname, use_threads=False) stop = time.perf_counter() print("PyArrow Read Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s") # Time In-Memory Read Speeds start = time.perf_counter() for _ in range(niterations): pq.read_table(io.BytesIO(bytes), use_threads=False) stop = time.perf_counter() print("PyArrow In-Memory Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s") # Time In-Memory Read Speeds start = time.perf_counter() for _ in range(niterations): pq.read_table(io.BytesIO(bytes), use_threads=False).to_pandas(use_threads=False) stop = time.perf_counter() print("PyArrow (to_pandas) Bandwidth:", int(nbytes / ((stop - start) / niterations) / 2**20), "MiB/s") ```
In [3]: run_benchmark("foo.parquet")
Disk Bandwidth: 2052 MiB/s
PyArrow Read Bandwidth: 436 MiB/s
PyArrow In-Memory Bandwidth: 459 MiB/s
PyArrow (to_pandas) Bandwidth: 280 MiB/s

In [4]: run_benchmark("foo-lz4.parquet")
Disk Bandwidth: 2100 MiB/s
PyArrow Read Bandwidth: 516 MiB/s
PyArrow In-Memory Bandwidth: 569 MiB/s
PyArrow (to_pandas) Bandwidth: 323 MiB/s

In [5]: run_benchmark("foo-uncompressed.parquet")
Disk Bandwidth: 2092 MiB/s
PyArrow Read Bandwidth: 667 MiB/s
PyArrow In-Memory Bandwidth: 730 MiB/s
PyArrow (to_pandas) Bandwidth: 409 MiB/s

And the file sizes are 258, 255 and 293 MB, respectively (so the actual speedup for uncompressed is a bit lower than what the above gives, because it's reading more MBs. But it's still faster in terms of seconds to read)

pitrou commented 11 months ago

PyArrow In-Memory Bandwidth: 730 MiB/s

This seems like a rather low number for uncompressed integer data. What is the exact encoding used by the integer column? The metadata display above is a bit ambiguous as it lists three encodings :-)

pitrou commented 11 months ago

Ok, so after investigating this, one factor is that pq.write_table by default tries to use dictionary encoding, and the heuristic to stop using dictionary encoding is when the dictionary page size reaches the dictionary_pagesize_limit parameter, which is set to a very high default of 1MB.

However, the benchmark numbers are also largely skewed by the fact that the MB/s figure is computed relatively to the compressed and encoded size, not on the final in-memory size.

pitrou commented 11 months ago

With that in mind, here are updated results against the actual in-memory size:

pitrou commented 11 months ago

And now the file sizes:

... meaning that DELTA_BINARY_PACKED is probably a very good choice for integer columns.

pitrou commented 11 months ago

(here's the benchmark code used for the numbers above: https://gist.github.com/pitrou/1713b6d32111dac9102e87be1b5ac887 )

pitrou commented 11 months ago

Also shows, btw, that MiB/s can be a very misleading metric. In the end, you're interested in reading values, not bytes, so I find it more useful to reason in terms of Mitems/s.

jorisvandenbossche commented 11 months ago

Summarizing the numbers I get with the edits from Antoine, for the defaults (snappy compression, dictionary encoding): around 1350 MiB/s for reading from disk, around 1450 MiB/s for reading from memory (single threaded performance, using my Linux laptop: Dell XPS 13 9380 with Ubuntu 20.04, 8th gen Intel Core i7)

fjetter commented 10 months ago

Just a heads up. My current working theory is that the parquet deserialization performance is roughly where it is supposed to be (but honestly I don't know) but what we're seeing is rather an artifact of storage latency due to too many small reads. I'll keep that conversation separate from this issue here, see https://github.com/coiled/benchmarks/issues/1125#issuecomment-1778995537


Still, I think the question this issue is asking is whether the performance for the deserialization (w/out IO) we're seeing is what is expected and if the severe slowdown on other hardware is expected.

mrocklin commented 10 months ago

I should probably raise another issue with "how to use arrow to read parquet data from the cloud quickly" and ask for help on the general problem, and point to this issue as a sub-part of that.

pitrou commented 10 months ago

Still, I think the question this issue is asking is whether the performance for the deserialization (w/out IO) we're seeing is what is expected and if the severe slowdown on other hardware is expected.

I think it would be nice if you could re-do your benchmarks with a proper MiB/s metric (i.e., computed on decoded size, not file size). Also, please ensure that the different machines read the same files.

mrocklin commented 10 months ago

I think it would be nice if you could re-do your benchmarks with a proper MiB/s metric (i.e., computed on decoded size, not file size). Also, please ensure that the different machines read the same files.

The reason why I like measuring against file size is that it helps me understand how close we are to S3's limitations. My intuition says that Arrow deserialization should be much faster than S3 download speeds, and so we can stop when we're within ~80% of S3 download speeds. Right now we're not close, and so we should keep working.

Does that make sense? Or maybe I'll reach some arrow llmit first, in which case I agree that looking at decoded size would make sense. Really, there's no reason I can't do both I guess.

pitrou commented 10 months ago

My intuition says that Arrow deserialization should be much faster than S3 download speeds, and so we can stop when we're within ~80% of S3 download speeds

But this is Parquet deserialization, not Arrow deserialization :-) Parquet features many encodings and compression schemes, most of which trade CPU overhead for disk footprint (and network transmission times).

You're also looking at single-thread Parquet decoding speed here, but presumably you're going to download and decode multiple files simultaneously, which will give you a multiple of the decoding speed and may reach S3 download limits.

(also, I don't know what the expected download speeds from S3 are...)

mrocklin commented 10 months ago

also, I don't know what the expected download speeds from S3 are.

Ideally 100 MB/s, but more realistically 50 MB/s, at least when reads are large. From what @fjetter is saying pyarrow.parquet is doing a few smaller reads, and so effective bandwidth is closer to 15-20 MB/s.

But this is Parquet deserialization, not Arrow deserialization

Of course, my mistake, but still, I'm expecting this to be faster than the numbers listed above

mrocklin commented 10 months ago

See also https://gist.github.com/mrocklin/9eca141688d03bbb6c375e53bee35c6d

mapleFU commented 10 months ago

Reading from S3 might better enable PreBuffer, and making some Cache input, it would do some:

  1. Merging adjacent S3 IO request with some condition.
  2. PreBuffer some column-chunk

Generally, I think S3 has higher latency, but might has high bindwidth. So if memory is enough, PreBuffer would help

pitrou commented 10 months ago

See also https://gist.github.com/mrocklin/9eca141688d03bbb6c375e53bee35c6d

For the record, what is the in-memory data size in this example? (table.nbytes)

mapleFU commented 10 months ago

Also. The benchmark is great

From the paper ( https://www.durner.dev/app/media/papers/anyblob-vldb23.pdf ) perspective, the io-size also important.

And some machine might have different bandwidth, this should also be taken into account.

mrocklin commented 10 months ago

I think that those files were relatively small. Unfortunately we've since replaced them with larger files. I'll re-run everything and include table size numbers as well, hopefully within the next hour.

And some machine might have different bandwidth, this should also be taken into account

Certainly this is true. This is why I record S3 bandwidth at the beginning. That gives us a baseline. My goal isn't to get a high MiB/s number. It's to get a full pipeline that is close to S3 bandwidth. I hope that S3, rather than pyarrow.parquet, is the bottleneck.

mapleFU commented 10 months ago

@mrocklin

In best case, the IO and CPU is pipelined, we will waiting for the first group of IO finished, and advacne does the handling logic

However, there're few points might affect this:

  1. fetching in thread pool doesn't ensure priority. So some part might with the different order than expected. For example, there a 2 column chunks, each chunks has 5 IO. The 5th io might finish earier than the first one
  2. As a result, the pattern might become: Waiting for IO -> do cpu things..

To optimizing this, the dataset api might split a file to different row-group, and has a row-group reader (aka ParquetFragment..). The fragment will be io and read parallelly. This might helps a bit.

Also it will be help to give me some info about a single file 🤔 Let me checkout how it read the file

lidavidm commented 10 months ago

re: the pipelining/IO discussion, you may find the discussion here interesting: https://lists.apache.org/thread/cdfkm8oflm2zvd25yn4k6gh2o7pc9z88

Some (but not all) of those proposals were implemented in Arrow ("pre-buffering" primarily), though pre-buffering is probably not the ideal way to implement it (too much memory usage). One thing that didn't make it was the global concurrency manager, which would have approximated priority by not actually issuing reads for a file until all reads for previous files have been issued (of course, this only makes sense if there's an ordering between the files in the first place - not necessarily true for dataset)

That said, I believe datasets does parallelize at the row-group level already @mapleFU

mapleFU commented 10 months ago

That said, I believe datasets does parallelize at the row-group level already

I think user uses ParquetFile api here. Which is apart from Dataset API. When a file contains multiple row-groups, it might be slower(Or might not).

However using S3FS would be faster than arrowFs, this make me confused. Currently I don't know the reason here🤔

lidavidm commented 10 months ago

Hmm, s3fs does some readahead. If you are scanning the whole file, this probably helps. However, Arrow's FS + pre-buffer should be better in general (readahead is actively harmful if you aren't scanning the whole file)

mrocklin commented 10 months ago

Yeah, in practice s3fs actually has some intelligent system buried within it for exactly this use case. See https://developer.nvidia.com/blog/optimizing-access-to-parquet-data-with-fsspec/

(I actually just learned about this)

mapleFU commented 10 months ago

Hmm, s3fs does some readahead. If you are scanning the whole file, this probably helps. However, Arrow's FS + pre-buffer should be better in general (readahead is actively harmful if you aren't scanning the whole file)

In my opinion, when reading the same file, if user is using ArrowFS, before 14.0, the default options is not Lazy, so, the file will pre_buffer all row-groups and all columns at once. I don't know how could be a times faster... Maybe I need to re-produce this myself to take a look 🤔

lidavidm commented 10 months ago

Ah, that means fsspec actually implements the same optimization as PyArrow now.

mapleFU commented 10 months ago

See also https://gist.github.com/mrocklin/9eca141688d03bbb6c375e53bee35c6d

@lidavidm from user's test in this case s3fs might be faster 😭:-(

mrocklin commented 10 months ago

Things are still very much up in the air. There's a lot of sensitivity to file size, machine type, etc..

Clearly there is room for improvement with Arrow Filesystem. I think that Dask developers are generally hoping that, with some configuration changes, we can get Arrow filesystem to be fast enough to be our default. For example, I suspect that sometimes s3fs wins because ...

However, s3fs is slow for other reasons, like Python and the GIL (we can't scale it up on larger workers as easily). Arrow seems to have more potential, but is currently harder for us to optimize.

If it makes you feel better, I can easily produce a similar notebook that shows Arrow doing way better 🙂

There's more conversation going on here: https://github.com/coiled/benchmarks/issues/1125

lidavidm commented 10 months ago

We can super-saturate threads (s3 seems to work well with 2-3 threads per core) (I suspect that arrow might be limiting concurrency more than is ideal)

https://github.com/apache/arrow/blob/73589ddd60e4cbcd860102871692541989ea38c6/cpp/src/arrow/io/interfaces.cc#L389-L415

Maybe we should default this to 2*numcpus or similar, instead of hardcoding 8 (!). But you can try with ARROW_IO_THREADS to see if that affects it

mrocklin commented 10 months ago

This may also differ strongly between local / low-latency systems, and cloud-object / high-latency systems.

mrocklin commented 10 months ago

I'm currently hoping that the following has an effect:

import pyarrow as pa
pa.set_io_thread_count(...)

I'm not seeing much response in benchmarks yet (but that could be due to other reasons)