iterative / datachain

AI-data warehouse to enrich, transform and analyze data from cloud storages
https://docs.datachain.ai
Apache License 2.0
904 stars 52 forks source link

Clarify ordering semantics #477

Open rlamy opened 4 weeks ago

rlamy commented 4 weeks ago

We need to specify clearly how .order_by() interacts with other methods.

For instance, in SQL, the order of the results from a SELECT query is undefined unless there is an ORDER BY clause. Currently, chain.order_by("a").select("a") generates SQL that look like SELECT a FROM (SELECT a FROM some_table ORDER BY a), which is therefore technically unordered, even though it'll usually return sorted results. While it might be fine to say that .select() doesn't preserve ordering, it seems obvious that chain.order_by("a").collect("a") should return the "a" column in sorted order, and chain.collect("a") is currently implemented as chain.select("a").collect()...

dmpetrov commented 3 weeks ago

Good point. We need to keep information about the order and use it in the following statements when it makes sense. We should use the underlying IDs for that.

mattseddon commented 3 weeks ago

Good point. We need to keep information about the order and use it in the following statements when it makes sense. We should use the underlying IDs for that.

It does not seem possible to generate a sequential list of IDs in BigQuery (more details here). I am currently stuck on how we can provide deterministic results from all chains in an efficient way.

With that said; Is there a clear use case for ordering an intermediate step in a chain? I can find superficial examples in tests such as this:

    def name_len(path):
        return (len(posixpath.basename(path)),)

    DataChain.from_storage(path, session=session).order_by("file.path").map(
        name_len, params=["file.path"], output={"name_len": int}
    ).save(ds_name)

but do we support cumulative values being calculated across rows, wouldn't this be done via group_by? What is the use case that we are trying to support? Can we justify the complexity added to support this? Note: I can understand order_by preceding limit (this should be handled by the generated SQL anyway).

For instance, in SQL, the order of the results from a SELECT query is undefined unless there is an ORDER BY clause.

We could patch order_by before select or select_except in the following way:

    @detach
    def select(self, *args, **kwargs) -> "Self":
        named_args = [v.label(k) for k, v in kwargs.items()]
        query = self.clone()

        last_step = query.steps[-1] if query.steps else None

        query.steps.append(SQLSelect((*args, *named_args)))

        if isinstance(last_step, SQLOrderBy):
            query.steps.append(SQLOrderBy(last_step.args))

        return query

but I think it would be better to explicitly state that .select() doesn't preserve ordering (as suggested by @rlamy) and then patch collect directly.

mattseddon commented 3 weeks ago

