Open MennoNij opened 4 years ago
import dask
from dask.distributed import Client
import asyncpg
import asyncio
# Define your asyncpg-specific function
async def query_database(pool, parameter):
async with pool.acquire() as connection:
result = await connection.fetch('SELECT * FROM your_table WHERE column = $1', parameter)
return result
# Create a Dask Client
client = Client()
# Wrap your asyncpg-specific code with dask.delayed
@dask.delayed
def asyncpg_task(parameter):
# Create an async event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Establish an asyncpg pool
pool = loop.run_until_complete(asyncpg.create_pool(database='your_db', user='your_user', password='your_password'))
# Run the asyncpg function
result = loop.run_until_complete(query_database(pool, parameter))
return result
# Define your input data
input_parameters = [1, 2, 3]
# Use Dask to parallelize the asyncpg tasks
results = []
for parameter in input_parameters:
result = asyncpg_task(parameter)
results.append(result)
# Execute the computation
computed_results = dask.compute(*results)
# Print or process the computed results as needed
print(computed_results)
I'm trying to use asyncpg in combination with Dask, but I'm running into the problem that Pool, Connection or asyncpg.Record objects cannot be serialized (pickled) to and from my workers. (I need to supply a Pool or Connection to a worker, and expect Record objects back)
Any suggestions? Regards,