Closed gkolakowski-ias closed 4 years ago
This jstack
output part might be useful as well:
"main" #1 prio=5 os_prio=31 tid=0x00007f9296809000 nid=0xf03 waiting on condition [0x00007000074c8000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000078d0bb158> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
at alex.mojaki.s3upload.MultiPartOutputStream.putCurrentStream(MultiPartOutputStream.java:129)
at alex.mojaki.s3upload.MultiPartOutputStream.checkSize(MultiPartOutputStream.java:110)
at alex.mojaki.s3upload.MultiPartOutputStream.write(MultiPartOutputStream.java:145)
I followed your steps but I can't reproduce this. I just get an exception and a traceback as expected. Can you make a branch that reproduces the problem when running tests? Or does it only happen with real S3? Does it happen consistently or only here and there?
Make sure your version is up to date, the line numbers in your jstack don't match the code in master.
I guess the reason for the problem is the fact, that the data producer does not run in a separate thread. I had something like this.
final StreamTransferManager manager = getStreamTransferManager(outputKey);
try (final FileInputStream fis = new FileInputStream(inputPath.toFile());
final MultiPartOutputStream mpos = manager.getMultiPartOutputStreams().get(0)) {
IOUtils.copy(fis, mpos);
}
When I moved copying to a Runnable
which is submitted to a ExecutorService
it works fine.
Make sure your version is up to date, the line numbers in your jstack don't match the code in master.
I used the lastest version. The code lines do not match since I modified the code a bit, that is, I added exception in uploadStreamPart()
Here is a branch in which I have tried to follow your steps: https://github.com/alexmojaki/s3-stream-upload/compare/stuck-test?expand=1
I put the exception where you said. No matter what else I do, it just results in an ugly traceback. It never gets stuck. It's all running in one thread.
uploadStreamPart
is in StreamTransferManager.java
. I'm referring to this in your stack:
at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
at alex.mojaki.s3upload.MultiPartOutputStream.putCurrentStream(MultiPartOutputStream.java:129)
which is off by one because queue.put
is called on line 128.
@alexmojaki Not enough parts are created in the test. testTransferManager(1000000);
generates exactly 2 parts which is equal is to the queue size. Please use testTransferManager(10000000);
(* 10, that is, 20 parts are generated) and it gets blocked.
"main" #1 prio=5 os_prio=31 tid=0x00007fb7e4808800 nid=0x1003 waiting on condition [0x00007000043c9000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c07e3cc8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
at alex.mojaki.s3upload.MultiPartOutputStream.putCurrentStream(MultiPartOutputStream.java:128)
at alex.mojaki.s3upload.MultiPartOutputStream.checkSize(MultiPartOutputStream.java:110)
at alex.mojaki.s3upload.MultiPartOutputStream.write(MultiPartOutputStream.java:143)
at alex.mojaki.s3upload.MultiPartOutputStream.write(MultiPartOutputStream.java:148)
at alex.mojaki.s3upload.test.StreamTransferManagerTest.testTransferManager(StreamTransferManagerTest.java:197)
at alex.mojaki.s3upload.test.StreamTransferManagerTest.testTransferManager(StreamTransferManagerTest.java:156)
Thanks, that did the trick.
I've updated the branch - please take a look and tell me if it seems like a reasonable solution. Basically the queue is 'closed' when the upload is aborted to prevent producers trying to write to the stream. My Java is fading away and my concurrency has never been very good, I might be missing something obvious.
What if a producer is much faster than a consumer? The producer may be already blocked on queue.put()
when the consumer throws AmazonS3Exception
.
I think producer needs to periodically call queue.offer()
with some small but reasonable timeout until offer()
returns success or closed
flag is true. WDYT?
Great idea. I've opened a proper PR in #28, take a look.
Fix released in 2.2.1, upgrade once it becomes available.
I used StreamTransferManager with the following properties.
If a part upload fails, STM may block forever on putting an element into the queue:
I also encountered similar problem with the following settings:
STM blocked on
close()
method on putting poison pill into queue.I recreated issue by adding:
in
alex/mojaki/s3upload/StreamTransferManager.java:552