huggingface / datasets

🤗 The largest hub of ready-to-use datasets for ML models with fast, easy-to-use and efficient data manipulation tools
https://huggingface.co/docs/datasets
Apache License 2.0
18.98k stars 2.62k forks source link

Add a GROUP BY operator #3644

Open felix-schneider opened 2 years ago

felix-schneider commented 2 years ago

Is your feature request related to a problem? Please describe. Using batch mapping, we can easily split examples. However, we lack an appropriate option for merging them back together by some key. Consider this example:

# features:
# {
#    "example_id": datasets.Value("int32"),
#    "text": datasets.Value("string")
# }

ds = datasets.Dataset()

def split(examples):
    sentences = [text.split(".") for text in examples["text"]]
    return {
        "example_id": [
            example_id
            for example_id, sents in zip(examples["example_id"], sentences)
            for _ in sents
        ],
        "sentence": [sent for sents in sentences for sent in sents],
        "sentence_id": [i for sents in sentences for i in range(len(sents))],
    }

split_ds = ds.map(split, batched=True)

def process(examples):
    outputs = some_neural_network_that_works_on_sentences(examples["sentence"])
    return {"outputs": outputs}

split_ds = split_ds.map(process, batched=True)

I have a dataset consisting of texts that I would like to process sentence by sentence in a batched way. Afterwards, I would like to put it back together as it was, merging the outputs together.

Describe the solution you'd like Ideally, it would look something like this:

def join(examples):
    order = np.argsort(examples["sentence_id"])
    text = ".".join(examples["text"][i] for i in order)
    outputs = [examples["outputs"][i] for i in order]
    return {"text": text, "outputs": outputs}

ds = split_ds.group_by("example_id", join)

Describe alternatives you've considered Right now, we can do this:

def merge(example):
    meeting_id = example["example_id"]
    parts = split_ds.filter(lambda x: x["example_id"] == meeting_id).sort("segment_no")
    return {"outputs": list(parts["outputs"])}

ds = ds.map(merge)

Of course, we could process the dataset like this:

def process(example):
    outputs = some_neural_network_that_works_on_sentences(example["text"].split("."))
    return {"outputs": outputs}

ds = ds.map(process, batched=True)

However, that does not allow using an arbitrary batch size and may lead to very inefficient use of resources if the batch size is much larger than the number of sentences in one example.

I would very much appreciate some kind of group by operator to merge examples based on the value of one column.

lhoestq commented 2 years ago

Hi ! At the moment you can use to_pandas() to get a pandas DataFrame that supports group_by operations (make sure your dataset fits in memory though)

We use Arrow as a back-end for datasets and it doesn't have native group by (see https://github.com/apache/arrow/issues/2189) unfortunately.

I just drafted what it could look like to have group_by in datasets:

from datasets import concatenate_datasets

def group_by(d, col, join): 
    """from: https://github.com/huggingface/datasets/issues/3644"""
    # Get the indices of each group
    groups = {key: [] for key in d.unique(col)} 
    def create_groups_indices(key, i): 
        groups[key].append(i) 
    d.map(create_groups_indices, with_indices=True, input_columns=col) 
    # Get one dataset object per group
    groups = {key: d.select(indices) for key, indices in groups.items()} 
    # Apply join function
    groups = {
        key: dataset_group.map(join, batched=True, batch_size=len(dataset_group), remove_columns=d.column_names)
        for key, dataset_group in groups.items()
    } 
    # Return concatenation of all the joined groups
    return concatenate_datasets(groups.values())

example of usage:


def join(batch): 
    # take the batch of all the examples of a group, and return a batch with one aggregated example
    # (we could aggregate examples into several rows instead of one, if you want)
    return {"total": [batch["i"]]} 

d = Dataset.from_dict({
    "i": [i for i in range(50)],
    "group_key": [i % 4 for i in range(50)],
})
print(group_by(d, "group_key", join))
#                                                total
# 0  [0, 4, 8, 12, 16, 20, 24, 28, 32, 36, 40, 44, 48]
# 1  [1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45, 49]
# 2     [2, 6, 10, 14, 18, 22, 26, 30, 34, 38, 42, 46]
# 3     [3, 7, 11, 15, 19, 23, 27, 31, 35, 39, 43, 47]

Let me know if that helps !

cc @albertvillanova @mariosasko for visibility

mariosasko commented 2 years ago

