apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.37k stars 3.49k forks source link

[C++][Python][R][Dataset] Control overwriting vs appending when writing to existing dataset #28157

Open asfimport opened 3 years ago

asfimport commented 3 years ago

Currently, the dataset writing (eg with pyarrow.dataset.write_dataset) uses a fixed filename template ("part\{i\}.ext"). This means that when you are writing to an existing dataset, you de facto overwrite previous data when using this default template.

There is some discussion in ARROW-10695 about how the user can avoid this by ensuring the file names are unique (the user can specify the basename_template to be something unique). There is also ARROW-7706 about silently doubling data (so not overwriting existing data) with the legacy parquet.write_to_dataset implementation.

It could be good to have a "mode" when writing datasets that controls the different possible behaviours. And erroring when there is pre-existing data in the target directory is maybe the safest default, because both appending vs overwriting silently can be surprising behaviour depending on your expectations.

Reporter: Joris Van den Bossche / @jorisvandenbossche

Related issues:

Note: This issue was originally created as ARROW-12358. Please see the migration documentation for further details.

asfimport commented 3 years ago

Joris Van den Bossche / @jorisvandenbossche: As mentioned by @ldacey in ARROW-10695 (and also in the comment on my SO answer), one of the consequences of the current default behaviour is that it will sometimes overwrite and sometimes append data, depending on what files are already present and how many parts your are writing.
It would probably be useful to be able to either fully overwrite or either always append.

Taking inspiration for possible "modes" from ARROW-7706:

asfimport commented 3 years ago

Lance Dacey / @ldacey: I think that having an "overwrite" option would satisfy my need for the partition_filename_cb  https://issues.apache.org/jira/browse/ARROW-12365 if we can replace all data inside the partition. This would be great for file compaction as well - we could read a dataset with a lot of tiny file fragments and then overwrite it.

Overwriting a specific file is also useful. For example, my basename_template is usually my f"{task-id}-{schedule-timestamp}-{file-count}-{i}.parquet". I am able clear a task and overwrite a file which already exists. The only flaw here is that we cannot control the {i} variable so I guess it is not guaranteed. I could live without this.

For "append", is it possible for the counter to be per partition instead (potential race conditions if multiple tasks write to the same partition in parallel perhaps, and it seems to be a more demanding step for large datasets..)? Or could the {i} variable optionally be a uuid instead of the fragment count?

"error" makes sense. 

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: cc @westonpace  @bkietz

asfimport commented 3 years ago

Weston Pace / @westonpace: tl;dr: Do what @jorisvandenbossche said and interpret "overwrite" as "overwrite the entire partition".

 

https://stackoverflow.com/questions/27033823/how-to-overwrite-the-output-directory-in-spark is related (talks about this issue and how it is handled in Spark).  Even reading through all the answers however I cannot tell if "overwrite" replaces the entire partition or the entire dataset.  It does appear to do one or the other and not just replacing some of a partition.  Only replacing some of a partition does not seem like it would ever be useful.

Overwriting the entire table could always be easily achieved without pyarrow by simply removing the dataset beforehand so I don't see much value in adding that capability.  Although it does bring up the question of repartitioning which would require deleting the old data as it is read, but I think that is a different topic (and related to the update topic I mention below).  Deleting a partition isn't very hard for the user either.  The tricky part though is knowing which partition to delete.

With that in mind I'd suggest the following:

Overwrite-partition: If the dataset write will write to partition X then delete all data in partition X first.

Append: Same as @jorisvandenbossche  mentioned.  Similar to how we behave today but add logic to make sure we never overwrite a file that happens to have the same counter (e.g. detect the max counter value before we start writing and continue the old counter)

Error: Same as @jorisvandenbossche mentioned.

 

The overwrite-partition mode is useful for the case of "Load the entire dataset (or an entire partition), modify it, write it back out".

 

However, I think the use case that is still missing is:

asfimport commented 3 years ago

