Skip to content

eventuate-tram/eventuate-tram-core-dotnet

Repository files navigation

Eventuate Tram (Transactional Messaging) framework

A .NET port of version 0.21.0.RELEASE of https://github.com/eventuate-tram/eventuate-tram-core.

Usage

Register Services

Register the core Eventuate Tram services in Startup.cs. You probably want to configure Eventuate Tram to use the same DbConnection as your application uses when persisting application state. This will allow for events to be published and consumed within the same transaction as application state changes are saved. If your application is using Entity Framework, this can be done as follows:

services.AddEventuateTramSqlKafkaTransport(dbSchemaName, bootstrapServers,
	eventuateKafkaConsumerConfigurationProperties, (provider, o) =>
	{
		var applicationDbContext = provider.GetRequiredService<ApplicationDbContext>();
		o.UseSqlServer(applicationDbContext.Database.GetDbConnection());
	});

To use the default eventuateKafkaConsumerConfigurationProperties, just pass in EventuateKafkaConsumerConfigurationProperties.Empty()

The AddEventuateTramSqlKafkaTransport() method will register the core services for both publishing and consuming messages. Alternatively, if your application only needs to publish messages you can use the AddEventuateTramSqlProducer() method:

services.AddEventuateTramSqlProducer(dbSchemaName, (provider, o) =>
	{
		var applicationDbContext = provider.GetRequiredService<ApplicationDbContext>();
		o.UseSqlServer(applicationDbContext.Database.GetDbConnection());
	});

Kafka Consumer Configuration Properties

The Kafka consumer configuration can be modified using the EventuateKafkaConsumerConfigurationProperties object that is passed in when registering the services. To use the default configuration, you can use EventuateKafkaConsumerConfigurationProperties.Empty().

PollTimeout

Controls the maximum amount of time in milliseconds that the Kafka client will wait for when polling Kafka for a new message within the Kafka consume loop.

Type: long
Required: No
Default Value: 100

BackPressure

Customizes the backpressure behavior. Message consumption will be paused once the size of the unprocessed message queue exceeds the PauseThreshold and resumed once it drops back below the ResumeThreshold.

BackPressure.PauseThreshold

Type: uint
Required: No
Default Value: uint.MaxValue

BackPressure.ResumeThreshold

Type: uint
Required: No
Default Value: 0

Properties

A dictionary of property names and values that can be used to override the default configuration of the underlying Kafka client. Refer to https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md for a list of possible property names.

Type: IDictionary<string, string>
Required: No
Default Value: Empty

Database Setup

You need to run the database initialization script, modifying the database schema $(TRAM_SCHEMA) variable to match what you configured.

Publishing Events

To enable domain event publishing register the DomainEventPublisher service in Startup.cs:

services.AddEventuateTramEventsPublisher();

To publish a domain event, inject an IDomainEventPublisher into your service and then call IDomainEventPublisher.Publish:

var @event = new ExampleEvent
{
    ...
};
    
_domainEventPublisher.Publish(aggregateType, aggregateId, new List<IDomainEvent> {@event});

The aggregateType will be used as the Kafka topic, and the aggregateId as the partition ID. Your event type needs to implement IDomainEvent. If you want to publish events atomically along with persisting application state changes, you should call Publish() within a transaction along with saving the changes to you application state. If your application is using Entity Framework, this can be done as follows:

using (var scope = new TransactionScope(TransactionScopeOption.Required,
    new TransactionOptions {IsolationLevel = IsolationLevel.ReadCommitted}))
{
    _applicationDbContext.SaveChanges();
    _domainEventPublisher.Publish(aggregateType, aggregateId, new List<IDomainEvent> {@event});
    scope.Complete();
}

Alternatively, there is also an asynchronous publish method available:

using (var scope = new TransactionScope(TransactionScopeOption.Required,
    new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted },
    TransactionScopeAsyncFlowOption.Enabled))
{
    await _applicationDbContext.SaveChangesAsync();
    await _domainEventPublisher.PublishAsync(aggregateType, aggregateId, new List<IDomainEvent> {@event});
    scope.Complete();
}

