ranaroussi / pystore

Fast data store for Pandas time-series data
Apache License 2.0
562 stars 101 forks source link

Append creates new parquet file #17

Closed cevans3098 closed 5 years ago

cevans3098 commented 5 years ago

Not sure if this is the intended functionality, but I wanted to verify before reporting an issue. I am appending daily minute data files together using the default chucksize. I would have expected the parquet file to grow until the chunksize or npartition size was met before creating another parquet file. However, a new parquet file is created for each append call.

Is this the intended functionality? I can look into the code further and/or provide my code if this is not your intention.

ranaroussi commented 5 years ago

Actually, this behavior is not being caused by PyStore but rather by dask.dataframe.

While it makes sense for the parquet file to grow until the chunksize or npartition size was met before creating another parquet file - one has to wonder if it's wise to interfere with Dask's behavior.

In any case - if you have a way around this, please post it here or as a pull request and I'll run some benchmarks.

Thanks!

cevans3098 commented 5 years ago

Ran,

I worked around this issue two different ways:

1) I read in as many data files (*.csv) as possible (as much as memory will allow). I have done this either with:

2) The other method I had good success with was not using the append, but reading the existing parquet file, appending the dataframe, doing a repartition and then rewriting to parquet. Below is some test code I use to benchmark with:

def __update_pystore_option2(self, item_name, data, new_metadata={}):
        if item_name not in self.collection.items:
            self.collection.write(item=item_name, data=data, metadata=new_metadata, reload_items=False)
        else:
            item = self.collection.item(item_name)
            olddata = item.data
            newdata = dd.from_pandas(data, npartitions=1)

            new_df = olddata.append(newdata)
            new_df = new_df.repartition(npartitions=1)
            self.collection.write(item=item_name, data=new_df, metadata=new_metadata, reload_items=False, overwrite=True)

While this might look like more work, it actually ended up being faster both on the read and write and resulted in a significantly smaller file I ran a test where I read in 30 data files each consisting of 401 rows and 15 columns and I wrote them to pystore two different ways. The first was the conventional, write if the item did not exist and append if it did The second method was as illustrated in the code above

The results:

**Starting Parquet file test - append, method 1**
***** Parquet file write/append time 1230.032 ms
***** Parquet file write/append time 3354.0 ms
***** Parquet file write/append time 100.104 ms
***** Parquet file write/append time 113.012 ms
***** Parquet file write/append time 118.976 ms
***** Parquet file write/append time 116.994 ms
***** Parquet file write/append time 137.012 ms
***** Parquet file write/append time 142.997 ms
***** Parquet file write/append time 162.995 ms
***** Parquet file write/append time 158.06 ms
***** Parquet file write/append time 159.00300000000001 ms
***** Parquet file write/append time 153.001 ms
***** Parquet file write/append time 150.001 ms
***** Parquet file write/append time 157.999 ms
***** Parquet file write/append time 180.994 ms
***** Parquet file write/append time 192.002 ms
***** Parquet file write/append time 181.998 ms
***** Parquet file write/append time 185.942 ms
***** Parquet file write/append time 179.013 ms
***** Parquet file write/append time 200.99900000000002 ms
***** Parquet file write/append time 192.999 ms
***** Parquet file write/append time 215.99699999999999 ms
***** Parquet file write/append time 211.01 ms
***** Parquet file write/append time 199.99800000000002 ms
***** Parquet file write/append time 210.998 ms
***** Parquet file write/append time 231.099 ms
***** Parquet file write/append time 266.005 ms
***** Parquet file write/append time 235.014 ms
***** Parquet file write/append time 225.01399999999998 ms
***** Parquet file write/append time 238.99699999999999 ms

Summary
** Time to read final parquet file: 38.027 ms
** The length of the dataframe is: 12295 rows
** Number of Parquet files created: 30
** final file size on disk; 1.23 MB

