Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring query executor to ensure that transaction is released #2024

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
86 changes: 38 additions & 48 deletions LiteDB/Engine/Query/QueryExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,75 +63,65 @@ internal BsonDataReader ExecuteQuery(bool executionPlan)

IEnumerable<BsonDocument> RunQuery()
{
var snapshot = transaction.CreateSnapshot(_query.ForUpdate ? LockMode.Write : LockMode.Read, _collection, false);

// no collection, no documents
if (snapshot.CollectionPage == null && _source == null)
try
{
// if query use Source (*) need runs with empty data source
if (_query.Select.UseSource)
{
yield return _query.Select.ExecuteScalar(_pragmas.Collation).AsDocument;
}
var snapshot = transaction.CreateSnapshot(_query.ForUpdate ? LockMode.Write : LockMode.Read, _collection, false);

transaction.OpenCursors.Remove(_cursor);

if (isNew)
// no collection, no documents
if (snapshot.CollectionPage == null && _source == null)
{
_monitor.ReleaseTransaction(transaction);
// if query use Source (*) need runs with empty data source
if (_query.Select.UseSource)
{
yield return _query.Select.ExecuteScalar(_pragmas.Collation).AsDocument;
}
yield break;
}

yield break;
}
// execute optimization before run query (will fill missing _query properties instance)
var optimizer = new QueryOptimization(snapshot, _query, _source, _pragmas.Collation);

// execute optimization before run query (will fill missing _query properties instance)
var optimizer = new QueryOptimization(snapshot, _query, _source, _pragmas.Collation);
var queryPlan = optimizer.ProcessQuery();

var queryPlan = optimizer.ProcessQuery();
// if execution is just to get explan plan, return as single document result
if (executionPlan)
{
yield return queryPlan.GetExecutionPlan();
yield break;
}

// if execution is just to get explan plan, return as single document result
if (executionPlan)
{
yield return queryPlan.GetExecutionPlan();
// get node list from query - distinct by dataBlock (avoid duplicate)
var nodes = queryPlan.Index.Run(snapshot.CollectionPage, new IndexService(snapshot, _pragmas.Collation));

transaction.OpenCursors.Remove(_cursor);
// get current query pipe: normal or groupby pipe
var pipe = queryPlan.GetPipe(transaction, snapshot, _sortDisk, _pragmas);

if (isNew)
try
{
_monitor.ReleaseTransaction(transaction);
}
// start cursor elapsed timer
_cursor.Elapsed.Start();

yield break;
}
// call safepoint just before return each document
foreach (var doc in pipe.Pipe(nodes, queryPlan))
{
_cursor.Fetched++;
_cursor.Elapsed.Stop();

// get node list from query - distinct by dataBlock (avoid duplicate)
var nodes = queryPlan.Index.Run(snapshot.CollectionPage, new IndexService(snapshot, _pragmas.Collation));
yield return doc;

// get current query pipe: normal or groupby pipe
var pipe = queryPlan.GetPipe(transaction, snapshot, _sortDisk, _pragmas);
if (transaction.State != TransactionState.Active) throw new LiteException(0, $"There is no more active transaction for this cursor: {_cursor.Query.ToSQL(_cursor.Collection)}");

try
{
// start cursor elapsed timer
_cursor.Elapsed.Start();

// call safepoint just before return each document
foreach (var doc in pipe.Pipe(nodes, queryPlan))
_cursor.Elapsed.Start();
}
}
finally
{
_cursor.Fetched++;
// stop cursor elapsed
_cursor.Elapsed.Stop();

yield return doc;

if (transaction.State != TransactionState.Active) throw new LiteException(0, $"There is no more active transaction for this cursor: {_cursor.Query.ToSQL(_cursor.Collection)}");

_cursor.Elapsed.Start();
}
}
finally
{
// stop cursor elapsed
_cursor.Elapsed.Stop();

transaction.OpenCursors.Remove(_cursor);

Expand Down