Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

6 1 #2470

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from
Draft

6 1 #2470

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions LiteDB/Client/Database/LiteDatabase.cs
Expand Up @@ -25,6 +25,11 @@ public partial class LiteDatabase : ILiteDatabase
/// </summary>
public BsonMapper Mapper => _mapper;

/// <summary>
/// Get current instance of ILiteEngine used in this database instance
/// </summary>
public ILiteEngine Engine => _engine;

#endregion

#region Ctor
Expand Down
4 changes: 2 additions & 2 deletions LiteDB/Client/Mapper/BsonMapper.cs
Expand Up @@ -179,7 +179,7 @@ public EntityBuilder<T> Entity<T>()

var expr = visitor.Resolve(typeof(K) == typeof(bool));

LOG($"`{predicate.ToString()}` -> `{expr.Source}`", "LINQ");
Logging.LOG($"`{predicate.ToString()}` -> `{expr.Source}`", "LINQ");

return expr;
}
Expand All @@ -193,7 +193,7 @@ public EntityBuilder<T> Entity<T>()

var expr = visitor.Resolve(false);

LOG($"`{predicate.ToString()}` -> `{expr.Source}`", "LINQ");
Logging.LOG($"`{predicate.ToString()}` -> `{expr.Source}`", "LINQ");

return expr;
}
Expand Down
5 changes: 5 additions & 0 deletions LiteDB/Client/Shared/SharedEngine.cs
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
#if NETFRAMEWORK
using System.Security.AccessControl;
using System.Security.Principal;
Expand Down Expand Up @@ -86,6 +87,10 @@ private void CloseDatabase()
// Release Mutex on every call to close DB.
_mutex.ReleaseMutex();
}

public bool IsDisposed => _engine.IsDisposed;

public Task<bool> Closed => _engine.Closed;

