laughingman7743 / PyAthena

PyAthena is a Python DB API 2.0 (PEP 249) client for Amazon Athena.
MIT License
458 stars 105 forks source link

larger than memory queries? #61

Open mckeown12 opened 5 years ago

mckeown12 commented 5 years ago

I'm hoping to use Athena with Dask, performing queries which return 10-30 GB and then training some distributed ML algorithms. Any suggestions for concurrent/distributed io for such a task? I've been quite happy with the pandas cursor for smaller local use, following the examples in the pyAthena documentation, but I still have no idea what I am actually doing-- does the pandas cursor do concurrent io, or is it limited to one core?

I apologize in advance if this question belongs on some other forum-- let me know and I'll gladly move the conversation there. Thanks!

laughingman7743 commented 5 years ago

I think that it would be nice to create a DaskCursor similar to PandasCusor. A cursor that returns Dask's Dataframe instead of Pandas' Dataframe. https://docs.dask.org/en/latest/dataframe.html

Since I have not created any other forums, there is no problem with the issue of GitHub.

rpanai commented 5 years ago

@mckeown12 Isn't better to save the query to parquet in S3 (possibly nicely partitioned) and then read from S3 with dask?

mckeown12 commented 5 years ago

@rpanai I was unaware that there was a way to save query results as parquet, or partition them. Do you have a reference/code example? If that is possible, I would be very interested!

rpanai commented 5 years ago

@mckeown12 Have at examples here

a typical example is

CREATE TABLE new_table
WITH (
 format='PARQUET',
 external_location='s3://my_bucket/',
 partitioned_by = ARRAY['key1']
) AS
SELECT ...

I might need to do something in this direction. Maybe we can add as feature to this project.

rogerganga commented 4 years ago

I work on large datasets with Athena. CTAS (Create Table AS) queries is the way to go as you can store the results in S3 with the right format.

Lucas-Cerqueira commented 2 years ago

@rogerganga can you elaborate more on that? I am running on the same issue as @mckeown12 because I am using PandasCursor and the memory usage keeps growing as it reads the result set from S3 until it raises a MemoryError. So your solution is to create a table in Athena by saving the result as parquet in S3 and then reading it from there using Dask?

rogerganga commented 2 years ago

@rogerganga can you elaborate more on that? I am running on the same issue as @mckeown12 because I am using PandasCursor and the memory usage keeps growing as it reads the result set from S3 until it raises a MemoryError. So your solution is to create a table in Athena by saving the result as parquet in S3 and then reading it from there using Dask?

Hey @Lucas-Cerqueira. You are right (I am unsure about Dask). So select Athena query will retrieve the data from S3 and store it in pandas cursor (your local machine) in order to view it later in a pandas dataframe. This is not a good idea for large datasets as your local machine might not have enough memory/storage to store all info.

On the other hand, running a CTAS query will run the query operation in cloud and store it in S3 which can be later viewed via Athena. No data is stored in your local machine.

You can check @rpanai code above or click this link to know more on how to create a CTAS query :)

ericson-cepeda commented 1 year ago

There were dependencies conflicts in my environment that did not allow me to use Dask. Hence, as an additional note, it is possible to use a combination of CTAS, AVRO, boto:

  1. Embed the SELECT query in a CTAS query using AVRO format.
  2. Retrieve the temporary Athena table S3 location.
  3. Download all the AVRO partitions in parallel (boto).
  4. Read each partition sequentially (fastavro) and trigger the garbage collector.
  5. Use pandas to concatenate the read partition to a CSV

This process is meant to optimise for memory but not storage, and the output goes directly to a CSV. It was not necessary to load the whole result set in memory to perform additional calculations.

I found this process interesting because, for a 25GB CSV, the resulting behaviour for the memory utilisation looked as follows:

image

Why AVRO? Parquet does not accept empty columns, so having a result set with an empty column will trigger an exception executing CTAS. AVRO does not have such limitation.

Why not using the S3 stream with fastavro? The connection could fail and recovering the state is another problem to solve.

References: https://skeptric.com/export-athena/ https://stackoverflow.com/a/49214514/1705127 https://towardsdatascience.com/python-garbage-collection-article-4a530b0992e3