@lhoestq As of PyArrow 7.0.0, pa.Table has the group_by method, so we should also consider using that function for grouping.

felix-schneider commented 2 years ago

Any update on this?

lhoestq commented 2 years ago

You can use https://github.com/mariosasko/datasets_sql by @mariosasko to go group by operations using SQL queries

jeremylhour commented 1 year ago

Hi, I have a similar issue as OP but the suggested solutions do not work for my case. Basically, I process documents through a model to extract the last_hidden_state, using the "map" method on a Dataset object, but would like to average the result over a categorical column at the end (i.e. groupby this column).

davanstrien commented 1 year ago

Hi, I have a similar issue as OP but the suggested solutions do not work for my case. Basically, I process documents through a model to extract the last_hidden_state, using the "map" method on a Dataset object, but would like to average the result over a categorical column at the end (i.e. groupby this column).

If you haven't yet, you could explore using Polars for this. It's a new DataFrame library written in Rust with Python bindings. It is Pandas like it in many ways ,but does have some biggish differences in syntax/approach so it's definitely not a drop-in replacement.

Polar's also uses Arrow as a backend but also supports out-of-memory operations; in this case, it's probably easiest to write out your dataset to parquet and then use the polar's scan_parquet method (this will lazily read from the parquet file). The thing you get back from that is a LazyDataFrame i.e. nothing is loaded into memory until you specify a query and call a collect method.

Example below of doing a groupby on a dataset which definitely wouldn't fit into memory on my machine:

from datasets import load_dataset
import polars as pl

ds = load_dataset("blbooks")
ds['train'].to_parquet("test.parquet")
df = pl.scan_parquet("test.parquet")
df.groupby('date').agg([pl.count()]).collect()

datasets_sql seems to not like the fact that I'm averaging np.arrays.

I am not certain how Polars will handle this either. It does have NumPy support (https://pola-rs.github.io/polars-book/user-guide/howcani/interop/numpy.html) but I assume Polars will need to have at least enough memory in each group you want to average over so you may still end up needing more memory depending on the size of your dataset/groups.

jeremylhour commented 1 year ago

Hi @davanstrien , thanks a lot, I didn't know about this library and the answer works! I need to try it on the full dataset now, but I'm hopeful. Here's what my code looks like:

list_size = 768
df.groupby("date").agg(
    pl.concat_list(
        [
            pl.col("hidden_state")
            .arr.slice(n, 1)
            .arr.first()
            .mean()
            for n in range(0, list_size)
        ]
    ).collect()

For some reasons, the following code was giving me a "mean() got unexpected argument 'axis'":

df2 = df.groupby('date').agg(
    pl.col("hidden_state").map(np.mean).alias("average_hidden_state")
).collect()

EDIT: The solution works on my large dataset, the memory does not crash and the time is reasonable, thanks a lot again!

davanstrien commented 1 year ago

@jeremylhour glad this worked for you :)

kuchenrolle commented 1 year ago

I find this functionality missing in my workflow as well and the workarounds with SQL and Polars unsatisfying. Since PyArrow has exposed this functionality, I hope this soon makes it into a release. (:

zudi-lin commented 6 months ago

Any update on this feature?

lhoestq commented 6 months ago

We added a proper Polars integration at #3334 if it can help:

>>> from datasets import load_dataset
>>> ds = load_dataset("TheBritishLibrary/blbooks", "1700_1799", split="train")
>>> ds.to_polars().groupby('date').len()
┌─────────────────────┬──────┐
│ date                ┆ len  │
│ ---                 ┆ ---  │
│ datetime[ms]        ┆ u32  │
╞═════════════════════╪══════╡
│ 1796-01-01 00:00:00 ┆ 5831 │
│ 1775-01-01 00:00:00 ┆ 4697 │
│ 1749-01-01 00:00:00 ┆ 1118 │
│ 1740-01-01 00:00:00 ┆ 713  │
│ 1714-01-01 00:00:00 ┆ 865  │
│ …                   ┆ …    │
│ 1795-01-01 00:00:00 ┆ 5930 │
│ 1754-01-01 00:00:00 ┆ 1373 │
│ 1780-01-01 00:00:00 ┆ 1970 │
│ 1734-01-01 00:00:00 ┆ 1047 │
│ 1719-01-01 00:00:00 ┆ 1235 │
└─────────────────────┴──────┘