pandas-dev / pandas

Flexible and powerful data analysis / manipulation library for Python, providing labeled data structures similar to R data.frame objects, statistical functions, and much more
https://pandas.pydata.org
BSD 3-Clause "New" or "Revised" License
43.2k stars 17.77k forks source link

ENH: allow column oriented table storage in HDFStore #4454

Open jreback opened 11 years ago

jreback commented 11 years ago

renamed carray package: https://github.com/Blosc/bcolz

Soliciting any comments on this proposal to create a columnar access table in HDFStore.

This is actually very straightforward to do.

need a new kw argument to describe the type of format for storage: `format='s|t|c' (also allows expansion in the future to other formats)

so will essentially deprecate append=,table= keywords (or just translate them) to a format= kw.

df.to_hdf('test.h5','df',format='c')

Will have a master node which holds the structure. Will store a format with a single column from a DataFrame in a sub-node of the master.

advantages:

disadvantages:

There are actually 2 different formats that could be used here, I propose just the single-file for now. However, The sub-nodes could be spread out in a directory and stored as separate files. This allows concurrent access with some concurrent reads allowed (this is pretty tricky, so hold off on this for now).

This CTable format will use the existing PyTables infrastructure under the hood; it is possible to use the ctable module however http://carray.pytables.org/docs/manual/ (this is basically what BLAZE uses under the hood for its storage backend)

cpcloud commented 11 years ago

what does the running time of syncing of indices depend on other than possibly the obvious len(index)?

:+1: sounds interesting

i also think should default to Table format, but that's not really back compat...I just find table much more useful

jreback commented 11 years ago

when I say syncing indices I really mean its an append operation is a function of the number of columns

e.g.

DataFrame(randn(100,100)) will have to traverse 100 columns and append its 100 rows to each

while DataFrame(randn(100,10)) will have to traverse 10 columns to append its 100 rows

in Table format these would take the same amount of work

selection is the same principle (subject of course to the number of columns actually selected. That is the big benefit here, you could have a wide table, but only select a few columns at a time quickly)

Adding a column needs a compare of the index column of the table with the index of the new column (for integrity checking), as everything is indexed by the row number.

dragoljub commented 11 years ago

I like this idea. In many cases such as feature selection you really only want a few columns out if many but across all rows.

CarstVaartjes commented 10 years ago

Hi, it sounds like a great idea, I would love to understand better how the pandas - hdf5 works and this triggers some questions around how this it would work:

If these are all stupid questions / all these are already implemented and the separate columns would use that automatically, I apologize, I just never really found any explanation of how pandas & hdf5 indices work outside from a functional perspective. I will look at the source code in the coming weeks to estimate if I could be of any help here (would be good practice for my cython skills too I guess)

jreback commented 10 years ago

There are basically 2 approaches here:

The wave of the future is the CTable like approach (e.g. Blaze and @wesm new creation 'badger').

In a row-oriented table (the current Table storage mechanism), the query is delegated to numexpr which figures out which row coordinates are selected, then they are selected.

In approach (1), this would have to change as the query would have to be evaluated for the columns, then anded (it would be quite performant as usually the number of querands is much less than the number of columns in the table). That's the main reason in fact to use a column-oriented approach. Oftentimes you only need a small subset of columns back, while currently (in a row-oriented table), you get the entire row.

Approach (2) will again delegate the CTable mechanism (which uses numexpr under the hood), so would be pretty transparent.

So I think that a simple implementation of (1) would be straightforward. (2) offers the possibily of allowing transparent frames (kind of like memmap for numpy arrays).

I don't think this would actually need any API change, you could just specify format='column' instead of format=table'` and handle it internally.

CarstVaartjes commented 10 years ago

Thanks! I will experiment with approach 2 as it's also really a good learning experience for me. I guess it would also be nice to have optional stuff like pre-aggregation in a select performed inside ctable before loading the end result into pandas. Me experimenting here should not stop anyone else to jump on this too if they have time & urge. I'm estimating that this will take me quite a bit of time (2-3 months to be honest) as I'm swamped in my normal work + will have a learning curve.

jreback commented 10 years ago

