meggart / DiskArrays.jl

Other
72 stars 13 forks source link

optimization for var[:,:,[1,2]], wrong result with indexing with two vectors and the type-stability issue #131

Closed Alexander-Barth closed 7 months ago

Alexander-Barth commented 10 months ago

In NCDatasets 0.12.17 I had optimization for var[:,:,[1,2]] converting it to var[:,:,1:2] and var[:,:,[1,2,10,11]] into two load operation v[:,:,1:2] and v[:,:,10:11]. This was implemented here:

https://github.com/Alexander-Barth/NCDatasets.jl/blob/v0.12.17/src/cfvariable.jl#L359-L436

Now a call to v[:,:,[1,2]] loads every element individually (which quite slow).

In fact, the relevant code is still in NDatasets 0.13 but no longer called.

Maybe this functionality should go into DiskArrays?

rafaqz commented 10 months ago

PRs always welcome

Alexander-Barth commented 10 months ago

OK, DiskArrays has an implementation of "batch" getindex.jl https://github.com/meggart/DiskArrays.jl/blob/7d297f8b4a0067f7c93cb84e8c2d42b88754e54e/src/batchgetindex.jl

Because of a too restrictive type declaration in CommonDataSet, I was not using it. This is now fixed. Now I am using batchgetindex.jl but there are two issues in its implementation:

With the _DiskArray type from runtests.jl I made these two tests (to reproduce issues in NCDatasets):

a1 = _DiskArray(zeros(2, 6000));
@show getindex_count(a1)
a1[:,[1,6000]]
@show getindex_count(a1)

output

getindex_count(a1) = 0
┌ Info: readblock! of _DiskArray
└   i = (1:2, 1:6000)
getindex_count(a1) = 1

The i variable is the parameter of readblock!. Surprisingly, here I get only 1 call the getindex but loading quite a lot of data for this case (in fact all). I wondering if this is intentional?

The other issue is that the output can be wrong in some circumstances:

a1 = _DiskArray(zeros(5,5,3));
size(a1[[1,2],[2,3],:])
# output (2, 3, 2)
# expected (2,2,3)

The code in batchindex is really complicated (for me at least :-)). I am wondering if we should replace it with my implementation (loads less data and works if there are two indices as vectors). It is also shorter but the current batchgetindex.jl may also do something else that I don't know.

Alexander-Barth commented 10 months ago

Another issue I just came accross is that with DiskArray the indexing is no-longer type stable (in this case):

Currently NCDatasets + DiskArrays:

julia> v = NCDataset("/tmp/sample2.nc")["data"].var;

julia> foo(v) = v[:,:,[1]];

julia> @code_warntype foo(v)
MethodInstance for foo(::NCDatasets.Variable{Float32, 3, NCDataset{Nothing}})
  from foo(v) @ Main REPL[6]:1
Arguments
  #self#::Core.Const(foo)
  v::NCDatasets.Variable{Float32, 3, NCDataset{Nothing}}
Body::Any
1 ─ %1 = Main.:(:)::Core.Const(Colon())
│   %2 = Main.:(:)::Core.Const(Colon())
│   %3 = Base.vect(1)::Vector{Int64}
│   %4 = Base.getindex(v, %1, %2, %3)::Any
└──      return %4

julia> @which v[:,:,[1]]
getindex(a::NCDatasets.Variable, i...)
     @ NCDatasets ~/.julia/packages/DiskArrays/QlfRF/src/diskarray.jl:211

NCDataset 0.12.17

ulia> foo(v) = v[:,:,[1]];

julia> @code_warntype foo(v)
MethodInstance for foo(::NCDatasets.Variable{Float32, 3, NCDataset{Nothing}})
  from foo(v) @ Main REPL[15]:1
Arguments
  #self#::Core.Const(foo)
  v::NCDatasets.Variable{Float32, 3, NCDataset{Nothing}}
Body::Array{Float32, 3}
1 ─ %1 = Main.:(:)::Core.Const(Colon())
│   %2 = Main.:(:)::Core.Const(Colon())
│   %3 = Base.vect(1)::Vector{Int64}
│   %4 = Base.getindex(v, %1, %2, %3)::Array{Float32, 3}
└──      return %4

julia> @which v[:,:,[1]]
getindex(v::Union{CommonDataModel.CFVariable, NCDatasets.MFVariable, NCDatasets.SubVariable, NCDatasets.Variable}, indices::Union{Colon, Int64, AbstractRange{<:Integer}, Vector{Int64}}...)
     @ NCDatasets ~/.julia/packages/NCDatasets/st9Jz/src/cfvariable.jl:406

