dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.54k stars 1.71k forks source link

Suggesting updates on the doc of `dask.dataframe.read_sql_query` #11339

Open ParsifalXu opened 2 months ago

ParsifalXu commented 2 months ago

Describe the issue: There is an incorrect description in the document of dask.dataframe.read_sql_query. As mentioned in the description of the parameter divisions:

divisions: sequence Values of the index column to split the table by. If given, this will override npartitions and bytes_per_chunk.

However, corresponding part in the code:

if divisions is None:
    if limits is None:
        # calculate max and min for given index
        q = sa.sql.select(
            sa.sql.func.max(index), sa.sql.func.min(index)
        ).select_from(sql.subquery())
        minmax = pd.read_sql(q, engine)
        maxi, mini = minmax.iloc[0]
        dtype = minmax.dtypes["max_1"]
    else:
        mini, maxi = limits
        dtype = pd.Series(limits).dtype

    if npartitions is None:
        q = sa.sql.select(sa.sql.func.count(index)).select_from(sql.subquery())
        count = pd.read_sql(q, engine)["count_1"][0]
        npartitions = (
            int(round(count * bytes_per_row / parse_bytes(bytes_per_chunk))) or 1
        )
    if dtype.kind == "M":
        divisions = methods.tolist(
            pd.date_range(
                start=mini,
                end=maxi,
                freq="%is" % ((maxi - mini).total_seconds() / npartitions),
            )
        )
        divisions[0] = mini
        divisions[-1] = maxi
    elif dtype.kind in ["i", "u", "f"]:
        divisions = np.linspace(mini, maxi, npartitions + 1, dtype=dtype).tolist()
    else:
        raise TypeError(
            'Provided index column is of type "{}".  If divisions is not provided the '
            "index column type must be numeric or datetime.".format(dtype)
        )

Apparently, the correct description should be "If not given, then this will override npartition". Meanwhile, there doesn't seem to be any place to override bytes_per_chunk.

Could you check it ?

phofl commented 2 months ago

Investigations are welcome!

benrutter commented 2 months ago

Hey both - thought I'd have a brief investigate, the logic on overriding npartitions looks sound (i.e. matching the docs) to me. If you don't give a divisions keyword, it's evaluate to None, so that whole if divisions is None: statement won't be evaluated.

After which the return is this:

return from_delayed(parts, meta, divisions=divisions)

(i.e. it uses divisions and ignores npartitions)

If you don't give a divisions argument, that loop will get executed, and specifically one of these two branches will run:

        if dtype.kind == "M":
            divisions = methods.tolist(
                pd.date_range(
                    start=mini,
                    end=maxi,
                    freq="%is" % ((maxi - mini).total_seconds() / npartitions),
                )
            )
            divisions[0] = mini
            divisions[-1] = maxi
        elif dtype.kind in ["i", "u", "f"]:
            divisions = np.linspace(mini, maxi, npartitions + 1, dtype=dtype).tolist()

Essentially divisions is being set based on some logic making use of the npartitions keyword.

Later on the return is based on the now overwritten divisions:

return from_delayed(parts, meta, divisions=divisions)

Which might look like it's using divisions, but it's using divisions the code has set using npartitions.

So in short:

Same with bytes_per_chunk - that's only getting used on the branch executed if divisions is None (the "overwrite" logic is essentially, none of that branch getting executed)

@ParsifalXu does that make sense from your point of view? (assuming it does and you're not seeing an issue, I think this issue is ok to close?)