Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #2435 Transactions are not removed in LiteDB 5.0.18 #2436

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
146 changes: 70 additions & 76 deletions LiteDB/Engine/Query/QueryExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,103 +76,97 @@ internal BsonDataReader ExecuteQuery(bool executionPlan)

IEnumerable<BsonDocument> RunQuery()
{
var snapshot = transaction.CreateSnapshot(_query.ForUpdate ? LockMode.Write : LockMode.Read, _collection, false);

// no collection, no documents
if (snapshot.CollectionPage == null && _source == null)
try
{
// if query use Source (*) need runs with empty data source
if (_query.Select.UseSource)
{
yield return _query.Select.ExecuteScalar(_pragmas.Collation).AsDocument;
}
var snapshot = transaction.CreateSnapshot(_query.ForUpdate ? LockMode.Write : LockMode.Read, _collection, false);

transaction.OpenCursors.Remove(_cursor);

if (isNew)
// no collection, no documents
if (snapshot.CollectionPage == null && _source == null)
{
_monitor.ReleaseTransaction(transaction);
// if query use Source (*) need runs with empty data source
if (_query.Select.UseSource)
{
yield return _query.Select.ExecuteScalar(_pragmas.Collation).AsDocument;
}
yield break;
}

yield break;
}

// execute optimization before run query (will fill missing _query properties instance)
var optimizer = new QueryOptimization(snapshot, _query, _source, _pragmas.Collation);
// execute optimization before run query (will fill missing _query properties instance)
var optimizer = new QueryOptimization(snapshot, _query, _source, _pragmas.Collation);

var queryPlan = optimizer.ProcessQuery();
var queryPlan = optimizer.ProcessQuery();

var plan = queryPlan.GetExecutionPlan();
var plan = queryPlan.GetExecutionPlan();

// if execution is just to get explan plan, return as single document result
if (executionPlan)
{
yield return queryPlan.GetExecutionPlan();

transaction.OpenCursors.Remove(_cursor);

if (isNew)
// if execution is just to get explan plan, return as single document result
if (executionPlan)
{
_monitor.ReleaseTransaction(transaction);
yield return queryPlan.GetExecutionPlan();
yield break;
}

yield break;
}

// get node list from query - distinct by dataBlock (avoid duplicate)
var nodes = queryPlan.Index.Run(snapshot.CollectionPage, new IndexService(snapshot, _pragmas.Collation, _disk.MAX_ITEMS_COUNT));
// get node list from query - distinct by dataBlock (avoid duplicate)
var nodes = queryPlan.Index.Run(snapshot.CollectionPage, new IndexService(snapshot, _pragmas.Collation, _disk.MAX_ITEMS_COUNT));

// get current query pipe: normal or groupby pipe
var pipe = queryPlan.GetPipe(transaction, snapshot, _sortDisk, _pragmas, _disk.MAX_ITEMS_COUNT);
// get current query pipe: normal or groupby pipe
var pipe = queryPlan.GetPipe(transaction, snapshot, _sortDisk, _pragmas, _disk.MAX_ITEMS_COUNT);

// start cursor elapsed timer
_cursor.Elapsed.Start();

using (var enumerator = pipe.Pipe(nodes, queryPlan).GetEnumerator())
{
var read = false;
// start cursor elapsed timer
_cursor.Elapsed.Start();

try
{
read = enumerator.MoveNext();
}
catch (Exception ex)
{
_state.Handle(ex);
throw ex;
}

while (read)
{
_cursor.Fetched++;
_cursor.Elapsed.Stop();

yield return enumerator.Current;

if (transaction.State != TransactionState.Active) throw new LiteException(0, $"There is no more active transaction for this cursor: {_cursor.Query.ToSQL(_cursor.Collection)}");

_cursor.Elapsed.Start();

try
{
read = enumerator.MoveNext();
}
catch (Exception ex)
using (var enumerator = pipe.Pipe(nodes, queryPlan).GetEnumerator())
{
_state.Handle(ex);
throw ex;
var read = false;

try
{
read = enumerator.MoveNext();
}
catch (Exception ex)
{
_state.Handle(ex);
throw ex;
}

while (read)
{
_cursor.Fetched++;
_cursor.Elapsed.Stop();

yield return enumerator.Current;

if (transaction.State != TransactionState.Active) throw new LiteException(0, $"There is no more active transaction for this cursor: {_cursor.Query.ToSQL(_cursor.Collection)}");

_cursor.Elapsed.Start();

try
{
read = enumerator.MoveNext();
}
catch (Exception ex)
{
_state.Handle(ex);
throw ex;
}
}
}
}
finally
{
// stop cursor elapsed
_cursor.Elapsed.Stop();
}
}

// stop cursor elapsed
_cursor.Elapsed.Stop();

transaction.OpenCursors.Remove(_cursor);

if (isNew)
finally
{
_monitor.ReleaseTransaction(transaction);
transaction.OpenCursors.Remove(_cursor);

if (isNew)
{
_monitor.ReleaseTransaction(transaction);
}
}
};
}
Expand Down