Should I try to make a PR that replace the current batchgetindex implementation by the one from NCDatasets 0.12 ?

rafaqz commented 10 months ago

You can certainly try!

batchgetindex may be doing a bit more than you think, such as fast StepRange indexing and indexing with arbitrary vectors. But @meggart wrote most of that so I'm not 100% sure.

(by the way sharing all of this code across all backends is a very nice outcome of using DiskArrays.jl everywhere - there will likely be other things that NCDatasets.jl previously did better than DiskArrays.jl too, we certainly haven't got everything right yet)

meggart commented 10 months ago

Thanks a lot for sharing the example. Here it is indeed intended behavior to read the whole array, because in your case the chunk size equals the size of the array. So for tests, please experiment a bit with different chunkings of the _DiskArray to test if DiskArrays is doing the "right thing" for you. In general, with every operation we define in DiskArrays we try to minimize the number of chunk accesses on the array. When you have a compressed on-disk array it literally does not matter if you only read a single value or the whole chunk because partial decompression is very hard to impossible for most compression formats.

However, I agree that many users are also dealing with arrays that are not explicitly chunked or compressed, for these we currently usually generate some fake chunks (through eachchunk_fallback) which would quite often result in non-optimised behavior. Currently there is a bias in DiskArrays to optimise chunked+compressed arrays.

So, having special implementations for arrays where haschunks(a)=Unchunked() would be greatly appreciated. Please feel free to dispatch on this trait and provide better methods for arrays that are not explicitly chunked, probably not only in batchgetindex but also in other operations.

Alexander-Barth commented 10 months ago

When you have a compressed on-disk array it literally does not matter if you only read a single value or the whole chunk because partial decompression is very hard to impossible for most compression formats.

I would guess that for uncompressed chunks this case still make a difference. Also we can make some more intelligence choices about how much memory should be really allocated. Some compression algorithm can also work on stream and do not need to decompres the whole objects to get the first bytes for example.

From my understanding of this post, the widely used gzip algorithm has this capability.

rafaqz commented 10 months ago

Some compression algorithm can also work on stream and do not need to decompres the whole objects to get the first bytes for example

Do netcdf or gdal take advantage of that when reading partial chunks?

I would guess that for uncompressed chunks this case still make a difference

Benchmarks on a PR would be interesting

Alexander-Barth commented 10 months ago

Yes, it makes a difference. In the worst-case scenario (reading a time series for spatial chunked data), one would read a single value from each chunk:

Test with NCDatasets 0.12.17:

using BenchmarkTools                                                                                                                                                  
using NCDatasets                                                                                                                                                      

sz = (1024,1024,64)                                                                                                                                                   
ds = NCDataset("/tmp/tmp.nc","c")                                                                                                                                     
defDim(ds,"lon",sz[1])                                                                                                                                                
defDim(ds,"lat",sz[2])                                                                                                                                                
defDim(ds,"time",sz[3])                                                                                                                                               

defVar(ds,"data",Float64,("lon","lat","time"),chunksizes=[sz[1:2]...,1])                                                                                               
defVar(ds,"datac",Float64,("lon","lat","time"),chunksizes=[sz[1:2]...,1],deflatelevel = 9)                                                                            

ds["data"][:] = randn(sz);                                                                                                                                            
ds["datac"][:] = randn(sz);                                                                                                                                           

@btime ds["data"][1,1,1];                                                                                                                                             
@btime ds["data"][:,:,1];                                                                                                                                             

@btime ds["datac"][1,1,1];                                                                                                                                            
@btime ds["datac"][:,:,1];

output:

julia> @btime ds["data"][1,1,1];                                                                                                                                      
  15.069 μs (34 allocations: 1.62 KiB)                                                                                                                                

julia> @btime ds["data"][:,:,1];                                                                                                                                      
  1.014 ms (35 allocations: 8.00 MiB)                                                                                                                                 

julia> @btime ds["datac"][1,1,1];                                                                                                                                     
  13.518 μs (34 allocations: 1.62 KiB)                                                                                                                                

julia> @btime ds["datac"][:,:,1];                                                                                                                                     
  470.025 μs (35 allocations: 8.00 MiB)                                                                                                                               

So reading an entire chunk (if you do not need it) is 67 times slower for uncompressed data and 36 times slower for compressed data. But we read the same data multiple times in @btime. The netCDF library is likely cache the data. Some application might indeed read the same data multiple times but this is less typical. If the data is read for the first time, the differences are not as large:

sz = (1024,1024,64)                                                                                                                                                   
isfile("/tmp/tmp3.nc") && rm("/tmp/tmp3.nc")                                                                                                                          
ds = NCDataset("/tmp/tmp3.nc","c")                                                                                                                                    
defDim(ds,"lon",sz[1])                                                                                                                                                
defDim(ds,"lat",sz[2])                                                                                                                                                
defDim(ds,"time",sz[3])                                                                                                                                               

defVar(ds,"data",Float64,("lon","lat","time"),chunksizes=[sz[1:2]...,1])                                                                                              
defVar(ds,"datac",Float64,("lon","lat","time"),chunksizes=[sz[1:2]...,1],deflatelevel = 9)                                                                            

ds["data"][:] = randn(sz);                                                                                                                                            
ds["datac"][:] = randn(sz);                                                                                                                                           
close(ds)                                                                                                                                                             

ds = NCDataset("/tmp/tmp3.nc")                                                                                                                                        
# warm-up                                                                                                                                                             
ds["data"][1,1,1];                                                                                                                                                    
ds["data"][:,:,2];                                                                                                                                                    

@time ds["data"][1,1,3];                                                                                                                                              
@time ds["data"][:,:,4];                                                                                                                                              

@time ds["datac"][1,1,3];                                                                                                                                             
@time ds["datac"][:,:,4];           

output

  0.001231 seconds (34 allocations: 1.625 KiB)                                                                                                                        
  0.001682 seconds (35 allocations: 8.002 MiB)                                                                                                                        
  0.031734 seconds (34 allocations: 1.625 KiB)                                                                                                                        
  0.038516 seconds (35 allocations: 8.002 MiB)                                                                                                                        

So a difference of about 20 % in this case.

rafaqz commented 10 months ago

Interesting. May be good to compare the middle index in the chunk as well, as the first one is the extreme (fastest possible) case.

Alexander-Barth commented 10 months ago

For:

@time ds["data"][512,512,3];
@time ds["data"][:,:,4];

@time ds["datac"][512,512,3];
@time ds["datac"][:,:,4];

I get (loading the first time)

  0.026237 seconds (36 allocations: 1.656 KiB)
  0.045077 seconds (35 allocations: 8.002 MiB)
  0.033074 seconds (36 allocations: 1.656 KiB)
  0.038558 seconds (35 allocations: 8.002 MiB)

If the data is in the libnetcdf chunk cache (using @btime), I get:

julia> @btime ds["data"][512,512,3];
  15.549 μs (36 allocations: 1.66 KiB)

julia> @btime ds["data"][:,:,4];
  421.606 μs (35 allocations: 8.00 MiB)

julia> @btime ds["datac"][512,512,3];
  15.361 μs (36 allocations: 1.66 KiB)

julia> @btime ds["datac"][:,:,4];
  417.527 μs (35 allocations: 8.00 MiB)

Feel free to make other benchmarks with other indices. Another use case is to load data over the network via opendap or HTTP byte ranges request, you certainly do not want to transfer more data than necessary.

rafaqz commented 10 months ago

How is this one faster ? :smile:

That does look worthwhile having.

meggart commented 10 months ago

We are again running into the situation where benchmarking is extremely difficult because as you say @btime will only measure the speed of copying data from netcdf cache to some other piece of memory.

0.026237 seconds (36 allocations: 1.656 KiB) 0.045077 seconds (35 allocations: 8.002 MiB) 0.033074 seconds (36 allocations: 1.656 KiB) 0.038558 seconds (35 allocations: 8.002 MiB)

I am also running the benchmarks right now and still trying to get a better picture, but I guess it will be very complicated to come up with good heuristics.

Another use case is to load data over the network via opendap or HTTP byte ranges request, you certainly do not want to transfer more data than necessary.

You are right, but for e.g remote datasets stored in Zarr it will be the complete opposite. When doing multiple reads into the same chunk, you would have to transfer the whole chunk multiple times to the local machine, while the current DiskArrays.jl implementation makes sure that every chunk is touched exactly once for any data. As I said, DiskArrays.jl is really tailored with the Zarr model in mind where there simply is no libary-in-the-middle that does some efficient pre-filtering for you.

It would be very important to discuss the different use cases, and probably we will have to define additional backend traits in DiskArrays.jl to be able to find the best heuristics that would fit most/all of the real-world use cases.

Alexander-Barth commented 10 months ago

This start to sound like a big change. If the approach in my PR is not suitable, and can somebody else have a look how to fix the wrong result with indexing with two vectors and the type-stability issue?

Alexander-Barth commented 9 months ago

Following our meeting, I tried to implement DiskArrays.readblock!(A::Variable, A_ret, indices::AbstractVector...) and skipping the macro DiskArrays.@implement_batchgetindex. Unfortunately, the internal DiskArrays.batchgetindex still gets called here:

https://github.com/meggart/DiskArrays.jl/blob/7d297f8b4a0067f7c93cb84e8c2d42b88754e54e/src/diskarray.jl#L36

But when I define a more specific DiskArrays.batchgetindex function in NCDatasets, it seems to work:

using NCDatasets

filename_src = tempname()
ds = NCDataset(filename_src, "c")
data = zeros(Int,5,5,3)
ncv = defVar(ds,"data",data,("lon","lat","time"))

# typeof(ncv.var) is NCDatasets.Variable which implements DiskArrays
@show size(ncv.var[[1,2],[2,3],:])
# previously output (2, 3, 2)
# now (2, 2, 3)

(Let me know if you see a better way :-))

