Skip to content

Commit

Permalink
RavenDB-17587 Incremental time-series import should overwrite existin…
Browse files Browse the repository at this point in the history
…g values
  • Loading branch information
karmeli87 authored and ppekrol committed Nov 28, 2021
1 parent 3884219 commit 6807792
Show file tree
Hide file tree
Showing 10 changed files with 427 additions and 35 deletions.
8 changes: 7 additions & 1 deletion src/Raven.Server/Documents/Handlers/TimeSeriesHandler.cs
Expand Up @@ -1107,6 +1107,12 @@ public override TransactionOperationsMerger.IReplayableCommandDto<TransactionOpe
}
}

private static readonly TimeSeriesStorage.AppendOptions AppendOptionsForSmuggler = new TimeSeriesStorage.AppendOptions
{
VerifyName = false,
FromSmuggler = true
};

internal class SmugglerTimeSeriesBatchCommand : TransactionOperationsMerger.MergedTransactionCommand
{
private readonly DocumentDatabase _database;
Expand Down Expand Up @@ -1144,7 +1150,7 @@ protected override long ExecuteCmd(DocumentsOperationContext context)
}

var values = item.Segment.YieldAllValues(context, context.Allocator, item.Baseline);
tss.AppendTimestamp(context, docId, item.Collection, item.Name, values, verifyName: false);
tss.AppendTimestamp(context, docId, item.Collection, item.Name, values, AppendOptionsForSmuggler);
}

changes += items.Count;
Expand Down
7 changes: 6 additions & 1 deletion src/Raven.Server/Documents/Patch/ScriptRunner.cs
Expand Up @@ -422,6 +422,11 @@ private JsValue GetStatsTimeSeries(JsValue document, JsValue name, JsValue[] arg
return tsStats;
}

private static readonly TimeSeriesStorage.AppendOptions AppendOptionsForScript = new TimeSeriesStorage.AppendOptions
{
AddNewNameToMetadata = false
};

private JsValue AppendTimeSeries(JsValue document, JsValue name, JsValue[] args)
{
AssertValidDatabaseContext("timeseries(doc, name).append");
Expand Down Expand Up @@ -500,7 +505,7 @@ private JsValue AppendTimeSeries(JsValue document, JsValue name, JsValue[] args)
CollectionName.GetCollectionName(doc),
timeSeries,
new[] { toAppend },
addNewNameToMetadata: false);
AppendOptionsForScript);

if (DebugMode)
{
Expand Down
Expand Up @@ -1236,9 +1236,15 @@ protected override long ExecuteCmd(DocumentsOperationContext context)
context.LastDatabaseChangeVector = ChangeVectorUtils.MergeVectors(databaseChangeVector, segment.ChangeVector);
continue;
}

var options = new TimeSeriesStorage.AppendOptions
{
VerifyName = false,
ChangeVectorFromReplication = segment.ChangeVector
};

var values = segment.Segment.YieldAllValues(context, context.Allocator, baseline);
var changeVector = tss.AppendTimestamp(context, docId, segment.Collection, segment.Name, values, segment.ChangeVector, verifyName: false);
var changeVector = tss.AppendTimestamp(context, docId, segment.Collection, segment.Name, values, options);
context.LastDatabaseChangeVector = ChangeVectorUtils.MergeVectors(changeVector, segment.ChangeVector);

break;
Expand Down
7 changes: 6 additions & 1 deletion src/Raven.Server/Documents/TimeSeries/TimeSeriesRollups.cs
Expand Up @@ -493,6 +493,11 @@ protected override long ExecuteCmd(DocumentsOperationContext context)
return RolledUp;
}

private readonly TimeSeriesStorage.AppendOptions _appendOptionsForRollups = new TimeSeriesStorage.AppendOptions
{
VerifyName = false
};

