vaexio / vaex

Out-of-Core hybrid Apache Arrow/NumPy DataFrame for Python, ML, visualization and exploration of big tabular data at a billion rows per second 🚀
https://vaex.io
MIT License
8.3k stars 590 forks source link

[BUG-REPORT] problem using data from parquet/pyarrow in correlation function #1336

Closed lastephey closed 1 year ago

lastephey commented 3 years ago

Dear Vaex developers,

Description I would like to use the Vaex correlation function to calculate the correlation coefficient between two columns of a dataframe (one type int64, the other type str). I am loading the dataframe from a parquet file generated using the pyarrow engine. It seems that correlation may not work correctly with data in this format.

Here is a reproducer:

#use pandas to create and save dataframe as parquet
data = [[0,'hello'], [1,'world'], [3,'this'], [5,'example'], [0,'hello']]
pdf = pd.DataFrame(data, columns=['a', 'b'], dtype=int)
pdf.to_parquet('test', engine='pyarrow')

#now load into vaex
vdf = vaex.open('test')
#try correlation without .values
out = vdf.correlation(vdf.a, vdf.b)

This fails with a traceback that ends in ValueError: could not convert string to float: 'hello'.

I thought I might need to explictly request .values:

#try correlation with .values
out = vdf.correlation(vdf.a.values, vdf.b.values)

This fails with a traceback that ends in

ValueError: <pyarrow.lib.Int64Array object at 0x2aaae733da60>
[
  0,
  1,
  3,
  5,
  0
] is not of string or Expression type, but <class 'pyarrow.lib.Int64Array'>

Software information

Apologies if I am using Vaex or this function incorrectly.

Thank you very much, Laurie

kmcentush commented 3 years ago

Hi Laurie,

I imagine that the correlation function is not built to correlate non-numeric datatypes. I'd be curious if you label encoded your strings prior to correlating, would that work?

Might be good to try it on a smaller dataset to see if the result is what you're after.

lastephey commented 3 years ago

Hi @kmcentush,

Thank you for your quick reply. Pandas corr supports strings I wondered if Vaex intended to do the same. If not, it would be nice to have a more informative error message like "not supported" or mention that the data must be numeric in the docs.

Yes I will try label encoding-- thank you for the suggestion.

Please feel free to close if there is nothing actionable here.

Thanks again, Laurie

kmcentush commented 3 years ago

I'd be curious to see if label encoding in Vaex then correlating returns the same results as doing it in Pandas. Please let me know what you find!

JovanVeljanoski commented 3 years ago

Hi,

I don't understand, what is the mathematical background on calculating a correlation function between a numeric and a non-numeric data type? Also, i think the error message is quite clear, vaex assumes that one is using numerical columns for correlation, and if not, will try to convert it to floats. In this case it fails, and the message reflects that.

I also had a (very) brief look at the pandas.corr and could not see how they support strings. Also, simply running

pdf.corr()

on your example in the OP, i see that column b is ignored, as it is non numeric.

I am happy to get corrected if anything I wrote here is wrong in any way!

lastephey commented 3 years ago

Hi @JovanVeljanoski,

Thanks for your reply. I should apologize-- I was mistaken about what I said earlier.

The data I used with pandas.corr was in a different format. I had forgotten that I used pandas.pivot_table to produce columns with str column names and int values, for example:

data = [[0,1,0], [0,0,1], [1,1,1], [1,0,1], [0,0,0]]
pdf = pd.DataFrame(data, columns=['a', 'b', 'c'])
>>> pdf.head()
   a  b  c
0  0  1  0
1  0  0  1
2  1  1  1
3  1  0  1
4  0  0  0

pdf_corr = pdf.corr()
>>> pdf_corr.head()
          a         b         c
a  1.000000  0.166667  0.666667
b  0.166667  1.000000 -0.166667
c  0.666667 -0.166667  1.000000

I should have gone back and looked more carefully at what I had done before opening an issue with you-- I am sorry about this. I would still ultimately be interested in doing this in Vaex (if this is possible?) but I realize this is now a different question.

