zarr-developers / zarr-python

An implementation of chunked, compressed, N-dimensional arrays for Python.
https://zarr.readthedocs.io
MIT License
1.52k stars 282 forks source link

Add __dask_tokenize__ methods to Zarr objects #202

Open jakirkham opened 6 years ago

jakirkham commented 6 years ago

Would be good to have __dask_tokenize__ methods added to Array and possibly Group classes. These methods are used to generate unique identifiers for different Dask objects. By default, they will be totally random if the objects in question don't define this method or register some function for making this determination with Dask. Having these methods will allow reuse of Zarr Arrays that are loaded repeatedly. Also this will be helpful for some applications like caching where deterministic hashes are needed.

ref: http://dask.pydata.org/en/latest/custom-collections.html#deterministic-hashing

jakirkham commented 6 years ago

Should add this crosses over with issue ( #98 ).

Edit: implemented in #203

jakirkham commented 6 years ago

In master, there is an option to compute hashes on Zarr Arrays. So this could naively just reuse that. On a couple GB array, runtime is a few seconds using NFS. This may be fine for some use cases. Though runtime will obviously increase as size does. One can tell Dask to do this for all Zarr Arrays by using the code below. This will work at least until we come up with a __dask_tokenize__ solution in Zarr.

dask.base.normalize_token.register(zarr.Array)(lambda v: v.hexdigest("sha1"))
jakirkham commented 6 years ago

An alternative might be to use a weaker hash than sha1 like md5. This will compute more quickly. Dask already uses md5 within the graph and would reduce the value we return here with md5 anyways. Though it does mean the probability of collisions would increase. Not sure of the practical significance offhand.

Another option to consider would be hashes that are not cryptographically secure, but sufficient for distinguishing values and significantly faster. Some techniques Dask tries are CityHash, xxHash, and MurmurHash. More details in PR ( https://github.com/dask/dask/pull/2377 ). Of these only saw the ability to handle hashing incrementally with the Python library xxhash, which is important to deal with larger than memory data. Should add that xxHash is one of the faster ones.

Could also look at CRC algorithms as well. The crc32 function (i.e. CRC32) is available from builtin libraries in Python 2/3. Though modern Intel CPUs support CRC32C with intrinsics. IOW hardware support is available (likely requires an external library). Another option to consider would be adler32, which is faster than crc32.

Finally it is worth looking at hash trees like Merkle trees (what Dask uses 😉). These compute hashes for leaf nodes and then compute hashes for all other nodes based on hash the concatenated hashes of the children. As can be imagined this can be much faster than simply computing the hash of everything for two reasons. First parallelism is now an option. Second hashes typically pass a rolling window over the data, but breaking the data into chunks and hashing chunks separately effectively removes some of the regions this window would roll over. This could be combined with any of the techniques above. We could even get Dask to do this work for us by loading chunks directly into Dask.

jakirkham commented 6 years ago

Any thoughts on how best to tackle this one, @mrocklin?

mrocklin commented 6 years ago

In cases where zarr arrays point to very large datasets, hashing all of the data is probably not feasible.

In such cases you would probably want to defer to the underlying storage mechanism, which often has hashes pre-computed (S3 and GS do this). This would probably require that we defer this operation to the underlying MutableMapping, which would require changes in a number of upstream libraries.

jakirkham commented 6 years ago

That's a good point! Currently lack a clear picture on what scale other people use Zarr for.

Getting precomputed hashes from cloud storage services (when used) sounds like a great idea. Fixing multiple upstream libraries sounds tricky, how would you propose tackling that? Do you have an API in mind?

How do we handle hashing of things like DirectoryStore or ZipStore (assuming it is not cloud hosted somehow)? Any tricks that you know of?

On a related note, had been thinking we might be able to hash chunks as they are written to. Delaying hashing of the full array until it is read from, but using the precomputed chunk hashes to do so. This avoids having to hash everything from scratch each time. Not sure how well this scales offhand.

mrocklin commented 6 years ago

I would implement __dask_tokenize__ on the MutableMapping classes within s3fs, gcsfs, hdfs3, and so on. I would then have zarr.Array.__dask_tokenize__ just call tokenize on its underlying MutableMapping.

Each of the upstream libraries (s3fs, gcsfs, hdfs3) would implement __dask_tokenize__ using whatever mechanisms are common in that storage system.

For DirectoryStore I would probably tokenize by the filenames contained within the directory and the modification times. For ZipStore I would probably tokenize by the filename, and the modification time.

I imagine you could store hashes within Zarr metadata itself. This would presumably be an extension of the protocol.

jakirkham commented 6 years ago

Alright thinking about how to consolidate these case a bit. What if we simply add the modified time to .zarray when we write to the array? As a default, we can hash this modification time and the path to the array. This should work for any storage's MutableMapping and be reasonably fast as its independent of array size. However the MutableMapping implementation can define __dask_tokenize__ at the array level as an alternative and we can use it instead.

Side note: We can add the modified time to .zgroup too. Just shouldn't have any weight here.

jakirkham commented 6 years ago

We might also want to give some thought to how __dask_tokenize__ looks API-wise for a mutable mapping as it would somehow have to take the relevant key as an argument as well. Perhaps its name has to change as well ( __dask_tokenize_kv__? ).

mrocklin commented 6 years ago

What if we simply add the modified time to .zarray when we write to the array?

Does this mean that all Zarr operations would be responsible for updating the modification time metadata in the zarr dataset?

jakirkham commented 6 years ago

Any writing operations, yes. Shouldn't be too expensive since we would be writing anyways and this just the time. Though if we do this in .zarray, we would need to lock for each write operation, which might not be great. We could correct this last issue by tracking this modification time per chunk in separate files. Though that might hurt scaling if there are lots of chunks.

mrocklin commented 6 years ago

Yeah, I can imagine that this would be difficult to accomplish safely and efficiently at scale.