Lance Dacey / @ldacey: Being able to update and replace specific rows would be very powerful. For my use case, I am basically overwriting the entire partition in order to update a (sometimes tiny) subset of rows. That means that I need to read the existing data for that partition which was saved previously, and the new data with updated or new rows. Then I need to sort and drop duplicates (I use pandas because there is no simple .drop_duplicates() for a pyarrow table, but adding a step with pandas can add some complication sometimes with data types), then I need to overwrite the partition (I use the partition_filename_cb to guarantee that the final file for the partition is the same).

asfimport commented 3 years ago

Weston Pace / @westonpace: So looking on this with fresh eyes, the "overwrite mode" feature is fairly different from an "update" feature.  So I don't think update related topics are relevant for this ticket.  Update generally (and specifically in @ldacey 's case) implies reading and writing to the same set of files.   Overwrite-partition mode wouldn't allow for that.  Overwrite-partition mode could be useful in some limited circumstances (e.g. somehow someone regenerates an entire new set of data for one or more partitions) but I think those are rare enough, and would be handled by a general "update" feature anyways, that I don't see much benefit in creating a separate feature and the complexity would just confuse users.

 

So I'll walk back my earlier comment.  I'd now argue that dataset write should only allow "append" and "error" options.

 

Dataset update could be created as a separate Jira ticket (I'll go ahead and draft one).  Dataset update would mean scanning and rewriting a dataset (or parts thereof).

asfimport commented 3 years ago

Lance Dacey / @ldacey: What is the common workflow pattern for folks trying to imitate something similar to a view in a database?

 

In many of my sources I have a dataset which is append only (using UUIDs in the basename template), normally partitioned by date. If this data is downloaded frequently or is generated from multiple sources (for example, several endpoints or servers), then each partition might have many files. Most likely there are also different versions of each row (one ID will have a row for each time it was updated, for example).

 

I then write to a new dataset which is used for reporting and visualization. 

  1. Get the list of files which were saved to the append-only dataset during the most recent schedule
  2. Create a dataset from the list of paths which were just saved and use .get_fragments() and ds._get_partition_keys(fragment.partition_expression) to generate a filter expression (this allows me to query for all of the data in each relevant partition which was recently modified - so if only a single row was modified in the 2021-08-05 partition, then I still need to read all of the other data in that partition in order to finalize it)
  3. Create a dataframe, sort the data and drop duplicates on a primary key, convert back to a table (it would be nice to be able to do this purely in a pyarrow table so I could leave out pandas!)
  4. Use pq.write_to_dataset() with partition_filename_cb=lambda x: str(x[-1]) + ".parquet" to write to a final dataset. This allows me to overwrite the relevant partitions because the filenames are the same. I can be certain that I only have the latest version of each row.

     

    This is my approach to come close to what I would achieve with a view in the database. It works fine, but the storage is essentially doubled since I am maintaining two datasets (append-only and final). Our visualization tool connects directly to these parquet files, so there is some benefit in having less files (one per partition instead of potentially hundreds) as well.

asfimport commented 3 years ago

Weston Pace / @westonpace: Do you clear your append only dataset after step 4? In other words, is it just a temporary staging area (which stays rather small) or are you wanting to keep the duplicate rows in your base dataset?

So, to check my understanding, I think what you are describing is a materialized view with incremental refresh. Does this sound right?

In other words, the results of a query (in your case the query is sort of a "group by date" where you take the latest row instead of aggregating anything) are saved off (saved off meaning you don't have to recompute the query each time) as a view and you want to update the view when new data arrives but should only have to read the new data when computing the update.

Some thoughts to your current approach...

asfimport commented 3 years ago

Lance Dacey / @ldacey:

I do not clear my append dataset, but I need to add tasks to consolidate the small files someday. If I download a source every hour, I will have a minimum of 24 files in a single daily partition and some of them might be small.

But yes, I am basically describing a materialized view. I cannot rely on an incremental refresh in many cases because I partition data based on the created_at date and not the updated_at date.

Here is an example where the data was all updated today, but there were some rows originally created days or even months ago.


table = pa.table(
    {
        "date_id": [20210114, 20210811, 20210812, 20210813],    #based on the created_at timestamp
        "created_at": ["2021-01-14 16:45:18", "2021-08-11 15:10:00", "2021-08-12 11:19:26", "2021-08-13 23:01:47"],
        "updated_at": ["2021-08-13 00:04:12", "2021-08-13 02:16:23", "2021-08-13 09:55:44", "2021-08-13 22:36:01"],
        "category": ["cow", "sheep", "dog", "cat"],
        "value": [0, 99, 17, 238],
    }
)

Partitioning this by date_id would save the following files in my "append" dataset. Note that this has one row which is from January, so I cannot do an incremental refresh from the minimum date because it would be too much data in a real world scenario.


written_paths = [
    "dev/test/date_id=20210812/test-20210813114024-2.parquet",
    "dev/test/date_id=20210813/test-20210813114024-3.parquet",
    "dev/test/date_id=20210811/test-20210813114024-1.parquet",
    "dev/test/date_id=20210114/test-20210813114024-0.parquet",
]

During my next task, I create a new dataset from the written_paths above (so a dataset of only the new/changed data). Using .get_fragments() and partition expressions, I ultimately generate a filter expression:


fragments = ds.dataset(written_paths, fs).get_fragments()
for frag in fragments:
    partitions = ds._get_partition_keys(frag.partition_expression)
#... other stuff
filter_expression = 
<pyarrow.dataset.Expression is_in(date_id, {value_set=int32:[
  20210114,
  20210811,
  20210812,
  20210813
], skip_nulls=true})>

Finally, I use that filter to query my "append" dataset which has all historical data. So I read all of the data in each partition


df = ds.dataset(source, fs).to_table(filters=filter_expression).to_pandas()

, convert the table to pandas, sort and drop duplicates, convert back to a table, and then save to my "final" dataset with partition_filename_cb to overwrite whatever was there. This means that if even a single row was updated within a partition, I will be read all of the data in that partition and recompute the final version of each row. This also requires me to use the "use_legacy_dataset" flag to support overwriting the existing partitions.

I found a custom implementation of drop_duplicates (https://github.com/TomScheffers/pyarrow_ops/blob/main/pyarrow_ops/ops.py) using pyarrow Tables, but I am still just using pandas for now. I keep a close eye on the pyarrow.compute() docs and have been slowly replacing stuff I do with pandas directly in the pyarrow tables, which is great.

You mentioning the temporary staging area got me to realize that I could replace my messy staging append dataset (many small files) with something temporary that I delete each schedule, and then read from it and create a consolidated historical append-only dataset similar to what I am doing in the example above (one file per partition instead of potentially hundreds)

asfimport commented 3 years ago

Weston Pace / @westonpace: I've added some customization here in https://github.com/apache/arrow/pull/10955 via "existing_data_behavior". This will provide the options...

kError - Raise an error if there are any files or directories in `base_dir` (the new default)
kOverwriteOrIgnore - Existing files will be ignored unless the filename is one of those chosen by the dataset writer in which case they will be overwritten (the old default)
kDeleteMatchingPartitions - This is similar to the dynamic partition overwrite mode in parquet. The first time a directory is written to it will delete any existing data.

This was based partially on discussion in ARROW-7706. I think kDeleteMatchingPartitions might simplify your step 4 but I'm not entirely sure. Feedback welcome!

asfimport commented 3 years ago

Lance Dacey / @ldacey: kDeleteMatchingPartitions - So this only deletes the individual partitions and not the entire dataset correct? So if I save a dataset made up of hundreds of partitions but only 4 of them are written to, then only those 4 partitions will have their existing files cleared? If so, then yes that should work for me.

 

 

asfimport commented 3 years ago

Weston Pace / @westonpace: Yes, that is correct. It will only delete a partition that will be updated / modified as part of the write_dataset operation.

asfimport commented 2 years ago

Lance Dacey / @ldacey: I was not able to install 6.0.1 until the latest version of turbodbc supported it. Finally have it up and running and I see that the existing_data_behavior argument has been added.

 Is this the proper way to use the "delete_matching" feature? When I tried to set that as default, there was a FileNotFound error (because the base_dir did not exist at all).

EDIT - using the try, except does not really work. I need to save the dataset as "overwrite_or_ignore" first, then save the dataset again as "delete_matching"  


try:
    ds.write_dataset(
        data=table,
        existing_data_behavior="error",
    )
except pa.lib.ArrowInvalid:
    ds.write_dataset(
        data=table,
        ...,
        existing_data_behavior="delete_matching",
    )

I created a dataset using my old method (use_legacy_dataset = True with a partition_filename_cb to overwrite partitions) and the output matched the new "delete_matching" dataset. I believe I can completely retire the use_legacy_dataset code now. Really amazing, thank you.

asfimport commented 2 years ago

Lance Dacey / @ldacey: Any thoughts on "delete_matching" creating the partition if it does not exist already?

asfimport commented 2 years ago

Weston Pace / @westonpace: If "delete_matching" is not creating the base directory or the partition directories in the same way as the other methods then I would consider that a bug. Let's leave this open to fix that.

asfimport commented 2 years ago

Lance Dacey / @ldacey: @westonpace Just wanted to check if this issue with "delete_matching" not creating the partition directory is still on the radar. I am currently using "overwrite_or_ignore", and then writing the same table again with "delete_matching" which is a bit redundant.

asfimport commented 2 years ago

Weston Pace / @westonpace: Thanks for checking in. I did some testing on this today. I might not be understanding what you are after. I just tested with the following:


import shutil

import pyarrow as pa
import pyarrow.dataset as ds

# Make sure the /tmp/newdataset directory does not exist                                                                                                                                                           
shutil.rmtree('/tmp/newdataset', ignore_errors=True)

tab = pa.Table.from_pydict({ 'part': [0, 0, 1, 1], 'value': [0, 1, 2, 3] })
ds.write_dataset(tab,
                 '/tmp/newdataset',
                 partitioning_flavor='hive',
                 partitioning=['part'],
                 existing_data_behavior='delete_matching',
                 format='parquet')

I used the 6.0.1 release and did not run into any issues. Am I misunderstanding the use case? Or is it possible you are using a certain filesystem? Or maybe you are on a particular OS?

asfimport commented 2 years ago

Lance Dacey / @ldacey: Ah, so it must be related to the filesystem. I am using adlfs / fsspec to save datasets on Azure Blob:


import pyarrow as pa
import pyarrow.dataset as ds

print(type(fs))
tab = pa.Table.from_pydict({ 'part': [0, 0, 1, 1], 'value': [0, 1, 2, 3] })
ds.write_dataset(data=tab,
                 base_dir='/dev/newdataset',
                 partitioning_flavor='hive',
                 partitioning=['part'],
                 existing_data_behavior='delete_matching',
                 format='parquet',
                 filesystem=fs)

Output:


<class 'adlfs.spec.AzureBlobFileSystem'>

[2022-01-14 12:45:44,076] {api.py:76} WARNING - Given content is empty, stopping the process very early, returning empty utf_8 str match
[2022-01-14 12:45:44,090] {api.py:76} WARNING - Given content is empty, stopping the process very early, returning empty utf_8 str match
[2022-01-14 12:45:44,093] {api.py:76} WARNING - Given content is empty, stopping the process very early, returning empty utf_8 str match
[2022-01-14 12:45:44,109] {api.py:76} WARNING - Given content is empty, stopping the process very early, returning empty utf_8 str match
[2022-01-14 12:45:44,121] {api.py:76} WARNING - Given content is empty, stopping the process very early, returning empty utf_8 str match
[2022-01-14 12:45:44,124] {api.py:76} WARNING - Given content is empty, stopping the process very early, returning empty utf_8 str match
---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
/tmp/ipykernel_47/3075266795.py in <module>
      4 print(type(fs))
      5 tab = pa.Table.from_pydict({ 'part': [0, 0, 1, 1], 'value': [0, 1, 2, 3] })
----> 6 ds.write_dataset(data=tab,
      7                  base_dir='/dev/newdataset',
      8                  partitioning_flavor='hive',

/opt/conda/envs/airflow/lib/python3.9/site-packages/pyarrow/dataset.py in write_dataset(data, base_dir, basename_template, format, partitioning, partitioning_flavor, schema, filesystem, file_options, use_threads, max_partitions, file_visitor, existing_data_behavior)
    876         scanner = data
    877 
--> 878     _filesystemdataset_write(
    879         scanner, base_dir, basename_template, filesystem, partitioning,
    880         file_options, max_partitions, file_visitor, existing_data_behavior

/opt/conda/envs/airflow/lib/python3.9/site-packages/pyarrow/_dataset.pyx in pyarrow._dataset._filesystemdataset_write()

/opt/conda/envs/airflow/lib/python3.9/site-packages/pyarrow/_fs.pyx in pyarrow._fs._cb_delete_dir_contents()

/opt/conda/envs/airflow/lib/python3.9/site-packages/pyarrow/fs.py in delete_dir_contents(self, path)
    357             raise ValueError(
    358                 "delete_dir_contents called on path '", path, "'")
--> 359         self._delete_dir_contents(path)
    360 
    361     def delete_root_dir_contents(self):

/opt/conda/envs/airflow/lib/python3.9/site-packages/pyarrow/fs.py in _delete_dir_contents(self, path)
    347 
    348     def _delete_dir_contents(self, path):
--> 349         for subpath in self.fs.listdir(path, detail=False):
    350             if self.fs.isdir(subpath):
    351                 self.fs.rm(subpath, recursive=True)

/opt/conda/envs/airflow/lib/python3.9/site-packages/fsspec/spec.py in listdir(self, path, detail, **kwargs)
   1221     def listdir(self, path, detail=True, **kwargs):
   1222         """Alias of `AbstractFileSystem.ls`."""
-> 1223         return self.ls(path, detail=detail, **kwargs)
   1224 
   1225     def cp(self, path1, path2, **kwargs):

/opt/conda/envs/airflow/lib/python3.9/site-packages/adlfs/spec.py in ls(self, path, detail, invalidate_cache, delimiter, return_glob, **kwargs)
    753     ):
    754 
--> 755         files = sync(
    756             self.loop,
    757             self._ls,

/opt/conda/envs/airflow/lib/python3.9/site-packages/fsspec/asyn.py in sync(loop, func, timeout, *args, **kwargs)
     69         raise FSTimeoutError from return_result
     70     elif isinstance(return_result, BaseException):
---> 71         raise return_result
     72     else:
     73         return return_result

/opt/conda/envs/airflow/lib/python3.9/site-packages/fsspec/asyn.py in _runner(event, coro, result, timeout)
     23         coro = asyncio.wait_for(coro, timeout=timeout)
     24     try:
---> 25         result[0] = await coro
     26     except Exception as ex:
     27         result[0] = ex

/opt/conda/envs/airflow/lib/python3.9/site-packages/adlfs/spec.py in _ls(self, path, invalidate_cache, delimiter, return_glob, **kwargs)
    875                     if not finalblobs:
    876                         if not await self._exists(target_path):
--> 877                             raise FileNotFoundError
    878                         return []
    879                     cache[target_path] = finalblobs

FileNotFoundError: 

Do you think I should raise this as an issue on the adlfs project instead?

asfimport commented 2 years ago

Weston Pace / @westonpace: Ah, I think I see. We call something like...


fs.CreateDir(partition_dir);
if (delete_matching) {
  fs.DeleteDirContents(partition_dir);
}

My guess is that ADLFS doesn't handle empty directories very well (I think we have to create an empty file or something when working with S3) so the fs.CreateDir operation is basically a no-op. Then, when we try to do DeleteDirContents it cannot find the directory.

This is a bit of a tricky one. I wonder if we can come up with some kind of workaround.

asfimport commented 2 years ago

Weston Pace / @westonpace: This (https://github.com/apache/arrow/compare/master...westonpace:feature/ARROW-12358--only-call-delete-contents-if-needed) would be one possible fix but not easily regressible (I suppose I could mock out a filesystem to make sure we don't call delete contents on an empty directory but that seems like a lot of complexity)

This (https://github.com/apache/arrow/compare/master...westonpace:feature/ARROW-12358--pass-delete-contents-if-dir-not-there) would be another possible fix and it's easily regressible but does lead to different filesystems acting differently (although we already have some of this)

@jorisvandenbossche @lidavidm opinions?

asfimport commented 2 years ago

David Li / @lidavidm: Hmm, what about calling DeleteDirContents and swallowing a not found error? Or is the error too difficult to distinguish from other errors?

asfimport commented 2 years ago

Weston Pace / @westonpace: The "not found" error is thrown from python and then I'd be catching it in C++. I'm not sure how well that would work. I don't think we have a specific NotFoundError in C++ so I'd need to examine the message content which is a little icky.

asfimport commented 2 years ago

David Li / @lidavidm: I think we use KeyError for such things (or else we could use StatusDetail to propagate a more detailed error code) but yes, that would be a little icky if we only had the message content.

I would probably prefer the second fix (delete and ignore FileNotFound) or perhaps just specifying that DeleteDirContents ignores not-found errors.

asfimport commented 2 years ago

Weston Pace / @westonpace:

or perhaps just specifying that DeleteDirContents ignores not-found errors.

That would mean adding the behavior to local filesystem and the native s3 filesystem which wouldn't be too much trouble. I wait and let Joris weigh in.

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: We could perhaps add an option to ignore not-found errors.

asfimport commented 2 years ago

Lance Dacey / @ldacey: Is this slated for a fix in 7.0.0? I am writing a dataset using "overwrite_or_ignore" and then "delete_matching" if my initial save fails (FileNotFoundError) using "delete_matching".

asfimport commented 2 years ago

Weston Pace / @westonpace: No, this was not fixed as part of 7.0.0 (that build is pretty much finalized). I think we have some consensus here on how to fix it (optionally ignoring not-found errors in DeleteDirContents). I will assign it to myself and expect I will find some time for it in 8.0.0.

asfimport commented 2 years ago

Weston Pace / @westonpace: If anyone wants to grab it in the meantime I don't expect I will be getting to it immediately.

asfimport commented 2 years ago

Lance Dacey / @ldacey: Is this issue sufficient to track this? In the meantime, is there a more efficient way to create the partitions instead using "overwrite_or_ignore" and then "delete_matching" if the first attempt failed?

asfimport commented 2 years ago

Lance Dacey / @ldacey: Is this on the radar to be fixed for the next release?

asfimport commented 2 years ago

Weston Pace / @westonpace: Yes, I will tackle this on Friday.

asfimport commented 2 years ago

Weston Pace / @westonpace: I created ARROW-16159 (and a PR) to address the "allow DeleteDirContents to succeed if the dir is not found" issue. Once that is merged in we can test this agian.

asfimport commented 2 years ago

Weston Pace / @westonpace: @ldacey now that ARROW-16159 has merged this is probably ready to test again. Are you able to test with the nightly builds? Or do you want to wait for the release?

asfimport commented 2 years ago

Lance Dacey / @ldacey: Nice, thanks. I can try to test with a nightly build this weekend.

asfimport commented 1 year ago

Apache Arrow JIRA Bot: This issue was last updated over 90 days ago, which may be an indication it is no longer being actively worked. To better reflect the current state, the issue is being unassigned per project policy. Please feel free to re-take assignment of the issue if it is being actively worked, or if you plan to start that work soon.