iterative / datachain

AI-data warehouse to enrich, transform and analyze data from cloud storages
https://docs.datachain.ai
Apache License 2.0
930 stars 54 forks source link

Introduce group_by #228

Closed dmpetrov closed 1 week ago

dmpetrov commented 2 months ago

These has to work inside DB:

from datachain import func

chain.group_by(
        name=func.first(Column("name")),
        total=func.sum(Column("num")),
        cnt=func.count(),
        partition_by=Column("class"),
)

Open question:

Functions to implement:

Later:

EdwardLi-coder commented 2 months ago

Hi @dmpetrov. I think group_by should be implemented as a separate method, rather than as part of agg(). This approach would provide a clearer API and maintain consistency with the conventional usage in most data processing libraries (such as pandas and SQL).

dmpetrov commented 2 months ago

@EdwardLi-coder agree, it seems a cleaner API. In general, I like the idea of separating DB/CPU compute from application/GPU compute. Like mutate() and map().

dreadatour commented 1 month ago

Intermediate results:

group_by.py:

import os
from datachain import DataChain, func

def path_ext(path):
    _, ext = os.path.splitext(path)
    return (ext.lstrip("."),)

(
    DataChain.from_storage("s3://dql-50k-laion-files/")
    .map(
        path_ext,
        params=["file.path"],
        output={"path_ext": str},
    )
    .group_by(
        total_size=func.sum("file.size"),
        cnt=func.count(),
        partition_by="path_ext",
    )
    .show()
)

Running:

~/playground $ python group_by.py
  path_ext  total_size    cnt
0      jpg  1079645149  43042
1     json    29743128  43047
2  parquet    15378208      5
3      txt     2927814  43042
~/playground $

TBD: cleanup the code, add more aggregate functions, add tests and create PR. Draft PR: https://github.com/iterative/datachain/pull/482

dreadatour commented 1 week ago

Merged. Closing this issue as work will continue in the follow-up https://github.com/iterative/datachain/issues/523 issue.