modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.89k stars 653 forks source link

Double reading of data during `read_csv` execution #4770

Open prutskov opened 2 years ago

prutskov commented 2 years ago

Describe the problem

Current implementation of read_csv does double reading of bytes from csv files in several use cases. The first reading happens during calculating of start/stop byte-pointers for partitions in driver process in some cases. https://github.com/modin-project/modin/blob/a28e319d9756c9383afa9efd67e32eb04e6b848c/modin/core/io/text/text_file_dispatcher.py#L206

The second reading is in workers.

The use cases, when reading in driver process happens:

  1. Using of read_csv with nrows parameter. In this case to calculate correct start/stop pointers we read input filepath by rows (with tracking quotations or not) to have correct numbers of rows to read in workers with corresponding start/stop pointers. https://github.com/modin-project/modin/blob/a28e319d9756c9383afa9efd67e32eb04e6b848c/modin/core/io/text/text_file_dispatcher.py#L292-L307
  2. Using of read_csv with quoting != csv.QUOTE_NONE (tracking quotations). Tracking of quotations in file is a default behavior of read_csv. In this case we read file offset-by-offset with checking quotations and calculate start/stop pointers for reading in partitions. This happens in the next block: https://github.com/modin-project/modin/blob/a28e319d9756c9383afa9efd67e32eb04e6b848c/modin/core/io/text/text_file_dispatcher.py#L314-L325 The reading in driver process happens here: https://github.com/modin-project/modin/blob/a28e319d9756c9383afa9efd67e32eb04e6b848c/modin/core/io/text/text_file_dispatcher.py#L183 and followed _read_rows call to close opened quotes/incomplete line in byte sequence.

In these cases we have double reading of entire files. The performance regression is most pronounced in case of reading from remote buckets (s3 e.g.).

Benchmark for s3 ```python import time import pandas as pd # import modin.pandas as pd import modin.config as cfg cfg.Engine.put("ray") s3_path = "s3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv" def get_time(op): start = time.time() result = op() return time.time() - start if __name__ == "__main__": n_runs = 3 all_time = 0 def operation(): # df = pd.read_csv(s3_path, parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"], nrows=50_000, storage_options={"anon": True}) df = pd.read_csv(s3_path, parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"], storage_options={"anon": True}) # df = pd.read_csv(s3_path, parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"], quoting=3, storage_options={"anon": True}) return df print(f"\tZero call: {get_time(operation)} s") for i in range(n_runs): t = get_time(operation) print(f"\tIteration #{i}: {t} s") all_time += t print(f'Time read_csv op: {all_time / n_runs} s') ```
Performance numbers from the benchmark: Driver reading time, s All reading time, s
read_csv(s3_path, nrows=50k) 62.4 90.5
read_csv(s3_path, quoting=csv.QUOTE_NONE) 74.9 336.54
read_csv(s3_path) 473.9 718.27
read_csv(s3_path) prototype(details are below) 225.1 474.09

As we can see huge time we spend on reading in driver process in case of default read_csv use case. Probably, we need to find a way to exclude reading of file in driver process or parallelize this reading (I've implemented draft prototype here of this parallelizing using multiprocessing.Pool but this implementation doesn't give any significant performance gain (see table above)).

We should find a way to somehow re-design text file reading approach to exclude double-reading of bytes. The best way I see is something next:

  1. Read bytes from any type of filesystem (s3/local/https e.g.) in parallel in engine workers.
  2. Compute different meta of read byte chunks in parallel in workers (does chunk started from new line or not, does number of quotes is %2 == 0 or not e.g.). I've did somethng like that here https://github.com/prutskov/modin/blob/40653e5e9fdeba6b8710f5761d870ee8e47b5eea/modin/core/io/text/text_file_dispatcher.py#L211-L230
  3. Get this metadata on driver process. Calculate how much of bytes each worker should send to the previous/next in order.
  4. Align chunks in parallel to make each one has correct line endings/quotations.
  5. Continue habitual flow with creating partitions from byte chunks which are already placed in distributed storage without any additional reading.

In this flow I see the next pros:

  1. Single reading in parallel. All bytes at once placed in distributed memory of execution engine.
  2. There is no huge data transfers between workers. Only small byte offsets to completing chunk alignment can be transferred (probably this will be important for Dask, because it has storage in each worker).

Cons:

  1. This approach will cover only quoting parameter perf case. nrows case won't be covered, because we anyway should check how many lines we already read (I don't have ideas how we can do this in parallel).
  2. This approach will significantly change the current code of text_file_dispatcher. The task looks complex.

