ome / ome2024-ngff-challenge

Project planning and material repository for the 2024 challenge to generate 1 PB of OME-Zarr data
https://pypi.org/project/ome2024-ngff-challenge/
BSD 3-Clause "New" or "Revised" License
11 stars 8 forks source link

Parallel writing and other fixes #23

Closed will-moore closed 3 weeks ago

will-moore commented 1 month ago

This PR fixes #28 by writing batches of chunks in parallel (based on a configurable number of threads).


This also fixes an issue I had when working with larger images: e.g. https://uk1s3.embassy.ebi.ac.uk/idr/zarr/v0.4/idr0047A/4496763.zarr which has shape 4,25,2048,2048.

I wanted to use --output-write-details to generate parameters.json using a guess for the shard shape, or even providing a shard shape with --output-shards 4,1,2048,2048. However, in both cases the guess_shards() is called and fails since the size is above the threshold:

(bf2raw_env) [wmoore@pilot-zarr3-dev ~]$ ome2024-ngff-challenge --input-bucket=idr --input-endpoint=https://uk1s3.embassy.ebi.ac.uk --input-anon zarr/v0.4/idr0047A/4496763.zarr params_4496763.json --output-write-details --output-shards 4,1,2048,2048

Traceback (most recent call last):
  File "/home/wmoore/miniconda3/envs/bf2raw_env/bin/ome2024-ngff-challenge", line 8, in <module>
    sys.exit(cli())
             ^^^^^
  File "/home/wmoore/miniconda3/envs/bf2raw_env/lib/python3.12/site-packages/ome2024_ngff_challenge/resave.py", line 712, in cli
    converted = main(ns, rocrate)
                ^^^^^^^^^^^^^^^^^
  File "/home/wmoore/miniconda3/envs/bf2raw_env/lib/python3.12/site-packages/ome2024_ngff_challenge/resave.py", line 544, in main
    convert_image(
  File "/home/wmoore/miniconda3/envs/bf2raw_env/lib/python3.12/site-packages/ome2024_ngff_challenge/resave.py", line 406, in convert_image
    ds_shards = guess_shards(ds_shape, ds_chunks)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/wmoore/miniconda3/envs/bf2raw_env/lib/python3.12/site-packages/ome2024_ngff_challenge/resave.py", line 42, in guess_shards
    raise ValueError(f"no shard guess: shape={shape}, chunks={chunks}")

This PR aims to only perform this validation IF we're actually using the guessed shard_shape to convert an image, NOT if we're just generating parameters.json. Also, we don't validate if the user is doing the conversion with shard_shape provided by --output-shards 4,1,2048,2048 or by --output-read-details=parameters.json.

To test...

ome2024-ngff-challenge --input-bucket=idr --input-endpoint=https://uk1s3.embassy.ebi.ac.uk --input-anon zarr/v0.4/idr0047A/4496763.zarr params_4496763.json --output-write-details

vi params_4496763.json
# edit each "shards" to: [4, 1, sizeY, sizeX]

ome2024-ngff-challenge --input-bucket=idr --input-endpoint=https://uk1s3.embassy.ebi.ac.uk --input-anon zarr/v0.4/idr0047A/4496763.zarr 4496763.zarr --output-read-details params_4496763.json
imagesc-bot commented 1 month ago

This pull request has been mentioned on Image.sc Forum. There might be relevant details there:

https://forum.image.sc/t/ome2024-ngff-challenge/97363/34

joshmoore commented 3 weeks ago

I've pushed the multithreading improvements from @JoOkuma now. Here are the cpu/mem usage for 16 threads:

plot-16threads

and 64 threads:

plot-64threads

If I use this code (i.e. everything in one transaction):

diff --git a/src/ome2024_ngff_challenge/resave.py b/src/ome2024_ngff_challenge/resave.py
index e4a8fe1..b5ff4e1 100755
--- a/src/ome2024_ngff_challenge/resave.py
+++ b/src/ome2024_ngff_challenge/resave.py
@@ -418,22 +418,19 @@ def convert_array(

     # read & write a chunk (or shard) at a time:
     blocks = shards if shards is not None else chunks
-    for idx, batch in enumerate(Batched(chunk_iter(read.shape, blocks), threads)):
-        start = time.time()
-        with ts.Transaction() as txn:
-            LOGGER.log(5, f"batch {idx:03d}: scheduling transaction size={len(batch)}")
-            for slice_tuple in batch:
-                write.with_transaction(txn)[slice_tuple] = read[slice_tuple]
-                LOGGER.log(
-                    5, f"batch {idx:03d}: {slice_tuple} scheduled in transaction"
-                )
-            LOGGER.log(5, f"batch {idx:03d}: waiting on transaction size={len(batch)}")
-        stop = time.time()
-        elapsed = stop - start
-        avg = float(elapsed) / len(batch)
-        LOGGER.debug(
-            f"batch {idx:03d}: completed transaction size={len(batch)} in {stop-start:0.2f}s (avg={avg:0.2f})"
-        )
+    with ts.Transaction() as txn:
+        for idx, slice_tuple in enumerate(chunk_iter(read.shape, blocks), threads):
+            write.with_transaction(txn)[slice_tuple] = read[slice_tuple]
+            LOGGER.log(
+                5, f"batch {idx:03d}: {slice_tuple} scheduled in transaction"
+            )
+        LOGGER.log(5, f"batch {idx:03d}: waiting on transaction size={idx}")
+    stop = time.time()
+    elapsed = stop - start
+    avg = float(elapsed) / idx
+    LOGGER.debug(
+        f"batch {idx:03d}: completed transaction size={idx} in {stop-start:0.2f}s (avg={avg:0.2f})"

then this is the usage:

plot-all

(exited early due to a slight typo in the quick diff above)

Finally, the original code (write.write(read).result()) had this usage:

plot-brutal

The command for all of these is more or less:

time ome2024-ngff-challenge --log=trace 9846151.zarr/0 
9846151.zarr-v3 --output-chunks=1,1,1,472,421 --output-
shards=1,1,1,5192,2947 --output-overwrite --output-
threads=16

and the graphs are recorded with:

psrecord 52813 --include-children  --plot plot-all.png
joshmoore commented 3 weeks ago

Waiting on a quick review from @will-moore then I'll get this out as 0.0.6 and move on to the other PRs.

will-moore commented 3 weeks ago

Thanks @joshmoore - Looks good 👍

imagesc-bot commented 3 weeks ago

This pull request has been mentioned on Image.sc Forum. There might be relevant details there:

https://forum.image.sc/t/ome2024-ngff-challenge-memory-issues-with-tensorstore/100636/7