I appreciate your and @kmcentush's fast reponses. My apologies providing the wrong information earlier. Thank you for Vaex-- so far I am finding it very powerful.

Best, Laurie

kmcentush commented 3 years ago

If the values are unique, you can try vaex's OneHotEncoder. Although, based on your data, it looks like you can have multiple 1 values for the same record.

If you have a sample of the input data (prior to encoding as you do w/ pandas pivot), I can take a look. I have some various pivot snippets that might work.

lastephey commented 3 years ago

@kmcentush,

Thank you. Yes, tl;dr we are tracking Python libraries used in jobs at NERSC. Each dataframe row represents one job and each column represents one library. As you might expect one job can have many libraries. Here is a snippet:

>>> jobiddata = jobiddata.sum(axis=1, level=0)
>>> jobiddata.head()
subservice  ROOT.std  astropy  astropy.io.fits  bokeh  cudf  cupy  cupyx  cv2  dask  ...  scipy  seaborn  sklearn  spacy  statsmodels  tensorflow  theano  torch  xarray
job_id                                                                               ...                                                                                
                 1.0      1.0              1.0    1.0   1.0   1.0    1.0  1.0   1.0  ...    1.0      1.0      1.0    1.0          1.0         1.0     1.0    1.0     1.0
0                0.0      0.0              0.0    0.0   0.0   0.0    0.0  0.0   0.0  ...    0.0      0.0      0.0    0.0          0.0         0.0     0.0    0.0     0.0
1710433          0.0      0.0              0.0    0.0   0.0   0.0    0.0  0.0   0.0  ...    4.0      0.0      4.0    0.0          0.0         4.0     0.0    0.0     0.0
1710443          0.0      0.0              0.0    0.0   0.0   0.0    0.0  0.0   0.0  ...    4.0      0.0      4.0    0.0          0.0         4.0     0.0    0.0     0.0
1720091          0.0      0.0              0.0    0.0   0.0   0.0    0.0  0.0   0.0  ...    2.0      0.0      2.0    0.0          0.0         2.0     0.0    0.0     0.0

We are using these data to figure out which libraries are correlated.

Thank you very much, Laurie

kmcentush commented 3 years ago

Cool! I did some undergrad work at LBNL in the electrochemical space. My background is in chemical engineering. 😄

Do you have a snippet of some example rows prior to the pivoted result you're showing there? That looks like a good goal but I'm happy to help you get there from a pre-pivoted sample.

JovanVeljanoski commented 3 years ago

Hey @lastephey

No worries! :). I know it is a challenge to do correlation between numerical and categorical (or string) data types. Depending on the problem there are various approaches, some are as @kmcentush describes.

Btw - you are doing interesting work! If you publish a paper or something like that, I'd be interested in the results :).

lastephey commented 3 years ago

Hi @kmcentush and @JovanVeljanoski,

Thanks-- here are the relevant pre-pivoted columns:

cdata[['job_id','subservice']].head(20)
                       job_id       subservice
__null_dask_index__                           
4                    41236074            numpy
6                    41236072            numpy
12                             multiprocessing
13                                        h5py
14                   41236032            numpy
15                   41202258            numpy
16                                       keras
17                   41236371  multiprocessing
18                   41236053            numpy
19                   41236061            numpy
20                                       numpy
21                             multiprocessing
22                             multiprocessing
23                                   fireworks
24                   41236060            numpy
25                   41236041            numpy
26                   41236046            numpy
27                   41201717            numpy
28                   41235995            numpy
29                   41202248  multiprocessing

This is only a small fraction but we have many rows for the same job, each corresponding to a single library. We pivot to get everything grouped together according to job. If this isn't possible in Vaex, we are also using dask-cudf so we can explore options there, too.

Yes we are working on a talk/paper for SciPy 2021:

