epam / cloud-pipeline

Cloud agnostic genomics analysis, scientific computation and storage platform
https://cloud-pipeline.com
Apache License 2.0
145 stars 58 forks source link

Implement parallel transferring in pipe cli for Google storages #1374

Closed tcibinan closed 2 years ago

tcibinan commented 4 years ago

Background

Currently pipe cli uses default single-threaded approach for data transferring in Google Cloud storages.

Approach

Nevertheless it would be quite useful to have support for parallel transferring the same way it is done for AWS S3 in boto3 library.

tcibinan commented 4 years ago

Benchmarks

Average values of 5 attempts are shown for each operation below.

Operation Size Before, s After, s Faster, times
Upload 10M 1 1 -
Upload 100M 2 2 -
Upload 1G 8 5 ▲ 1.6
Upload 10G 66 35 ▲ 1.89
Upload 100G 674 336 ▲ 2.01
Upload 200G 1422 670 ▲ 2.12
Download 10M 1 1 -
Download 100M 2 2 -
Download 1G 8 7 ▲ 1.14
Download 10G 63 53 ▲ 1.19
Download 100G 656 539 ▲ 1.22
Download 200G 1272 1000 ▲ 1.27

Environment

OS: Ubuntu 18.04 Disk: 1000GB Instance: n2-standard-4 (cpu: 4, ram: 16) Cloud region: europe-west1-b Deployment cloud provider: GCP Before commit: bc7f5fce8f6e9d77ce1aa1172a6ba5c6b31e0148 After commit: 8de45b4f73da5cbaea5f31a97a6c325691b60d59

Source code

``` ./measure_performance.sh > measured_performance.tsv python3 collect_summary.py ```

measure_performance.sh

```bash executables=("./pipe_old" "./pipe_new") operations=("upload" "download") sizes=(10M 100M 1G 10G 100G 200G) destination="gs://fuse-dev-storage-1" attempts=5 for size in "${sizes[@]}" do file=file_$size fallocate -l $size $file done echo -e "operation\texecutable\tsize\telapsed, s" for executable in "${executables[@]}" do for operation in "${operations[@]}" do if [[ $operation == "upload" ]] then for size in "${sizes[@]}" do for ((n=0;n/dev/null 2>&1 start_time="$(date -u +%s)" $executable storage cp -q $file $destination/$file if [[ $? -eq 0 ]] then end_time="$(date -u +%s)" elapsed="$(($end_time-$start_time))" echo -e "upload\t$executable\t$size\t$elapsed" else end_time="$(date -u +%s)" elapsed="$(($end_time-$start_time))" echo -e "FAIL_upload\t$executable\t$size\t$elapsed" fi sleep 1 done done fi if [[ $operation == "download" ]] then for size in "${sizes[@]}" do for ((n=0;n/dev/null 2>&1 start_time="$(date -u +%s)" $executable storage cp -q $destination/$file $file if [[ $? -eq 0 ]] then end_time="$(date -u +%s)" elapsed="$(($end_time-$start_time))" echo -e "download\t$executable\t$size\t$elapsed" else end_time="$(date -u +%s)" elapsed="$(($end_time-$start_time))" echo -e "FAIL_download\t$executable\t$size\t$elapsed" fi sleep 1 done done fi done done ```

collect_summary.py

```python from collections import OrderedDict import statistics measured_performance = 'measured_performance.tsv' aggregating_function = statistics.mean allowed_prefixes = ['upload', 'download'] with open(measured_performance) as f: lines = f.readlines() operations = OrderedDict() for line in lines: if not any(line.startswith(allowed_prefix) for allowed_prefix in allowed_prefixes): continue operation, executable, size, elapsed = line.split('\t') executables = operations.get(operation, OrderedDict()) operations[operation] = executables sizes = executables.get(executable, {}) executables[executable] = sizes measurements = sizes.get(size, []) sizes[size] = measurements measurements.append(int(elapsed)) header = ['operation', 'size'] operation_outputs = {} for operation, executables in operations.items(): outputs = operation_outputs.get(operation, {}) operation_outputs[operation] = outputs for executable, sizes in executables.items(): if executable not in header: header.append(executable) for size, measurements in sizes.items(): out = outputs.get(size, []) outputs[size] = out out.append(int(aggregating_function(measurements))) header.append('times') print('\t'.join(header)) for operation, outputs in operation_outputs.items(): for size, out in outputs.items(): a, b = out print('\t'.join(map(str, [operation, size, a, b, round(a / b, 2)]))) ```

measured_performance.tsv

