dask / fastparquet

python implementation of the parquet columnar file format.
Apache License 2.0
772 stars 177 forks source link

Option to not close() after write() when writing to buffer #914

Closed luukburger closed 8 months ago

luukburger commented 8 months ago

Hi there,

I'm building an AWS Lambda that calls an API and writes the output to an AWS S3 bucket in Parquet format. The relevant code snippets look like this:

parquet_buffer = io.BytesIO()
df.to_parquet(path=parquet_buffer, engine='pyarrow', index=False)
# Define an AWS S3 resource.
s3_resource = boto3.resource(
    service_name='s3',
    region_name=aws_region,
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key
)

# Set path and filename.
timestamp_str = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
file_name = f"{timestamp_str}.parquet"

# Upload to AWS S3 bucket.
s3_bucket = s3_resource.Bucket(bucket_name)
s3_bucket.put_object(Body=write_buffer.getvalue(), Key=file_name)

So I first create a buffer, and later write it to S3.

Now as you can see I use 'pyarrow' in this case, but I just switched to 'fastparquet' because I need to package the dependencies for the Lambda, and the footprint for 'pyarrow' is huge. When I switch the above code to 'fastparquet' though it doesn't work.

--> 283 s3_bucket.put_object(Body=write_buffer.getvalue(), Key=file_name)

ValueError: I/O operation on closed file.

Now I'm not exactly an expert on this, but I read online this is due to the fact that the buffer gets closed automatically, and it can be worked around by overwriting the close() function.

class customBytesIO(io.BytesIO):
    def close(self):
        pass

parquet_buffer = customBytesIO()
df.to_parquet(path=parquet_buffer, engine='fastparquet', index=False)

So I did that, and now it indeed works, but seems a bit of a hack to me, and I guess it would be great if 'pyarrow' and 'fastparquet' would be compatible on this? Maybe something to look into?

martindurant commented 8 months ago

May I ask why you write locally before uploading?

I read online this is due to the fact that the buffer gets closed automatically, and it can be worked around by overwriting the close() function

I'm pretty sure this was my recommendation when something like this came up in the past. You could also use the memory:// filesystem in fsspec, which contains file-like objects that don'y close (and fsspec rsync might then be a natural choice to send to file to remote).

I am reticent to add yet another argument to write(), which has so many already. We could in theory tailor the code to only close files that we had opened, but that's not necessarily what the caller wants, so I thin the workaround is not that bad.

luukburger commented 8 months ago

I'm pretty sure this was my recommendation when something like this came up in the past.

Could well be! I think I found it in another place on Github, or maybe on StackOverflow...

May I ask why you write locally before uploading?

To the buffer, you mean? I did not want to write the file to a local file-system and then copy the file to S3, since I'm in a Lambda here. And with limited expertise on these kind of things, I just figured writing to a buffer was the best way to do what I wanted to do.

I can live with the workaround, but I just thought it would be nice if writing to Parquet from Pandas could be identical for PyArrow and Fastparquet, though I understand that after a change on your end this would also require applying it in Pandas.

martindurant commented 8 months ago

I can live with the workaround, but I just thought it would be nice if writing to Parquet from Pandas could be identical for PyArrow and Fastparquet

I don't intend to spend time on this right now, but contributions are of course welcome. It feels like this is niche enough, especially given both the workaround you found and the memory: filesystem. Furthermore, with simplecache::s3://bucket/file, you would also get the same behaviour of writing to a local file (not memory) and uploading when done.