eclipse-rdf4j / rdf4j

Eclipse RDF4J: scalable RDF for Java
https://rdf4j.org/
BSD 3-Clause "New" or "Revised" License
361 stars 163 forks source link

TimeLimitIteration calls close() asynchronously from another thread #4806

Open kenwenzel opened 11 months ago

kenwenzel commented 11 months ago

Current Behavior

Instead of just setting the isInterrupted flag, the close() method is directly called from interrupt(). The latter method is executed within an asynchronous thread which leads to race conditions if the iterator expects non-multithreaded usage (which is the usual case).

This seems to affect the LmdbRecordIterator that causes a A heap has been corrupted exception at least under Windows.

Expected Behavior

The close() method should not be called asynchronously from another thread. Maybe a global ThreadLocal should be introduced which provides access to isInterrupted or directly a checkInterrupted() method that throws an exception. Then its up to the individual iterators to check the interrupted state and stop the execution.

Another solution would be to explicitly support the asynchronous invocation of close() in LmdbRecordIterator.

Steps To Reproduce

No response

Version

4.2.2

Are you interested in contributing a solution yourself?

Perhaps?

Anything else?

No response

kenwenzel commented 11 months ago

A possibly related stack trace:

"TimeLimitIteration" #347 daemon prio=5 tid=0x00000162c5e1e800 nid=0x314c runnable [0x0000000408afe000]
   java.lang.Thread.State: RUNNABLE
   JavaThread state: _thread_in_native
 - org.lwjgl.util.lmdb.LMDB.nmdb_cursor_close(long) @bci=0 (Compiled frame; information may be imprecise)
 - org.lwjgl.util.lmdb.LMDB.mdb_cursor_close(long) @bci=12, line=1765 (Compiled frame)
 - org.eclipse.rdf4j.sail.lmdb.LmdbRecordIterator.close() @bci=11, line=192 (Compiled frame)
 - org.eclipse.rdf4j.sail.lmdb.LmdbStatementIterator.handleClose() @bci=8, line=87 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(org.eclipse.rdf4j.common.iteration.Iteration) @bci=11, line=189 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.UnionIteration.handleClose() @bci=98, line=113 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.DualUnionIteration.close() @bci=28, line=142 (Interpreted frame)
 - org.eclipse.rdf4j.sail.TripleSourceIterationWrapper.close() @bci=16, line=114 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.impl.evaluationsteps.StatementPatternQueryEvaluationStep$JoinStatementWithBindingSetIterator.close() @bci=16, line=570 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.JoinIterator.handleClose() @bci=8, line=104 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.JoinIterator.handleClose() @bci=17, line=106 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.JoinIterator.handleClose() @bci=17, line=106 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.JoinIterator.handleClose() @bci=17, line=106 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.JoinIterator.handleClose() @bci=17, line=106 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.LeftJoinIterator.handleClose() @bci=8, line=162 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(org.eclipse.rdf4j.common.iteration.Iteration) @bci=11, line=189 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.ConvertingIteration.handleClose() @bci=8, line=112 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.OrderIterator.handleClose() @bci=8, line=358 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(org.eclipse.rdf4j.common.iteration.Iteration) @bci=11, line=189 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.IterationWrapper.handleClose() @bci=8, line=127 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.FilterIteration.handleClose() @bci=1, line=105 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(org.eclipse.rdf4j.common.iteration.Iteration) @bci=11, line=189 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.IterationWrapper.handleClose() @bci=8, line=127 (Compiled frame)
 - org.eclipse.rdf4j.sail.base.SailClosingIteration.handleClose() @bci=1, line=120 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(org.eclipse.rdf4j.common.iteration.Iteration) @bci=11, line=189 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.IterationWrapper.handleClose() @bci=8, line=127 (Compiled frame)
 - org.eclipse.rdf4j.sail.helpers.SailBaseIteration.handleClose() @bci=1, line=55 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.sail.helpers.CleanerIteration$CleanableState.close() @bci=9, line=82 (Compiled frame)
 - org.eclipse.rdf4j.sail.helpers.CleanerIteration.close() @bci=4, line=36 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(org.eclipse.rdf4j.common.iteration.Iteration) @bci=11, line=189 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.IterationWrapper.handleClose() @bci=8, line=127 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.TimeLimitIteration.handleClose() @bci=9, line=99 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.TimeLimitIteration.interrupt() @bci=9, line=138 (Interpreted frame)
 - org.eclipse.rdf4j.common.iteration.InterruptTask.run() @bci=16, line=33 (Interpreted frame)
 - java.util.TimerThread.mainLoop() @bci=221, line=556 (Compiled frame)
 - java.util.TimerThread.run() @bci=1, line=506 (Interpreted frame)
