Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TimeLimitIteration calls close() asynchronously from another thread #4806

Open
kenwenzel opened this issue Oct 10, 2023 · 8 comments
Open

TimeLimitIteration calls close() asynchronously from another thread #4806

kenwenzel opened this issue Oct 10, 2023 · 8 comments
Labels
🐞 bug issue is a bug

Comments

@kenwenzel
Copy link
Contributor

kenwenzel commented Oct 10, 2023

Current Behavior

Instead of just setting the isInterrupted flag, the close() method is directly called from interrupt().
The latter method is executed within an asynchronous thread which leads to race conditions if the iterator expects non-multithreaded usage (which is the usual case).

This seems to affect the LmdbRecordIterator that causes a A heap has been corrupted exception at least under Windows.

Expected Behavior

The close() method should not be called asynchronously from another thread.
Maybe a global ThreadLocal should be introduced which provides access to isInterrupted or directly a checkInterrupted() method that throws an exception. Then its up to the individual iterators to check the interrupted state and stop the execution.

Another solution would be to explicitly support the asynchronous invocation of close() in LmdbRecordIterator.

Steps To Reproduce

No response

Version

4.2.2

Are you interested in contributing a solution yourself?

Perhaps?

Anything else?

No response

@kenwenzel kenwenzel added the 🐞 bug issue is a bug label Oct 10, 2023
@kenwenzel
Copy link
Contributor Author

A possibly related stack trace:

"TimeLimitIteration" #347 daemon prio=5 tid=0x00000162c5e1e800 nid=0x314c runnable [0x0000000408afe000]
   java.lang.Thread.State: RUNNABLE
   JavaThread state: _thread_in_native
 - org.lwjgl.util.lmdb.LMDB.nmdb_cursor_close(long) @bci=0 (Compiled frame; information may be imprecise)
 - org.lwjgl.util.lmdb.LMDB.mdb_cursor_close(long) @bci=12, line=1765 (Compiled frame)
 - org.eclipse.rdf4j.sail.lmdb.LmdbRecordIterator.close() @bci=11, line=192 (Compiled frame)
 - org.eclipse.rdf4j.sail.lmdb.LmdbStatementIterator.handleClose() @bci=8, line=87 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(org.eclipse.rdf4j.common.iteration.Iteration) @bci=11, line=189 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.UnionIteration.handleClose() @bci=98, line=113 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.DualUnionIteration.close() @bci=28, line=142 (Interpreted frame)
 - org.eclipse.rdf4j.sail.TripleSourceIterationWrapper.close() @bci=16, line=114 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.impl.evaluationsteps.StatementPatternQueryEvaluationStep$JoinStatementWithBindingSetIterator.close() @bci=16, line=570 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.JoinIterator.handleClose() @bci=8, line=104 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.JoinIterator.handleClose() @bci=17, line=106 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.JoinIterator.handleClose() @bci=17, line=106 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.JoinIterator.handleClose() @bci=17, line=106 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.JoinIterator.handleClose() @bci=17, line=106 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.LeftJoinIterator.handleClose() @bci=8, line=162 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(org.eclipse.rdf4j.common.iteration.Iteration) @bci=11, line=189 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.ConvertingIteration.handleClose() @bci=8, line=112 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.query.algebra.evaluation.iterator.OrderIterator.handleClose() @bci=8, line=358 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(org.eclipse.rdf4j.common.iteration.Iteration) @bci=11, line=189 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.IterationWrapper.handleClose() @bci=8, line=127 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.FilterIteration.handleClose() @bci=1, line=105 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(org.eclipse.rdf4j.common.iteration.Iteration) @bci=11, line=189 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.IterationWrapper.handleClose() @bci=8, line=127 (Compiled frame)
 - org.eclipse.rdf4j.sail.base.SailClosingIteration.handleClose() @bci=1, line=120 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(org.eclipse.rdf4j.common.iteration.Iteration) @bci=11, line=189 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.IterationWrapper.handleClose() @bci=8, line=127 (Compiled frame)
 - org.eclipse.rdf4j.sail.helpers.SailBaseIteration.handleClose() @bci=1, line=55 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.sail.helpers.CleanerIteration$CleanableState.close() @bci=9, line=82 (Compiled frame)
 - org.eclipse.rdf4j.sail.helpers.CleanerIteration.close() @bci=4, line=36 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(org.eclipse.rdf4j.common.iteration.Iteration) @bci=11, line=189 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.IterationWrapper.handleClose() @bci=8, line=127 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.TimeLimitIteration.handleClose() @bci=9, line=99 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close() @bci=13, line=52 (Compiled frame)
 - org.eclipse.rdf4j.common.iteration.TimeLimitIteration.interrupt() @bci=9, line=138 (Interpreted frame)
 - org.eclipse.rdf4j.common.iteration.InterruptTask.run() @bci=16, line=33 (Interpreted frame)
 - java.util.TimerThread.mainLoop() @bci=221, line=556 (Compiled frame)
 - java.util.TimerThread.run() @bci=1, line=506 (Interpreted frame)