TITLE: Monitoring Scientific Python Usage on a Supercomputer
AUTHORS: Rollin Thomas, Laurie Stephey, Annette Greiner and Brandon Cook

As you can see the analysis is still in progress. :)

Thank you very much, Laurie

kmcentush commented 3 years ago

Okay, you managed to get me to dig up my vaex utility code from last year! I believe this should do the trick. It works on the latest version of vaex and was originally written for v4.0 preleases - so there's definitely an opportunity to make things more efficient!

I wasn't sure if you wanted to count ex: numpy appearing twice in for job ID as 2 (used twice) or 1 (used at all), so for now I did not deduplicate your data prior to pivoting such that 2 is returned. Deduping would return 1 here.

#!/usr/bin/env python
# coding: utf-8

# In[1]:

import numpy as np
import vaex as vx

# In[2]:

job_ids = np.array([41236074, 41236072, 41236072, 41236072, 41236032, 41202258, 41202258, 41236371, 41236053, 41236061, 41236061, 41236061, 41236061, 41236061, 41236060, 41236041, 41236046, 41201717, 41235995, 41202248], dtype='int32')
services = np.array(['numpy', 'numpy', 'multiprocessing', 'h5py', 'numpy', 'numpy', 'keras', 'multiprocessing', 'numpy', 'numpy', 'numpy', 'multiprocessing', 'multiprocessing', 'fireworks', 'numpy', 'numpy', 'numpy', 'numpy', 'numpy', 'multiprocessing'], dtype=str)

dt = vx.from_arrays(job_id=job_ids, service=services)
dt

# In[3]:

def concat_cols(dt, cols, name='concat_col', divider='|'):
    # Create the join column
    for i, col in enumerate(cols):
        if i == 0:
            dt[name] = dt[col].astype('string').fillna('')
        else:
            dt[name] = dt[name] + divider + dt[col].astype('string').fillna('')

    # Ensure it's a string; on rare occassions it's an object
    dt[name] = dt[name].astype('string')

    return dt, name

def vx_dedupe(dt, dedupe_cols=None, concat_first=True):
    # Make a shallow copy
    dt = dt.copy()

    # Get and join columns
    init_cols = dt.get_column_names()
    if dedupe_cols is None:
        dedupe_cols = init_cols
    if concat_first:
        dt, concat_col = concat_cols(dt, dedupe_cols)
        col_names = [concat_col]
    else:
        col_names = dedupe_cols

    # Add named sets
    sets = [dt._set(col_name) for col_name in col_names]
    counts = [set.count for set in sets]
    set_names = [dt.add_variable('set_{}'.format(col_name), set, unique=True) for col_name, set in zip(col_names, sets)]

    # Create 'row_id' column that gives each unique row the same ID
    expression = dt['_ordinal_values({}, {})'.format(col_names[0], set_names[0])].astype('int64')
    product_count = 1
    for col_name, set_name, count in zip(col_names[1:], set_names[1:], counts[:-1]):
        product_count *= count
        expression = expression + dt['_ordinal_values({}, {})'.format(col_name, set_name)].astype('int64') * product_count
    dt['row_id'] = expression

    # This is not 'stable'; because it is multithreaded, we may get a different id each time
    index = dt._index('row_id')
    unique_row_ids = dt.row_id.unique()
    indices = index.map_index(unique_row_ids)

    # Dedupe
    deduped = dt.take(indices)
    deduped = deduped[init_cols]

    return deduped

# In[4]:

# Dedupe data in case a job ID lists the same service multiple times; although maybe you don't want to do this?
# source: https://github.com/vaexio/vaex/issues/746
# deduped = vx_dedupe(dt)
# deduped

# In[5]:

def vx_set_fixed_value(data, col, val, dtype):
    # Note: uses a hacky way to set the default column value: https://github.com/vaexio/vaex/issues/802
    data[col] = vx.vrange(0, len(data))
    if dtype == 'str':
        data[col] = (data[col] * 0).astype(dtype).str.replace('0.0', val)
    else:
        data[col] = (data[col] * 0).astype(dtype) + val

    return data