If EnableRetryOnFailure is enabled on your DbContext, you will need to wrap the transaction in an ExecutionStrategy Execute() method:

IExecutionStrategy strategy = _applicationDbContext.Database.CreateExecutionStrategy();
strategy.Execute(() =>
{
    using (var scope = new TransactionScope(TransactionScopeOption.Required,
        new TransactionOptions {IsolationLevel = IsolationLevel.ReadCommitted}))
    {
        // Defer accepting changes until after transaction is committed so that
        // the changes are saved again on an execution strategy retry
        _applicationDbContext.SaveChanges(false);
        _domainEventPublisher.Publish(aggregateType, aggregateId, new List<IDomainEvent> {@event});
        scope.Complete();
        _applicationDbContext.ChangeTracker.AcceptAllChanges();
    }
});

Finally, the Eventuate CDC Service will need to be running and monitoring your database for new events to publish. The Eventuate Tram Code Basic examples project has an example docker-compose.yml file. See also the Eventuate CDC Service configuration documentation.

Message ID Generation

Eventuate Tram generates message IDs using the network interface's MAC address by default when publishing messages. This can result in duplicate message IDs being generated if multiple processes using Eventuate Tram are running on the same machine. To avoid this, the MAC address used for the purpose of ID generation can be overridden by setting the IdGeneratorOptions in Startup.cs. For example, the MAC address can be set from a configuration property value:

services.Configure<IdGeneratorOptions>(o =>
{
    o.MacAddress = _configuration.GetValue<string>("EventuateMacAddress");
});

This would allow overriding the MAC address via configuration (e.g. by setting the EventuateMacAddress property within the appsettings.json file or via an environment variable).

Note that the specified "MAC address" should be a number specified as a base 10 integer value with a maximum of 140737488355327.

Consuming Events (Option 1)

Create one or more event handler services that implements IDomainEventHandler. You could have one handler service per event type, or handle multiple event types in the same handler service:

public class ExampleEventHandler : IDomainEventHandler<Example1Event>, IDomainEventHandler<Example2Event>
{
	private readonly ExampleDbContext _dbContext;
	private readonly ILogger _logger;

	public ExampleEventHandler(ExampleDbContext dbContext, ILogger<ExampleEventHandler> logger)
	{
		_dbContext = dbContext;
		_logger = logger;
	}
    
	public async Task HandleAsync(IDomainEventEnvelope<Example1Event> @event, CancellationToken cancellationToken)
	{
		_logger.LogDebug("Got message Example1Event with id={EventId} and value={Event}", @event.EventId,
			@event.Event.ToString());
		_dbContext.DoSomething(...);
		await _dbContext.SaveChangesAsync(cancellationToken);
	}

	public async Task HandleAsync(IDomainEventEnvelope<Example2Event> @event, CancellationToken cancellationToken)
	{
		_logger.LogDebug("Got message Example2Event with id={EventId} and value={Event}", @event.EventId,
			@event.Event.ToString());
		_dbContext.DoSomething(...);
		await _dbContext.SaveChangesAsync(cancellationToken);
	}
}

Register your event handler service(s) as well as a DomainEventDispatcher in Startup.cs:

services.AddScoped<ExampleEventHandler>();
services.AddEventuateTramDomainEventDispatcher(subscriberId,
    provider => DomainEventHandlersBuilder.ForAggregateType("ExampleAggregate")
        .OnEvent<Example1Event, ExampleEventHandler>()
        .OnEvent<Example2Event, ExampleEventHandler>()
        .Build());

The aggregate type will be used as the Kafka topic and the subscriberId will be used as the Kafka consumer group.

Handling with DbContext EnableRetryOnFailure

Note that the Handle method is executed within a transaction scope and a new row is inserted into the received_messages table within that scope (used for message deduplication). If EnableRetryOnFailure is configured on a DbContext used by your handler, you will get the exception:

System.InvalidOperationException: The configured execution strategy 'SqlServerRetryingExecutionStrategy' does not 
support user initiated transactions. Use the execution strategy returned by 'DbContext.Database.CreateExecutionStrategy()' 
to execute all the operations in the transaction as a retriable unit.