@CarstVaartjes that would be gr8!. lmk of any questions/issues.

hussainsultan commented 10 years ago

@jreback I know this hasn't been active much but i wanted to run by my approach to this problem and see if i can get some feedback. I am trying to prototype how something like option 1 will work. I am using pandas HDFStore machinery to write each column as an array (writes will also be faster) with index as a separate group, where columns are sharded by number of rows (should be calculated somehow). This results in each store having a separate group for each column and within each group there are row partitioned columns (1 :N).

<class 'pandas.io.pytables.HDFStore'>
File path: column_store.h5
/f0/__0__             frame        (shape->[100000,1])
/f0/__1__             frame        (shape->[100000,1])
/f0/__2__             frame        (shape->[100000,1])
/f0/__3__             frame        (shape->[100000,1])

Here is the prototype code:

from collections import OrderedDict
import pandas as pd
import tables as tb
import numpy as np

#dataframe to store as columnar HDF
df = pd.DataFrame(np.random.randn(1000000,100))

with pd.get_store('column_store.h5',complib='blosc',complevel=6) as store:
    k=100000 #arbitrary right now for column sizees to be stored
    i=0
    total=len(df.index)
    _cols =OrderedDict()
    for each in df.columns:
        gen=(pd.DataFrame(df[each])[pos:pos + k] for pos in xrange(0, total, k))

        _chunks =OrderedDict()

        for i,chunk in enumerate(gen): 

            _chunks['f'+ str(each)+'/'+'__{0}__'.format(i)]=chunk.index
            store['f'+ str(each)+'/'+'__{0}__'.format(i)]=chunk

        _cols['f'+ str(each)]=_chunks

CPU times: user 30.6 s, sys: 717 ms, total: 31.4 s
Wall time: 32.6 s

There is some metadata here that i am saving in the dictionary _cols preserving the order of the columns and column chunks. This metadata can actually help in only reading the columns where a particular condition is already met once the new index is already calculated. Obviously there needs to be a better way to calcuclate the where conditions better and also for the case of appends the dataframe could be reindexed the same way using the metadata. I think it can also be stored in the pytables attributes.

And this can be read back with where conditions: naive example, with where on the first column >2:

%%time
with pd.get_store('column_store.h5') as store:
    l=[]
    rows=[]
    for each in _cols:
        if each =='f0':
            for i in _cols[each]:
                a = store[i]

                rows.append(a[a>2].dropna().index)

    for i,each in enumerate(_cols):
        df_container=[]
        for k,val in enumerate(_cols[each].keys()):
            df_container.append(store[val][store[val].index.isin(rows[k])])

        l.append(pd.concat(df_container))
    result=pd.concat(l,axis=1)

CPU times: user 18.6 s, sys: 359 ms, total: 19 s
Wall time: 19 s

This approach reads in the column for where conditions and applies the returned index on all subsequent columns.

The reason i am chunking the columns here is because fixed array format can only be read in full and its better for the memory to read chunk by chunk.

Is this a reasonable approach to the approach that you have proposed in Pytables option? Please let me know if you have any questions or concerns.

jreback commented 10 years ago

why would you chunk the columns at all? just store each column as a separate group under a master group

hussainsultan commented 10 years ago

There are some other benefits for column chunking here:

These maybe out of scope for this enhancement. If so, i can go ahead and implement a version something without the chunking option. Please let me know.

jreback commented 10 years ago

well chunking is already supported in using a low level pytables mechanism. so that is not necessary at all.

as far as sharding goes - I am not sure that makes sense at all - in practice hdf5 is performant because of the locality of data distributing local data defeats the purpose in its entirety

i would like to see what kind or use case that you have in mind for a columnar store and benchmark vs a standard row store

it's easy to implement a simple columnar store - bug an efficient one for specific use cases may not be so trivial

jreback commented 10 years ago

all that said

simple implementation as a demonstration / experimental is certainly welcome

start simple

needs a good amount of testing as well

I think it's simple to hijack the format kw and just allow 'ctable'

then should be transparent to the user

hussainsultan commented 10 years ago

