Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

6 1 #2470

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from
Draft

6 1 #2470

Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions LiteDB/Client/Mapper/BsonMapper.cs
Expand Up @@ -179,7 +179,7 @@ public EntityBuilder<T> Entity<T>()

var expr = visitor.Resolve(typeof(K) == typeof(bool));

LOG($"`{predicate.ToString()}` -> `{expr.Source}`", "LINQ");
Logging.LOG($"`{predicate.ToString()}` -> `{expr.Source}`", "LINQ");

return expr;
}
Expand All @@ -193,7 +193,7 @@ public EntityBuilder<T> Entity<T>()

var expr = visitor.Resolve(false);

LOG($"`{predicate.ToString()}` -> `{expr.Source}`", "LINQ");
Logging.LOG($"`{predicate.ToString()}` -> `{expr.Source}`", "LINQ");

return expr;
}
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Client/SqlParser/SqlParser.cs
Expand Up @@ -29,7 +29,7 @@ public IBsonDataReader Execute()
{
var ahead = _tokenizer.LookAhead().Expect(TokenType.Word);

LOG($"executing `{ahead.Value.ToUpper()}`", "SQL");
Logging.LOG($"executing `{ahead.Value.ToUpper()}`", "SQL");

switch (ahead.Value.ToUpper())
{
Expand Down
9 changes: 3 additions & 6 deletions LiteDB/Engine/Disk/DiskService.cs
@@ -1,10 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static LiteDB.Constants;

namespace LiteDB.Engine
Expand All @@ -29,7 +26,7 @@ internal class DiskService : IDisposable
private long _logLength;

public DiskService(
EngineSettings settings,
EngineSettings settings,
EngineState state,
int[] memorySegmentSizes)
{
Expand All @@ -52,7 +49,7 @@ internal class DiskService : IDisposable
// create new database if not exist yet
if (isNew)
{
LOG($"creating new database: '{Path.GetFileName(_dataFactory.Name)}'", "DISK");
Logging.LOG($"creating new database: '{Path.GetFileName(_dataFactory.Name)}'", "DISK");

this.Initialize(_dataPool.Writer, settings.Collation, settings.InitialSize);
}
Expand Down Expand Up @@ -261,7 +258,7 @@ public IEnumerable<PageBuffer> ReadFull(FileOrigin origin)

var bytesRead = stream.Read(buffer, 0, PAGE_SIZE);

ENSURE(bytesRead == PAGE_SIZE, $"ReadFull must read PAGE_SIZE bytes [{bytesRead}]");
ENSURE(bytesRead == PAGE_SIZE, "ReadFull must read PAGE_SIZE bytes [{0}]", bytesRead);

yield return new PageBuffer(buffer, 0, 0)
{
Expand Down
14 changes: 8 additions & 6 deletions LiteDB/Engine/Disk/DiskWriterQueue.cs
Expand Up @@ -8,6 +8,8 @@

namespace LiteDB.Engine
{
using LiteDB.Utils.Extensions;

/// <summary>
/// Implement disk write queue and async writer thread - used only for write on LOG file
/// [ThreadSafe]
Expand All @@ -23,7 +25,7 @@ internal class DiskWriterQueue : IDisposable

private readonly ConcurrentQueue<PageBuffer> _queue = new ConcurrentQueue<PageBuffer>();
private readonly object _queueSync = new object();
private readonly AsyncManualResetEvent _queueHasItems = new AsyncManualResetEvent();
private readonly ManualResetEventSlim _queueHasItems = new ManualResetEventSlim();
private readonly ManualResetEventSlim _queueIsEmpty = new ManualResetEventSlim(true);

private Exception _exception = null; // store last exception in async running task
Expand All @@ -48,7 +50,7 @@ public void EnqueuePage(PageBuffer page)
ENSURE(page.Origin == FileOrigin.Log, "async writer must use only for Log file");

// throw last exception that stop running queue
if (_exception != null) throw _exception;
if (_exception != null) throw new LiteException(0, _exception, "DiskWriterQueue error");

lock (_queueSync)
{
Expand All @@ -58,7 +60,7 @@ public void EnqueuePage(PageBuffer page)

if (_task == null)
{
_task = Task.Factory.StartNew(ExecuteQueue, TaskCreationOptions.LongRunning);
_task = Task.Factory.StartNew(ExecuteQueue, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
}
}
Expand Down Expand Up @@ -100,13 +102,13 @@ private async Task ExecuteQueue()

_stream.FlushToDisk();

await _queueHasItems.WaitAsync();
await _queueHasItems.WaitHandle.WaitAsync().ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
_state.Handle(ex);
_state.Handle(LiteException.InvalidDatafileState(ex, "DiskWriterQueue failed"));
Comment on lines -109 to +111
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make DiskWriterQueue exceptions "fatal" - these can't be survived and it is best to tear down the entire engine and let the outside observer to restart the engine (possibly with a rebuild)

_exception = ex;
}
}
Expand All @@ -132,7 +134,7 @@ private void WritePageToStream(PageBuffer page)

public void Dispose()
{
LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK");
Logging.LOG($"disposing disk writer queue (with {_queue.Count} pages in queue)", "DISK");

_shouldClose = true;
_queueHasItems.Set(); // unblock the running loop in case there are no items
Expand Down
4 changes: 2 additions & 2 deletions LiteDB/Engine/Disk/MemoryCache.cs
Expand Up @@ -343,7 +343,7 @@ private void Extend()
}
}

LOG($"re-using cache pages (flushing {_free.Count} pages)", "CACHE");
Logging.LOG($"re-using cache pages (flushing {_free.Count} pages)", "CACHE");
}
else
{
Expand All @@ -359,7 +359,7 @@ private void Extend()

_extends++;

LOG($"extending memory usage: (segments: {_extends})", "CACHE");
Logging.LOG($"extending memory usage: (segments: {_extends})", "CACHE");
}
}

Expand Down
8 changes: 4 additions & 4 deletions LiteDB/Engine/Disk/Streams/AesStream.cs
Expand Up @@ -22,7 +22,7 @@ public class AesStream : Stream

private readonly byte[] _decryptedZeroes = new byte[16];

private static readonly byte[] _emptyContent = new byte[PAGE_SIZE - 1 - 16]; // 1 for aes indicator + 16 for salt
private static readonly byte[] _emptyContent = new byte[PAGE_SIZE - 1 - 16]; // 1 for aes indicator + 16 for salt

public byte[] Salt { get; }

Expand Down Expand Up @@ -111,7 +111,7 @@ public AesStream(string password, Stream stream)
// check whether bytes 32 to 64 is empty. This indicates LiteDb was unable to write encrypted 1s during last attempt.
_stream.Read(checkBuffer, 0, checkBuffer.Length);
isNew = checkBuffer.All(x => x == 0);

// reset checkBuffer and stream position
Array.Clear(checkBuffer, 0, checkBuffer.Length);
_stream.Position = 32;
Expand Down Expand Up @@ -160,7 +160,7 @@ public AesStream(string password, Stream stream)
/// </summary>
public override int Read(byte[] array, int offset, int count)
{
ENSURE(this.Position % PAGE_SIZE == 0, $"AesRead: position must be in PAGE_SIZE module. Position={this.Position}, File={_name}");
ENSURE(this.Position % PAGE_SIZE == 0, "AesRead: position must be in PAGE_SIZE module. Position={0}, File={1}", this.Position, _name);

var r = _reader.Read(array, offset, count);

Expand All @@ -181,7 +181,7 @@ public override int Read(byte[] array, int offset, int count)
public override void Write(byte[] array, int offset, int count)
{
ENSURE(count == PAGE_SIZE || count == 1, "buffer size must be PAGE_SIZE");
ENSURE(this.Position == HeaderPage.P_INVALID_DATAFILE_STATE || this.Position % PAGE_SIZE == 0, $"AesWrite: position must be in PAGE_SIZE module. Position={this.Position}, File={_name}");
ENSURE(this.Position == HeaderPage.P_INVALID_DATAFILE_STATE || this.Position % PAGE_SIZE == 0, "AesWrite: position must be in PAGE_SIZE module. Position={0}, File={1}", this.Position, _name);

_writer.Write(array, offset, count);
}
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Collection.cs
Expand Up @@ -33,7 +33,7 @@ public bool DropCollection(string name)
// if collection do not exist, just exit
if (snapshot.CollectionPage == null) return false;

LOG($"drop collection `{name}`", "COMMAND");
Logging.LOG($"drop collection `{name}`", "COMMAND");

// call drop collection service
snapshot.DropCollection(transaction.Safepoint);
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Delete.cs
Expand Up @@ -24,7 +24,7 @@ public int Delete(string collection, IEnumerable<BsonValue> ids)

if (collectionPage == null) return 0;

LOG($"delete `{collection}`", "COMMAND");
Logging.LOG($"delete `{collection}`", "COMMAND");

var count = 0;
var pk = collectionPage.PK;
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Index.cs
Expand Up @@ -44,7 +44,7 @@ public bool EnsureIndex(string collection, string name, BsonExpression expressio
return false;
}

LOG($"create index `{collection}.{name}`", "COMMAND");
Logging.LOG($"create index `{collection}.{name}`", "COMMAND");

// create index head
var index = indexer.CreateIndex(name, expression.Source, unique);
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Insert.cs
Expand Up @@ -24,7 +24,7 @@ public int Insert(string collection, IEnumerable<BsonDocument> docs, BsonAutoId
var indexer = new IndexService(snapshot, _header.Pragmas.Collation, _disk.MAX_ITEMS_COUNT);
var data = new DataService(snapshot, _disk.MAX_ITEMS_COUNT);

LOG($"insert `{collection}`", "COMMAND");
Logging.LOG($"insert `{collection}`", "COMMAND");

foreach (var doc in docs)
{
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Transaction.cs
Expand Up @@ -22,7 +22,7 @@ public bool BeginTrans()

if (transacion.OpenCursors.Count > 0) throw new LiteException(0, "This thread contains an open cursors/query. Close cursors before Begin()");

LOG(isNew, $"begin trans", "COMMAND");
Logging.LOG(isNew, $"begin trans", "COMMAND");

return isNew;
}
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Update.cs
Expand Up @@ -25,7 +25,7 @@ public int Update(string collection, IEnumerable<BsonDocument> docs)

if (collectionPage == null) return 0;

LOG($"update `{collection}`", "COMMAND");
Logging.LOG($"update `{collection}`", "COMMAND");

foreach (var doc in docs)
{
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/Engine/Upsert.cs
Expand Up @@ -24,7 +24,7 @@ public int Upsert(string collection, IEnumerable<BsonDocument> docs, BsonAutoId
var data = new DataService(snapshot, _disk.MAX_ITEMS_COUNT);
var count = 0;

LOG($"upsert `{collection}`", "COMMAND");
Logging.LOG($"upsert `{collection}`", "COMMAND");

foreach (var doc in docs)
{
Expand Down
2 changes: 1 addition & 1 deletion LiteDB/Engine/EngineState.cs
Expand Up @@ -37,7 +37,7 @@ public void Validate()

public bool Handle(Exception ex)
{
LOG(ex.Message, "ERROR");
Logging.LOG(ex.Message, "ERROR");

if (ex is IOException ||
(ex is LiteException lex && lex.ErrorCode == LiteException.INVALID_DATAFILE_STATE))
Expand Down
34 changes: 17 additions & 17 deletions LiteDB/Engine/FileReader/FileReaderV8.cs
Expand Up @@ -117,7 +117,7 @@ public IEnumerable<BsonDocument> GetDocuments(string collection)
var colID = _collections[collection];

if (!_collectionsDataPages.ContainsKey(colID)) yield break;

var dataPages = _collectionsDataPages[colID];
var uniqueIDs = new HashSet<BsonValue>();

Expand Down Expand Up @@ -156,8 +156,8 @@ public IEnumerable<BsonDocument> GetDocuments(string collection)
// empty slot
if (position == 0) continue;

ENSURE(position > 0 && length > 0, $"Invalid footer ref position {position} with length {length}");
ENSURE(position + length < PAGE_SIZE, $"Invalid footer ref position {position} with length {length}");
ENSURE(position > 0 && length > 0, "Invalid footer ref position {0} with length {1}", position, length);
ENSURE(position + length < PAGE_SIZE, "Invalid footer ref position {0} with length {1}", position, length);

// get segment slice
var segment = buffer.Slice(position, length);
Expand All @@ -183,8 +183,8 @@ public IEnumerable<BsonDocument> GetDocuments(string collection)
var nextBuffer = nextPage.Value.Buffer;

// make page validations
ENSURE(nextPage.Value.PageType == PageType.Data, $"Invalid PageType (excepted Data, get {nextPage.Value.PageType})");
ENSURE(nextPage.Value.ColID == colID, $"Invalid ColID in this page (expected {colID}, get {nextPage.Value.ColID})");
ENSURE(nextPage.Value.PageType == PageType.Data, "Invalid PageType (excepted Data, get {0})", nextPage.Value.PageType);
ENSURE(nextPage.Value.ColID == colID, "Invalid ColID in this page (expected {0}, get {1})", colID, nextPage.Value.ColID);
ENSURE(nextPage.Value.ItemsCount > 0, "Page with no items count");

// read slot address
Expand All @@ -196,15 +196,15 @@ public IEnumerable<BsonDocument> GetDocuments(string collection)
length = nextBuffer.ReadUInt16(lengthAddr);

// empty slot
ENSURE(length > 0, $"Last DataBlock request a next extend to {nextBlock}, but this block are empty footer");
ENSURE(length > 0, "Last DataBlock request a next extend to {0}, but this block are empty footer", nextBlock);

// get segment slice
segment = nextBuffer.Slice(position, length);
extend = segment.ReadBool(DataBlock.P_EXTEND);
nextBlock = segment.ReadPageAddress(DataBlock.P_NEXT_BLOCK);
data = segment.Slice(DataBlock.P_BUFFER, segment.Count - DataBlock.P_BUFFER);

ENSURE(extend == true, $"Next datablock always be an extend. Invalid data block {nextBlock}");
ENSURE(extend == true, "Next datablock always be an extend. Invalid data block {0}", nextBlock);

// write data on memorystream

Expand All @@ -219,8 +219,8 @@ public IEnumerable<BsonDocument> GetDocuments(string collection)
var docResult = r.ReadDocument();
var id = docResult.Value["_id"];

ENSURE(!(id == BsonValue.Null || id == BsonValue.MinValue || id == BsonValue.MaxValue), $"Invalid _id value: {id}");
ENSURE(uniqueIDs.Contains(id) == false, $"Duplicated _id value: {id}");
ENSURE(!(id == BsonValue.Null || id == BsonValue.MinValue || id == BsonValue.MaxValue), "Invalid _id value: {0}", id);
ENSURE(uniqueIDs.Contains(id) == false, "Duplicated _id value: {0}", id);

uniqueIDs.Add(id);

Expand Down Expand Up @@ -279,7 +279,7 @@ private void LoadDataPages()
var header = this.ReadPage(0, out var pageInfo).GetValue();
var lastPageID = header.Buffer.ReadUInt32(HeaderPage.P_LAST_PAGE_ID); //TOFO: tentar não usar esse valor como referencia (varrer tudo)

ENSURE(lastPageID <= _maxPageID, $"LastPageID {lastPageID} should be less or equals to maxPageID {_maxPageID}");
ENSURE(lastPageID <= _maxPageID, "LastPageID {0} should be less or equals to maxPageID {1}", lastPageID, _maxPageID);

for (uint i = 0; i <= lastPageID; i++)
{
Expand Down Expand Up @@ -398,8 +398,8 @@ private void LoadIndexes()

position += 15; // head 5 bytes, tail 5 bytes, reserved 1 byte, freeIndexPageList 4 bytes

ENSURE(!string.IsNullOrEmpty(name), $"Index name can't be empty (collection {collection.Key} - index: {i})");
ENSURE(!string.IsNullOrEmpty(expr), $"Index expression can't be empty (collection {collection.Key} - index: {i})");
ENSURE(!string.IsNullOrEmpty(name), "Index name can't be empty (collection {0} - index: {1})", collection.Key, i);
ENSURE(!string.IsNullOrEmpty(expr), "Index expression can't be empty (collection {0} - index: {1})", collection.Key, i);

var indexInfo = new IndexInfo
{
Expand Down Expand Up @@ -481,7 +481,7 @@ private void LoadIndexMap()
pageInfo.PageID = pageID;
pageInfo.ColID = buffer.ReadUInt32(BasePage.P_COL_ID);

ENSURE(read == PAGE_SIZE, $"Page position {_logStream} read only than {read} bytes (instead {PAGE_SIZE})");
ENSURE(read == PAGE_SIZE, "Page position {0} read only than {1} bytes (instead {2})", _logStream, read, PAGE_SIZE);

var position = new PagePosition(pageID, currentPosition);

Expand Down Expand Up @@ -515,7 +515,7 @@ private void LoadIndexMap()
{
var mapIndexPages = transactions[transactionID];

// update
// update
foreach (var page in mapIndexPages)
{
_logIndexMap[page.PageID] = page.Position;
Expand All @@ -532,7 +532,7 @@ private Result<BasePage> ReadPage(uint pageID, out PageInfo pageInfo)

try
{
ENSURE(pageID <= _maxPageID, $"PageID: {pageID} should be less then or equals to maxPageID: {_maxPageID}");
ENSURE(pageID <= _maxPageID, "PageID: {0} should be less then or equals to maxPageID: {1}", pageID, _maxPageID);

var pageBuffer = new PageBuffer(new byte[PAGE_SIZE], 0, PAGE_SIZE);
Stream stream;
Expand All @@ -556,13 +556,13 @@ private Result<BasePage> ReadPage(uint pageID, out PageInfo pageInfo)

read = stream.Read(pageBuffer.Array, pageBuffer.Offset, pageBuffer.Count);

ENSURE(read == PAGE_SIZE, $"Page position {stream.Position} read only than {read} bytes (instead {PAGE_SIZE})");
ENSURE(read == PAGE_SIZE, "Page position {0} read only than {1} bytes (instead {2})", stream.Position, read, PAGE_SIZE);

var page = new BasePage(pageBuffer);

pageInfo.ColID = page.ColID;

ENSURE(page.PageID == pageID, $"Expect read pageID: {pageID} but header contains pageID: {page.PageID}");
ENSURE(page.PageID == pageID, "Expect read pageID: {0} but header contains pageID: {1}", pageID, page.PageID);

return page;
}
Expand Down