It does not seem possible to generate a sequential list of IDs in BigQuery (more details https://github.com/iterative/studio/issues/10635#issuecomment-2381829809). I am currently stuck on how we can provide deterministic results from all chains in an efficient way.

I'd also like to understand why we are trying to stamp out the following SQL behaviour:

For instance, in SQL, the order of the results from a SELECT query is undefined unless there is an ORDER BY clause.

Why must the order of queries be deterministic in DataChain even when an order has not been specified?

Edit: There are remnants in the system of this ordering behaviour but it looks like it is completely untested (see https://github.com/iterative/datachain/pull/489).

dmpetrov commented 2 weeks ago

It looks like there is some misunderstanding in the ordering.

First, let's clarify basic assumptions:

  1. Rows are not ordering if the order is not specified. However, unique sys.id is presented in any table and is used for UDFs.
    • it's like row number without ordering - ROW_NUMBER() OVER () AS row_num
  2. If a last statement in a chain before saving/persist was order_by- sys.id represents the order.
    • it's like row num with ordering - ROW_NUMBER() OVER (ORDER BY aaaa DESC) AS row_num
  3. chain.order_by("a").select("a").save("my_ds") is not ordered since order_by is not the last statement
  4. We use order by sys.id (like SELECT * FROM t ORDER BY sys__id LIMIT 20) for showing datasets in UI to make the result look stable even for not ordered datasets.
  5. (I'm not sure it's implemented yet) If a dataset was ordered chain.order_by("val").save("ds_ordered") then next time the order can be reused in some queries DataChain.from_dataset("ds_ordered").limit(3) - returns top 3 with respect to the order.
    • Another good use case - streaming dataset to ML training with a given order
    • It makes me think, that the fact of ordering as well as the ordering criteria (columns, DESC/ASC) must be stored together with a dataset during save()

Please LMK if you disagree with some of these or you'd like to add something.

mattseddon commented 1 week ago

Thanks for clarifying those basic assumptions. I think I have identified some discrepancies between the assumptions and the current implementation.

  1. Rows are not ordering if the order is not specified. However, unique sys.id is presented in any table and is used for UDFs.
    • it's like row number without ordering - ROW_NUMBER() OVER () AS row_num
  2. If a last statement in a chain before saving/persist was order_by- sys.id represents the order.
    • it's like row num with ordering - ROW_NUMBER() OVER (ORDER BY aaaa DESC) AS row_num

I don't think this is strictly true on the ClickHouse side because we have this definition for sys.id:

Column("sys__id", UInt64, primary_key=True, server_default=func.rand64()),

which means that if it is not explicitly set we end up with a random value. We do use rowNumberInAllBlocks() + 1 (close to row_number() over()) when copying a query into a table but this is not generally how sys.id is created.

  1. We use order by sys.id (like SELECT * FROM t ORDER BY sys__id LIMIT 20) for showing datasets in UI to make the result look stable even for not ordered datasets.

The preview will always appear stable now because we persist the dataset's first 20 rows in the metastore against the dataset version and use that instead of loading the dataset each time. The following test passes for both SQLite and ClickHouse:

test_dataset_preview_order ```python def test_dataset_preview_order(test_session): ids = list(range(10000)) order = ids[::-1] catalog = test_session.catalog dataset_name = "test" DataChain.from_values(id=ids, order=order, session=test_session).order_by( "order" ).save(dataset_name) preview_values = [] for r in catalog.get_dataset(dataset_name).get_version(1).preview: id = ids.pop() o = order.pop() entry = (id, o) preview_values.append((id, o)) assert (r["id"], r["order"]) == entry DataChain.from_dataset(dataset_name, session=test_session).save(dataset_name) for r in catalog.get_dataset(dataset_name).get_version(2).preview: assert (r["id"], r["order"]) == preview_values.pop(0) DataChain.from_dataset(dataset_name, 2, session=test_session).order_by("id").save( dataset_name ) for r in catalog.get_dataset(dataset_name).get_version(3).preview: assert r["id"] == ids.pop(0) assert r["order"] == order.pop(0) ```

BUT it seems that it only passes by chance because to create the preview we do this:

        if not dataset_version.preview:
            values["preview"] = (
                DatasetQuery(name=dataset.name, version=version, catalog=self)
                .limit(20)
                .to_db_records()
            )

which roughly translates to SELECT * FROM table LIMIT 20 so this is not technically ordered either.

  1. (I'm not sure it's implemented yet) If a dataset was ordered chain.order_by("val").save("ds_ordered") then next time the order can be reused in some queries DataChain.from_dataset("ds_ordered").limit(3) - returns top 3 with respect to the order.

I have tested this and it seems to work but only by chance as well (no ordering provided in the SQL).

As you can see from https://github.com/iterative/datachain/pull/489 we have some logic in the code base where we order by sys.id but this is not tested and seems broken in places after going through the above.

It makes me think, that the fact of ordering as well as the ordering criteria (columns, DESC/ASC) must be stored together with a dataset during save()

IMO, this is worth investigating but as stated elsewhere this seems like a difficult problem to solve, especially across databases and at scale (see https://github.com/iterative/studio/issues/10635#issuecomment-2403630921 for some context). Having all the obvious use-cases like "Another good use case - streaming dataset to ML training with a given order" would be a great start and agreeing that we do not guarantee the order of "unordered chains" would be very helpful.

mattseddon commented 1 week ago

From a long discussion in the planning meeting:

For now, the order of a dataset will only be guaranteed when an order_by statement is present at the end of a chain. We may return to optimising this later.

shcheklein commented 5 days ago

Perfect, thanks @mattseddon . It should simplify things a lot. Thanks for pushing this forward.

shcheklein commented 5 days ago

Can we close this / repurpose this btw?