Alexander-Barth commented 9 months ago

unfortunately, the type instability persists:

julia> @code_warntype DiskArrays.getindex_disk(ncv.var,:,:,[1])
MethodInstance for DiskArrays.getindex_disk(::NCDatasets.Variable{Int64, 1, NCDataset{Nothing}}, ::Colon, ::Colon, ::Vector{Int64})
  from getindex_disk(a, i...) @ DiskArrays ~/.julia/dev/DiskArrays/src/diskarray.jl:33
Arguments
  #self#::Core.Const(DiskArrays.getindex_disk)
  a::NCDatasets.Variable{Int64, 1, NCDataset{Nothing}}
  i::Tuple{Colon, Colon, Vector{Int64}}
Locals
  @_4::Int64
  data::Vector{Int64}
  trans::DiskArrays.Reshaper
  inds::Tuple{Base.OneTo{Int64}}
Body::Any
1 ─       Core.NewvarNode(:(@_4))
│         Core.NewvarNode(:(data))
│         Core.NewvarNode(:(trans))
│         Core.NewvarNode(:(inds))
│         DiskArrays.checkscalar(i)
│   %6  = DiskArrays.any(DiskArrays.is_batch_arg, i)::Bool
└──       goto #3 if not %6
2 ─ %8  = DiskArrays.batchgetindex::Core.Const(DiskArrays.batchgetindex)
│   %9  = Core.tuple(a)::Tuple{NCDatasets.Variable{Int64, 1, NCDataset{Nothing}}}
│         Core._apply_iterate(Base.iterate, %8, %9, i)
└──       Core.Const(:(return %10))
3 ┄ %12 = DiskArrays.interpret_indices_disk(a, i)::Tuple{Tuple{Base.OneTo{Int64}}, DiskArrays.Reshaper}                                  
│   %13 = Base.indexed_iterate(%12, 1)::Core.PartialStruct(Tuple{Tuple{Base.OneTo{Int64}}, Int64}, Any[Tuple{Base.OneTo{Int64}}, Core.Const(2)])
│         (inds = Core.getfield(%13, 1))
│         (@_4 = Core.getfield(%13, 2))
│   %16 = Base.indexed_iterate(%12, 2, @_4::Core.Const(2))::Core.PartialStruct(Tuple{DiskArrays.Reshaper, Int64}, Any[DiskArrays.Reshaper, Core.Const(3)])
│         (trans = Core.getfield(%16, 1))
│   %18 = DiskArrays.Array::Core.Const(Array)
│   %19 = DiskArrays.eltype(a)::Core.Const(Int64)
│   %20 = Core.apply_type(%18, %19)::Core.Const(Array{Int64})
│   %21 = Core.tuple(DiskArrays.undef)::Core.Const((UndefInitializer(),))
│   %22 = DiskArrays.map(DiskArrays.length, inds)::Tuple{Int64}
│         (data = Core._apply_iterate(Base.iterate, %20, %21, %22))
│   %24 = DiskArrays.readblock!::Core.Const(DiskArrays.readblock!)
│   %25 = Core.tuple(a, data)::Tuple{NCDatasets.Variable{Int64, 1, NCDataset{Nothing}}, Vector{Int64}}               
│         Core._apply_iterate(Base.iterate, %24, %25, inds)
│   %27 = (trans)(data)::Any
└──       return %27

