Skip to content

Commit

Permalink
add CancellationToken support
Browse files Browse the repository at this point in the history
Co-authored-by: Adam Ralph <adam@adamralph.com>
Co-authored-by: Andreas Öhlund <andreas.ohlund@particular.net>
  • Loading branch information
3 people committed Feb 18, 2021
1 parent 5f99806 commit 53f9265
Show file tree
Hide file tree
Showing 243 changed files with 1,861 additions and 1,129 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
namespace NServiceBus.AcceptanceTesting
{
using System.Threading;
using System.Threading.Tasks;
using Extensibility;
using Persistence;

class AcceptanceTestingSynchronizedStorage : ISynchronizedStorage
{
public Task<CompletableSynchronizedStorageSession> OpenSession(ContextBag contextBag)
public Task<CompletableSynchronizedStorageSession> OpenSession(ContextBag contextBag, CancellationToken cancellationToken)
{
var session = (CompletableSynchronizedStorageSession)new AcceptanceTestingSynchronizedStorageSession();
return Task.FromResult(session);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace NServiceBus.AcceptanceTesting
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Persistence;

Expand All @@ -24,7 +25,7 @@ public void Dispose()
Transaction = null;
}

public Task CompleteAsync()
public Task CompleteAsync(CancellationToken cancellationToken)
{
if (ownsTransaction)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace NServiceBus.AcceptanceTesting
{
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Extensibility;
Expand All @@ -10,7 +11,7 @@ namespace NServiceBus.AcceptanceTesting

class AcceptanceTestingTransactionalSynchronizedStorageAdapter : ISynchronizedStorageAdapter
{
public Task<CompletableSynchronizedStorageSession> TryAdapt(OutboxTransaction transaction, ContextBag context)
public Task<CompletableSynchronizedStorageSession> TryAdapt(OutboxTransaction transaction, ContextBag context, CancellationToken cancellationToken)
{
if (transaction is AcceptanceTestingOutboxTransaction inMemOutboxTransaction)
{
Expand All @@ -20,7 +21,7 @@ public Task<CompletableSynchronizedStorageSession> TryAdapt(OutboxTransaction tr
return EmptyTask;
}

public Task<CompletableSynchronizedStorageSession> TryAdapt(TransportTransaction transportTransaction, ContextBag context)
public Task<CompletableSynchronizedStorageSession> TryAdapt(TransportTransaction transportTransaction, ContextBag context, CancellationToken cancellationToken)
{
if (transportTransaction.TryGet(out Transaction ambientTransaction))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ public OutboxCleaner(AcceptanceTestingOutboxStorage storage, TimeSpan timeToKeep
acceptanceTestingOutboxStorage = storage;
}

protected override Task OnStart(IMessageSession session)
protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken)
{
cleanupTimer = new Timer(PerformCleanup, null, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
return Task.CompletedTask;
}

protected override Task OnStop(IMessageSession session)
protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken)
{
using (var waitHandle = new ManualResetEvent(false))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
{
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Extensibility;
using NServiceBus.Outbox;

class AcceptanceTestingOutboxStorage : IOutboxStorage
{
public Task<OutboxMessage> Get(string messageId, ContextBag context)
public Task<OutboxMessage> Get(string messageId, ContextBag context, CancellationToken cancellationToken)
{
if (!storage.TryGetValue(messageId, out var storedMessage))
{
Expand All @@ -18,12 +19,12 @@ public Task<OutboxMessage> Get(string messageId, ContextBag context)
return Task.FromResult(new OutboxMessage(messageId, storedMessage.TransportOperations));
}

public Task<OutboxTransaction> BeginTransaction(ContextBag context)
public Task<OutboxTransaction> BeginTransaction(ContextBag context, CancellationToken cancellationToken)
{
return Task.FromResult<OutboxTransaction>(new AcceptanceTestingOutboxTransaction());
}

public Task Store(OutboxMessage message, OutboxTransaction transaction, ContextBag context)
public Task Store(OutboxMessage message, OutboxTransaction transaction, ContextBag context, CancellationToken cancellationToken)
{
var tx = (AcceptanceTestingOutboxTransaction)transaction;
tx.Enlist(() =>
Expand All @@ -36,7 +37,7 @@ public Task Store(OutboxMessage message, OutboxTransaction transaction, ContextB
return Task.CompletedTask;
}

public Task SetAsDispatched(string messageId, ContextBag context)
public Task SetAsDispatched(string messageId, ContextBag context, CancellationToken cancellationToken)
{
if (!storage.TryGetValue(messageId, out var storedMessage))
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace NServiceBus.AcceptanceTesting
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Outbox;

Expand All @@ -18,7 +19,7 @@ public void Dispose()
Transaction = null;
}

public Task Commit()
public Task Commit(CancellationToken cancellationToken)
{
Transaction.Commit();
return Task.CompletedTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace NServiceBus.AcceptanceTesting
using System.Collections.Generic;
using System.Reflection;
using System.Reflection.Emit;
using System.Threading;
using System.Threading.Tasks;
using Extensibility;
using Persistence;
Expand All @@ -20,7 +21,7 @@ public AcceptanceTestingSagaPersister()
byCorrelationIdCollection = byCorrelationId;
}

public Task Complete(IContainSagaData sagaData, SynchronizedStorageSession session, ContextBag context)
public Task Complete(IContainSagaData sagaData, SynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken)
{
((AcceptanceTestingSynchronizedStorageSession)session).Enlist(() =>
{
Expand All @@ -42,7 +43,7 @@ public Task Complete(IContainSagaData sagaData, SynchronizedStorageSession sessi
return Task.CompletedTask;
}

public Task<TSagaData> Get<TSagaData>(Guid sagaId, SynchronizedStorageSession session, ContextBag context)
public Task<TSagaData> Get<TSagaData>(Guid sagaId, SynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken)
where TSagaData : class, IContainSagaData
{
if (sagas.TryGetValue(sagaId, out var value))
Expand All @@ -56,21 +57,21 @@ public Task<TSagaData> Get<TSagaData>(Guid sagaId, SynchronizedStorageSession se
return CachedSagaDataTask<TSagaData>.Default;
}

public Task<TSagaData> Get<TSagaData>(string propertyName, object propertyValue, SynchronizedStorageSession session, ContextBag context)
public Task<TSagaData> Get<TSagaData>(string propertyName, object propertyValue, SynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken)
where TSagaData : class, IContainSagaData
{
var key = new CorrelationId(typeof(TSagaData), propertyName, propertyValue);

if (byCorrelationId.TryGetValue(key, out var id))
{
// this isn't updated atomically and may return null for an entry that has been indexed but not inserted yet
return Get<TSagaData>(id, session, context);
return Get<TSagaData>(id, session, context, cancellationToken);
}

return CachedSagaDataTask<TSagaData>.Default;
}

public Task Save(IContainSagaData sagaData, SagaCorrelationProperty correlationProperty, SynchronizedStorageSession session, ContextBag context)
public Task Save(IContainSagaData sagaData, SagaCorrelationProperty correlationProperty, SynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken)
{
((AcceptanceTestingSynchronizedStorageSession)session).Enlist(() =>
{
Expand All @@ -94,7 +95,7 @@ public Task Save(IContainSagaData sagaData, SagaCorrelationProperty correlationP
return Task.CompletedTask;
}

public Task Update(IContainSagaData sagaData, SynchronizedStorageSession session, ContextBag context)
public Task Update(IContainSagaData sagaData, SynchronizedStorageSession session, ContextBag context, CancellationToken cancellationToken)
{
((AcceptanceTestingSynchronizedStorageSession)session).Enlist(() =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,23 @@ namespace NServiceBus.AcceptanceTesting
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Extensibility;
using Unicast.Subscriptions;
using Unicast.Subscriptions.MessageDrivenSubscriptions;

class AcceptanceTestingSubscriptionStorage : ISubscriptionStorage
{
public Task Subscribe(Subscriber subscriber, MessageType messageType, ContextBag context)
public Task Subscribe(Subscriber subscriber, MessageType messageType, ContextBag context, CancellationToken cancellationToken)
{
var dict = storage.GetOrAdd(messageType, type => new ConcurrentDictionary<string, Subscriber>(StringComparer.OrdinalIgnoreCase));

dict.AddOrUpdate(subscriber.TransportAddress, _ => subscriber, (_, __) => subscriber);
return Task.CompletedTask;
}

public Task Unsubscribe(Subscriber subscriber, MessageType messageType, ContextBag context)
public Task Unsubscribe(Subscriber subscriber, MessageType messageType, ContextBag context, CancellationToken cancellationToken)
{
if (storage.TryGetValue(messageType, out var dict))
{
Expand All @@ -27,7 +28,7 @@ public Task Unsubscribe(Subscriber subscriber, MessageType messageType, ContextB
return Task.CompletedTask;
}

public Task<IEnumerable<Subscriber>> GetSubscriberAddressesForMessage(IEnumerable<MessageType> messageTypes, ContextBag context)
public Task<IEnumerable<Subscriber>> GetSubscriberAddressesForMessage(IEnumerable<MessageType> messageTypes, ContextBag context, CancellationToken cancellationToken)
{
var result = new HashSet<Subscriber>();
foreach (var m in messageTypes)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace NServiceBus
{
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using Routing;
Expand All @@ -13,7 +14,7 @@ public AcceptanceTestingTransport(bool enableNativeDelayedDelivery = true, bool
{
}

public override async Task<TransportInfrastructure> Initialize(HostSettings hostSettings, ReceiveSettings[] receivers, string[] sendingAddresses)
public override async Task<TransportInfrastructure> Initialize(HostSettings hostSettings, ReceiveSettings[] receivers, string[] sendingAddresses, CancellationToken cancellationToken)
{
Guard.AgainstNull(nameof(hostSettings), hostSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Transport;

Expand Down Expand Up @@ -74,7 +75,7 @@ public void ConfigureDispatcher()
Dispatcher = new LearningTransportDispatcher(storagePath, int.MaxValue / 1024);
}

public override Task Shutdown()
public override Task Shutdown(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public async Task Initialize(RunDescriptor run, EndpointBehavior endpointBehavio

void TrackFailingMessages(string endpointName, EndpointConfiguration endpointConfiguration)
{
endpointConfiguration.Recoverability().Failed(settings => settings.OnMessageSentToErrorQueue(m =>
endpointConfiguration.Recoverability().Failed(settings => settings.OnMessageSentToErrorQueue((m, _) =>
{
scenarioContext.FailedMessages.AddOrUpdate(
endpointName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
Expand All @@ -17,7 +18,7 @@ public async Task Should_call_critical_error_action_for_every_error_that_occurre
{
var exceptions = new ConcurrentDictionary<string, Exception>();

Func<ICriticalErrorContext, Task> addCritical = criticalContext =>
Func<ICriticalErrorContext, CancellationToken, Task> addCritical = (criticalContext, _) =>
{
exceptions.TryAdd(criticalContext.Error, criticalContext.Exception);
return Task.FromResult(0);
Expand Down Expand Up @@ -85,18 +86,18 @@ public CriticalErrorStartupFeatureTask(CriticalError criticalError, TestContext
this.testContext = testContext;
}

protected override Task OnStart(IMessageSession session)
protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken)
{
criticalError.Raise("critical error 1", new SimulatedException());
criticalError.Raise("critical error 1", new SimulatedException(), cancellationToken);
testContext.CriticalErrorsRaised++;

criticalError.Raise("critical error 2", new SimulatedException());
criticalError.Raise("critical error 2", new SimulatedException(), cancellationToken);
testContext.CriticalErrorsRaised++;

return Task.FromResult(0);
}

protected override Task OnStop(IMessageSession session)
protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken)
{
return Task.FromResult(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
Expand All @@ -17,7 +18,7 @@ public async Task Should_trigger_critical_error_action()
{
var exceptions = new ConcurrentDictionary<string, Exception>();

Func<ICriticalErrorContext, Task> addCritical = criticalContext =>
Func<ICriticalErrorContext, CancellationToken, Task> addCritical = (criticalContext, _) =>
{
exceptions.TryAdd(criticalContext.Error, criticalContext.Exception);
return Task.FromResult(0);
Expand Down Expand Up @@ -95,18 +96,18 @@ public CriticalErrorStartupFeatureTask(CriticalError criticalError, TestContext
this.testContext = testContext;
}

protected override Task OnStart(IMessageSession session)
protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken)
{
criticalError.Raise("critical error 1", new SimulatedException());
criticalError.Raise("critical error 1", new SimulatedException(), cancellationToken);
testContext.CriticalErrorsRaised++;

criticalError.Raise("critical error 2", new SimulatedException());
criticalError.Raise("critical error 2", new SimulatedException(), cancellationToken);
testContext.CriticalErrorsRaised++;

return Task.FromResult(0);
}

protected override Task OnStop(IMessageSession session)
protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken)
{
return Task.FromResult(0);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace NServiceBus.AcceptanceTests.Core.DependencyInjection
{
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
Expand Down Expand Up @@ -85,12 +86,12 @@ public class StartupFeatureWithDependencies : FeatureStartupTask
testContext.DependencyBeforeEndpointStart = dependencyBeforeEndpointStart;
}

protected override Task OnStart(IMessageSession session)
protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

protected override Task OnStop(IMessageSession session)
protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public EndpointWithLocalCallback()
fakeTransport.RaiseCriticalErrorOnReceiverStart(new AggregateException("Startup task failed to complete.", new InvalidOperationException("ExceptionInBusStarts")));
builder.UseTransport(fakeTransport);
builder.DefineCriticalErrorAction(errorContext =>
builder.DefineCriticalErrorAction((errorContext, _) =>
{
var aggregateException = (AggregateException)errorContext.Exception;
var context = builder.GetSettings().Get<Context>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
namespace NServiceBus.AcceptanceTests.Core.FakeTransport
{
using System.Threading;
using System.Threading.Tasks;
using Transport;

class FakeDispatcher : IMessageDispatcher
{
public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction)
public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
Expand Down

0 comments on commit 53f9265

Please sign in to comment.