terricain / aioboto3

Wrapper to use boto3 resources with the aiobotocore async backend
Apache License 2.0
743 stars 76 forks source link

aioboto3 to copy files in a bucket #305

Closed eduard93 closed 1 year ago

eduard93 commented 1 year ago

Description

I'm trying to copy files in s3. My lambda does the following:

  1. Iterates source bucket/prefix.
  2. For each file copies it to destination bucket/prefix.

Each file is below 1Kb in size. I expected this code to work much (or at least somewhat) faster than the basic boto3 version but it's really slow (~10.6 files per second on 1vCPU 1769 Mb RAM). What am I doing wrong? Files can be copied in any order.

What I Did

import asyncio
import logging
import aioboto3
from timeit import default_timer as timer

def lambda_handler(event, context):
    source = event.get("source", "my-bucket")
    source_prefix = event.get("source_prefix", "in/")
    destination = event.get("destination", source)
    destination_prefix = event.get("destination_prefix", "out/")

    if not source_prefix.endswith("/"):
        source_prefix += "/"
    if not destination_prefix.endswith("/"):
        destination_prefix += "/"

    asyncio.run(main(source, source_prefix, destination, destination_prefix))

async def main(source, source_prefix, destination, destination_prefix):
    start = timer()
    count = 0

    session = aioboto3.Session()
    async with session.client("s3") as s3:
        paginator = s3.get_paginator("list_objects_v2")
        async for page in paginator.paginate(Bucket=source, Prefix=source_prefix):
            for file in page.get("Contents", []):
                count += 1
                source_key = file["Key"]
                copy_source = {'Bucket': source, 'Key': source_key}
                await s3.copy(copy_source, destination, destination_prefix + source_key.split("/")[-1])

    end = timer()
    timing = end - start
    print(f"Sent files: {count}")
    print(f"Took: {timing:.2f} s")
    print(f"Speed: {count/timing:.2f} files/s")
terricain commented 1 year ago

Well, I wouldn't expect much from that s3.copy effectively downloads the file and uploads the file to the destination. See if s3.copy_object, that should in theory copy objects without downloading and uploading.

Also you're not performing the copy in parallel. Look into creating an async queue, pushing the copy args to it, run say 4? co-routines in the background to all try and read off the queue and perform the copy, then you can do await queue.join() to wait till they're finished and exit.

eduard93 commented 1 year ago

Thank you, rewrote like this:

import asyncio
import logging
import aioboto3
from timeit import default_timer as timer

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

def lambda_handler(event, context):
    source = event.get("source", "my-bucket")
    source_prefix = event.get("source_prefix", "in/")
    destination = event.get("destination", source)
    destination_prefix = event.get("destination_prefix", "out/")

    if not source_prefix.endswith("/"):
        source_prefix += "/"
    if not destination_prefix.endswith("/"):
        destination_prefix += "/"

    asyncio.run(main(source, source_prefix, destination, destination_prefix))

async def main(source, source_prefix, destination, destination_prefix):
    start = timer()
    count = 0

    session = aioboto3.Session()

    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    async with session.client("s3") as s3:
        paginator = s3.get_paginator("list_objects_v2")
        async for page in paginator.paginate(Bucket=source, Prefix=source_prefix):
            for file in page.get("Contents", []):
                count += 1
                source_key = file["Key"]
                copy_source = {'Bucket': source, 'Key': source_key}
                queue.put_nowait({
                                    "s3": s3,
                                    "copy_source": copy_source,
                                    "destination": destination,
                                    "destination_key": destination_prefix + source_key.split("/")[-1]})

        end = timer()
        LOGGER.info(f"Pagination: {end-start:.2f} s")

        # Create four worker tasks to process the queue concurrently.
        tasks = []
        for i in range(4):
            task = asyncio.create_task(worker(f'worker-{i}', queue))
            tasks.append(task)

        # Wait until the queue is fully processed.
        await queue.join()

    end2 = timer()
    LOGGER.info(f"Sending: {end2-end:.2f} s")

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()

    end3 = timer()
    timing = end3 - start
    LOGGER.info(f"Sent files: {count}")
    LOGGER.info(f"Took: {timing:.2f} s")
    LOGGER.info(f"Speed: {count/timing:.2f} files/s")

async def worker(name, queue):
    while True:
        try:
            # Get a "work item" out of the queue.
            args = await queue.get()

            # Sleep for the "sleep_for" seconds.
            await args["s3"].copy_object(CopySource=args["copy_source"], Bucket=args["destination"], Key=args["destination_key"])

            # Notify the queue that the "work item" has been processed.
            queue.task_done()
        except Exception as e:
            print(e)

With 4 workers: 43 msg/s With 10 workers: 10 msg/s.

Looks better.

terricain commented 1 year ago

Might be worth starting the workers before pushing to the queue, and change the put_nowait to await put, that way they can start processing as you're paginating.

Doesn't matter much here as its a lambda, but calling task.cancel() doesnt immediately cancel the worker coroutines, so for a "proper" cleanup you'd do

for task in tasks:
  task.cancel()
  await task

Which'll yield to the event loop so the coroutine can process the cancellation (which would raise a CancelledError, which you'd normally catch specifically and return)