even is this is type-stable:

julia> @code_warntype NCDatasets.batchgetindex(ncv.var,:,:,[1])
MethodInstance for DiskArrays.batchgetindex(::NCDatasets.Variable{Int64, 3, NCDataset{Nothing}}, ::Colon, ::Colon, ::Vector{Int64})
  from batchgetindex(v::NCDatasets.Variable, indices::Union{Colon, AbstractVector{<:Integer}, var"#s245"} where var"#s245"<:Integer...) @ NCDatasets ~/.julia/dev/NCDatasets/src/variable.jl:589                                             
Arguments
  #self#::Core.Const(DiskArrays.batchgetindex)
  v::NCDatasets.Variable{Int64, 3, NCDataset{Nothing}}
  indices::Tuple{Colon, Colon, Vector{Int64}}
Body::Array{Int64, 3}
1 ─ %1 = NCDatasets._batchgetindex::Core.Const(NCDatasets._batchgetindex)
│   %2 = Core.tuple(v)::Tuple{NCDatasets.Variable{Int64, 3, NCDataset{Nothing}}}
│   %3 = Core._apply_iterate(Base.iterate, %1, %2, indices)::Array{Int64, 3}
└──      return %3
rafaqz commented 9 months ago

Is this a PR somewhere? Would be good to read it/run it

