pandas-dev / pandas

Flexible and powerful data analysis / manipulation library for Python, providing labeled data structures similar to R data.frame objects, statistical functions, and much more
https://pandas.pydata.org
BSD 3-Clause "New" or "Revised" License
43.23k stars 17.79k forks source link

DOC: groupby aligns Series when passed as groupands #15244

Closed jcrist closed 7 years ago

jcrist commented 7 years ago

xref #15338

Calling groupby with an unaligned index on the same frame in multiple threads can result in incorrect results. The following example demonstrates:

import numpy as np
import pandas as pd
from multiprocessing.pool import ThreadPool

def build_frame(N, seed):
    rs = np.random.RandomState(seed)
    amounts = rs.exponential(0.1, N) * 100
    ids = np.random.randint(0, 500, N)
    df = pd.DataFrame({'amount': amounts, 'id': ids})
    # XXX: call groupby once before makes everything pass.  Note that the
    # pre-filter by amount is necessary, without it things still fail.
    #df[df.amount < 0].groupby(df.id)
    return df

def f(x):
    return x[x.amount < 20].groupby(x.id).amount.count()

N = 100000
NTHREADS = 8
SEED = 100

df = build_frame(N, SEED)

pool = ThreadPool(NTHREADS)
args = [df] * NTHREADS * 2
r1 = pool.map(f, args)
r2 = pool.map(f, args)

# Print out lengths, which don't always match
print([len(i) for i in r1])
print([len(i) for i in r2])

# Check that results are equivalent
matches = [a.equals(b) for (a, b) in zip(r1, r2)]
assert all(matches)

On my machine, running python test.py results in:

[45, 471, 500, 429, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500]
[500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500, 500]
Traceback (most recent call last):
  File "test.py", line 38, in <module>
    assert all(matches)
AssertionError

A few notes:

Tested with pandas master, as well as pandas 0.19.0 and up.

jreback commented 7 years ago
def f(x):
    df = x[x.amount < 20]
    return df.groupby(df.id).amount.count()

in your original function, you are passing x.id which certainly can change as the filtered frame will be new, but the grouping index is the original. That is very unsafe in general (though pandas may take it)

jorisvandenbossche commented 7 years ago

I can't seem to reproduce it on my machine.

However, you are passing a grouper with a different shape. I actually didn't know that pandas is aligning the grouper. You can also pass the string: df.groupby('id').amount.count()

jreback commented 7 years ago

I think @jorisvandenbossche has it right.

.groupby is aligning the group-and (x.id). So this 'works', but I suspect this is in general an unsafe operation (from a thread point of view).

If you dont't align then this will be ok I think (IOW, if you pass in the group-and from the same frame).

certainly no guarantees :<

jcrist commented 7 years ago

Yeah, I think this is why this didn't show up until now - I didn't know pandas supported unaligned indices passed to groupby. Showed up in a user bug https://github.com/dask/dask/issues/1876.

I can fix this in dask by manually aligning beforehand, but the threadsafe issue still stands. Out of curiousity, why does this fail only on the first call? Some index structure being built up and then cached on later calls?

jorisvandenbossche commented 7 years ago

The passed series (df.id) gets reindexed (https://github.com/pandas-dev/pandas/blob/ba057443d1b69bb3735a4b18a18e4e4897231867/pandas/core/groupby.py#L2580), but that still shouldn't modify the original object?

jcrist commented 7 years ago

Hmmm, well something is getting modified, as it only fails the first time.

jreback commented 7 years ago

its shouldn't ever modify the original object, only the groupby object itself has state, but that could be the problem. IOW, this is a cached_property, which I suppose could be interrupted and if the groupby object is shared......

jcrist commented 7 years ago

This fails in the code above though, where only the original frame is shared (neither the groupby or the filtered frame is shared). But df.id is the same object for all calls - could that be modified by the reindex? Or am I misunderstanding what's being cached here?

jcrist commented 7 years ago

This seems to only fail if the index is longer than the grouped frame. Swapping the filter onto the index passes every time (not that this is recommended):

def f(x):
    return x.groupby(x[x.amount < 20].id).amount.count()