Starting Parquet file test - write and rewrite, method 2
***** Parquet file write/rewrite time 74.994 ms
***** Parquet file write/rewrite time 221.997 ms
***** Parquet file write/rewrite time 204.99800000000002 ms
***** Parquet file write/rewrite time 218.994 ms
***** Parquet file write/rewrite time 207.00099999999998 ms
***** Parquet file write/rewrite time 219.031 ms
***** Parquet file write/rewrite time 203.995 ms
***** Parquet file write/rewrite time 205.05100000000002 ms
***** Parquet file write/rewrite time 223.0 ms
***** Parquet file write/rewrite time 216.99699999999999 ms
***** Parquet file write/rewrite time 219.031 ms
***** Parquet file write/rewrite time 211.033 ms
***** Parquet file write/rewrite time 211.085 ms
***** Parquet file write/rewrite time 226.992 ms
***** Parquet file write/rewrite time 225.999 ms
***** Parquet file write/rewrite time 214.001 ms
***** Parquet file write/rewrite time 214.03 ms
***** Parquet file write/rewrite time 215.955 ms
***** Parquet file write/rewrite time 249.998 ms
***** Parquet file write/rewrite time 237.998 ms
***** Parquet file write/rewrite time 245.034 ms
***** Parquet file write/rewrite time 244.001 ms
***** Parquet file write/rewrite time 226.031 ms
***** Parquet file write/rewrite time 244.001 ms
***** Parquet file write/rewrite time 250.994 ms
***** Parquet file write/rewrite time 202.03199999999998 ms
***** Parquet file write/rewrite time 286.991 ms
***** Parquet file write/rewrite time 210.0 ms
***** Parquet file write/rewrite time 235.00199999999998 ms
***** Parquet file write/rewrite time 270.029 ms

Summary
** Time to read final parquet file: 26.032 ms
** The length of the dataframe is: 12295
** Number of Parquet files created: 1
** final file size on disk; 0.464 MB

I have to do some more testing to test how it scales with large files, but I believe it will work well given the parquet file structure.

I'd welcome comments and suggestions

Craig

ranaroussi commented 5 years ago

This is actually similar to the solution I had I mind. I need to run some benchmarking tests to confirm the results before merging this solution to the next release.

Thanks!!

ranaroussi commented 5 years ago

I've pushed a new version to the dev branch. It should result in a faster and more consistent behavior when using append. By default, PyStore will aim for partitions of ~99MB each (as per Dask's recommendation).

LMK.

ranaroussi commented 5 years ago

@cevans3098 - Did you get a chance to test pastors 0.1.13?

cevans3098 commented 5 years ago

@ranaroussi I have not had a chance to test this yet, but I will take a look this weekend

cevans3098 commented 5 years ago

@ranaroussi I benchmarked version 0.1.13 and I think something is still missing. I ran the same benchmarking code as I did previously.

Write Speed:

Read Speed:

parquet files

I did a quick look at the code updates and compared it to my code and the one difference I see is that you did not repartition the dataframe after combining them. This is probably the reason why we still see 30 individual file and why the append time are slow. If you do not repartition it will create a new parquet file for each partition

new_df = new_df.repartition(npartitions=1)

see my full code from my benchmarking example above: https://github.com/ranaroussi/pystore/issues/17#issuecomment-518542390

Below are the result from my benchmarking with 0.1.13

Starting Parquet file test - append (0.1,13)
***** Parquet file write/append time 2354.0 ms
***** Parquet file write/append time 8288.604 ms
***** Parquet file write/append time 901.984 ms
***** Parquet file write/append time 1089.964 ms
***** Parquet file write/append time 1373.026 ms
***** Parquet file write/append time 1621.995 ms
***** Parquet file write/append time 1744.019 ms
***** Parquet file write/append time 2059.012 ms
***** Parquet file write/append time 2602.975 ms
***** Parquet file write/append time 2849.058 ms
***** Parquet file write/append time 2115.003 ms
***** Parquet file write/append time 2481.992 ms
***** Parquet file write/append time 2324.999 ms
***** Parquet file write/append time 2397.006 ms
***** Parquet file write/append time 2516.004 ms
***** Parquet file write/append time 2699.001 ms
***** Parquet file write/append time 2815.002 ms
***** Parquet file write/append time 3098.004 ms
***** Parquet file write/append time 3461.000 ms
***** Parquet file write/append time 3992.004 ms
***** Parquet file write/append time 3991.999 ms
***** Parquet file write/append time 3798.998 ms
***** Parquet file write/append time 4221.463 ms
***** Parquet file write/append time 4054.033 ms
***** Parquet file write/append time 4175.036 ms
***** Parquet file write/append time 4574.992 ms
***** Parquet file write/append time 4595.996 ms
***** Parquet file write/append time 4989.004 ms
***** Parquet file write/append time 5095.996 ms
***** Parquet file write/append time 5696.228 ms

