laughingman7743 / PyAthena

PyAthena is a Python DB API 2.0 (PEP 249) client for Amazon Athena.
MIT License
463 stars 105 forks source link

PandasCursor interfering with pandas.DataFrame writing to s3 #465

Closed louis-vines closed 6 months ago

louis-vines commented 1 year ago

I've found that if we import PandasCursor this interfers with pandas using s3fs/fsspec to write to s3.

Minimal viable example:

import pandas as pd
from pyathena.pandas.cursor import PandasCursor

my_df = pd.DataFrame(dict(a=[1], b=[1]))
s3_bucket = ""  # add a valid s3 bucket
output_path = f"s3://{s3_bucket}/my_prefix/file.csv"
my_df.to_csv(output_path)

and I get the following error:

Traceback (most recent call last):
  File "/app/pipelines/bug_example.py", line 12, in <module>
    my_df.to_csv(output_path)
  File "/usr/local/lib/python3.10/site-packages/pandas/core/generic.py", line 3772, in to_csv
    return DataFrameRenderer(formatter).to_csv(
  File "/usr/local/lib/python3.10/site-packages/pandas/io/formats/format.py", line 1186, in to_csv
    csv_formatter.save()
  File "/usr/local/lib/python3.10/site-packages/pandas/io/formats/csvs.py", line 240, in save
    with get_handle(
  File "/usr/local/lib/python3.10/site-packages/pandas/io/common.py", line 138, in __exit__
    self.close()
  File "/usr/local/lib/python3.10/site-packages/pandas/io/common.py", line 130, in close
    handle.close()
  File "/usr/local/lib/python3.10/site-packages/pyathena/filesystem/s3.py", line 518, in close
    super(S3File, self).close()
  File "/usr/local/lib/python3.10/site-packages/fsspec/spec.py", line 1876, in close
    self.flush(force=True)
  File "/usr/local/lib/python3.10/site-packages/fsspec/spec.py", line 1742, in flush
    self._initiate_upload()
  File "/usr/local/lib/python3.10/site-packages/pyathena/filesystem/s3.py", line 522, in _initiate_upload
    raise NotImplementedError  # pragma: no cover

If you remove the pandascursor import, this works fine:

import pandas as pd
# from pyathena.pandas.cursor import PandasCursor

my_df = pd.DataFrame(dict(a=[1], b=[1]))
s3_bucket = ""  # add a valid s3 bucket
output_path = f"s3://{s3_bucket}/my_prefix/file.csv"
my_df.to_csv(output_path)
laughingman7743 commented 1 year ago

https://github.com/laughingman7743/PyAthena/blob/master/pyathena/filesystem/s3.py#L403-L416 I do not currently support writing DataFrames to S3 with PandasCursor.

louis-vines commented 1 year ago

Thanks for the swift response. Totally understand that your package doesn't have support to write to s3. The issue is that once we import the PandasCursor, it interfers with how pandas dataframes write to s3. Notice in my example that my_df.to_csv(output_path) has nothing to do with PyAthena. It seems to me from the stack trace that importing PandasCursor interfers with how pandas interacts with s3fs and fsspec and this is an issue as it means we can't use pyathena and then write our resulting dataframe back to s3 using pandas.

laughingman7743 commented 1 year ago

https://github.com/laughingman7743/PyAthena/blob/master/pyathena/pandas/__init__.py#L4-L5 Importing PandasCursor will read this code and overwrite the S3fs implementation.

louis-vines commented 1 year ago

hmmmm this seems like quite a problematic requirement for using the pandas cursor and means you can't really use pyathena for data pipelines as you can't read data from athena and the write back to s3. I've thought of a workaround by being explicit on which s3 file system I want when writing so s3:

s3 = s3fs.S3FileSystem()

with s3.open(f"{s3_bucket}/my_prefix/file.csv", "w") as file_in:
    my_df.to_csv(file_in, index=False)

But would addressing this issue be something you are willing to think about? either by using s3fs, not registering your s3 implementation globally with fsspec (not sure if this is possible) or extending your implementation to support writing?

laughingman7743 commented 1 year ago

I used to use S3fs, but stopped using it because S3fs depends on aiobotocore, and aiobotocore sets only certain versions of botocore as dependencies, making it difficult to install dependencies. https://github.com/aio-libs/aiobotocore/blob/master/setup.py#L10 I welcome pull requests, you know.

louis-vines commented 1 year ago

I used to use S3fs, but stopped using it because S3fs depends on aiobotocore, and aiobotocore sets only certain versions of botocore as dependencies, making it difficult to install dependencies.

Yes this is fair enough - it's also a problem we've been encountering in projects.

I welcome pull requests, you know.

Touche. Given that you don't want to include s3fs as a dependancy do you think the best fix is to implement an s3 writer in this package? This seems slightly odd thing to be implementing in this package. The alternative would be to avoid registering your s3 implentation globally to fsspec. I'll admit I've not looked at the code for fsspec at all and don't know if there is a way to implement things without the global registration to fsspec.

laughingman7743 commented 1 year ago

I am not sure what is odd about it. If you want to save the Dataframe as an Athena table, you can also use the to_sql method. https://github.com/laughingman7743/PyAthena#to-sql

graydenshand commented 1 year ago

As a work around you can convert query results to pandas yourself instead of using the as_pandas function or PandasCursor.

E.g. This succeeds.

from pyathena import connect
import pandas as pd

with connect(s3_staging_dir="s3://...") as conn:
    with conn.cursor() as cursor:
        cursor.execute("select * from my_table limit 10")        
        df = pd.DataFrame(cursor, columns=[c[0] for c in cursor.description])
        # later...
        df.to_csv("s3://...")

I'll suggest that it would be preferred for this library to not support pandas conversion at all, rather than to support it in a way that breaks typical pandas usage in other contexts.

gvogel-hh commented 8 months ago

Many users will need to solve the dependency problems surrounding s3fs/aiobotocore anyway, so I would not consider this a sufficient reason to take the decision from the user's hands. The usual way of handling such a choice would be to offer an extra like pyathena[replace_s3fs] pulling your s3fs implementation as a separate package.