@kenwenzel kenwenzel changed the title TimeLimitIteration calls close() asynchronous from another thread TimeLimitIteration calls close() asynchronously from another thread Oct 10, 2023
@kenwenzel
Copy link
Contributor Author

Potential fix for LmdbRecordIterator:

	@Override
	public void close() throws IOException {
		if (!closed) {
			long stamp;
			if (ownerThread != Thread.currentThread()) {
				stamp = txnLock.writeLock();
			} else {
				stamp = 0;
			}
			try {
				if (!closed) {
					mdb_cursor_close(cursor);
					pool.free(keyData);
					pool.free(valueData);
					if (minKeyBuf != null) {
						pool.free(minKeyBuf);
					}
					if (maxKey != null) {
						pool.free(maxKeyBuf);
						pool.free(maxKey);
					}
				}
			} finally {
				closed = true;
				if (stamp != 0) {
					txnLock.unlockWrite(stamp);
				}
			}
		}
	}

The code uses the already existing txnLock and checks if the calling thread is different from the owner thread of the iterator. If this is the case then an exclusive write-lock is acquired.

@hmottestad
Copy link
Contributor

Are we sure that any other read or write locks will be released, or is there a chance that some other code would wait to release its lock until this iterator is closed?

@kenwenzel
Copy link
Contributor Author

kenwenzel commented Oct 11, 2023

The txnLock is only necessary to implement the auto-grow functionality. While growing the database no thread is allowed to read or write data. This lock does not depend on closing the iterator.

My colleague is currently running a load test and it seems that the above code fixes the problem and at least prevents the JVM from completely crashing.

Should we also think about changing the logic of TimeLimitIteration?

@kenwenzel
Copy link
Contributor Author

The problem is bigger than first anticipated. Closing the data set triggers flush on the SAIL store from the wrong thread.
I think that this could also be a problem for the NativeStore.