** Time to read final parquet file: 62.999 ms
** The length of the dataframe is: 12296

-----------------------------------------------------

Starting Parquet file test - write and rewrite - original benchmark code
***** Parquet file write/rewrite time 109.993 ms
***** Parquet file write/rewrite time 453.998 ms
***** Parquet file write/rewrite time 444.996 ms
***** Parquet file write/rewrite time 604.609 ms
***** Parquet file write/rewrite time 558.000 ms
***** Parquet file write/rewrite time 458.029 ms
***** Parquet file write/rewrite time 460.990 ms
***** Parquet file write/rewrite time 462.001 ms
***** Parquet file write/rewrite time 433.000 ms
***** Parquet file write/rewrite time 493.006 ms
***** Parquet file write/rewrite time 446.000 ms
***** Parquet file write/rewrite time 470.000 ms
***** Parquet file write/rewrite time 457.035 ms
***** Parquet file write/rewrite time 465.030 ms
***** Parquet file write/rewrite time 480.034 ms
***** Parquet file write/rewrite time 480.001 ms
***** Parquet file write/rewrite time 461.998 ms
***** Parquet file write/rewrite time 486.033 ms
***** Parquet file write/rewrite time 458.033 ms
***** Parquet file write/rewrite time 485.034 ms
***** Parquet file write/rewrite time 470.995 ms
***** Parquet file write/rewrite time 444.005 ms
***** Parquet file write/rewrite time 495.998 ms
***** Parquet file write/rewrite time 457.000 ms
***** Parquet file write/rewrite time 489.000 ms
***** Parquet file write/rewrite time 477.998 ms
***** Parquet file write/rewrite time 480.998 ms
***** Parquet file write/rewrite time 477.000 ms
***** Parquet file write/rewrite time 496.999 ms
***** Parquet file write/rewrite time 483.001 ms

** Time to read final parquet file: 35.005 ms
** The length of the dataframe is: 12296
ranaroussi commented 5 years ago

That’s really weird... you really shouldn’t get multiple parquet files...

What version of Dask/fastparquet are you using?

cevans3098 commented 5 years ago

@ranaroussi I am currently running:

Dask: 1.2.2 fastparquet: 0.3.0

Do you think it is the Dask version... I could upgrade it to 2.1 or 2.3

ranaroussi commented 5 years ago

Yes. Please upgrade Dask and let me know please.

Thanks!

cevans3098 commented 5 years ago

@ranaroussi I benchmarked my code with Dask 2.1.0 and Fastparquet 0.3.0 and had the same experience as in my previous note, I saw 30 individual parquet files when I used the append.

When I updated to Dask 2.3.0 and Fastparquet 0.3.3 the code did not appear to work correctly. While it did only generate 1 parquet file, it did not contain all the data. I have included my results below.

key points to review:

Starting Parquet file test - append
***** Parquet file write/append time 5092.581 ms
***** Parquet file write/append time 758.398 ms
***** Parquet file write/append time 108.925 ms
***** Parquet file write/append time 108.20200000000001 ms
***** Parquet file write/append time 107.065 ms
***** Parquet file write/append time 106.96700000000001 ms
***** Parquet file write/append time 106.984 ms
***** Parquet file write/append time 106.909 ms
***** Parquet file write/append time 106.28 ms
***** Parquet file write/append time 107.919 ms
***** Parquet file write/append time 108.047 ms
***** Parquet file write/append time 108.019 ms
***** Parquet file write/append time 107.954 ms
***** Parquet file write/append time 108.936 ms
***** Parquet file write/append time 106.121 ms
***** Parquet file write/append time 107.019 ms
***** Parquet file write/append time 105.975 ms
***** Parquet file write/append time 107.104 ms
***** Parquet file write/append time 106.977 ms
***** Parquet file write/append time 107.97 ms
***** Parquet file write/append time 107.933 ms
***** Parquet file write/append time 107.959 ms
***** Parquet file write/append time 106.938 ms
***** Parquet file write/append time 108.93700000000001 ms
***** Parquet file write/append time 105.961 ms
***** Parquet file write/append time 107.393 ms
***** Parquet file write/append time 105.97099999999999 ms
***** Parquet file write/append time 107.08 ms
***** Parquet file write/append time 107.02000000000001 ms
***** Parquet file write/append time 108.02499999999999 ms