To resolve this issue, the transaction scope needs to be wrapped in an ExecutionStrategy Execute() method. This can be accomplished by creating a message handler decorator configuring it to run prior to the DuplicateDetectingMessageHandlerDecorator:

public class EventuateExecutionStrategyMessageHandlerDecorator : IMessageHandlerDecorator, IOrdered
{
	public Func<SubscriberIdAndMessage, IServiceProvider, IMessageHandlerDecoratorChain, CancellationToken, Task> Accept =>
		async (subscriberIdAndMessage, serviceProvider, chain, cancellationToken) =>
		{
			var dbContext = serviceProvider.GetService<ApplicationDbContext>();
			IExecutionStrategy executionStrategy = dbContext.Database.CreateExecutionStrategy();
			await executionStrategy.ExecuteAsync(async () =>
			{
				await chain.InvokeNextAsync(subscriberIdAndMessage, serviceProvider, cancellationToken);
			});
		};

	public int Order => BuiltInMessageHandlerDecoratorOrder.DuplicateDetectingMessageHandlerDecorator - 1;
}

You will also need to register the EventuateExecutionStrategyMessageHandlerDecorator as a singleton:

services.AddSingleton<EventuateExecutionStrategyMessageHandlerDecorator>();

Finally, note that if EnableRetryOnFailure is configured, your handler may get called multiple times if there is a transient failure that causes a retry to occur.

Consuming Events (Option 2)

Create an event consumer service and implement handlers for the different event types:

public class ExampleEventConsumer
{
	private readonly ILogger<ExampleEventConsumer> _logger;

	public ExampleEventConsumer(ILogger<ExampleEventConsumer> logger)
	{
		_logger = logger;
	}
	
	public DomainEventHandlers DomainEventHandlers()
	{
		return DomainEventHandlersBuilder.ForAggregateType("ExampleAggregate")
			.OnEvent<Example1Event>(HandleExample1EventAsync)
			.OnEvent<Example2Event>(HandleExample2EventAsync)
			.Build();
	}

	private async Task HandleExample1EventAsync(IDomainEventEnvelope<Example1Event> @event)
	{
		_logger.LogDebug("Got Example1Event with id={EventId} and value={Event}", @event.EventId,
			@event.Event.ToString());
	}
	
	private async Task HandleExample2EventAsync(IDomainEventEnvelope<Example2Event> @event)
	{
		_logger.LogDebug("Got Example2Event with id={EventId} and value={Event}", @event.EventId,
			@event.Event.ToString());
	}
}

The aggregate type will be used as the Kafka topic.

Register your event consumer service as well as a DomainEventDispatcher in Startup.cs:

services.AddSingleton<ExampleEventConsumer>();
services.AddEventuateTramDomainEventDispatcher(subscriberId,
    provider =>
    {
        var consumer = provider.GetRequiredService<ExampleEventConsumer>();
        return consumer.DomainEventHandlers();
    });

The subscriberId will be used as the Kafka consumer group.

If EnableRetryOnFailure is configured on a DbContext used by your handler, see Handling with DbContext EnableRetryOnFailure.

Running Integration Tests

Services Required

The integration tests require a set of support services to be running and accessible. The IntegrationTests folder contains a docker-compose.yml that can be used to start the support services and run the tests.

You can use the following steps to run the tests:

$ cd IO.Eventuate.Tram.IntegrationTests
$ dotnet build -c Release
$ export CDC_SERVICE_DOCKER_VERSION=<CDC docker tag>
$ ./test.sh

Test results will be written to ./bin/Release/net8.0/TestResults.

Environment Variable Configuration

If running the tests from an IDE, set the following environment variables:

ConnectionStrings__EventuateTramDbConnection identifies where to find the database for the outbox messages.
KafkaBootstrapServers identifies where the Kafka server is running.

About

.NET port of the Eventuate Tram client libraries

Resources

License

Stars

Watchers

Forks

Contributors 4

  •  
  •  
  •  
  •  

Languages