# In[6]:

# Label values as 1 (i.e. present in the job)
dt_fixed = vx_set_fixed_value(dt, 'flag', 1, 'int32')

dt_fixed

# In[7]:

# Never-before-shared code :)
def vx_pivot_agg_sum(dt, join_col, pivot_col, agg_cols, drop_duplicates=True, drop_join_col=True):
    # Make a shallow copy
    dt = dt.copy()

    # Loop over pivot values
    to_join = []
    pivot_vals = dt[pivot_col].unique()
    for i, pivot_val in enumerate(pivot_vals):
        # Ensure str dtype
        pivot_val = str(pivot_val)

        # Subset
        subset = dt[dt[pivot_col] == pivot_val]
        orig_dtypes = subset.dtypes.to_dict()

        # Pivot
        aggs = []
        decoders = {}
        for agg_col in agg_cols:
            # Encode strings
            is_str = subset[agg_col].dtype.is_string
            if is_str:
                subset = subset.ordinal_encode(agg_col)
                labels = subset._categories[agg_col]['labels']
                decoder = {k: labels[k] for k in range(len(labels))}
                decoders[agg_col] = decoder

            # Save the agg
            aggs.append(vx.agg.sum(agg_col))  # note: I have yet to parameterize this; in the past I've used vx.agg.first(agg_col, ts_col) where ts_col is another function argument. Changing the agg here affects the output column name in the next renaming section
        pivoted = subset.groupby(join_col, agg=aggs)

        # Rename columns
        for agg_col in agg_cols:
            pivoted.rename(agg_col + '_sum', pivot_val)

        # Decode columns
        for col, decoder in decoders.items():
            col = pivot_val
            pivoted[col] = pivoted[col].map(decoder, allow_missing=True)

        # Fix any null dtypes; happens if everything in the original column was NA
        for col in agg_cols:
            col = pivot_val
            if str(pivoted[col].dtype) == 'null':
                pivoted[col] = pivoted[col].astype(orig_dtypes[agg_col])

        # Save pivots
        to_join.append(pivoted)

    # Drop the same columns
    dt.drop(agg_cols, inplace=True)

    # Join
    for pivoted in to_join:
        dt = dt.join(pivoted, left_on=join_col, right_on=join_col, how='left')

    # Drop duplicates
    if drop_duplicates:
        dt = vx_dedupe(dt, [join_col])

    # Drop columns
    dt.drop(pivot_col, inplace=True)
    if drop_join_col:
        dt.drop(join_col, inplace=True)

    return dt

# In[8]:

# Pivot with magic :)
res = vx_pivot_agg_sum(dt_fixed, 'job_id', 'service', ['flag'], drop_join_col=False)

# In[9]:

res.head()

# In[10]:

res.tail()
lastephey commented 3 years ago

Wow, thank you so much!! I was hoping there would be something quick that wouldn't take you much effort 😱 -- but thank you, I really appreciate it. I'll test this out and report back.

Indeed, only counting numpy and other libraries once per job is our objective. I'm sorry I didn't clarify that.

lastephey commented 3 years ago

I've tested this. It seems to work well to produce a dataframe in the form I needed (first column is job_id, followed by one column for each service). Thank you very much for this, especially the custom pivot. I hope some of this will be useful for Vaex, too.

I do have a few questions. I'd like to use vaex.correlation to find the correlation between all columns for each row (job_id), but am struggling to figure out the right way to do that.

This works for a single pair:

res.correlation(res.matplotlib, res.scipy)
array(0.9984304)

If I try this to use all columns I get:

names = res.get_column_names()
res.correlation(names, names)

RecursionError: maximum recursion depth exceeded

My real data is shape 122179, 45 so maybe that's just too large to handle at once?

I'm sorry if I'm just misunderstanding the API.

Thank you very much, Laurie

kmcentush commented 3 years ago