#region Transaction Operations

Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Client/SqlParser/SqlParser.cs
Expand Up @@ -29,7 +29,7 @@ public IBsonDataReader Execute()
{
var ahead = _tokenizer.LookAhead().Expect(TokenType.Word);

LOG($"executing `{ahead.Value.ToUpper()}`", "SQL");
Logging.LOG($"executing `{ahead.Value.ToUpper()}`", "SQL");

switch (ahead.Value.ToUpper())
{
Expand Down
9 changes: 3 additions & 6 deletions LiteDB/Engine/Disk/DiskService.cs
@@ -1,10 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static LiteDB.Constants;

namespace LiteDB.Engine
Expand All @@ -29,7 +26,7 @@ internal class DiskService : IDisposable
private long _logLength;

public DiskService(
EngineSettings settings,
EngineSettings settings,
EngineState state,
int[] memorySegmentSizes)
{
Expand All @@ -52,7 +49,7 @@ internal class DiskService : IDisposable
// create new database if not exist yet
if (isNew)
{
LOG($"creating new database: '{Path.GetFileName(_dataFactory.Name)}'", "DISK");
Logging.LOG($"creating new database: '{Path.GetFileName(_dataFactory.Name)}'", "DISK");

this.Initialize(_dataPool.Writer, settings.Collation, settings.InitialSize);
}
Expand Down Expand Up @@ -261,7 +258,7 @@ public IEnumerable<PageBuffer> ReadFull(FileOrigin origin)

var bytesRead = stream.Read(buffer, 0, PAGE_SIZE);

ENSURE(bytesRead == PAGE_SIZE, $"ReadFull must read PAGE_SIZE bytes [{bytesRead}]");
ENSURE(bytesRead == PAGE_SIZE, "ReadFull must read PAGE_SIZE bytes [{0}]", bytesRead);

yield return new PageBuffer(buffer, 0, 0)
{
Expand Down
41 changes: 29 additions & 12 deletions LiteDB/Engine/Disk/DiskWriterQueue.cs
Expand Up @@ -8,6 +8,8 @@

namespace LiteDB.Engine
{
using LiteDB.Utils.Extensions;

/// <summary>
/// Implement disk write queue and async writer thread - used only for write on LOG file
/// [ThreadSafe]
Expand All @@ -23,7 +25,7 @@ internal class DiskWriterQueue : IDisposable

private readonly ConcurrentQueue<PageBuffer> _queue = new ConcurrentQueue<PageBuffer>();
private readonly object _queueSync = new object();
private readonly AsyncManualResetEvent _queueHasItems = new AsyncManualResetEvent();
private readonly ManualResetEventSlim _queueHasItems = new ManualResetEventSlim();
private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true);

private Exception _exception = null; // store last exception in async running task
Expand All @@ -48,7 +50,7 @@ public void EnqueuePage(PageBuffer page)
ENSURE(page.Origin == FileOrigin.Log, "async writer must use only for Log file");

// throw last exception that stop running queue
if (_exception != null) throw _exception;
if (_exception != null) throw new LiteException(0, _exception, "DiskWriterQueue error");

lock (_queueSync)
{
Expand All @@ -58,7 +60,7 @@ public void EnqueuePage(PageBuffer page)

if (_task == null)
{
_task = Task.Factory.StartNew(ExecuteQueue, TaskCreationOptions.LongRunning);
_task = Task.Factory.StartNew(ExecuteQueue, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
}
}
Expand Down Expand Up @@ -100,14 +102,23 @@ private async Task ExecuteQueue()

_stream.FlushToDisk();

await _queueHasItems.WaitAsync();
await _queueHasItems.WaitHandle.WaitAsync().ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
_state.Handle(ex);
_state.Handle(LiteException.InvalidDatafileState(ex, "DiskWriterQueue failed"));
Comment on lines -109 to +111
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make DiskWriterQueue exceptions "fatal" - these can't be survived and it is best to tear down the entire engine and let the outside observer to restart the engine (possibly with a rebuild)

_exception = ex;
ExhaustQueue();
}
}

private void ExhaustQueue()
{
while (_queue.TryDequeue(out var page))
{
page.Release();
}
}
Comment on lines +113 to 123
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If anything fails here we need to drop this object - this means all pages must be released (including the one that caused the failure - see the finally case below)


Expand All @@ -117,22 +128,28 @@ private void WritePageToStream(PageBuffer page)

ENSURE(page.ShareCounter > 0, "page must be shared at least 1");

// set stream position according to page
_stream.Position = page.Position;
try
{
// set stream position according to page
_stream.Position = page.Position;

#if DEBUG
_state.SimulateDiskWriteFail?.Invoke(page);
_state.SimulateDiskWriteFail?.Invoke(page);
#endif

_stream.Write(page.Array, page.Offset, PAGE_SIZE);
_stream.Write(page.Array, page.Offset, PAGE_SIZE);

// release page here (no page use after this)
page.Release();
}
finally
{
// release page here (no page use after this)
page.Release();
}
}

public void Dispose()
{
LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK");
Logging.LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK");

_shouldClose = true;
_queueHasItems.Set(); // unblock the running loop in case there are no items
Expand Down
4 changes: 2 additions & 2 deletions LiteDB/Engine/Disk/MemoryCache.cs
Expand Up @@ -343,7 +343,7 @@ private void Extend()
}
}

LOG($"re-using cache pages (flushing {_free.Count} pages)", "CACHE");
Logging.LOG($"re-using cache pages (flushing {_free.Count} pages)", "CACHE");
}
else
{
Expand All @@ -359,7 +359,7 @@ private void Extend()

_extends++;

LOG($"extending memory usage: (segments: {_extends})", "CACHE");
Logging.LOG($"extending memory usage: (segments: {_extends})", "CACHE");
}
}

Expand Down
8 changes: 4 additions & 4 deletions LiteDB/Engine/Disk/Streams/AesStream.cs
Expand Up @@ -22,7 +22,7 @@ public class AesStream : Stream

private readonly byte[] _decryptedZeroes = new byte[16];

private static readonly byte[] _emptyContent = new byte[PAGE_SIZE - 1 - 16]; // 1 for aes indicator + 16 for salt
private static readonly byte[] _emptyContent = new byte[PAGE_SIZE - 1 - 16]; // 1 for aes indicator + 16 for salt

public byte[] Salt { get; }

Expand Down Expand Up @@ -111,7 +111,7 @@ public AesStream(string password, Stream stream)
// check whether bytes 32 to 64 is empty. This indicates LiteDb was unable to write encrypted 1s during last attempt.
_stream.Read(checkBuffer, 0, checkBuffer.Length);
isNew = checkBuffer.All(x => x == 0);