private void RollupOne(DocumentsOperationContext context, Table table, RollupState item, TimeSeriesPolicy policy, TimeSeriesCollectionConfiguration config)
{
var tss = context.DocumentDatabase.DocumentsStorage.TimeSeriesStorage;
Expand Down Expand Up @@ -589,7 +594,7 @@ private void RollupOne(DocumentsOperationContext context, Table table, RollupSta
}

var before = context.LastDatabaseChangeVector;
var after = tss.AppendTimestamp(context, item.DocId, item.Collection, intoTimeSeries, values, verifyName: false);
var after = tss.AppendTimestamp(context, item.DocId, item.Collection, intoTimeSeries, values, _appendOptionsForRollups);
if (before != after)
RolledUp++;
MarkForNextPolicyAfterRollup(context, table, item, policy, tss, rollupEnd);
Expand Down
75 changes: 47 additions & 28 deletions src/Raven.Server/Documents/TimeSeries/TimeSeriesStorage.cs
Expand Up @@ -845,6 +845,7 @@ public class TimeSeriesSegmentHolder : IDisposable
private readonly DocumentsOperationContext _context;
public readonly TimeSeriesSliceHolder SliceHolder;
public readonly bool FromReplication;
public readonly bool FromSmuggler;
private readonly string _docId;
private readonly CollectionName _collection;
private readonly string _name;
Expand All @@ -863,50 +864,53 @@ public class TimeSeriesSegmentHolder : IDisposable

private AllocatedMemoryData _clonedReadonlySegment;

public TimeSeriesSegmentHolder(
private TimeSeriesSegmentHolder(
TimeSeriesStorage tss,
DocumentsOperationContext context,
string docId,
string name,
CollectionName collection,
DateTime timeStamp,
string fromReplicationChangeVector = null
)
string fromReplicationChangeVector
)
{
_tss = tss;
_context = context;
_collection = collection;
_docId = docId;
_name = name;

SliceHolder = new TimeSeriesSliceHolder(_context, docId, name, _collection.Name).WithBaseline(timeStamp);
SliceHolder.CreateSegmentBuffer();

FromReplication = fromReplicationChangeVector != null;
_tss.GenerateChangeVector(_context, fromReplicationChangeVector); // update the database change vector
}

public TimeSeriesSegmentHolder(
TimeSeriesStorage tss,
DocumentsOperationContext context,
string docId,
string name,
CollectionName collection,
DateTime timeStamp,
string fromReplicationChangeVector = null
) : this(tss, context, docId, name, collection, fromReplicationChangeVector)
{
SliceHolder = new TimeSeriesSliceHolder(_context, docId, name, _collection.Name).WithBaseline(timeStamp);
SliceHolder.CreateSegmentBuffer();
}

public TimeSeriesSegmentHolder(
TimeSeriesStorage tss,
DocumentsOperationContext context,
TimeSeriesSliceHolder allocator,
string docId,
string name,
CollectionName collection,
string fromReplicationChangeVector)
AppendOptions options
) : this(tss, context, docId, name, collection, options.ChangeVectorFromReplication)
{
_tss = tss;
_context = context;
FromSmuggler = options.FromSmuggler;
SliceHolder = allocator;
_collection = collection;
_docId = docId;
_name = name;

FromReplication = fromReplicationChangeVector != null;

BaselineDate = allocator.CurrentBaseline;
allocator.CreateSegmentBuffer();
_tss.GenerateChangeVector(_context, fromReplicationChangeVector); // update the database change vector
}

private void Initialize()
Expand Down Expand Up @@ -1225,7 +1229,12 @@ public void Dispose()

var holder = new SingleResult();

return AppendTimestamp(context, documentId, collection, name, toAppend.Select(ToResult), changeVectorFromReplication);
var options = new AppendOptions
{
ChangeVectorFromReplication = changeVectorFromReplication
};

return AppendTimestamp(context, documentId, collection, name, toAppend.Select(ToResult), options);

SingleResult ToResult(TimeSeriesOperation.AppendOperation element)
{
Expand Down Expand Up @@ -1575,16 +1584,26 @@ public void Dispose()
}
}

public class AppendOptions
{
public string ChangeVectorFromReplication = null;
public bool VerifyName = true;
public bool AddNewNameToMetadata = true;
public bool FromSmuggler = false;
}

private static readonly AppendOptions DefaultAppendOptions = new AppendOptions();