To me it looks loke the instability is in the other half of the is_batch_arg conditional

Alexander-Barth commented 9 months ago

Here is the PR that brings "my" batchgetindex to DiskArrays

https://github.com/meggart/DiskArrays.jl/pull/132

rafaqz commented 9 months ago

Ok. If the instabilty is not in your code or batchgetindex (no time to check yet, but really looks like its not), we can merge the PR and address the instability separately?

Alexander-Barth commented 9 months ago

OK, can you or @meggart look at the comment need_batch = false in the PR?

Alexander-Barth commented 7 months ago

Given the comment here, maybe this example helps to clarify:

NetCDF.jl + DiskArray

julia> using NetCDF

julia> v = NetCDF.open("http://tds.hycom.org/thredds/dodsC/GLBy0.08/expt_93.0","water_temp");

julia> size(v)
(4500, 4251, 40, 14969)

julia> v1 = @time v[1000,1000,1,1:10];
  1.478774 seconds (3.71 M allocations: 250.669 MiB, 3.42% gc time, 59.32% compilation time)

julia> v1 = @time v[1000,1000,1,1:10];
  0.467219 seconds (45 allocations: 1.781 KiB)

julia> v1 = @time v[1000,1000,1,collect(1:10)];
739.847348 seconds (1.32 M allocations: 819.944 MiB, 0.03% gc time, 0.10% compilation time)

NCDatasets 0.12 without DiskArray

julia> using NCDatasets

julia> ds = NCDataset("http://tds.hycom.org/thredds/dodsC/GLBy0.08/expt_93.0"); v = ds["water_temp"];

julia> v1 = @time v[1000,1000,1,1:10];
  0.726571 seconds (134.15 k allocations: 8.849 MiB, 14.90% compilation time)

julia> v1 = @time v[1000,1000,1,1:10];
  0.218793 seconds (62 allocations: 1.984 KiB)

julia> v1 = @time v[1000,1000,1,collect(1:10)];
  0.879592 seconds (982.55 k allocations: 66.829 MiB, 9.49% gc time, 65.79% compilation time)

So 12 minutes versus 1 seconds for this example. Yes, I know that I can bypass DiskArrays by providing my own batchgetindex. But I don't think that DiskArray should make the current assumptions if it aims to be generic (if this is the goal).

Can we have the current assumptions (i.e. reading a chunk similarly fast than reading a subset of chunk) documented?

Concerning the API, another think I am wondering if we need a function named batchgetindex at all. Why not having getindex simply pass the indices to DiskArrays.readblock! and let DiskArrays.readblock! figure out how to best load the data. DiskArrays.readblock! is already specific to the storage format.

rafaqz commented 7 months ago

Thanks for writing this up, but probably this needs a specific issue, how does it relate to this one?

(this issue already mixes two separate issues into one... we need to organise this process a little I find myself not understanding what the purpose is)

I think we need to make three separate issues and discuss the details of each to contain this a little. Does this make sense:

  1. type stability problems
  2. incorrect result when indexing with vectors
  3. slow reads for a collected vector.
  4. all data in a block is loaded every time

With separate MWEs and a clear path to fixing them.

Currently I don't know what to work on to help move this forward.

rafaqz commented 7 months ago

Ok I've made separate issues for these problems. Lets work through them and get it fixed.