Closed nantiamak closed 10 months ago
@Drvi I pushed a first version of the MultipartUploadStream
struct with two tests, one for S3 and one for Azure. I guess we could also integrate with GzipCompressorStream, but I think it's not necessary for starters.
Attention: 3 lines
in your changes are missing coverage. Please review.
Comparison is base (
328b427
) 83.13% compared to head (0790f79
) 84.21%.:exclamation: Current head 0790f79 differs from pull request most recent head c18952b. Consider uploading reports for the commit c18952b to get more accurate results
Files | Patch % | Lines |
---|---|---|
src/object.jl | 94.91% | 3 Missing :warning: |
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
Hi @Drvi, thanks for the feedback! It's very helpful.
One question I have is about the number of spawned tasks for the upload. For download/prefetching this is dependent on prefetch_multipart_size
and the size of input, but for upload we cannot know upfront the total upload size. How should the number of spawned tasks be determined in this case?
I think this is also related to how we'll know when all the tasks are done and the parts uploaded, as for _download_task
we decrease io.cond.ntasks
by 1 each time a task is done, if I understand correctly.
@nantiamak Ah, good point. I think the design space is a bit larger than I initially thought:) A couple of options
How many tasks to spawn?
write
up to a limit as needed (e.g. uploading tasks grab a semaphore, if there is a new buffer to upload and semafore if not fully acquired we check if we spawned max of tasks). This is probably trickier to implement.CloudStore.jl.put
does and just always spawn a new task for each buffer during write
, probably the simplestand we should target each buffer being 8MiB (MULTIPART_SIZE
). For PrefetchedDownloadStream a had to experiment quite a bit on EC2 to figure out which buffer sizes and number of task combos worked well, usually it was a good idea to follow the behavior of CloudStore.get
.
I think this is also related to how we'll know when all the tasks are done and the parts uploaded
Yes, so for PrefetchedDownloadStream
I used TaskCondition
as a counter, but since you are already using the OrderedSynchronizer
, I think you could use their counter that is used internally (https://github.com/JuliaServices/ConcurrentUtilities.jl/blob/main/src/synchronizer.jl#L120C1-L120C20) together with the condition. Maybe we should rethink the API of OrderedSynchronizer
so we wouldn't have to touch internals like this...
CI/Julia 1.6 is failing with UndefVarError: @atomic not defined
. How to go past this error? Is it an incompatibility issue with older Julia versions?
@Drvi About your comment "we should target each buffer being 8MiB (MULTIPART_SIZE)", the buffer is currently constructed outside write()
. Inside write
we only put it in the channel. Do you mean that buffers should be created inside write()
or that wherever they're created, the batch size passed to a buffer should be of size MULTIPART_SIZE
?
Hey @nantiamak, sorry for the delay.
CI/Julia 1.6 is failing with UndefVarError: @atomic not defined. How to go past this error? Is it an incompatibility issue with older Julia versions?
You can see e.g. in the OrderedSynchronizer
code how Jacob dealt with the issue -- https://github.com/JuliaServices/ConcurrentUtilities.jl/blob/main/src/synchronizer.jl
There are @static if VERSION < v"1.7"
version checks which are using the older API closed::Threads.Atomic{Bool}
instead of @atomic closed::Bool
etc.
@Drvi About your comment "we should target each buffer being 8MiB (MULTIPART_SIZE)", the buffer is currently constructed outside write(). Inside write we only put it in the channel. Do you mean that buffers should be created inside write() or that wherever they're created, the batch size passed to a buffer should be of size MULTIPART_SIZE?
I meant in our usage code we should target cca 8MiB chunked to be given to the MultipartUploadStream
. On the other hand, I agree it would be useful for the MultipartUploadStream
to have the ability do the chunking internally, but I don't know how to do that without being more complicated than just chunking externally. I'd say this is an open design question worth some experimenting.
@Drvi Thanks for the pointer! Indeed, I agree that chunking is more straightforward to be done externally for the time being.
How long is CI / Julia 1.6
supposed to take? I made changes to the code to be compatible with Julia 1.6 and now the job doesn't fail, but instead takes forever.
@nantiamak Ehm, not sure what is the timeout, but this indicates there is a deadlock happening -- best to try it locally. With juliaup you could simply call juliaup add 1.6
and then call julia +1.6
to get a Julia 1.6 REPL etc.
@Drvi As promised, here are some benchmark results on a t2.medium EC2 instance - 2CPUs (up to 3.3 GHz Intel Xeon Scalable processor), 4GB:
Method | Filename | Schema | Size (MB) | Time (s) | Allocations |
---|---|---|---|---|---|
MultipartUploadStream | csv_ints.csv | Tuple{Int64,Int64,Int64,Int64} | 39.6 | 1.17 | (17.64 k allocations: 38.339 MiB) |
Regular Put | csv_ints.csv | Tuple{Int64,Int64,Int64,Int64} | 39.6 | 1.17 | (17.84 k allocations: 38.347 MiB) |
MultipartUploadStream | csv_various.csv | Tuple{Int64,Int64,Float64,Float64,VS,VS} | 57.4 | 1.81 | (25.75 k allocations: 55.611 MiB) |
Regular Put | csv_various.csv | Tuple{Int64,Int64,Float64,Float64,VS,VS} | 57.4 | 1.47 | (25.36 k allocations: 55.557 MiB) |
@Drvi Could you please take another look on this PR to see if we can merge it?
@nantiamak Sorry for the late reply, in short:
put!
does throttle the number of tasks in flight and we should not deviate from that)OrderedSynchronizer
, when we get the (part_n, parteTag)
all we need to do is io.eTags[part_n] = parteTag
behind the lock (and making sure the io.eTags
is grown as needed).close
method does two things -- it waits for the submitted chunks to be written and then it closes the channel and calls completeMultipartUpload
. I think we should separate the two into wait
and close
.wait
should actually throw the error if there was one.Also, could you try larger files for the benchmark, say 700MiB, and use a semaphore?
A couple more results on a larger file and with adding a semaphore for MultipartUploadStream . |
Method | Filename | Schema | Size (MB) | Time (s) | Allocations |
---|---|---|---|---|---|---|
MultipartUploadStream | csv_various_larger.csv | Tuple{Int64,Int64,Float64,Float64,VS,VS} | 860.3 | 140.94 | (362.30 k allocations: 834.048 MiB, 0.01% gc time) | |
Regular Put | csv_various_larger.csv | Tuple{Int64,Int64,Float64,Float64,VS,VS} | 860.3 | 21.16 | (356.52 k allocations: 831.520 MiB, 0.04% gc time) |
There is a big difference now between MultipartUploadStream
and put
, which could be because I'm not configuring the semaphore correctly. I've currently set the default value to 4 * Threads.nthreads()
similar to defaultBatchSize()
.
Hmm, the performance difference seems problematic, we should investigate. Can you share the benchmarking code again?
@Drvi Regarding the following:
I think we should document that the chunks need to be written in order, which makes me think that we don't really need the OrderedSynchronizer, when we get the (part_n, parteTag) all we need to do is io.eTags[part_n] = parteTag behind the lock (and making sure the io.eTags is grown as needed).
Why should we change this behaviour for MultipleUploadStream
? putObjectImpl()
that also does a multipart upload works with an OrderedSynchronizer
.
Why should we change this behaviour for MultipleUploadStream? putObjectImpl() that also does a multipart upload works with an OrderedSynchronizer.
I just think it is simple to use 1 synchronization mechanism than 2, Since we already do the locking for the condition, we might as well assign the eTag to the eTags vector without involving the OrderedSynchornizer
Ah I get your point now. But what do you mean by "making sure the io.eTags is grown as needed"? I only know of push!
to grow a vector without knowing its size beforehand, but if I'm not mistaken you mean something different here.
@Drvi Do you maybe mean to use resize!
?
@Drvi I think I've addressed all of your feedback.
Btw, I think the benchnark results are heavily influenced by the performance of copy!
. I added some logging to it:
@time "copy " copyto!(buf, 1, csv, i, nb)
And the copy got progressively slower over time:
copy : 0.016634 seconds
copy : 0.028683 seconds
copy : 0.041162 seconds
copy : 0.052870 seconds
copy : 0.065678 seconds
copy : 0.077296 seconds
copy : 0.089498 seconds
copy : 0.101765 seconds
copy : 0.113838 seconds
copy : 0.126034 seconds
copy : 0.139463 seconds
copy : 0.151689 seconds
copy : 0.163353 seconds
copy : 0.175153 seconds
copy : 0.187306 seconds
copy : 0.199294 seconds
copy : 0.212154 seconds
copy : 0.223732 seconds
copy : 0.236243 seconds
copy : 0.248252 seconds
copy : 0.260920 seconds
copy : 0.272577 seconds
copy : 0.284779 seconds
copy : 0.297669 seconds
copy : 0.309319 seconds
copy : 0.322311 seconds
copy : 0.333299 seconds
copy : 0.347037 seconds
copy : 0.357561 seconds
copy : 0.371530 seconds
copy : 0.382138 seconds
copy : 0.395172 seconds
copy : 0.408420 seconds
copy : 0.420083 seconds
copy : 0.430917 seconds
copy : 0.445276 seconds
copy : 0.455702 seconds
copy : 0.468660 seconds
copy : 0.481685 seconds
copy : 0.493671 seconds
copy : 0.504613 seconds
copy : 0.516266 seconds
copy : 0.528877 seconds
copy : 0.550385 seconds
copy : 0.554715 seconds
copy : 0.565004 seconds
copy : 0.577645 seconds
copy : 0.590031 seconds
copy : 0.603796 seconds
copy : 0.615178 seconds
copy : 0.625942 seconds
copy : 0.638203 seconds
copy : 0.650485 seconds
copy : 0.663207 seconds
copy : 0.674765 seconds
copy : 0.688626 seconds
copy : 0.699776 seconds
copy : 0.712376 seconds
copy : 0.724644 seconds
copy : 0.737175 seconds
copy : 0.748462 seconds
copy : 0.760385 seconds
copy : 0.777539 seconds
copy : 0.784269 seconds
Wow! Good catch. I didn't expect it to be something in the benchmark code.
Then, I'll remove the comment about performance getting worse with larger files for now, as it might be misleading, but I've mentioned that the API is experimental.
This PR adds a
MultipartUploadStream
IO object inCloudStore.jl
, which facilitates uploading a stream of data chunks to blob storage. After creating aMultipartUploadStream
object, we can repeatedly callwrite()
and upload each part till there are no more chunks, when we can complete uploading and close the stream.