Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-tenant support #2045

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ updates:
- "m1l4n54v1c"
schananas marked this conversation as resolved.
Show resolved Hide resolved
- "saratry"
- "smcvb"
# Updates for `master`
- package-ecosystem: maven
directory: "/"
schedule:
interval: daily
ignore:
- dependency-name: "*"
update-types: [ "version-update:semver-patch" ]
open-pull-requests-limit: 5
labels:
- "Type: Dependency Upgrade"
Expand All @@ -30,3 +34,24 @@ updates:
- "m1l4n54v1c"
- "saratry"
- "smcvb"
# Patch and security updates for patch branches
- package-ecosystem: maven
directory: "/"
schedule:
interval: daily
ignore:
- dependency-name: "*"
update-types: [ "version-update:semver-major", "version-update:semver-minor" ]
labels:
- "Type: Dependency Upgrade"
- "Priority 1: Must"
- "Status: In Progress"
- "Target: 4.5.4"
milestone: 65
open-pull-requests-limit: 5
reviewers:
- "lfgcampos"
- "m1l4n54v1c"
- "saratry"
- "smcvb"
target-branch: "axon-4.5.x"
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2020. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,7 @@
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.Registration;
import org.axonframework.common.StringUtils;
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.lifecycle.Phase;
import org.axonframework.lifecycle.ShutdownLatch;
Expand All @@ -54,6 +55,7 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

import static org.axonframework.common.BuilderUtils.assertNonEmpty;
import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.common.ObjectUtils.getOrDefault;

Expand All @@ -76,7 +78,7 @@ public class AxonServerCommandBus implements CommandBus, Distributed<CommandBus>
private final RoutingStrategy routingStrategy;
private final CommandPriorityCalculator priorityCalculator;
private final CommandLoadFactorProvider loadFactorProvider;

private final String context;
private final DispatchInterceptors<CommandMessage<?>> dispatchInterceptors;
private final TargetContextResolver<? super CommandMessage<?>> targetContextResolver;
private final CommandCallback<Object, Object> defaultCommandCallback;
Expand Down Expand Up @@ -114,7 +116,11 @@ public AxonServerCommandBus(Builder builder) {
this.priorityCalculator = builder.priorityCalculator;
this.defaultCommandCallback = builder.defaultCommandCallback;
this.loadFactorProvider = builder.loadFactorProvider;
String context = configuration.getContext();


String context = StringUtils.nonEmptyOrNull(builder.defaultContext) ? builder.defaultContext : configuration.getContext();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operation actually makes me wonder, what if the getContext method returns null or empty?
We don't have any validation against this right now.

Not so much a concern for this PR I'm afraid, but it would be good if the Axon Server buses contain a BuilderUtils#assertNonEmpty invocation as part of the validate() step.
Maybe a nice addition in another small PR? ;-)


this.context = context;
this.targetContextResolver = builder.targetContextResolver.orElse(m -> context);

