Categorical colorizing broken for census parquet file #1202

jbednar commented 1 year ago

If I create an outdated environment with conda create -n censusold python=3.7 notebook 'dask<2022.6.2' datashader 'fastparquet<2023.2.0' python-snappy 'pandas<2', categorical colormapping works fine for http://s3.amazonaws.com/datashader-data/census2010.parq.zip unpacked and used with this code:

import datashader as ds, dask.dataframe as dd

df  = dd.read_parquet('./data/census2010.parq')
cvs = ds.Canvas(plot_width=900, plot_height=525, x_range=[-14E6, -7.4E6], y_range=[2.7E6, 6.4E6])
agg = cvs.points(df, 'easting', 'northing', ds.count_cat('race'))
img = ds.tf.shade(agg, how='eq_hist')



However, for the latest environment from conda-forge (conda create -n censusnew -c conda-forge python=3.9 notebook dask datashader fastparquet python-snappy pandas), I instead get colors completely mangled in a way that suggests getting different categories per dask partition:



hoxbro commented 1 year ago

I get the right coloring when using engine="fastparquet" and get the wrong coloring with engine="pyarrow" in dd.read_parquet.


jbednar commented 1 year ago

Thanks, @Hoxbro and @martindurant! Looks like indeed those crazy patterns are to do with pyarrow, presumably using different category values in each partition.

Unfortunately, looks like there is still an issue even with fastparquet, because when I specify the color mapping, the newer environments don't match the color key:

import datashader as ds, dask.dataframe as dd
color_key = {'w':'aqua', 'b':'lime',  'a':'red', 'h':'fuchsia', 'o':'yellow' }

df  = dd.read_parquet('./data/census2010.parq', engine='fastparquet')
cvs = ds.Canvas(plot_width=900, plot_height=525, x_range=[-14E6, -7.4E6], y_range=[2.7E6, 6.4E6])
agg = cvs.points(df, 'easting', 'northing', ds.count_cat('race'))
img = ds.tf.shade(agg, how='eq_hist', color_key=color_key)


The older versions in censusold do match the color key for the same code, with e.g. Maine being colored aqua:


@martindurant, any idea how that could have happened (given that the user code and the datashader version are the same in both environments, unless I got confused)?

martindurant commented 1 year ago

I'm not sure what's involved in converting values in parquet to colours or what "count_cat" does. I think I would do a simple .value_counts() to ensure that the gross totals are roughly right for the categories before going further.

jbednar commented 1 year ago

Looks right:


(same listing for both censusold and censusnew)

jbednar commented 1 year ago

To be concrete, for the same code and the same Datashader version, different output despite the dataframe seemingly having the same counts:

image image
jbednar commented 1 year ago
ianthomas23 commented 1 year ago

Upon investigation there is an assumption in datashader's handling of categorical columns that each partition has its categories sorted in the same order. A categorical aggregation is 3D of shape (ny, nx, ncat) where ncat is the number of categories, and internally we don't a category directly but use its index into the sequence of categories. Each partition is internally consistent, but when combining the results from multiple partitions across the categories the difference in indexes combines them incorrectly resulting in different colors.

For the 2010 US census data loaded using pyarrow the category order varies across the partitions (but is repeatable). Using fastparquet the category orders are the same across all partitions, but this is different from the order of categories used for the colormapping (which happens at the dask dataframe level not individual partition level).

There is a related issue on dask: https://github.com/dask/dask/issues/9467

We need to solve this within datashader but there is fortunately a workaround. After the dask.dataframe.read_parquet call add

df = df.categorize('race')

and the output is correct using either fastparquet 2023.2.0 or pyarrow 10.0.1.

martindurant commented 1 year ago

I don't think that fastparquet should be recoding the column on load - it must be showing the real encoding in the files, the same across all of them. So what is pyarrow doing? No idea.

df = df.categorize('race')

Is there no cost associated with this?

ianthomas23 commented 1 year ago

Is there no cost associated with this?

There is significant cost in doing this.

jbednar commented 1 year ago

Damn. I guess we need to move forward with the fix in Datashader, then.

ianthomas23 commented 1 year ago

The example works fine for dask <= 2022.7.0 and fails for dask >= 2022.7.1. The explanation is in the dask documentation at the bottom of this page: https://docs.dask.org/en/stable/dataframe-categoricals.html. The important quote is "If you write and read to parquet, Dask will forget known categories. This happens because, due to performance concerns, all the categories are saved in every partition rather than in the parquet metadata" and this is followed by an explanation of how to deal with this which is something along the lines of

if not ddf.col.cat.known:
    ddf.col = ddf.col.cat.set_categories(ddf.col.head(1).cat.categories)

where col is a categorical column that we want to use.

We can replicate the error using the US census data, but this is too large for a repeatable test. We can also do a cycle of save to parquet followed by read from parquet to replicate it. But here is a simpler reproducer that we'll be able to add to the datashader test suite:

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame(data=dict(col = ['a', 'b', 'c', 'a', 'b', 'c', 'b', 'b', 'b', 'b', 'b', 'b']))
ddf = dd.from_pandas(df, npartitions=2)
ddf.col = ddf.col.astype('category')

for i in range(ddf.npartitions):
    partition = ddf.get_partition(i)
    print("Partition counts", i, dict(partition.col.value_counts().compute()))

which produces

Partition counts 0 {'a': 2, 'b': 2, 'c': 2}
Partition counts 1 {'b': 6}

If you use this in datashader, all the partition 1 'b' counts are assigned to categorical index 0 so they are combined with the partition 0 'a' counts, which is incorrect. Adding the recommended code from the dask docs:

if not ddf.col.cat.known:
    ddf.col = ddf.col.cat.set_categories(ddf.col.head(1).cat.categories)


Partition counts 0 {'a': 2, 'b': 2, 'c': 2}
Partition counts 1 {'b': 6, 'a': 0, 'c': 0}

which works as expected. (The order of entries is different above but the underlying Indexes have identical order and give the correct datashader output).

martindurant commented 1 year ago

So the difference between fastparquet and pyarrow, is that fastparquet saves the pandas categories as-is, using the existing coding, and presumably must arrow re-code on save.