hail-is / hail

Cloud-native genomic dataframes and batch computing
https://hail.is
MIT License
984 stars 246 forks source link

[hailtop/fs] Make sync / copy tools take advantage of cloud specific apis as much as possible #14601

Open chrisvittal opened 4 months ago

chrisvittal commented 4 months ago

As part of our work with generating All of Us datasets, we needed to copy around a million gcs objects. Our Copier infrastructure 'should' be able to handle that, but it kept falling with robustness issues. What finally worked was using GCS's rewrite api. This allowed us to copy data without reading it, allowing the copies to complete in a fraction of the time while also reducing bandwidth needs.

There are two components to this:

  1. Research what specific APIs we can take advantage of
  2. Update our code to use them when we can, for the Copier, and the new sync tool (#14248)

Here's the code I used for making the rewrite requests for merging a set of matrix tables together, the progress bar code was for visibility.


async def rewrite(
    gfs: GoogleStorageAsyncFS,
    src: str,
    dst: str,
    progress: Optional[rich.progress.Progress] = None,
    file_tid: Optional[rich.progress.TaskID] = None,
    requests_tid: Optional[rich.progress.TaskID] = None,
):
    assert (progress is None) == (file_tid is None) == (requests_tid is None)
    src_bkt, src_name = gfs.get_bucket_and_name(src)
    dst_bkt, dst_name = gfs.get_bucket_and_name(dst)
    if not src_name:
        raise IsABucketError(src)
    if not dst_name:
        raise IsABucketError(dst)
    client = gfs._storage_client
    path = (
        f'/b/{src_bkt}/o/{urllib.parse.quote(src_name, safe="")}/rewriteTo'
        f'/b/{dst_bkt}/o/{urllib.parse.quote(dst_name, safe="")}'
    )
    kwargs = {'json': '', 'params': {}}
    client._update_params_with_user_project(kwargs, src_bkt)
    response = await retry_transient_errors(client.post, path, **kwargs)
    if progress is not None:
        progress.update(requests_tid, advance=1)
    while not response['done']:
        kwargs['params']['rewriteToken'] = response['rewriteToken']
        response = await retry_transient_errors(client.post, path, **kwargs)
        if progress is not None:
            progress.update(requests_tid, advance=1)
    if progress is not None:
        progress.update(file_tid, advance=1)
danking commented 4 months ago

Were you using the old copier or the new (not yet merged) hailctl fs sync? I had hoped the latter was finally robust enough for real use. hailtop.aiotools.copy is indeed not very reliable. Regardless, using the rewrite action when the source and destination agree is the correct move.

chrisvittal commented 4 months ago

We used a one off script, an attempt was made to use Copier.copy, but that wasn't reliable enough. We also needed to rename destination files beyond what the sync (or copy) tool is capable of.