kenwenzel commented 11 months ago

Potential fix for LmdbRecordIterator:

    @Override
    public void close() throws IOException {
        if (!closed) {
            long stamp;
            if (ownerThread != Thread.currentThread()) {
                stamp = txnLock.writeLock();
            } else {
                stamp = 0;
            }
            try {
                if (!closed) {
                    mdb_cursor_close(cursor);
                    pool.free(keyData);
                    pool.free(valueData);
                    if (minKeyBuf != null) {
                        pool.free(minKeyBuf);
                    }
                    if (maxKey != null) {
                        pool.free(maxKeyBuf);
                        pool.free(maxKey);
                    }
                }
            } finally {
                closed = true;
                if (stamp != 0) {
                    txnLock.unlockWrite(stamp);
                }
            }
        }
    }

The code uses the already existing txnLock and checks if the calling thread is different from the owner thread of the iterator. If this is the case then an exclusive write-lock is acquired.

hmottestad commented 11 months ago

Are we sure that any other read or write locks will be released, or is there a chance that some other code would wait to release its lock until this iterator is closed?

kenwenzel commented 11 months ago

The txnLock is only necessary to implement the auto-grow functionality. While growing the database no thread is allowed to read or write data. This lock does not depend on closing the iterator.

My colleague is currently running a load test and it seems that the above code fixes the problem and at least prevents the JVM from completely crashing.

Should we also think about changing the logic of TimeLimitIteration?

kenwenzel commented 11 months ago

The problem is bigger than first anticipated. Closing the data set triggers flush on the SAIL store from the wrong thread. I think that this could also be a problem for the NativeStore.