// reset checkBuffer and stream position
Array.Clear(checkBuffer, 0, checkBuffer.Length);
_stream.Position = 32;
Expand Down Expand Up @@ -160,7 +160,7 @@ public AesStream(string password, Stream stream)
/// </summary>
public override int Read(byte[] array, int offset, int count)
{
ENSURE(this.Position % PAGE_SIZE == 0, $"AesRead: position must be in PAGE_SIZE module. Position={this.Position}, File={_name}");
ENSURE(this.Position % PAGE_SIZE == 0, "AesRead: position must be in PAGE_SIZE module. Position={0}, File={1}", this.Position, _name);

var r = _reader.Read(array, offset, count);

Expand All @@ -181,7 +181,7 @@ public override int Read(byte[] array, int offset, int count)
public override void Write(byte[] array, int offset, int count)
{
ENSURE(count == PAGE_SIZE || count == 1, "buffer size must be PAGE_SIZE");
ENSURE(this.Position == HeaderPage.P_INVALID_DATAFILE_STATE || this.Position % PAGE_SIZE == 0, $"AesWrite: position must be in PAGE_SIZE module. Position={this.Position}, File={_name}");
ENSURE(this.Position == HeaderPage.P_INVALID_DATAFILE_STATE || this.Position % PAGE_SIZE == 0, "AesWrite: position must be in PAGE_SIZE module. Position={0}, File={1}", this.Position, _name);

_writer.Write(array, offset, count);
}
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Collection.cs
Expand Up @@ -33,7 +33,7 @@ public bool DropCollection(string name)
// if collection do not exist, just exit
if (snapshot.CollectionPage == null) return false;

LOG($"drop collection `{name}`", "COMMAND");
Logging.LOG($"drop collection `{name}`", "COMMAND");

// call drop collection service
snapshot.DropCollection(transaction.Safepoint);
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Delete.cs
Expand Up @@ -24,7 +24,7 @@ public int Delete(string collection, IEnumerable<BsonValue> ids)

if (collectionPage == null) return 0;

LOG($"delete `{collection}`", "COMMAND");
Logging.LOG($"delete `{collection}`", "COMMAND");

var count = 0;
var pk = collectionPage.PK;
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Index.cs
Expand Up @@ -44,7 +44,7 @@ public bool EnsureIndex(string collection, string name, BsonExpression expressio
return false;
}

LOG($"create index `{collection}.{name}`", "COMMAND");
Logging.LOG($"create index `{collection}.{name}`", "COMMAND");

// create index head
var index = indexer.CreateIndex(name, expression.Source, unique);
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Insert.cs
Expand Up @@ -24,7 +24,7 @@ public int Insert(string collection, IEnumerable<BsonDocument> docs, BsonAutoId
var indexer = new IndexService(snapshot, _header.Pragmas.Collation, _disk.MAX_ITEMS_COUNT);
var data = new DataService(snapshot, _disk.MAX_ITEMS_COUNT);

LOG($"insert `{collection}`", "COMMAND");
Logging.LOG($"insert `{collection}`", "COMMAND");

foreach (var doc in docs)
{
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Transaction.cs
Expand Up @@ -22,7 +22,7 @@ public bool BeginTrans()

if (transacion.OpenCursors.Count > 0) throw new LiteException(0, "This thread contains an open cursors/query. Close cursors before Begin()");

LOG(isNew, $"begin trans", "COMMAND");
Logging.LOG(isNew, $"begin trans", "COMMAND");

return isNew;
}
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Update.cs
Expand Up @@ -25,7 +25,7 @@ public int Update(string collection, IEnumerable<BsonDocument> docs)

if (collectionPage == null) return 0;

LOG($"update `{collection}`", "COMMAND");
Logging.LOG($"update `{collection}`", "COMMAND");

foreach (var doc in docs)
{
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Upsert.cs
Expand Up @@ -24,7 +24,7 @@ public int Upsert(string collection, IEnumerable<BsonDocument> docs, BsonAutoId
var data = new DataService(snapshot, _disk.MAX_ITEMS_COUNT);
var count = 0;

LOG($"upsert `{collection}`", "COMMAND");
Logging.LOG($"upsert `{collection}`", "COMMAND");

foreach (var doc in docs)
{
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/EngineState.cs
Expand Up @@ -37,7 +37,7 @@ public void Validate()

public bool Handle(Exception ex)
{
LOG(ex.Message, "ERROR");
Logging.LOG(ex, "ERROR");

if (ex is IOException ||
(ex is LiteException lex && lex.ErrorCode == LiteException.INVALID_DATAFILE_STATE))
Expand Down