Does anyone have some ideas how we can handle this performance issue? Share your thoughts, please.

cc @modin-project/modin-core, @modin-project/modin-contributors, @devin-petersohn, @RehanSD

prutskov commented 2 years ago

Connected with https://github.com/modin-project/modin/issues/3222

anmyachev commented 2 years ago

@prutskov When we have a large cost to read from remote storage, wouldn't it be better to try to put the read blocks from the driver process directly into the object store. Well, this will increase the data transfer between workers, but the throughput between them in most cases should be greater than the throughput between the Ray cluster and remote storage. What you think?

prutskov commented 2 years ago

@prutskov When we have a large cost to read from remote storage, wouldn't it be better to try to put the read blocks from the driver process directly into the object store. Well, this will increase the data transfer between workers, but the throughput between them in most cases should be greater than the throughput between the Ray cluster and remote storage. What you think?

But I don't see any parallel work in your suggestion. According to the table I mentioned above driver process reading time takes ~474s and after that you suggest to put bytes from the driver process to object storage, right? In this case, probably, we will have some performance gain, but I don't think that it will be huge.

I assume, that reading of bytes directly in the object storage memory will be more faster than 1 thread-reading in the driver process (we can see that parallel implementation using mp.Pool of reading bytes in driver process takes 225sm that's twice faster than 1-thread reading). I don't think that the throughput the Ray cluster/multiprocessing.Pool and remote storage has significant difference. And for parallel reading we should use execution engine (Ray/Dask), I think, to not mixin different parallelization techniques like multiprocessing and Ray in execution flow.

vnlitvinov commented 2 years ago

The bandwidth to some remote S3 storage and to local (in most cases, in-memory on the node) object storage are vastly different. That said, I think what Anatoly suggests is probably least painful to implement to address at least some portions of slowness.

However, I feel that the merge of the two approaches would give us an even better result. My proposal is to do the following:

  1. Do not read the file in the driver, but split the offsets like we do and send those offsets to the workers (like Alexey suggests).
  2. The worker reads its chunk, finds the first full line and last line (much like driver does now), and then it sends the half-lines (incomplete first and last ones of the chunk) to the driver and parses its own chunk
  3. The driver concatenates the half-lines, parses those (it should be NPartition/2 lines, so not a lot) and creates new 1-row partitions from these

The only problem I see with my proposal is heavy unbalanced partitions...

Another option is to again read chunks in workers, find the half-lines, then push the whole chunks and half-lines in the storage. Then the driver would start new tasks, each of which would consume two half-lines and one full-line chunk (as bytearrays), combine and parse. This is probably closer to what Anatoly suggests and might be easy enough to implement, though it would use twice as much bandwidth from the object store which might be a bottleneck.

prutskov commented 2 years ago
  1. The worker reads its chunk, finds the first full line and last line (much like driver does now), and then it sends the half-lines (incomplete first and last ones of the chunk) to the driver and parses its own chunk

Probably this a bit wrong, because we must also check is quotes closed or not. In some cases we can face with \n between quotes which are placed in different chunks.

  1. The driver concatenates the half-lines, parses those (it should be NPartition/2 lines, so not a lot) and creates new 1-row partitions from these

The only problem I see with my proposal is heavy unbalanced partitions...

I agree, that looks like this is heavy unbalanced. Also, the number of partitions will be bigger in this case than original equivalent number of MODIN_NPARTITIONS. (If I correctly understand what you mean).

Another option is to again read chunks in workers, find the half-lines, then push the whole chunks and half-lines in the storage. Then the driver would start new tasks, each of which would consume two half-lines and one full-line chunk (as bytearrays),

But in this case we should ray.get the whole chunks/half-lines from the object storage to can process this in the driver process.

I agree, that Anatoly's suggestion is not difficult to implement but in this case we read the whole data in the driver process and just put bytes in the object storage (I don't see any parallelism here except creating of pandas DataFrames from byte chunks in the workers).

In the last your idea I catch that we need to do a lot of full data transfers: csv->workers->driver->workers

vnlitvinov commented 2 years ago

Probably this a bit wrong, because we must also check is quotes closed or not. In some cases we can face with \n between quotes which are placed in different chunks.

It isn't any different than current approach, or should not be, rather.

In the last your idea I catch that we need to do a lot of full data transfers: csv->workers->driver->workers

There isn't any data transfer from workers to driver to workers, only direct workers->workers (so it's not 3x the usual traffic, but only 2x). The driver doesn't need to read the byte arrays from the object store, it just assigns those as ObjectRef's to tasks in the workers.