```tsv operation executable size elapsed, s upload ./pipe_old 10M 2 upload ./pipe_old 10M 2 upload ./pipe_old 10M 1 upload ./pipe_old 10M 1 upload ./pipe_old 10M 2 upload ./pipe_old 100M 2 upload ./pipe_old 100M 2 upload ./pipe_old 100M 2 upload ./pipe_old 100M 2 upload ./pipe_old 100M 2 upload ./pipe_old 1G 9 upload ./pipe_old 1G 8 upload ./pipe_old 1G 9 upload ./pipe_old 1G 8 upload ./pipe_old 1G 8 upload ./pipe_old 10G 71 upload ./pipe_old 10G 66 upload ./pipe_old 10G 65 upload ./pipe_old 10G 63 upload ./pipe_old 10G 65 upload ./pipe_old 100G 664 upload ./pipe_old 100G 669 upload ./pipe_old 100G 686 upload ./pipe_old 100G 681 upload ./pipe_old 100G 673 upload ./pipe_old 200G 1427 upload ./pipe_old 200G 1436 upload ./pipe_old 200G 1406 upload ./pipe_old 200G 1387 upload ./pipe_old 200G 1457 upload ./pipe_old 1T 10295 upload ./pipe_new 10M 2 upload ./pipe_new 10M 1 upload ./pipe_new 10M 1 upload ./pipe_new 10M 2 upload ./pipe_new 10M 2 upload ./pipe_new 100M 2 upload ./pipe_new 100M 2 upload ./pipe_new 100M 3 upload ./pipe_new 100M 2 upload ./pipe_new 100M 2 upload ./pipe_new 1G 6 upload ./pipe_new 1G 5 upload ./pipe_new 1G 5 upload ./pipe_new 1G 5 upload ./pipe_new 1G 5 upload ./pipe_new 10G 37 upload ./pipe_new 10G 34 upload ./pipe_new 10G 35 upload ./pipe_new 10G 35 upload ./pipe_new 10G 34 upload ./pipe_new 100G 332 upload ./pipe_new 100G 337 upload ./pipe_new 100G 334 upload ./pipe_new 100G 340 upload ./pipe_new 100G 337 upload ./pipe_new 200G 667 upload ./pipe_new 200G 709 upload ./pipe_new 200G 658 upload ./pipe_new 200G 657 upload ./pipe_new 200G 660 upload ./pipe_new 1T 3280 download ./pipe_old 10M 2 download ./pipe_old 10M 1 download ./pipe_old 10M 2 download ./pipe_old 10M 2 download ./pipe_old 10M 1 download ./pipe_old 100M 2 download ./pipe_old 100M 2 download ./pipe_old 100M 2 download ./pipe_old 100M 4 download ./pipe_old 100M 2 download ./pipe_old 1G 8 download ./pipe_old 1G 7 download ./pipe_old 1G 9 download ./pipe_old 1G 7 download ./pipe_old 1G 9 download ./pipe_old 10G 53 download ./pipe_old 10G 66 download ./pipe_old 10G 71 download ./pipe_old 10G 70 download ./pipe_old 10G 57 download ./pipe_old 100G 715 download ./pipe_old 100G 811 download ./pipe_old 100G 504 download ./pipe_old 100G 481 download ./pipe_old 100G 772 download ./pipe_old 200G 876 download ./pipe_old 200G 1239 download ./pipe_old 200G 1262 download ./pipe_old 200G 1558 download ./pipe_old 200G 1427 download ./pipe_new 10M 2 download ./pipe_new 10M 1 download ./pipe_new 10M 2 download ./pipe_new 10M 2 download ./pipe_new 10M 2 download ./pipe_new 100M 2 download ./pipe_new 100M 2 download ./pipe_new 100M 3 download ./pipe_new 100M 2 download ./pipe_new 100M 3 download ./pipe_new 1G 7 download ./pipe_new 1G 7 download ./pipe_new 1G 7 download ./pipe_new 1G 7 download ./pipe_new 1G 7 download ./pipe_new 10G 52 download ./pipe_new 10G 51 download ./pipe_new 10G 52 download ./pipe_new 10G 51 download ./pipe_new 10G 63 download ./pipe_new 100G 508 download ./pipe_new 100G 529 download ./pipe_new 100G 591 download ./pipe_new 100G 529 download ./pipe_new 100G 539 download ./pipe_new 200G 1038 download ./pipe_new 200G 1047 download ./pipe_new 200G 977 download ./pipe_new 200G 975 download ./pipe_new 200G 967 ```

Stress benchmarks

Single attempts are shown for each operation below.

Operation Size Before, s After, s Faster, times
Upload 1T 11229 4595 ▲ 2.44
Download 1T 7525 4992 ▲ 1.51

Environment

OS: Ubuntu 18.04 Disk: 2000GB Instance: n2-standard-4 (cpu: 4, ram: 16) Cloud region: europe-west1-b Deployment cloud provider: GCP Before commit: 34ecf35a63029d671cf67ea97807dfcfc293f6d5 After commit: df8b63e096cc2fe8b50ffd362a32d1caeb0c00a8