terricain / aioboto3

Wrapper to use boto3 resources with the aiobotocore async backend
Apache License 2.0
735 stars 75 forks source link

feature request #191

Closed Sanchoyzer closed 4 years ago

Sanchoyzer commented 4 years ago

aioboto3/s3/inject.py

This is current method for async (and sync) streaming upload file-like object:

async def upload_fileobj(self, Fileobj: BinaryIO, Bucket: str, Key: str, ExtraArgs: Optional[Dict[str, Any]] = None,
                         Callback: Optional[Callable[[int], None]] = None,
                         Config: Optional[S3TransferConfig] = None):
...
    async def file_reader() -> None:
...
        while not eof:
...
                if asyncio.iscoroutinefunction(Fileobj.read):  # handles if we pass in aiofiles obj
                    # noinspection PyUnresolvedReferences
                    data = await Fileobj.read(io_chunksize)
                else:
                    data = Fileobj.read(io_chunksize)
                    await asyncio.sleep(0.0)
...
                multipart_payload += data
...
            if not multipart_payload:
                break

            await io_queue.put({'Body': multipart_payload, 'Bucket': Bucket, 'Key': Key, 'PartNumber': part, 'UploadId': upload_id})

It will be great to be able to create the method for stream processing the chunk. Example:

async def upload_fileobj(self, Fileobj: BinaryIO, Bucket: str, Key: str, ExtraArgs: Optional[Dict[str, Any]] = None,
                         Callback: Optional[Callable[[int], None]] = None,
                         Config: Optional[S3TransferConfig] = None,
                         Processing: Callable[[bytes], bytes] = None):
...
    async def file_reader() -> None:
...
        while not eof:
...
                if asyncio.iscoroutinefunction(Fileobj.read):  # handles if we pass in aiofiles obj
                    # noinspection PyUnresolvedReferences
                    data = await Fileobj.read(io_chunksize)
                else:
                    data = Fileobj.read(io_chunksize)
                    await asyncio.sleep(0.0)
...
                multipart_payload += data
...
            if not multipart_payload:
                break
            elif Processing:
                multipart_payload = Processing(multipart_payload)

            await io_queue.put({'Body': multipart_payload, 'Bucket': Bucket, 'Key': Key, 'PartNumber': part, 'UploadId': upload_id})

Example usage: it will be useful to be able to copy very big file from first bucket to second bucket and convert it without total save in RAM

    async with aioboto3.client('s3') as s3:
        s3_ob = await s3.get_object(Bucket=bucket_src, Key=s3_key)
        async with s3_ob['Body'] as stream:
            await s3.upload_fileobj(
                Fileobj=stream,
                Bucket=bucket_dest,
                Key=s3_key,
                Processing=lambda x: x.lower(),
            )
Sanchoyzer commented 4 years ago

@terrycain what do you think about it?

terricain commented 4 years ago

Hmm, am undecided, as obviously you could just wrap s3_ob.read yourself. I guess it wouldn't hurt but then we'd need to document the whole function. Feel free to make a PR with that and some docs and i'll merge it