From 68077920afa302fbc1069cf78004bcea6d9c4fdd Mon Sep 17 00:00:00 2001 From: Karmel Date: Sun, 28 Nov 2021 09:18:59 +0200 Subject: [PATCH] RavenDB-17587 Incremental time-series import should overwrite existing values --- .../Documents/Handlers/TimeSeriesHandler.cs | 8 +- .../Documents/Patch/ScriptRunner.cs | 7 +- .../Replication/IncomingReplicationHandler.cs | 8 +- .../Documents/TimeSeries/TimeSeriesRollups.cs | 7 +- .../Documents/TimeSeries/TimeSeriesStorage.cs | 75 +++++--- .../ETL/ClusterEtlTimeSeriesTests.cs | 2 +- .../Documents/ETL/EtlTimeSeriesTests.cs | 9 +- .../PeriodicBackup/PeriodicBackupSlowTests.cs | 176 ++++++++++++++++++ test/SlowTests/Smuggler/SmugglerApiTests.cs | 150 +++++++++++++++ test/Tests.Infrastructure/BackupTestBase.cs | 20 ++ 10 files changed, 427 insertions(+), 35 deletions(-) diff --git a/src/Raven.Server/Documents/Handlers/TimeSeriesHandler.cs b/src/Raven.Server/Documents/Handlers/TimeSeriesHandler.cs index dd23a5638f5d..d46f4a41757c 100644 --- a/src/Raven.Server/Documents/Handlers/TimeSeriesHandler.cs +++ b/src/Raven.Server/Documents/Handlers/TimeSeriesHandler.cs @@ -1107,6 +1107,12 @@ public override TransactionOperationsMerger.IReplayableCommandDto toAppend, - string changeVectorFromReplication = null, - bool verifyName = true, - bool addNewNameToMetadata = true) + AppendOptions options = null) { + options ??= DefaultAppendOptions; + if (context.Transaction == null) { DocumentPutAction.ThrowRequiresTransaction(); @@ -1593,14 +1612,14 @@ public void Dispose() var collectionName = _documentsStorage.ExtractCollectionName(context, collection); var newSeries = Stats.GetStats(context, documentId, name).Count == 0; - if (newSeries && verifyName) + if (newSeries && options.VerifyName) { VerifyLegalName(name); } var appendEnumerator = TimeSeriesHandler.CheckIfIncrementalTs(name) - ? new IncrementalEnumerator(documentId, name, toAppend, changeVectorFromReplication != null) - : new AppendEnumerator(documentId, name, toAppend, changeVectorFromReplication != null); + ? new IncrementalEnumerator(documentId, name, toAppend, options.ChangeVectorFromReplication != null) + : new AppendEnumerator(documentId, name, toAppend, options.ChangeVectorFromReplication != null); using (appendEnumerator) { @@ -1613,7 +1632,7 @@ public void Dispose() var current = appendEnumerator.Current; Debug.Assert(current != null); - if (changeVectorFromReplication == null) + if (options.ChangeVectorFromReplication == null) { // not from replication AssertNoNanValue(current); @@ -1621,7 +1640,7 @@ public void Dispose() using (var slicer = new TimeSeriesSliceHolder(context, documentId, name, collection).WithBaseline(current.Timestamp)) { - var segmentHolder = new TimeSeriesSegmentHolder(this, context, slicer, documentId, name, collectionName, changeVectorFromReplication); + var segmentHolder = new TimeSeriesSegmentHolder(this, context, slicer, documentId, name, collectionName, options); if (segmentHolder.LoadCurrentSegment() == false) { // no matches for this series at all, need to create new segment @@ -1667,7 +1686,7 @@ public void Dispose() } } - if (newSeries && addNewNameToMetadata) + if (newSeries && options.AddNewNameToMetadata) { AddTimeSeriesNameToMetadata(context, documentId, name); } @@ -1946,7 +1965,7 @@ private static CompareResult Compare(DateTime localTime, Span localValue int compareTags = localTag.SequenceCompareTo(remote.Tag.AsSpan()); if (compareTags == 0) { - if (holder.FromReplication == false) + if (holder.FromReplication == false && holder.FromSmuggler == false) return CompareResult.Addition; bool isIncrement = localTag.StartsWith(IncrementPrefixBuffer); diff --git a/test/SlowTests/Server/Documents/ETL/ClusterEtlTimeSeriesTests.cs b/test/SlowTests/Server/Documents/ETL/ClusterEtlTimeSeriesTests.cs index d497e78b6076..78652434eba4 100644 --- a/test/SlowTests/Server/Documents/ETL/ClusterEtlTimeSeriesTests.cs +++ b/test/SlowTests/Server/Documents/ETL/ClusterEtlTimeSeriesTests.cs @@ -91,7 +91,7 @@ public async Task RavenEtlWithTimeSeries_WhenEtlNodeTryToProcessTimeSeriesWithou Values = new Memory(new []{value}) }, }; - tsStorage.AppendTimestamp(context, tsOwnerId, "Users", timeSeriesName.ToLower(), toAppend, null, verifyName: false); + tsStorage.AppendTimestamp(context, tsOwnerId, "Users", timeSeriesName.ToLower(), toAppend, EtlTimeSeriesTests.AppendOptionsForEtlTest); tr.Commit(); } diff --git a/test/SlowTests/Server/Documents/ETL/EtlTimeSeriesTests.cs b/test/SlowTests/Server/Documents/ETL/EtlTimeSeriesTests.cs index 5f22f1bbb42e..e89a990d0257 100644 --- a/test/SlowTests/Server/Documents/ETL/EtlTimeSeriesTests.cs +++ b/test/SlowTests/Server/Documents/ETL/EtlTimeSeriesTests.cs @@ -1828,7 +1828,7 @@ string script Values = new Memory(new []{value}) }, }; - tsStorage.AppendTimestamp(context, documentId, "Users", timeSeriesName.ToLower(), toAppend, null, verifyName: false); + tsStorage.AppendTimestamp(context, documentId, "Users", timeSeriesName.ToLower(), toAppend, AppendOptionsForEtlTest); tr.Commit(); } @@ -1888,7 +1888,7 @@ await using (OpenEtlOffArea(src, etlResult.TaskId)) Values = new Memory(new []{value}) }, }; - tsStorage.AppendTimestamp(context, users[0].Id, "Users", timeSeriesName.ToLower(), toAppend, null, verifyName: false); + tsStorage.AppendTimestamp(context, users[0].Id, "Users", timeSeriesName.ToLower(), toAppend, AppendOptionsForEtlTest); tr.Commit(); } @@ -2170,5 +2170,10 @@ private async Task AssertWaitForIncTimeSeriesEntry(IDocumentSto return timeSeriesEntries?.FirstOrDefault(); }, interval: 1000); } + + public static TimeSeriesStorage.AppendOptions AppendOptionsForEtlTest = new TimeSeriesStorage.AppendOptions + { + VerifyName = false + }; } } diff --git a/test/SlowTests/Server/Documents/PeriodicBackup/PeriodicBackupSlowTests.cs b/test/SlowTests/Server/Documents/PeriodicBackup/PeriodicBackupSlowTests.cs index ba66ae0f94cd..6c2e32738de4 100644 --- a/test/SlowTests/Server/Documents/PeriodicBackup/PeriodicBackupSlowTests.cs +++ b/test/SlowTests/Server/Documents/PeriodicBackup/PeriodicBackupSlowTests.cs @@ -1307,6 +1307,182 @@ public async Task periodic_backup_with_timeseries_should_export_starting_from_la } } + [Fact, Trait("Category", "Smuggler")] + public async Task periodic_backup_with_incremental_timeseries_should_export_starting_from_last_etag() + { + var backupPath = NewDataPath(suffix: "BackupFolder"); + using (var store = GetDocumentStore()) + { + var baseline = RavenTestHelper.UtcToday; + + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(new User { Name = "oren" }, "users/1"); + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession()) + { + for (int i = 0; i < 360; i++) + { + session.IncrementalTimeSeriesFor("users/1", "INC:Heartrate").Increment(baseline.AddSeconds(i * 10), new[] { i % 60d }); + } + + await session.SaveChangesAsync(); + } + + var config = Backup.CreateBackupConfiguration(backupPath); + var backupTaskId = await Backup.UpdateConfigAndRunBackupAsync(Server, config, store); + + var exportPath = GetBackupPath(store, backupTaskId, incremental: false); + + using (var store2 = GetDocumentStore()) + { + var op = await store2.Smuggler.ImportAsync(new DatabaseSmugglerImportOptions(), exportPath); + await op.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + var stats = await store2.Maintenance.SendAsync(new GetStatisticsOperation()); + Assert.Equal(1, stats.CountOfDocuments); + Assert.Equal(1, stats.CountOfTimeSeriesSegments); + + using (var session = store2.OpenAsyncSession()) + { + var user1 = await session.LoadAsync("users/1"); + Assert.Equal("oren", user1.Name); + + var values = (await session.IncrementalTimeSeriesFor("users/1", "INC:Heartrate") + .GetAsync()) + .ToList(); + + Assert.Equal(360, values.Count); + + for (int i = 0; i < values.Count; i++) + { + Assert.Equal(baseline.AddSeconds(i * 10), values[i].Timestamp, RavenTestHelper.DateTimeComparer.Instance); + Assert.Equal(i % 60, values[i].Values[0]); + } + } + } + + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(new User { Name = "ayende" }, "users/2"); + for (int i = 0; i < 180; i++) + { + session.IncrementalTimeSeriesFor("users/2", "INC:Heartrate") + .Increment(baseline.AddSeconds(i * 10), new[] { i % 60d }); + } + await session.SaveChangesAsync(); + } + + var lastEtag = store.Maintenance.Send(new GetStatisticsOperation()).LastDocEtag; + var status = await Backup.RunBackupAndReturnStatusAsync(Server, backupTaskId, store, isFullBackup: false, expectedEtag: lastEtag); + + exportPath = GetBackupPath(store, backupTaskId); + + using (var store3 = GetDocumentStore()) + { + // importing to a new database, in order to verify that + // periodic backup imports only the changed documents (and timeseries) + + var op = await store3.Smuggler.ImportAsync(new DatabaseSmugglerImportOptions(), exportPath); + await op.WaitForCompletionAsync(TimeSpan.FromMinutes(15)); + + var stats = await store3.Maintenance.SendAsync(new GetStatisticsOperation()); + Assert.Equal(1, stats.CountOfDocuments); + + Assert.Equal(1, stats.CountOfTimeSeriesSegments); + + using (var session = store3.OpenAsyncSession()) + { + var user2 = await session.LoadAsync("users/2"); + + Assert.Equal("ayende", user2.Name); + + var values = (await session.IncrementalTimeSeriesFor(user2, "INC:Heartrate") + .GetAsync()) + .ToList(); + + Assert.Equal(180, values.Count); + + for (int i = 0; i < values.Count; i++) + { + Assert.Equal(baseline.AddSeconds(i * 10), values[i].Timestamp, RavenTestHelper.DateTimeComparer.Instance); + Assert.Equal(i % 60, values[i].Values[0]); + } + } + } + } + } + + [Fact, Trait("Category", "Smuggler")] + public async Task IncrementTimeSeriesBackup() + { + var backupPath = NewDataPath(suffix: "BackupFolder"); + var config = Backup.CreateBackupConfiguration(backupPath); + + using (var store = GetDocumentStore()) + { + var baseline = RavenTestHelper.UtcToday; + + using (var session = store.OpenAsyncSession()) + { + await session.StoreAsync(new User { Name = "oren" }, "users/1"); + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession()) + { + for (int i = 0; i < 10; i++) + { + session.IncrementalTimeSeriesFor("users/1", "INC:Heartrate").Increment(baseline.AddSeconds(i * 10), 1); + } + + await session.SaveChangesAsync(); + } + + var backupTaskId = await Backup.UpdateConfigAndRunBackupAsync(Server, config, store); + + using (var session = store.OpenAsyncSession()) + { + for (int i = 10; i < 20; i++) + { + session.IncrementalTimeSeriesFor("users/1", "INC:Heartrate").Increment(baseline.AddSeconds(i * 10), 1); + } + + await session.SaveChangesAsync(); + } + + await Backup.RunBackupAsync(Server, backupTaskId, store, isFullBackup: false); + + using (var restored = RestoreAndGetStore(store, backupPath)) + { + var stats = await restored.Maintenance.SendAsync(new GetStatisticsOperation()); + Assert.Equal(1, stats.CountOfDocuments); + Assert.Equal(1, stats.CountOfTimeSeriesSegments); + + using (var session = restored.OpenAsyncSession()) + { + var user1 = await session.LoadAsync("users/1"); + Assert.Equal("oren", user1.Name); + + var values = (await session.IncrementalTimeSeriesFor("users/1", "INC:Heartrate") + .GetAsync()) + .ToList(); + + Assert.Equal(20, values.Count); + + for (int i = 0; i < values.Count; i++) + { + Assert.Equal(baseline.AddSeconds(i * 10), values[i].Timestamp, RavenTestHelper.DateTimeComparer.Instance); + Assert.Equal(1, values[i].Values[0]); + } + } + } + } + } + + [Fact, Trait("Category", "Smuggler")] public async Task BackupTaskShouldStayOnTheOriginalNode() { diff --git a/test/SlowTests/Smuggler/SmugglerApiTests.cs b/test/SlowTests/Smuggler/SmugglerApiTests.cs index 297303dda15a..c652aace9581 100644 --- a/test/SlowTests/Smuggler/SmugglerApiTests.cs +++ b/test/SlowTests/Smuggler/SmugglerApiTests.cs @@ -1106,6 +1106,156 @@ public async Task CanExportAndImportTimeSeries() } } + [Fact] + public async Task CanExportAndImportIncrementalTimeSeries() + { + var file = GetTempFileName(); + var baseline = RavenTestHelper.UtcToday; + + try + { + using (var store1 = GetDocumentStore()) + using (var store2 = GetDocumentStore()) + { + using (var session = store1.OpenAsyncSession()) + { + await session.StoreAsync(new User { Name = "Name1" }, "users/1"); + await session.StoreAsync(new User { Name = "Name2" }, "users/2"); + + await session.SaveChangesAsync(); + } + + using (var session = store1.OpenAsyncSession()) + { + for (int i = 0; i < 360; i++) + { + session.IncrementalTimeSeriesFor("users/1", "INC:Heartrate").Increment(baseline.AddSeconds(i * 10), new[] { i % 60d }); + session.IncrementalTimeSeriesFor("users/2", "INC:Heartrate").Increment(baseline.AddSeconds(i * 10), new[] { i % 60d, i % 60d + 5 }); + session.IncrementalTimeSeriesFor("users/1", "INC:Heartrate2").Increment(baseline.AddSeconds(i * 10), new[] { i % 60d, i % 60d + 5, i % 60d + 10 }); + } + + await session.SaveChangesAsync(); + } + + var operation = await store1.Smuggler.ExportAsync(new DatabaseSmugglerExportOptions(), file); + await operation.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + operation = await store2.Smuggler.ImportAsync(new DatabaseSmugglerImportOptions(), file); + await operation.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + var stats = await store2.Maintenance.SendAsync(new GetStatisticsOperation()); + Assert.Equal(2, stats.CountOfDocuments); + Assert.Equal(4, stats.CountOfTimeSeriesSegments); + + using (var session = store2.OpenAsyncSession()) + { + var user1 = await session.LoadAsync("users/1"); + var user2 = await session.LoadAsync("users/2"); + + Assert.Equal("Name1", user1.Name); + Assert.Equal("Name2", user2.Name); + + var values = await session.IncrementalTimeSeriesFor("users/1", "INC:Heartrate").GetAsync(DateTime.MinValue, DateTime.MaxValue); + + var count = 0; + foreach (var val in values) + { + Assert.Equal(baseline.AddSeconds(count * 10), val.Timestamp, RavenTestHelper.DateTimeComparer.Instance); + Assert.Equal(1, val.Values.Length); + Assert.Equal(count++ % 60, val.Values[0]); + } + + Assert.Equal(360, count); + + values = await session.IncrementalTimeSeriesFor("users/2", "INC:Heartrate").GetAsync(DateTime.MinValue, DateTime.MaxValue); + + count = 0; + foreach (var val in values) + { + Assert.Equal(baseline.AddSeconds(count * 10), val.Timestamp, RavenTestHelper.DateTimeComparer.Instance); + Assert.Equal(2, val.Values.Length); + Assert.Equal(count % 60, val.Values[0]); + Assert.Equal(count++ % 60 + 5, val.Values[1]); + } + + Assert.Equal(360, count); + + values = await session.IncrementalTimeSeriesFor("users/1", "INC:Heartrate2").GetAsync(DateTime.MinValue, DateTime.MaxValue); + + count = 0; + foreach (var val in values) + { + Assert.Equal(baseline.AddSeconds(count * 10), val.Timestamp, RavenTestHelper.DateTimeComparer.Instance); + Assert.Equal(3, val.Values.Length); + Assert.Equal(count % 60, val.Values[0]); + Assert.Equal(count % 60 + 5, val.Values[1]); + Assert.Equal(count++ % 60 + 10, val.Values[2]); + } + + Assert.Equal(360, count); + } + } + } + finally + { + File.Delete(file); + } + } + + + [Fact] + public async Task ImportIncrementalTimeSeriesTwice() + { + var file = GetTempFileName(); + var baseline = RavenTestHelper.UtcToday; + + try + { + using (var store1 = GetDocumentStore()) + using (var store2 = GetDocumentStore()) + { + using (var session = store1.OpenAsyncSession()) + { + await session.StoreAsync(new User { Name = "Name1" }, "users/1"); + await session.SaveChangesAsync(); + } + + using (var session = store1.OpenAsyncSession()) + { + session.IncrementalTimeSeriesFor("users/1", "INC:Heartrate").Increment(baseline, 1); + await session.SaveChangesAsync(); + } + + var operation = await store1.Smuggler.ExportAsync(new DatabaseSmugglerExportOptions(), file); + await operation.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + operation = await store2.Smuggler.ImportAsync(new DatabaseSmugglerImportOptions(), file); + await operation.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + // import twice + operation = await store2.Smuggler.ImportAsync(new DatabaseSmugglerImportOptions(), file); + await operation.WaitForCompletionAsync(TimeSpan.FromMinutes(1)); + + using (var session = store2.OpenAsyncSession()) + { + var user1 = await session.LoadAsync("users/1"); + Assert.Equal("Name1", user1.Name); + + var values = await session.IncrementalTimeSeriesFor("users/1", "INC:Heartrate").GetAsync(); + var entry = values.Single(); + Assert.Equal(baseline, entry.Timestamp, RavenTestHelper.DateTimeComparer.Instance); + Assert.Equal(1, entry.Values.Length); + Assert.Equal(1, entry.Values[0]); + } + } + } + finally + { + File.Delete(file); + } + } + + [Fact] public async Task CanExportAndImportTimeSeriesWithMultipleSegments() { diff --git a/test/Tests.Infrastructure/BackupTestBase.cs b/test/Tests.Infrastructure/BackupTestBase.cs index 6176c0bf94fa..b6420e997895 100644 --- a/test/Tests.Infrastructure/BackupTestBase.cs +++ b/test/Tests.Infrastructure/BackupTestBase.cs @@ -1,5 +1,7 @@ using System; using System.Diagnostics; +using System.IO; +using System.Linq; using System.Text; using System.Threading.Tasks; using Raven.Client.Documents; @@ -391,5 +393,23 @@ private static async Task CheckBackupOperationStatus(OperationStatus expected, O } } } + + public IDocumentStore RestoreAndGetStore(IDocumentStore store, string backupPath, TimeSpan? timeout = null) + { + var restoredDatabaseName = GetDatabaseName(); + + Backup.RestoreDatabase(store, new RestoreBackupConfiguration + { + BackupLocation = Directory.GetDirectories(backupPath).First(), + DatabaseName = restoredDatabaseName + }, timeout); + + return GetDocumentStore(new Options + { + ModifyDatabaseName = s => restoredDatabaseName, + CreateDatabase = false, + DeleteDatabaseOnDispose = true + }); + } } }