Thanks. My use case at the end of the day is to do out-of-core group by reading in certain columns and computing sum/counts on other columns based on the indices for grouped var. This maybe my lack of understanding of HDF5/Pytables, but from what i understood the array format can only be read-in in its entirety and are not enlargeable. is there a way to read in an array using pytables chunk by chunk? Also,how would you append new data onto a column.

The reason i was proposing a chunked column was that i could easily append new data in a new chunk with refactoring, if needed. I understand your point about data locality and sharding, that makes sense, it wont be as performant.

jreback commented 10 years ago

http://pandas.pydata.org/pandas-docs/stable/io.html#hdf5-pytables

you are talking about the fixed format; much more power in the table format however which is queryable and appendable

I would only implement a ctable which is a table stored in sub groups

this is conceptually equivalent to doing append_to_multiple and select_from_multiple with each column as a separate table

hussainsultan commented 10 years ago

that will be extremely slow to write.

here are quick tests"

def write_column_store(name,df,f):
    pd.set_option('io.hdf.default_format',f)
    with pd.get_store(name,complib='blosc',complevel=6) as store:   

        for each in df.columns:
                store['f'+ str(each)]=pd.DataFrame(df[each])
#fixed format
%time write_column_store('column_fixed.h5',df,'fixed')
CPU times: user 7.67 s, sys: 676 ms, total: 8.34 s
Wall time: 9.64 s
#table
%time write_column_store('column_table.h5',df,'table')
CPU times: user 1min 1s, sys: 6.51 s, total: 1min 8s
Wall time: 1min 2s

%%time 
#row oriented table
store = pd.HDFStore('table1.h5')
store.append('data',df,format='table',complib='blosc',complevel=6)
store.close()

CPU times: user 8.62 s, sys: 3.9 s, total: 12.5 s
Wall time: 9.95 s

I am reading over the pytables docs and there is an EArray format(http://www.pytables.org/docs/manual-1.3.3/x4072.html), i dont think it supports chunking or searching, but its at least append able.

jreback commented 10 years ago

that is a very naive impl because you are writing an index column for each column and your are indexing each table

you don't need to do either except you prob do need to index the data_columns

normally you only specify as many data columns as you need

don't reinvent the wheel

understand the wheel

jreback commented 10 years ago

also you would not normally compress as you write - very inefficient that way

see this discussion : http://stackoverflow.com/questions/22934996/pandas-pytables-append-performance-and-increase-in-file-size

hussainsultan commented 10 years ago

Sorry, i did not think about indexes and data_columns. Thanks. Write times are much better now with all columns specified as data_columns.

def write_column_table(name,df,f):
    pd.set_option('io.hdf.default_format',f)
    with pd.get_store(name,complib='blosc',complevel=6) as store:   
        for each in df.columns:
                store.append('f'+str(each),pd.DataFrame(df[each]),format=f,index=False,data_columns=str(each))
%time write_column_store('column_table11.h5',df,'table')

CPU times: user 15.1 s, sys: 3.26 s, total: 18.4 s
Wall time: 13 s

This makes much more sense now. thanks.I will go ahead and play around this with append_to_multiple and select_from_multiple approaches.

jreback commented 10 years ago

ok great

FYI

normally u don't create more data columns that u need but in this case everything effectively is a data column anyhow

jzwinck commented 8 years ago

I would find this very useful as well--specifically "Option 1" above, which is to simply store the DataFrame index and each column as named, separate HDF5 DataSets under a master node.

The reason I like Option 1 is that it's more compatible with other systems which may want to read the data (just as format='table', data_columns=True is now). More elaborate approaches may offer benefits to Pandas-only users, but others like me want to be able to easily read the data using existing tools outside of Pandas too.

nmusolino commented 6 years ago

@jreback, could I ask, is this feature still desired today? Some new column-oriented storage capabilities have been introduced in pandas (Parquet, Feather/PyArrow) since this issue was opened.

I ask because I had some interest in working on this.

dragoljub commented 6 years ago

The other option here is to allow users to specify the write order for existing hdf5 table blocks and compression chunk size/shape. Last I checkd it’s currently using the default row-major with hdf5 determining block shape. It was storing and compressing chunks of rows and not columns by default.