java.io.IOException: Invalid argument
	at org.eclipse.rdf4j.sail.lmdb.LmdbUtil.E(LmdbUtil.java:61) ~[bundleFile:?]
	at org.eclipse.rdf4j.sail.lmdb.TxnManager$Txn.setActive(TxnManager.java:235) ~[bundleFile:?]
	at org.eclipse.rdf4j.sail.lmdb.TxnManager.activate(TxnManager.java:135) ~[bundleFile:?]
	at org.eclipse.rdf4j.sail.lmdb.TripleStore.endTransaction(TripleStore.java:884) ~[bundleFile:?]
	at org.eclipse.rdf4j.sail.lmdb.TripleStore.commit(TripleStore.java:902) ~[bundleFile:?]
	at org.eclipse.rdf4j.sail.lmdb.LmdbSailStore$LmdbSailSink.flush(LmdbSailStore.java:452) [bundleFile:?]
	at org.eclipse.rdf4j.sail.base.SailSourceBranch.flush(SailSourceBranch.java:302) [bundleFile:?]
	at org.eclipse.rdf4j.sail.base.SailSourceBranch.autoFlush(SailSourceBranch.java:394) [bundleFile:?]
	at org.eclipse.rdf4j.sail.base.SailSourceBranch$2.close(SailSourceBranch.java:255) [bundleFile:?]
	at org.eclipse.rdf4j.sail.base.DelegatingSailDataset.close(DelegatingSailDataset.java:47) [bundleFile:?]
	at org.eclipse.rdf4j.sail.base.SailSourceBranch$2.close(SailSourceBranch.java:250) [bundleFile:?]
	at org.eclipse.rdf4j.sail.base.UnionSailDataset.close(UnionSailDataset.java:59) [bundleFile:?]
	at org.eclipse.rdf4j.sail.base.SailClosingIteration.handleClose(SailClosingIteration.java:127) [bundleFile:?]
	at org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close(AbstractCloseableIteration.java:52) [bundleFile:?]
	at org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(Iterations.java:189) [bundleFile:?]
	at org.eclipse.rdf4j.common.iteration.IterationWrapper.handleClose(IterationWrapper.java:127) [bundleFile:?]
	at org.eclipse.rdf4j.sail.helpers.SailBaseIteration.handleClose(SailBaseIteration.java:55) [bundleFile:?]
	at org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close(AbstractCloseableIteration.java:52) [bundleFile:?]
	at org.eclipse.rdf4j.sail.helpers.CleanerIteration$CleanableState.close(CleanerIteration.java:82) [bundleFile:?]
	at org.eclipse.rdf4j.sail.helpers.CleanerIteration.close(CleanerIteration.java:36) [bundleFile:?]
	at org.eclipse.rdf4j.common.iteration.Iterations.closeCloseable(Iterations.java:189) [bundleFile:?]
	at org.eclipse.rdf4j.common.iteration.IterationWrapper.handleClose(IterationWrapper.java:127) [bundleFile:?]
	at org.eclipse.rdf4j.common.iteration.TimeLimitIteration.handleClose(TimeLimitIteration.java:99) [bundleFile:?]
	at org.eclipse.rdf4j.common.iteration.AbstractCloseableIteration.close(AbstractCloseableIteration.java:52) [bundleFile:?]
	at org.eclipse.rdf4j.common.iteration.TimeLimitIteration.interrupt(TimeLimitIteration.java:138) [bundleFile:?]
	at org.eclipse.rdf4j.common.iteration.InterruptTask.run(InterruptTask.java:33) [bundleFile:?]
	at java.util.TimerThread.mainLoop(Timer.java:556) [?:?]
	at java.util.TimerThread.run(Timer.java:506) [?:?]

@hmottestad
Copy link
Contributor

hmottestad commented Oct 16, 2023

If I understand correctly, thread A created the iteration, it times out and is interrupted by the monitoring thread B. Thread B calls interrupt and also calls the close() method, which causes issues because it should be thread A that calls close(). Is that correct?

We could have thread B only interrupt thread A, and not close the iteration. For safety we could always have thread B close the iteration anyways after a specified amount of time.

So, thread A starts the iteration, it times out so thread B interrupts thread A. Thread B sleeps for 30 seconds (or until interrupted) and checks if the iteration is closed, if not then it closes the iteration. If things work out they way they are supposed to, then thread A is interrupted and calls close(), then interrupts thread B that wakes up and sees that the iteration is already closed and then does nothing.

@kenwenzel
Copy link
Contributor Author

If I understand correctly, thread A created the iteration, it times out and is interrupted by the monitoring thread B. Thread B calls interrupt and also calls the close() method, which causes issues because it should be thread A that calls close(). Is that correct?

Yes, that is correct. The interrupt method of TimeLimitIteration directly calls close on the underlying iteration.

We could have thread B only interrupt thread A, and not close the iteration. For safety we could always have thread B close the iteration anyways after a specified amount of time.

Yes, that should work. The method Thread.isInterrrupted() could be used to check this state from nested iterations.
If isInterrupted() is true then an InterruptedException or something like this could be thrown?!

So, thread A starts the iteration, it times out so thread B interrupts thread A. Thread B sleeps for 30 seconds (or until interrupted) and checks if the iteration is closed, if not then it closes the iteration. If things work out they way they are supposed to, then thread A is interrupted and calls close(), then interrupts thread B that wakes up and sees that the iteration is already closed and then does nothing.

Yes, this could work. It is only a problem with LMDB as only one thread may use a write transaction for example. If close() ends up calling flush() then an error like in the stacktrace above could be triggered.

@kenwenzel
Copy link
Contributor Author

IterationWrapper already checks Thread.currentThread().isInterrupted() within hasNext(), next() and remove().

The same logic could be added to LookAheadIteration. This should cover already many cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🐞 bug issue is a bug
Projects
None yet
Development

No branches or pull requests

2 participants