Open amotl opened 2 years ago
Getting support for async SQLAlchemy would be super useful
I have a few queries that can take slightly over 1 sec to execute and being able to not block would be HUGE
Dear Robert,
support for asynchronous communication with SQLAlchemy, based on the asyncpg and psycopg3 drivers, is being evaluated at crate/crate-python#532. Please note that this is experimental, and we currently have no schedule about when or how this will be released.
With kind regards, Andreas.
Coming from https://github.com/crate/crate-python/pull/553#issuecomment-1545704082 and https://github.com/crate/crate-python/pull/553#discussion_r1192554584, there are a few additional backlog items for SQLAlchemy/pandas/Dask:
UPDATE
and DELETE
offered by the CrateDB bulk operations endpoint, when this is sensible within a pandas/Dask environment.to_sql()
, also cover optimal read_sql()
techniques within the documentation, when possible.os.cpu_count()
or multiprocessing.cpu_count()
functions, and/or the cpu-count package. [^1]dask.from_pandas()
function, chunksize=
, which can only be used exclusively to the npartitions=
argument. [^2]
ddf = dask.from_pandas(df, chunksize=CHUNKSIZE)
In this way, you don't specify the number of partitions, but also use the chunk size as a parameter to configure the workload scheduler. This concept might fit even better with typical ETL tasks from/to database systems, which we are exploring here to make them more efficient with CrateDB.
You'd probably use the same chunksize
value here, which will also be used for configuring the outbound batch chunker to the database, but I am not sure about it yet. If this is the case, it would make its usage even easier in different scenarios, because a user would just need to configure a good chunk size, not different to the basic pandas usage at all, and not need to spend much specific thoughts on compute resources at all.
[^1]: Rationale: While you will probably know the number of cores in advance if you are professionally scheduling cluster workloads anyway, I think inquiring the number of available cores, and using that figure on demand, still makes totally sense if your program is meant to run on different environments, for example down to Jupyter notebooks, which don't reach out to a cluster, and just process fractions of the whole workload(s) on smaller workstations, but still aim to utilize their resources as good as possible, i.e. to prevent only using 4 cores while 16 would be available. Apologies for that beast of a sentence.
[^2]: dask.from_pandas()
function
```python
def from_pandas(
data: pd.DataFrame | pd.Series,
npartitions: int | None = None,
chunksize: int | None = None,
sort: bool = True,
name: str | None = None,
) -> DataFrame | Series:
"""
Construct a Dask DataFrame from a Pandas DataFrame
This splits an in-memory Pandas dataframe into several parts and constructs
a dask.dataframe from those parts on which Dask.dataframe can operate in
parallel. By default, the input dataframe will be sorted by the index to
produce cleanly-divided partitions (with known divisions). To preserve the
input ordering, make sure the input index is monotonically-increasing. The
``sort=False`` option will also avoid reordering, but will not result in
known divisions.
Note that, despite parallelism, Dask.dataframe may not always be faster
than Pandas. We recommend that you stay with Pandas for as long as
possible before switching to Dask.dataframe.
npartitions : int, optional
The number of partitions of the index to create. Note that if there
are duplicate values or insufficient elements in ``data.index``, the
output may have fewer partitions than requested.
chunksize : int, optional
The desired number of rows per index partition to use. Note that
depending on the size and index of the dataframe, actual partition
sizes may vary.
"""
```
Hi there,
while working on crate/crate-python#391, some backlog items have accumulated. I will gather them within this ticket.
Internals
We've identified a few shortcomings in the internal implementation of the CrateDB SQLAlchemy dialect. While it seems to work in general, those spots can well be improved, in order to better align with the internal API hooks of SQLAlchemy, and how the CrateDB dialect interacts with that.
before_execute
event. Mike Bayer advised us to hook intoExecutionContext.pre_exec()
for rewritingUPDATE
statements instead of using any kinds of engine- or dialect-events, like our current implementation. Discussions:CompileError: Unconsumed column names
Issue: crate/sqlalchemy-cratedb#78MutableDict
implementation can be used for implementing CrateDB'sOBJECT
type, see https://github.com/crate/crate-python/pull/561#pullrequestreview-1514116649.More
visit_update_14
method. Reference: https://github.com/crate/crate-python/pull/391#pullrequestreview-991961268With kind regards, Andreas.