Can you try running that on just a subset of the data? I.E. try running the correlation of all columns but not all all of your rows. If that still happens even with just a few rows, it's most likely some (unintentional) evaluation limit that you're butting up against.

Does your stacktrace show what call is actually recursive? The correlation function utilizes a lot of other functions in it that may actually be the offenders.

Regardless, any sort of test case you can share will help. :)

lastephey commented 3 years ago

Hi @kmcentush,

I tried with 10 rows and still hit the limit:

res_small = res.head(10)
res_small.shape
(10, 44)
names = res_small.get_column_names()
res_small.correlation(names,names)

The stacktrace is quite long. Here is just a subset of what I hope is important:

ERROR:MainThread:vaex.execution:error in task, flush task queue
Traceback (most recent call last):
  File "/opt/miniconda3/lib/python3.8/site-packages/vaex/execution.py", line 171, in execute_async
    variables |= run.df._expr(expression).expand().variables(ourself=True)
  File "/opt/miniconda3/lib/python3.8/site-packages/vaex/expression.py", line 406, in variables
    expresso.translate(self.ast, record)
  File "/opt/miniconda3/lib/python3.8/site-packages/vaex/expression.py", line 228, in ast
    self._ast = expresso.parse_expression(self.expression)
  File "/opt/miniconda3/lib/python3.8/site-packages/vaex/expression.py", line 244, in expression
    self._expression = expresso.node_to_string(self.ast)
  File "/opt/miniconda3/lib/python3.8/site-packages/vaex/expression.py", line 228, in ast
    self._ast = expresso.parse_expression(self.expression)
  File "/opt/miniconda3/lib/python3.8/site-packages/vaex/expression.py", line 244, in expression
    self._expression = expresso.node_to_string(self.ast)
  File "/opt/miniconda3/lib/python3.8/site-packages/vaex/expression.py", line 228, in ast

...

RecursionError: maximum recursion depth exceeded

---------------------------------------------------------------------------
RecursionError                            Traceback (most recent call last)
<ipython-input-21-b4af7cc1482d> in <module>
      1 names = res_small.get_column_names()
----> 2 res_small.correlation(names,names)

/opt/miniconda3/lib/python3.8/site-packages/vaex/dataframe.py in correlation(self, x, y, binby, limits, shape, sort, sort_key, selection, delay, progress)
   1120             value = np.array(vaex.utils.unlistify(waslist, correlations))
   1121             return value
-> 1122         return self._delay(delay, finish(delayed_list(correlations)))
   1123 
   1124     @docsubst

/opt/miniconda3/lib/python3.8/site-packages/vaex/dataframe.py in _delay(self, delay, task, progressbar)
   1497             return task
   1498         else:
-> 1499             self.execute()
   1500             return task.get()
   1501 

/opt/miniconda3/lib/python3.8/site-packages/vaex/dataframe.py in execute(self)
    306         '''Execute all delayed jobs.'''
    307         from .asyncio import just_run
--> 308         just_run(self.execute_async())
    309 
    310     async def execute_async(self):

/opt/miniconda3/lib/python3.8/site-packages/vaex/asyncio.py in just_run(coro)
     33             nest_asyncio.apply()
     34             check_patch_tornado()
---> 35         return loop.run_until_complete(coro)
     36     finally:
     37         if not had_loop:  # remove loop if we did not have one

/opt/miniconda3/lib/python3.8/site-packages/nest_asyncio.py in run_until_complete(self, future)
     68                 raise RuntimeError(
     69                     'Event loop stopped before Future completed.')
---> 70             return f.result()
     71 
     72     def _run_once(self):

/opt/miniconda3/lib/python3.8/asyncio/futures.py in result(self)
    176         self.__log_traceback = False
    177         if self._exception is not None:
--> 178             raise self._exception
    179         return self._result
    180 

/opt/miniconda3/lib/python3.8/asyncio/tasks.py in __step(***failed resolving arguments***)
    278                 # We use the `send` method directly, because coroutines
    279                 # don't have `__iter__` and `__next__` methods.