java.io.IOException: Invalid argument
    at org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E(LmdbUtil.java:61) ~[bundleFile:?]
    at org.eclipse.rdf4j.sail.lmdb.TxnManager$Txn.setActive(TxnManager.java:235) ~[bundleFile:?]
    at org.eclipse.rdf4j.sail.lmdb.TxnManager.activate(TxnManager.java:135) ~[bundleFile:?]
    at org.eclipse.rdf4j.sail.lmdb.TripleStore.endTransaction(TripleStore.java:884) ~[bundleFile:?]
    at org.eclipse.rdf4j.sail.lmdb.TripleStore.commit(TripleStore.java:902) ~[bundleFile:?]
    at org.eclipse.rdf4j.sail.lmdb.LmdbSailStore$LmdbSailSink.flush(LmdbSailStore.java:452) [bundleFile:?]
    at org.eclipse.rdf4j.sail.base.SailSourceBranch.flush(SailSourceBranch.java:302) [bundleFile:?]
    at org.eclipse.rdf4j.sail.base.SailSourceBranch.autoFlush(SailSourceBranch.java:394) [bundleFile:?]
    at org.eclipse.rdf4j.sail.base.SailSourceBranch$2.close(SailSourceBranch.java:255) [bundleFile:?]
    at org.eclipse.rdf4j.sail.base.DelegatingSailDataset.close(DelegatingSailDataset.java:47) [bundleFile:?]
    at org.eclipse.rdf4j.sail.base.SailSourceBranch$2.close(SailSourceBranch.java:250) [bundleFile:?]
    at org.eclipse.rdf4j.sail.base.UnionSailDataset.close(UnionSailDataset.java:59) [bundleFile:?]
    at org.eclipse.rdf4j.sail.base.SailClosingIteration.handleClose(SailClosingIteration.java:127) [bundleFile:?]
    at org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close(AbstractCloseableIteration.java:52) [bundleFile:?]
    at org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(Iterations.java:189) [bundleFile:?]
    at org.eclipse.rdf4j.common.iteration.IterationWrapper.handleClose(IterationWrapper.java:127) [bundleFile:?]
    at org.eclipse.rdf4j.sail.helpers.SailBaseIteration.handleClose(SailBaseIteration.java:55) [bundleFile:?]
    at org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close(AbstractCloseableIteration.java:52) [bundleFile:?]
    at org.eclipse.rdf4j.sail.helpers.CleanerIteration$CleanableState.close(CleanerIteration.java:82) [bundleFile:?]
    at org.eclipse.rdf4j.sail.helpers.CleanerIteration.close(CleanerIteration.java:36) [bundleFile:?]
    at org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(Iterations.java:189) [bundleFile:?]
    at org.eclipse.rdf4j.common.iteration.IterationWrapper.handleClose(IterationWrapper.java:127) [bundleFile:?]
    at org.eclipse.rdf4j.common.iteration.TimeLimitIteration.handleClose(TimeLimitIteration.java:99) [bundleFile:?]
    at org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close(AbstractCloseableIteration.java:52) [bundleFile:?]
    at org.eclipse.rdf4j.common.iteration.TimeLimitIteration.interrupt(TimeLimitIteration.java:138) [bundleFile:?]
    at org.eclipse.rdf4j.common.iteration.InterruptTask.run(InterruptTask.java:33) [bundleFile:?]
    at java.util.TimerThread.mainLoop(Timer.java:556) [?:?]
    at java.util.TimerThread.run(Timer.java:506) [?:?]
hmottestad commented 11 months ago

If I understand correctly, thread A created the iteration, it times out and is interrupted by the monitoring thread B. Thread B calls interrupt and also calls the close() method, which causes issues because it should be thread A that calls close(). Is that correct?

We could have thread B only interrupt thread A, and not close the iteration. For safety we could always have thread B close the iteration anyways after a specified amount of time.

So, thread A starts the iteration, it times out so thread B interrupts thread A. Thread B sleeps for 30 seconds (or until interrupted) and checks if the iteration is closed, if not then it closes the iteration. If things work out they way they are supposed to, then thread A is interrupted and calls close(), then interrupts thread B that wakes up and sees that the iteration is already closed and then does nothing.

kenwenzel commented 11 months ago

If I understand correctly, thread A created the iteration, it times out and is interrupted by the monitoring thread B. Thread B calls interrupt and also calls the close() method, which causes issues because it should be thread A that calls close(). Is that correct?

Yes, that is correct. The interrupt method of TimeLimitIteration directly calls close on the underlying iteration.

We could have thread B only interrupt thread A, and not close the iteration. For safety we could always have thread B close the iteration anyways after a specified amount of time.

Yes, that should work. The method Thread.isInterrrupted() could be used to check this state from nested iterations. If isInterrupted() is true then an InterruptedException or something like this could be thrown?!

So, thread A starts the iteration, it times out so thread B interrupts thread A. Thread B sleeps for 30 seconds (or until interrupted) and checks if the iteration is closed, if not then it closes the iteration. If things work out they way they are supposed to, then thread A is interrupted and calls close(), then interrupts thread B that wakes up and sees that the iteration is already closed and then does nothing.

Yes, this could work. It is only a problem with LMDB as only one thread may use a write transaction for example. If close() ends up calling flush() then an error like in the stacktrace above could be triggered.

kenwenzel commented 11 months ago

IterationWrapper already checks Thread.currentThread().isInterrupted() within hasNext(), next() and remove().

The same logic could be added to LookAheadIteration. This should cover already many cases.