Open jgiunti-coatue opened 3 years ago
thank you for the report i will look into
release v0.3.6 is on his way
@jgiunti-coatue please dont hesitate to reopen the ticket if you found other problem. thank again
Thank you!
The fix was wrong
It would be great to be able to reproduce this error. It looks like we are accumulating the whole data in memory, which is weird since we are streaming the data.
@jgiunti-coatue
Can you show how did you create stream (content) passed in parameter of the function putObject
def putObject[R](
bucketName: String,
key: String,
contentLength: Long,
content: ZStream[R, Throwable, Byte],
options: UploadOptions
)
@regis-leray we have since moved to using multiPartUpload
as it seems to be truly using the stream to send the data to s3 and we haven't had the issue since. If I'm not mistaken, it seems like the conversion of the ZStream
in the putObject
function by calling toPublisher
consumes the entire stream, which is why I think the whole data is being accumulated in memory.
Check out the streamToPublisher
method in the reactive streams interop
def streamToPublisher[R, E <: Throwable, O](stream: ZStream[R, E, O]): ZIO[R, Nothing, Publisher[O]] =
ZIO.runtime.map { runtime => subscriber =>
if (subscriber == null) {
throw new NullPointerException("Subscriber must not be null.")
} else {
runtime.unsafeRunAsync_(
for {
demand <- Queue.unbounded[Long]
_ <- UIO(subscriber.onSubscribe(createSubscription(subscriber, demand, runtime)))
_ <- stream
.run(demandUnfoldSink(subscriber, demand))
.catchAll(e => UIO(subscriber.onError(e)))
.forkDaemon
} yield ()
)
}
}
The stream is consumed using a sink in order to create the publisher. I think this is why all the data is accumulated in memory.
Glad to hear, you were able to solve it.
thank you, it really help about your findings, i will try to see how we can build a safer publisher or will be force to change the signature to a ByteBuffer.
Need to investigate when we are building a bridge ZIO => Reactive Stream if we need to evaluate the whole stream in memory.
Im still looking what could cause this issue. Based on the discussion on discord channel with zio team, we didnt find any clue or problem which could lead to accumulate data in memory
@regis-leray This article was very helpful in helping me to understand the issue with using ByteBuffers. The problem we were encountering was using many threads to upload multiple large files to s3. This was causing OOM issues even though our heap was relatively low because they are allocated into native memory. This isn't an issue per se, but it can cause problems if you don't set specific java options to limit the amount of native memory and the size of the buffers that are cached per thread. The implementation using byte buffers isn't wrong per se, but it can cause non-obvious OOM issues for users.
The usage of
HeapByteBuffer
in theputObject
method can lead to a pretty subtle OOM issue that is tricky to figure out. If you are using many threads to do IO, in this case putting an object in s3, the JVM is going to cache one or more of these ByteBuffers per thread, and by default there is no limit on the size or number of these buffers. As as result, if you create many threads for IO and these buffers are large, the app can use a lot of additional native memory that looks like a leak.For us, when running in production, this presented itself as the java process consuming several more GB of memory than we had allocated for the heap and thus getting killed when it ran out of memory.
This issue can be mitigated by defining
jdk.nio.maxCachedBufferSize
so that large buffers are not cached, but maybe it makes sense to consider using a direct buffer or something else.