** Time to read final parquet file: 114.963 ms
** The length of the dataframe is: 406
** Parquet file size: 141 kb

-----------------------------------------------------

Starting Parquet file test - write and rewrite
***** Parquet file write/rewrite time 206.019 ms
***** Parquet file write/rewrite time 827.078 ms
***** Parquet file write/rewrite time 766.9240000000001 ms
***** Parquet file write/rewrite time 776.102 ms
***** Parquet file write/rewrite time 782.925 ms
***** Parquet file write/rewrite time 777.0970000000001 ms
***** Parquet file write/rewrite time 777.984 ms
***** Parquet file write/rewrite time 775.982 ms
***** Parquet file write/rewrite time 775.894 ms
***** Parquet file write/rewrite time 781.079 ms
***** Parquet file write/rewrite time 776.978 ms
***** Parquet file write/rewrite time 778.916 ms
***** Parquet file write/rewrite time 775.073 ms
***** Parquet file write/rewrite time 778.062 ms
***** Parquet file write/rewrite time 781.933 ms
***** Parquet file write/rewrite time 803.956 ms
***** Parquet file write/rewrite time 789.002 ms
***** Parquet file write/rewrite time 797.077 ms
***** Parquet file write/rewrite time 790.9920000000001 ms
***** Parquet file write/rewrite time 782.9150000000001 ms
***** Parquet file write/rewrite time 810.095 ms
***** Parquet file write/rewrite time 800.9920000000001 ms
***** Parquet file write/rewrite time 790.0709999999999 ms
***** Parquet file write/rewrite time 792.0070000000001 ms
***** Parquet file write/rewrite time 803.833 ms
***** Parquet file write/rewrite time 795.182 ms
***** Parquet file write/rewrite time 792.062 ms
***** Parquet file write/rewrite time 813.9490000000001 ms
***** Parquet file write/rewrite time 836.115 ms
***** Parquet file write/rewrite time 847.99 ms

** Time to read final parquet file: 121.92500000000001 ms
** The length of the dataframe is: 12296
** Parquet file size: 1020 kb
cevans3098 commented 5 years ago

@ranaroussi for reference here are parts of my test code:

To test Append

    def __update_pystore_option1(self, item_name, data, new_metadata={}):
        ''' Adds new option data to the collection.  If the contract item does not exist the item is created
        and the data is added.  Otherwise the item is appended'''

        if item_name not in self.collection.items:
            self.collection.write(item=item_name, data=data, metadata=new_metadata, reload_items=False)
        else:                                       
            self.collection.append(item=item_name, data=data)

The code I proposed earlier - Write/rewrite

    def __update_pystore_option2(self, item_name, data, new_metadata={}):
        ''' Adds new option data to the collection.  If the contract item does not exist the item is created
        and the data is added.  Otherwise the item is appended'''

        if item_name not in self.collection.items:
            self.collection.write(item=item_name, data=data, metadata=new_metadata, reload_items=False)
        else:
            item = self.collection.item(item_name)
            olddata = item.data
            newdata = dd.from_pandas(data, npartitions=1)

            new_df = olddata.append(newdata)
            new_df = new_df.repartition(npartitions=1)
            self.collection.write(item=item_name, data=new_df, metadata=new_metadata, reload_items=False, overwrite=True)
ranaroussi commented 5 years ago

Fixed in 0.1.15 (I hope 😄). Please reopen issue if the problem persists.