--> 280                 result = coro.send(None)
    281             else:
    282                 result = coro.throw(exc)

/opt/miniconda3/lib/python3.8/site-packages/vaex/dataframe.py in execute_async(self)
    311         '''Async version of execute'''
    312         # no need to clear _task_aggs anymore, since they will be removed for the executors' task list
--> 313         await self.executor.execute_async()
    314 
    315     @property

/opt/miniconda3/lib/python3.8/site-packages/vaex/execution.py in execute_async(self)
    169                 variables = set()
    170                 for expression in run.expressions:
--> 171                     variables |= run.df._expr(expression).expand().variables(ourself=True)
    172                 columns = list(variables - set(run.df.variables) - set(run.df.virtual_columns))
    173                 logger.debug('Using columns %r from dataset', columns)

/opt/miniconda3/lib/python3.8/site-packages/vaex/expression.py in variables(self, ourself, expand_virtual, include_virtual)
    404                 variables.add(varname)
    405 
--> 406         expresso.translate(self.ast, record)
    407         # df is a buildin, don't record it, if df is a column name, it will be collected as
    408         # df['df']

/opt/miniconda3/lib/python3.8/site-packages/vaex/expression.py in ast(self)
    226         """Returns the abstract syntax tree (AST) of the expression"""
    227         if self._ast is None:
--> 228             self._ast = expresso.parse_expression(self.expression)
    229         return self._ast
    230 

/opt/miniconda3/lib/python3.8/site-packages/vaex/expression.py in expression(self)
    242     def expression(self):
    243         if self._expression is None:
--> 244             self._expression = expresso.node_to_string(self.ast)
    245         return self._expression
    246 

... last 2 frames repeated, from the frame below ...

/opt/miniconda3/lib/python3.8/site-packages/vaex/expression.py in ast(self)
    226         """Returns the abstract syntax tree (AST) of the expression"""
    227         if self._ast is None:
--> 228             self._ast = expresso.parse_expression(self.expression)
    229         return self._ast
    230 

RecursionError: maximum recursion depth exceeded

Are you able to reproduce on your end? Would you like me to send you a file/dataframe with 10 rows to test?

I am happy to close this issue as we're now very far off topic and open a new one to capture the recursion problem if it's helpful. Please let me know.

Thank you again for your help, Laurie

kmcentush commented 3 years ago

Hi Laurie,

That subset looks like it shows the issue! In the expression.py file, line 228 calls line 244 which calls line 228 etc. @JovanVeljanoski I might be able to give this a shot over the weekend, but I haven't dug into these internals yet. Any ideas for a quick fix?

I find it interesting that this would be caused by the correlation() function but nothing else in vaex. Perhaps there's something that can be addressed there to avoid this recursion loop?

JovanVeljanoski commented 3 years ago

Hi

Regarding the usage of correlation (same goes for the mutual_information method): it is not a bug, it is a feature :)

This is part of an.. older part of vaex that has not been updated in a long time, and the API is a bit different. You can use it in two ways: one as @lastephey found out, by passing a single x, and y arguments which are expressions.

If you want to calculate more correlations at once, you need to pass a list of tuples to the xargument like this:

# assume the dataframe has x, y, z columns:
df.correlation(x=[('x', 'y'), ('x', 'z'), ('y', 'z')])

@kmcentush don't bother with this, it touches a very very old part of vaex that might be tricky to understand. I think @maartenbreddels has basically updated this to a more modern API, but it lives in a branch somewhere and needs a final bit of ironing out before it is merged. I expect it to happen soon-ish.

lastephey commented 3 years ago

Hi,

I just wanted to let you all know that our SciPy paper is published here. We acknowledged you (the Vaex developers)-- thank you for your help in this effort.

You are free to close this issue if it's not useful for you. We were able to perform the calculation we needed in Dask-cuDF.

Thank you again, Laurie

lastephey commented 1 year ago

Thanks for all your help. Closing in the spirit of Closember.