public string AppendTimestamp(
DocumentsOperationContext context,
string documentId,
string collection,
string name,
IEnumerable<SingleResult> toAppend,
string changeVectorFromReplication = null,
bool verifyName = true,
bool addNewNameToMetadata = true)
AppendOptions options = null)
{
options ??= DefaultAppendOptions;

if (context.Transaction == null)
{
DocumentPutAction.ThrowRequiresTransaction();
Expand All @@ -1593,14 +1612,14 @@ public void Dispose()

var collectionName = _documentsStorage.ExtractCollectionName(context, collection);
var newSeries = Stats.GetStats(context, documentId, name).Count == 0;
if (newSeries && verifyName)
if (newSeries && options.VerifyName)
{
VerifyLegalName(name);
}

var appendEnumerator = TimeSeriesHandler.CheckIfIncrementalTs(name)
? new IncrementalEnumerator(documentId, name, toAppend, changeVectorFromReplication != null)
: new AppendEnumerator(documentId, name, toAppend, changeVectorFromReplication != null);
? new IncrementalEnumerator(documentId, name, toAppend, options.ChangeVectorFromReplication != null)
: new AppendEnumerator(documentId, name, toAppend, options.ChangeVectorFromReplication != null);

using (appendEnumerator)
{
Expand All @@ -1613,15 +1632,15 @@ public void Dispose()
var current = appendEnumerator.Current;
Debug.Assert(current != null);

if (changeVectorFromReplication == null)
if (options.ChangeVectorFromReplication == null)
{
// not from replication
AssertNoNanValue(current);
}

using (var slicer = new TimeSeriesSliceHolder(context, documentId, name, collection).WithBaseline(current.Timestamp))
{
var segmentHolder = new TimeSeriesSegmentHolder(this, context, slicer, documentId, name, collectionName, changeVectorFromReplication);
var segmentHolder = new TimeSeriesSegmentHolder(this, context, slicer, documentId, name, collectionName, options);
if (segmentHolder.LoadCurrentSegment() == false)
{
// no matches for this series at all, need to create new segment
Expand Down Expand Up @@ -1667,7 +1686,7 @@ public void Dispose()
}
}

if (newSeries && addNewNameToMetadata)
if (newSeries && options.AddNewNameToMetadata)
{
AddTimeSeriesNameToMetadata(context, documentId, name);
}
Expand Down Expand Up @@ -1946,7 +1965,7 @@ private static CompareResult Compare(DateTime localTime, Span<double> localValue
int compareTags = localTag.SequenceCompareTo(remote.Tag.AsSpan());
if (compareTags == 0)
{
if (holder.FromReplication == false)
if (holder.FromReplication == false && holder.FromSmuggler == false)
return CompareResult.Addition;

bool isIncrement = localTag.StartsWith(IncrementPrefixBuffer);
Expand Down
Expand Up @@ -91,7 +91,7 @@ public async Task RavenEtlWithTimeSeries_WhenEtlNodeTryToProcessTimeSeriesWithou
Values = new Memory<double>(new []{value})
},
};
tsStorage.AppendTimestamp(context, tsOwnerId, "Users", timeSeriesName.ToLower(), toAppend, null, verifyName: false);
tsStorage.AppendTimestamp(context, tsOwnerId, "Users", timeSeriesName.ToLower(), toAppend, EtlTimeSeriesTests.AppendOptionsForEtlTest);
tr.Commit();
}

Expand Down
9 changes: 7 additions & 2 deletions test/SlowTests/Server/Documents/ETL/EtlTimeSeriesTests.cs
Expand Up @@ -1828,7 +1828,7 @@ string script
Values = new Memory<double>(new []{value})
},
};
tsStorage.AppendTimestamp(context, documentId, "Users", timeSeriesName.ToLower(), toAppend, null, verifyName: false);
tsStorage.AppendTimestamp(context, documentId, "Users", timeSeriesName.ToLower(), toAppend, AppendOptionsForEtlTest);
tr.Commit();
}

Expand Down Expand Up @@ -1888,7 +1888,7 @@ await using (OpenEtlOffArea(src, etlResult.TaskId))
Values = new Memory<double>(new []{value})
},
};
tsStorage.AppendTimestamp(context, users[0].Id, "Users", timeSeriesName.ToLower(), toAppend, null, verifyName: false);
tsStorage.AppendTimestamp(context, users[0].Id, "Users", timeSeriesName.ToLower(), toAppend, AppendOptionsForEtlTest);
tr.Commit();
}

Expand Down Expand Up @@ -2170,5 +2170,10 @@ private async Task<TimeSeriesEntry> AssertWaitForIncTimeSeriesEntry(IDocumentSto
return timeSeriesEntries?.FirstOrDefault();
}, interval: 1000);
}

public static TimeSeriesStorage.AppendOptions AppendOptionsForEtlTest = new TimeSeriesStorage.AppendOptions
{
VerifyName = false
};
}
}

0 comments on commit 6807792

Please sign in to comment.