diff --git a/bucket4j-hazelcast-all/bucket4j-hazelcast/src/main/java/io/github/bucket4j/grid/hazelcast/HazelcastOffloadableEntryProcessor.java b/bucket4j-hazelcast-all/bucket4j-hazelcast/src/main/java/io/github/bucket4j/grid/hazelcast/HazelcastOffloadableEntryProcessor.java new file mode 100644 index 00000000..5de16196 --- /dev/null +++ b/bucket4j-hazelcast-all/bucket4j-hazelcast/src/main/java/io/github/bucket4j/grid/hazelcast/HazelcastOffloadableEntryProcessor.java @@ -0,0 +1,46 @@ +/*- + * ========================LICENSE_START================================= + * Bucket4j + * %% + * Copyright (C) 2015 - 2020 Vladimir Bukhtoyarov + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =========================LICENSE_END================================== + */ +package io.github.bucket4j.grid.hazelcast; + +import com.hazelcast.core.Offloadable; +import io.github.bucket4j.distributed.remote.Request; + +public class HazelcastOffloadableEntryProcessor extends HazelcastEntryProcessor implements Offloadable { + + private static final long serialVersionUID = 1L; + + private final String executorName; + + public HazelcastOffloadableEntryProcessor(Request request, String executorName) { + super(request); + this.executorName = executorName; + } + + public HazelcastOffloadableEntryProcessor(byte[] requestBytes, String executorName) { + super(requestBytes); + this.executorName = executorName; + } + + @Override + public String getExecutorName() { + return executorName; + } + +} diff --git a/bucket4j-hazelcast-all/bucket4j-hazelcast/src/main/java/io/github/bucket4j/grid/hazelcast/HazelcastProxyManager.java b/bucket4j-hazelcast-all/bucket4j-hazelcast/src/main/java/io/github/bucket4j/grid/hazelcast/HazelcastProxyManager.java index 2720a727..953e7f7a 100644 --- a/bucket4j-hazelcast-all/bucket4j-hazelcast/src/main/java/io/github/bucket4j/grid/hazelcast/HazelcastProxyManager.java +++ b/bucket4j-hazelcast-all/bucket4j-hazelcast/src/main/java/io/github/bucket4j/grid/hazelcast/HazelcastProxyManager.java @@ -45,6 +45,7 @@ import io.github.bucket4j.distributed.remote.Request; import io.github.bucket4j.distributed.versioning.Version; import io.github.bucket4j.grid.hazelcast.serialization.HazelcastEntryProcessorSerializer; +import io.github.bucket4j.grid.hazelcast.serialization.HazelcastOffloadableEntryProcessorSerializer; import io.github.bucket4j.grid.hazelcast.serialization.SimpleBackupProcessorSerializer; import io.github.bucket4j.distributed.serialization.InternalSerializationHelper; @@ -60,6 +61,7 @@ public class HazelcastProxyManager extends AbstractProxyManager { private final IMap map; + private final String offloadableExecutorName; public HazelcastProxyManager(IMap map) { this(map, ClientSideConfig.getDefault()); @@ -68,11 +70,20 @@ public HazelcastProxyManager(IMap map) { public HazelcastProxyManager(IMap map, ClientSideConfig clientSideConfig) { super(clientSideConfig); this.map = Objects.requireNonNull(map); + this.offloadableExecutorName = null; + } + + public HazelcastProxyManager(IMap map, ClientSideConfig clientSideConfig, String offlodableExecutorName) { + super(clientSideConfig); + this.map = Objects.requireNonNull(map); + this.offloadableExecutorName = Objects.requireNonNull(offlodableExecutorName); } @Override public CommandResult execute(K key, Request request) { - HazelcastEntryProcessor entryProcessor = new HazelcastEntryProcessor<>(request); + HazelcastEntryProcessor entryProcessor = offloadableExecutorName == null? + new HazelcastEntryProcessor<>(request) : + new HazelcastOffloadableEntryProcessor<>(request, offloadableExecutorName); byte[] response = map.executeOnKey(key, entryProcessor); Version backwardCompatibilityVersion = request.getBackwardCompatibilityVersion(); return deserializeResult(response, backwardCompatibilityVersion); @@ -85,7 +96,9 @@ public boolean isAsyncModeSupported() { @Override public CompletableFuture> executeAsync(K key, Request request) { - HazelcastEntryProcessor entryProcessor = new HazelcastEntryProcessor<>(request); + HazelcastEntryProcessor entryProcessor = offloadableExecutorName == null? + new HazelcastEntryProcessor<>(request) : + new HazelcastOffloadableEntryProcessor<>(request, offloadableExecutorName); CompletionStage future = map.submitToKey(key, entryProcessor); Version backwardCompatibilityVersion = request.getBackwardCompatibilityVersion(); return (CompletableFuture) future.thenApply((byte[] bytes) -> InternalSerializationHelper.deserializeResult(bytes, backwardCompatibilityVersion)); @@ -133,6 +146,13 @@ public static void addCustomSerializers(SerializationConfig serializationConfig, .setImplementation(new SimpleBackupProcessorSerializer(typeIdBase + 1)) .setTypeClass(SimpleBackupProcessor.class) ); + + serializationConfig.addSerializerConfig( + new SerializerConfig() + .setImplementation(new HazelcastOffloadableEntryProcessorSerializer(typeIdBase + 2)) + .setTypeClass(HazelcastOffloadableEntryProcessor.class) + ); + } } diff --git a/bucket4j-hazelcast-all/bucket4j-hazelcast/src/main/java/io/github/bucket4j/grid/hazelcast/serialization/HazelcastOffloadableEntryProcessorSerializer.java b/bucket4j-hazelcast-all/bucket4j-hazelcast/src/main/java/io/github/bucket4j/grid/hazelcast/serialization/HazelcastOffloadableEntryProcessorSerializer.java new file mode 100644 index 00000000..cf096fba --- /dev/null +++ b/bucket4j-hazelcast-all/bucket4j-hazelcast/src/main/java/io/github/bucket4j/grid/hazelcast/serialization/HazelcastOffloadableEntryProcessorSerializer.java @@ -0,0 +1,76 @@ +/*- + * ========================LICENSE_START================================= + * Bucket4j + * %% + * Copyright (C) 2015 - 2020 Vladimir Bukhtoyarov + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * =========================LICENSE_END================================== + */ +package io.github.bucket4j.grid.hazelcast.serialization; + +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; +import com.hazelcast.nio.serialization.StreamSerializer; +import com.hazelcast.nio.serialization.TypedStreamDeserializer; +import io.github.bucket4j.grid.hazelcast.HazelcastEntryProcessor; +import io.github.bucket4j.grid.hazelcast.HazelcastOffloadableEntryProcessor; + +import java.io.IOException; + + +public class HazelcastOffloadableEntryProcessorSerializer implements StreamSerializer, TypedStreamDeserializer { + + private final int typeId; + + public HazelcastOffloadableEntryProcessorSerializer(int typeId) { + this.typeId = typeId; + } + + public Class getSerializableType() { + return HazelcastEntryProcessor.class; + } + + @Override + public int getTypeId() { + return typeId; + } + + @Override + public void destroy() { + + } + + @Override + public void write(ObjectDataOutput out, HazelcastOffloadableEntryProcessor serializable) throws IOException { + out.writeByteArray(serializable.getRequestBytes()); + out.writeString(serializable.getExecutorName()); + } + + @Override + public HazelcastOffloadableEntryProcessor read(ObjectDataInput in) throws IOException { + return read0(in); + } + + @Override + public HazelcastOffloadableEntryProcessor read(ObjectDataInput in, Class aClass) throws IOException { + return read0(in); + } + + private HazelcastOffloadableEntryProcessor read0(ObjectDataInput in) throws IOException { + byte[] commandBytes = in.readByteArray(); + String executorName = in.readString(); + return new HazelcastOffloadableEntryProcessor(commandBytes, executorName); + } + +} diff --git a/bucket4j-hazelcast-all/bucket4j-hazelcast/src/test/java/io/github/bucket4j/hazelcast/HazelcastWithCustomSerializersAndOffloadableExecutorTest.java b/bucket4j-hazelcast-all/bucket4j-hazelcast/src/test/java/io/github/bucket4j/hazelcast/HazelcastWithCustomSerializersAndOffloadableExecutorTest.java new file mode 100644 index 00000000..d7ce9577 --- /dev/null +++ b/bucket4j-hazelcast-all/bucket4j-hazelcast/src/test/java/io/github/bucket4j/hazelcast/HazelcastWithCustomSerializersAndOffloadableExecutorTest.java @@ -0,0 +1,81 @@ +package io.github.bucket4j.hazelcast; + +import com.hazelcast.config.Config; +import com.hazelcast.config.JoinConfig; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.map.IMap; +import io.github.bucket4j.distributed.proxy.ClientSideConfig; +import io.github.bucket4j.distributed.proxy.ProxyManager; +import io.github.bucket4j.grid.hazelcast.HazelcastProxyManager; +import io.github.bucket4j.tck.AbstractDistributedBucketTest; +import org.gridkit.nanocloud.Cloud; +import org.gridkit.nanocloud.CloudFactory; +import org.gridkit.nanocloud.VX; +import org.gridkit.vicluster.ViNode; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.Serializable; +import java.util.UUID; + +public class HazelcastWithCustomSerializersAndOffloadableExecutorTest extends AbstractDistributedBucketTest { + + private static IMap map; + private static Cloud cloud; + private static ViNode server; + + private static HazelcastInstance hazelcastInstance; + + @BeforeClass + public static void setup() { + // start separated JVM on current host + cloud = CloudFactory.createCloud(); + cloud.node("**").x(VX.TYPE).setLocal(); + server = cloud.node("stateful-hazelcast-server"); + + server.exec((Runnable & Serializable) () -> { + Config config = new Config(); + HazelcastProxyManager.addCustomSerializers(config.getSerializationConfig(), 10_000); + JoinConfig joinConfig = config.getNetworkConfig().getJoin(); + joinConfig.getMulticastConfig().setEnabled(false); + joinConfig.getTcpIpConfig().setEnabled(true); + joinConfig.getTcpIpConfig().addMember("127.0.0.1:5702"); + config.setLiteMember(false); + HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config); + hazelcastInstance.getMap("my_buckets"); + }); + + // start hazelcast client which works inside current JVM and does not hold data + Config config = new Config(); + JoinConfig joinConfig = config.getNetworkConfig().getJoin(); + joinConfig.getMulticastConfig().setEnabled(false); + joinConfig.getTcpIpConfig().setEnabled(true); + joinConfig.getTcpIpConfig().addMember("127.0.0.1:5701"); + HazelcastProxyManager.addCustomSerializers(config.getSerializationConfig(), 10_000); + config.setLiteMember(true); + hazelcastInstance = Hazelcast.newHazelcastInstance(config); + map = hazelcastInstance.getMap("my_buckets"); + } + + @AfterClass + public static void shutdown() { + if (hazelcastInstance != null) { + hazelcastInstance.shutdown(); + } + if (cloud != null) { + cloud.shutdown(); + } + } + + @Override + protected ProxyManager getProxyManager() { + return new HazelcastProxyManager<>(map, ClientSideConfig.getDefault(), "my-executor"); + } + + @Override + protected String generateRandomKey() { + return UUID.randomUUID().toString(); + } + +}