Open thetadweller opened 1 year ago
Thanks for raising this issue. I agree that async
support would be a huge asset in the growing world of asyncio Python.
This is a nontrivial project though, namely because we use urllib3 for our HTTP handling and urllib3 doesn't support async. In the short term you can work around this with stdlib/threading
rather than multiple processes but obviously the ergonomics there aren't good. We have an example of it in examples/query_cancel.py
I'm self-assigning this issue for now.
I'd like to see more signal from other users so we can gauge the demand for this feature. Single-threaded usage comprises the majority of customer feature requests with regard to this connector (the connector is 18 months old and this request has only come up once before here).
The more people ask for it the easier it will be to prioritise
Similar Request here. Key concern is to handle timeout when cluster boots up .
I need to run 30 async queries in parallel to see if they will parse in UC with 1 minute timeout to consider it a parseable query in Unity Catalog. I'm using concurrent.futures.ProcessPoolExecutor
but getting the following error:
> _, error_list = execute_sql_queries_async(list(tables.values()))
tests/infrastructure/database/test_dbx_connect.py:25:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
sql_utils.py:240: in execute_sql_queries_async
_, error = future.result()
/root/.pyenv/versions/3.10.12/lib/python3.10/concurrent/futures/_base.py:451: in result
return self.__get_result()
/root/.pyenv/versions/3.10.12/lib/python3.10/concurrent/futures/_base.py:403: in __get_result
raise self._exception
/root/.pyenv/versions/3.10.12/lib/python3.10/multiprocessing/queues.py:244: in _feed
obj = _ForkingPickler.dumps(obj)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'multiprocessing.reduction.ForkingPickler'>
obj = <concurrent.futures.process._CallItem object at 0x7fffe7e99420>, protocol = None
@classmethod
def dumps(cls, obj, protocol=None):
buf = io.BytesIO()
> cls(buf, protocol).dump(obj)
E TypeError: cannot pickle 'SSLContext' object
@vhrichfernandez that error is pretty straightforward. Here's a related StackOverflow answer.
As for the topic of this issue: we're actively planning out the introduction of both async/await and a blocking but async execution method for this connector.
@susodapop
Adding the following code to my module:
import pickle, copyreg, ssl
def save_sslcontext(obj):
return obj.__class__, (obj.protocol,)
copyreg.pickle(ssl.SSLContext, save_sslcontext)
context = ssl.create_default_context()
results in the following error:
sql_utils.py:248: in execute_sql_queries_async
_, error = future.result()
/root/.pyenv/versions/3.10.12/lib/python3.10/concurrent/futures/_base.py:451: in result
return self.__get_result()
/root/.pyenv/versions/3.10.12/lib/python3.10/concurrent/futures/_base.py:403: in __get_result
raise self._exception
/root/.pyenv/versions/3.10.12/lib/python3.10/multiprocessing/queues.py:244: in _feed
obj = _ForkingPickler.dumps(obj)
/root/.pyenv/versions/3.10.12/lib/python3.10/multiprocessing/reduction.py:51: in dumps
cls(buf, protocol).dump(obj)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <ssl.SSLSocket fd=16, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.17.0.2', 56568), raddr=('3.237.73.239', 443)>
def __getstate__(self):
> raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
E TypeError: cannot pickle 'SSLSocket' object
I also would like to have async support
Same here
Also interested in async/await support.
The more people ask for it the easier it will be to prioritise
Our team is also interested
Thanks for the signal, everyone. This feature is now being developed. I'll post updates on this issue as we move closer to release. Pull requests implementing this behaviour should begin to pop up in the next couple weeks.
Hello @susodapop - Please share any latest updates on this?
Hello, @susodapop - InMobi customer and MSFT have been following up on this. Can you please let me know if this issue is fixed.
Echoing the interest here. This would be a huge add!
I have a requirement for this building an LLM web app with 400+ concurrent users.
The more people ask for it the easier it will be to prioritise
Just chiming in here that I'd also really like async sypport.
As a user of the trio
async-framework it would be great if you could also support that (it is supported by httpx
).
To support users of both asyncio
and trio
you need to not use any asyncio
primitives directly (asymc/await are fine) but instead use the equivalent functionality provided by the AnyIO library (this is how httpx
provides support for both frameworks).
Using AnyIO automatically gives you support for both async-frameworks and has the additional benefit of providing a clean structured-concurrency (SC) api which should be easier to develop against than asyncio
.
Many of the ideas developed by trio
are now considered best practice for writing async code. Notably, the asyncio.TaskGroup
is modelled off Trio's Nursery
construct. asyncio.TaskGroup
is only available on 3.11 but by using AnyIO you can use modern async constructs in earlier Python versions.
Thanks for the signal, everyone. This feature is now being developed. I'll post updates on this issue as we move closer to release. Pull requests implementing this behaviour should begin to pop up in the next couple weeks.
@susodapop checking in to see if you can share the latest status/ETA?
Hi there! Any updates on this?
Adding async support is not trivial. We need to prioritize and do the design. There is no ETA to provide at this time.
Also interested in async/await support 🙏🏻
Hi @yunbodeng-db, I understand async support is non trivial but is this even being considered at the time?
Hi @yunbodeng-db, I understand async support is non trivial but is this even being considered at the time?
Not the async APIs at this moment, but it's possible to expose an async handler for the client to poll the status of a long running query. I cannot provide an ETA yet.
I am writing a web app that needs to run multiple concurrent queries against Databricks SQL Warehouse. Due to existing library is synchronous my processes tend to get locked for a duration of SQL query so that subsequent calls from other clients end up being queued. As such, I am forced to run multiple Python processes to handle multiple concurrent calls even though all of them are I/O bound and would have been handled by a handful of processes had I been able to write queries using async / await.
I tried to find a workaround using SQLAlchemy and async I/O wrappers but returned a message that connection is not asynchronous:
InvalidRequestError: The asyncio extension requires an async driver to be used. The loaded 'databricks-sql-python' is not async.