Closed vikalfc closed 6 months ago
I have checked the CSV upload with the following simple code and it is fine. https://github.com/laughingman7743/PyAthena/blob/master/tests/pyathena/filesystem/test_s3.py#L736-L747
import logging
import uuid
import fsspec
import pandas
logging.getLogger("pyathena").addHandler(logging.StreamHandler())
logging.getLogger("pyathena").setLevel(logging.DEBUG)
fsspec.register_implementation("s3", "pyathena.filesystem.s3.S3FileSystem", clobber=True)
fsspec.register_implementation("s3a", "pyathena.filesystem.s3.S3FileSystem", clobber=True)
bucket = "laughingman7743-athena"
# small file
path_small = f"s3://{bucket}/tmp/{uuid.uuid4()}"
df = pandas.DataFrame({"a": [1] * (100 * 2**20), "b": [2] * (100 * 2**20)})
df.to_csv(path_small, index=False)
# large file
path_large = f"s3://{bucket}/tmp/{uuid.uuid4()}"
df = pandas.DataFrame({"a": [1] * (100 * 2 ** 20), "b": [2] * (100 * 2 ** 20)})
df.to_csv(path_large, index=False)
Please share the code that reproduces the error and the stack trace of the error.
Versions
pandas==2.2.2
PyAthena==3.8.2
s3fs==2024.5.0
And just note the same code with PyAthena==2.17.0 is working just fine
Okay, so I have connection
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor
bucket = "my_test_bucket"
region_name = "eu-central-1"
cursor = connect(
s3_staging_dir=f"s3://{bucket}/tmp/outputs/,
region_name=region_name,
cursor_class=PandasCursor,
).cursor(schema_name="my_schema")
I do some query
data = cursor.execute("""
SELECT
column_1,
column_2,
column_3,
column_4,
column_5,
column_6,
column_7,
column_8,
column_9,
column_10,
column_11,
column_12,
column_13,
column_14,
column_15,
column_16,
column_17,
FROM my_columns AS l
LEFT JOIN my_extra_columns AS cm ON l.id=cm.id
LEFT JOIN my_extra_extra_columns AS rel ON l.id=rel.id
""").as_pandas()
I got my data with shape
719303 rows × 17 columns
This is what I'm doing after
data.to_csv(f"s3://{bucket}/tmp/exports/test.csv")
And here is what I got
ClientError Traceback (most recent call last)
/var/folders/7h/vnq03l3s05q6p4pdzmhdnk200000gn/T/ipykernel_21465/3217657796.py in <module>
----> 1 data.to_csv("s3://my_test_bucket/tmp/exports/test.csv")
~/opt/anaconda3/lib/python3.9/site-packages/pandas/util/_decorators.py in wrapper(*args, **kwargs)
331 stacklevel=find_stack_level(),
332 )
--> 333 return func(*args, **kwargs)
334
335 # error: "Callable[[VarArg(Any), KwArg(Any)], Any]" has no
~/opt/anaconda3/lib/python3.9/site-packages/pandas/core/generic.py in to_csv(self, path_or_buf, sep, na_rep, float_format, columns, header, index, index_label, mode, encoding, compression, quoting, quotechar, lineterminator, chunksize, date_format, doublequote, escapechar, decimal, errors, storage_options)
3965 )
3966
-> 3967 return DataFrameRenderer(formatter).to_csv(
3968 path_or_buf,
3969 lineterminator=lineterminator,
~/opt/anaconda3/lib/python3.9/site-packages/pandas/io/formats/format.py in to_csv(self, path_or_buf, encoding, sep, columns, index_label, mode, compression, quoting, quotechar, lineterminator, chunksize, date_format, doublequote, escapechar, errors, storage_options)
1012 formatter=self.fmt,
1013 )
-> 1014 csv_formatter.save()
1015
1016 if created_buffer:
~/opt/anaconda3/lib/python3.9/site-packages/pandas/io/formats/csvs.py in save(self)
268 )
269
--> 270 self._save()
271
272 def _save(self) -> None:
~/opt/anaconda3/lib/python3.9/site-packages/pandas/io/common.py in __exit__(self, exc_type, exc_value, traceback)
155 traceback: TracebackType | None,
156 ) -> None:
--> 157 self.close()
158
159
~/opt/anaconda3/lib/python3.9/site-packages/pandas/io/common.py in close(self)
142 self.created_handles.remove(self.handle)
143 for handle in self.created_handles:
--> 144 handle.close()
145 self.created_handles = []
146 self.is_wrapped = False
~/opt/anaconda3/lib/python3.9/site-packages/pyathena/filesystem/s3.py in close(self)
1021
1022 def close(self) -> None:
-> 1023 super().close()
1024 self._executor.shutdown()
1025
~/opt/anaconda3/lib/python3.9/site-packages/fsspec/spec.py in close(self)
1947 else:
1948 if not self.forced:
-> 1949 self.flush(force=True)
1950
1951 if self.fs is not None:
~/opt/anaconda3/lib/python3.9/site-packages/fsspec/spec.py in flush(self, force)
1811 raise
1812
-> 1813 if self._upload_chunk(final=force) is not False:
1814 self.offset += self.buffer.seek(0, 2)
1815 self.buffer = io.BytesIO()
~/opt/anaconda3/lib/python3.9/site-packages/pyathena/filesystem/s3.py in _upload_chunk(self, final)
1098
1099 if self.autocommit and final:
-> 1100 self.commit()
1101 return True
1102
~/opt/anaconda3/lib/python3.9/site-packages/pyathena/filesystem/s3.py in commit(self)
1131 )
1132 parts.sort(key=lambda x: x["PartNumber"])
-> 1133 self.fs._complete_multipart_upload(
1134 bucket=self.bucket,
1135 key=self.key,
~/opt/anaconda3/lib/python3.9/site-packages/pyathena/filesystem/s3.py in _complete_multipart_upload(self, bucket, key, upload_id, parts, **kwargs)
930
931 _logger.debug(f"Complete multipart upload {upload_id} to s3://{bucket}/{key}.")
--> 932 response = self._call(
933 self._client.complete_multipart_upload,
934 **request,
~/opt/anaconda3/lib/python3.9/site-packages/pyathena/filesystem/s3.py in _call(self, method, **kwargs)
942 else:
943 func = method
--> 944 response = retry_api_call(
945 func, config=self._retry_config, logger=_logger, **kwargs, **self.request_kwargs
946 )
~/opt/anaconda3/lib/python3.9/site-packages/pyathena/util.py in retry_api_call(func, config, logger, *args, **kwargs)
82 reraise=True,
83 )
---> 84 return retry(func, *args, **kwargs)
~/opt/anaconda3/lib/python3.9/site-packages/tenacity/__init__.py in __call__(self, fn, *args, **kwargs)
377 retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
378 while True:
--> 379 do = self.iter(retry_state=retry_state)
380 if isinstance(do, DoAttempt):
381 try:
~/opt/anaconda3/lib/python3.9/site-packages/tenacity/__init__.py in iter(self, retry_state)
312 is_explicit_retry = fut.failed and isinstance(fut.exception(), TryAgain)
313 if not (is_explicit_retry or self.retry(retry_state)):
--> 314 return fut.result()
315
316 if self.after is not None:
~/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py in result(self, timeout)
437 raise CancelledError()
438 elif self._state == FINISHED:
--> 439 return self.__get_result()
440
441 self._condition.wait(timeout)
~/opt/anaconda3/lib/python3.9/concurrent/futures/_base.py in __get_result(self)
389 if self._exception:
390 try:
--> 391 raise self._exception
392 finally:
393 # Break a reference cycle with the exception in self._exception
~/opt/anaconda3/lib/python3.9/site-packages/tenacity/__init__.py in __call__(self, fn, *args, **kwargs)
380 if isinstance(do, DoAttempt):
381 try:
--> 382 result = fn(*args, **kwargs)
383 except BaseException: # noqa: B902
384 retry_state.set_exception(sys.exc_info()) # type: ignore[arg-type]
~/opt/anaconda3/lib/python3.9/site-packages/botocore/client.py in _api_call(self, *args, **kwargs)
563 )
564 # The "self" in this scope is referring to the BaseClient.
--> 565 return self._make_api_call(operation_name, kwargs)
566
567 _api_call.__name__ = str(py_operation_name)
~/opt/anaconda3/lib/python3.9/site-packages/botocore/client.py in _make_api_call(self, operation_name, api_params)
1019 )
1020 error_class = self.exceptions.from_code(error_code)
-> 1021 raise error_class(parsed_response, operation_name)
1022 else:
1023 return parsed_response
ClientError: An error occurred (EntityTooSmall) when calling the CompleteMultipartUpload operation: Your proposed upload is smaller than the minimum allowed size
With a very similar workflow to above, I'm getting essentially the same error and stack trace on v3.8.2. Behavior is strange since I have no problems uploading other dataframes of similar size. I'll add more details if I'm able.
https://github.com/laughingman7743/PyAthena/blob/master/pyathena/filesystem/s3.py#L1024-L1027 If the file size is smaller than the block size, the file is not uploaded in multipart. https://github.com/laughingman7743/PyAthena/blob/master/pyathena/filesystem/s3.py#L994-L997 When writing, the block size is validated to be no smaller than the minimum size for multipart uploads.
I am not sure in what case this error occurs. 🤔
For now, I can confirm that my data with a shape of
34158 rows × 17 columns
is okay (probably because the file size is smaller than the block size).
However, 127434 rows × 17 columns
throws the same error as before.
If I can help with something, I'll be glad to do it.
@laughingman7743 Not sure if this helps, but I used your logging code to try and understand where my upload is going wrong. Surprisingly, I do get a message logging that the multipart upload completed, but immediately after that the EntityTooSmall error is thrown. I can confirm that the csv I attempted to save is not present. We may end up just downgrading PyAthena versions but I can try to provide more information if possible.
https://github.com/fsspec/s3fs/blob/main/s3fs/core.py#L2343-L2351 The error is probably caused by the last part of the multipart request being less than 5 MB. If the last part is less than 5 MB, it will need to be merged with the previous part and uploaded.
Hi, after closing this issue https://github.com/laughingman7743/PyAthena/issues/483, I've increased my PyAthena version to 3.8.2.
But now I'm having trouble when I want to use data.to_csv(), and the data is large, resulting in the following error: "An error occurred (EntityTooSmall) when calling the CompleteMultipartUpload operation: Your proposed upload is smaller than the minimum allowed size."
Note: My file is around 60.1 MB, and I had no trouble writing with version 2.17.0.
Can you please check? TY :)