this.executorService = builder.executorServiceBuilder.apply(
Expand Down Expand Up @@ -191,7 +197,7 @@ public Registration subscribe(String commandName, MessageHandler<? super Command
+ "Expect similar logging on the local segment.", commandName);
Registration localRegistration = localSegment.subscribe(commandName, messageHandler);
io.axoniq.axonserver.connector.Registration serverRegistration =
axonServerConnectionManager.getConnection()
axonServerConnectionManager.getConnection(context)
schananas marked this conversation as resolved.
Show resolved Hide resolved
.commandChannel()
.registerCommandHandler(
c -> {
Expand Down Expand Up @@ -231,8 +237,8 @@ public Registration registerDispatchInterceptor(
* handlers. This shutdown operation is performed in the {@link Phase#INBOUND_COMMAND_CONNECTOR} phase.
*/
public CompletableFuture<Void> disconnect() {
if (axonServerConnectionManager.isConnected(axonServerConnectionManager.getDefaultContext())) {
return axonServerConnectionManager.getConnection().commandChannel().prepareDisconnect();
if (axonServerConnectionManager.isConnected(context)) {
return axonServerConnectionManager.getConnection(context).commandChannel().prepareDisconnect();
}
return CompletableFuture.completedFuture(null);
}
Expand Down Expand Up @@ -306,11 +312,12 @@ public static class Builder {
private RoutingStrategy routingStrategy;
private CommandPriorityCalculator priorityCalculator =
CommandPriorityCalculator.defaultCommandPriorityCalculator();
private TargetContextResolver<? super CommandMessage<?>> targetContextResolver =
c -> configuration.getContext();
private ExecutorServiceBuilder executorServiceBuilder =
ExecutorServiceBuilder.defaultCommandExecutorServiceBuilder();
private CommandLoadFactorProvider loadFactorProvider = command -> CommandLoadFactorProvider.DEFAULT_VALUE;
private String defaultContext;
private TargetContextResolver<? super CommandMessage<?>> targetContextResolver =
c -> StringUtils.nonEmptyOrNull(defaultContext) ? defaultContext : configuration.getContext();

/**
* Sets the {@link AxonServerConnectionManager} used to create connections between this application and an Axon
Expand Down Expand Up @@ -458,6 +465,18 @@ public Builder loadFactorProvider(CommandLoadFactorProvider loadFactorProvider)
return this;
}

/**
* Sets the default context for this command bus to connect to.
*
* @param defaultContext for this bus to connect to.
* @return the current Builder instance, for fluent interfacing
*/
public Builder defaultContext(String defaultContext) {
assertNonEmpty(defaultContext, "The context may not be null or empty");
this.defaultContext = defaultContext;
return this;
}

/**
* Initializes a {@link AxonServerCommandBus} as specified through this Builder.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2021. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
import org.axonframework.axonserver.connector.util.GrpcMetaDataConverter;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.StringUtils;
import org.axonframework.common.jdbc.PersistenceExceptionResolver;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.DomainEventData;
Expand Down Expand Up @@ -75,6 +76,7 @@
import java.util.stream.StreamSupport;

import static java.util.Spliterator.*;
import static org.axonframework.common.BuilderUtils.assertNonEmpty;
import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.common.ObjectUtils.getOrDefault;

Expand Down Expand Up @@ -177,6 +179,7 @@ public static class Builder extends AbstractEventStore.Builder {
private Supplier<Serializer> eventSerializer;
private EventUpcaster upcasterChain = NoOpEventUpcaster.INSTANCE;
private SnapshotFilter snapshotFilter;
private String defaultContext;

@Override
public Builder storageEngine(EventStorageEngine storageEngine) {
Expand Down Expand Up @@ -259,6 +262,18 @@ public Builder eventSerializer(Serializer eventSerializer) {
return this;
}

/**
* Sets the default context for this event store to connect to.
*
* @param defaultContext for this event store to connect to.
* @return the current Builder instance, for fluent interfacing
*/
public Builder defaultContext(String defaultContext) {
assertNonEmpty(defaultContext, "The default context may not be null");
this.defaultContext = defaultContext;
return this;
}

/**
* Sets the {@link Predicate} used to filter snapshots when returning aggregate events. When not set all
* snapshots are used.
Expand Down Expand Up @@ -353,6 +368,7 @@ private void buildStorageEngine() {
.snapshotFilter(snapshotFilter)
.eventSerializer(eventSerializer.get())
.configuration(configuration)
.defaultContext(defaultContext)
.eventStoreClient(axonServerConnectionManager)
.converter(new GrpcMetaDataConverter(eventSerializer.get()))
.build());
Expand Down Expand Up @@ -391,7 +407,7 @@ private static Builder builder() {
}

private AxonIQEventStorageEngine(Builder builder) {
this(builder, builder.configuration.getContext());
this(builder, StringUtils.nonEmptyOrNull(builder.defaultContext) ? builder.defaultContext : builder.configuration.getContext());
}

private AxonIQEventStorageEngine(Builder builder, String context) {
Expand Down Expand Up @@ -709,6 +725,7 @@ private static class Builder extends AbstractEventStorageEngine.Builder {
private AxonServerConfiguration configuration;
private AxonServerConnectionManager connectionManager;
private GrpcMetaDataConverter converter;
private String defaultContext;

@Override
public Builder snapshotSerializer(Serializer snapshotSerializer) {
Expand Down Expand Up @@ -770,6 +787,12 @@ private Builder configuration(AxonServerConfiguration configuration) {
return this;
}


private Builder defaultContext(String defaultContext) {
this.defaultContext = defaultContext;
return this;
}

private Builder eventStoreClient(AxonServerConnectionManager eventStoreClient) {
assertNonNull(eventStoreClient, "AxonServerEventStoreClient may not be null");
this.connectionManager = eventStoreClient;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2020. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,13 +37,10 @@
*/
public class EventProcessorInfoConfiguration implements ModuleConfiguration {

private Configuration config;

private final Component<EventProcessingConfiguration> eventProcessingConfiguration;
private final Component<AxonServerConnectionManager> connectionManager;
private final Component<AxonServerConfiguration> axonServerConfiguration;
private final Component<EventProcessorControlService> eventProcessorControlService;

private Configuration config;
schananas marked this conversation as resolved.
Show resolved Hide resolved
schananas marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create an default EventProcessorInfoConfiguration, which uses the {@link Configuration} as a means to retrieve
* the {@link EventProcessingConfiguration}, {@link AxonServerConnectionManager} and {@link
Expand All @@ -55,41 +52,48 @@ public EventProcessorInfoConfiguration() {
c -> c.getComponent(AxonServerConfiguration.class));
}


/**
* Creates an EventProcessorInfoConfiguration using the provided functions to retrieve the {@link
* EventProcessingConfiguration}, {@link AxonServerConnectionManager} and {@link AxonServerConfiguration}.
*
* @param eventProcessingConfiguration a Function taking in the {@link Configuration} and providing a {@link
* EventProcessingConfiguration}
* @param connectionManager a Function taking in the {@link Configuration} and providing a {@link
* AxonServerConnectionManager}
* @param axonServerConfiguration a Function taking in the {@link Configuration} and providing a {@link
* AxonServerConfiguration}
* @param eventProcessingConfigurationBuilder a Function taking in the {@link Configuration} and providing a {@link
* EventProcessingConfiguration}
* @param connectionManagerBuilder a Function taking in the {@link Configuration} and providing a {@link
* AxonServerConnectionManager}
* @param axonServerConfigurationBuilder a Function taking in the {@link Configuration} and providing a {@link
* AxonServerConfiguration}
*/
public EventProcessorInfoConfiguration(
Function<Configuration, EventProcessingConfiguration> eventProcessingConfiguration,
Function<Configuration, AxonServerConnectionManager> connectionManager,
Function<Configuration, AxonServerConfiguration> axonServerConfiguration) {
this.eventProcessingConfiguration = new Component<>(
() -> config, "eventProcessingConfiguration", eventProcessingConfiguration
);
this.connectionManager = new Component<>(() -> config, "connectionManager", connectionManager);
this.axonServerConfiguration = new Component<>(() -> config, "connectionManager", axonServerConfiguration);
Function<Configuration, EventProcessingConfiguration> eventProcessingConfigurationBuilder,
Function<Configuration, AxonServerConnectionManager> connectionManagerBuilder,
Function<Configuration, AxonServerConfiguration> axonServerConfigurationBuilder
) {
this(c -> new EventProcessorControlService(
connectionManagerBuilder.apply(c),
eventProcessingConfigurationBuilder.apply(c),
axonServerConfigurationBuilder.apply(c)
));
}

/**
* Create a default EventProcessorInfoConfiguration, which uses the {@link EventProcessorControlService}
*
* @param eventProcessorControlService a Function taking in the {@link Configuration} and providing a {@link
* EventProcessorControlService}
*/
public EventProcessorInfoConfiguration(
Function<Configuration, EventProcessorControlService> eventProcessorControlService
) {
this.eventProcessorControlService = new Component<>(
() -> config, "eventProcessorControlService",
c -> new EventProcessorControlService(
this.connectionManager.get(),
this.eventProcessingConfiguration.get(),
this.axonServerConfiguration.get()
)
() -> config, "eventProcessorControlService", eventProcessorControlService
);
}

@Override
public void initialize(Configuration config) {
this.config = config;
// if there are no event handlers registered, there is may be no EventProcessingConfiguration at all.
// if there are no event handlers registered, there may be no EventProcessingConfiguration at all.
if (config.eventProcessingConfiguration() != null) {
this.config.onStart(Phase.INBOUND_EVENT_CONNECTORS, eventProcessorControlService::get);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public class EventProcessorControlService implements Lifecycle {
private static final String SUBSCRIBING_EVENT_PROCESSOR_MODE = "Subscribing";
private static final String UNKNOWN_EVENT_PROCESSOR_MODE = "Unknown";

private final AxonServerConnectionManager axonServerConnectionManager;
private final EventProcessingConfiguration eventProcessingConfiguration;
private final String context;
protected final AxonServerConnectionManager axonServerConnectionManager;
protected final EventProcessingConfiguration eventProcessingConfiguration;
protected final String context;

/**
* Initialize a {@link EventProcessorControlService} which adds {@link java.util.function.Consumer}s to the given
Expand Down Expand Up @@ -141,7 +141,7 @@ private EventProcessorInfo unknownProcessorTypeInfo(EventProcessor eventProcesso
}


private static class AxonProcessorInstructionHandler implements ProcessorInstructionHandler {
protected static class AxonProcessorInstructionHandler implements ProcessorInstructionHandler {

private final EventProcessor processor;
private final String name;
Expand Down