Skip to content

Commit

Permalink
#251: ability to specify offloadable executor for Hazelcast
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimir-bukhtoyarov committed May 2, 2022
1 parent 41cc370 commit a226652
Show file tree
Hide file tree
Showing 4 changed files with 225 additions and 2 deletions.
@@ -0,0 +1,46 @@
/*-
* ========================LICENSE_START=================================
* Bucket4j
* %%
* Copyright (C) 2015 - 2020 Vladimir Bukhtoyarov
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =========================LICENSE_END==================================
*/
package io.github.bucket4j.grid.hazelcast;

import com.hazelcast.core.Offloadable;
import io.github.bucket4j.distributed.remote.Request;

public class HazelcastOffloadableEntryProcessor<K, T> extends HazelcastEntryProcessor<K, T> implements Offloadable {

private static final long serialVersionUID = 1L;

private final String executorName;

public HazelcastOffloadableEntryProcessor(Request<T> request, String executorName) {
super(request);
this.executorName = executorName;
}

public HazelcastOffloadableEntryProcessor(byte[] requestBytes, String executorName) {
super(requestBytes);
this.executorName = executorName;
}

@Override
public String getExecutorName() {
return executorName;
}

}
Expand Up @@ -45,6 +45,7 @@
import io.github.bucket4j.distributed.remote.Request;
import io.github.bucket4j.distributed.versioning.Version;
import io.github.bucket4j.grid.hazelcast.serialization.HazelcastEntryProcessorSerializer;
import io.github.bucket4j.grid.hazelcast.serialization.HazelcastOffloadableEntryProcessorSerializer;
import io.github.bucket4j.grid.hazelcast.serialization.SimpleBackupProcessorSerializer;
import io.github.bucket4j.distributed.serialization.InternalSerializationHelper;

Expand All @@ -60,6 +61,7 @@
public class HazelcastProxyManager<K> extends AbstractProxyManager<K> {

private final IMap<K, byte[]> map;
private final String offloadableExecutorName;

public HazelcastProxyManager(IMap<K, byte[]> map) {
this(map, ClientSideConfig.getDefault());
Expand All @@ -68,11 +70,20 @@ public HazelcastProxyManager(IMap<K, byte[]> map) {
public HazelcastProxyManager(IMap<K, byte[]> map, ClientSideConfig clientSideConfig) {
super(clientSideConfig);
this.map = Objects.requireNonNull(map);
this.offloadableExecutorName = null;
}

public HazelcastProxyManager(IMap<K, byte[]> map, ClientSideConfig clientSideConfig, String offlodableExecutorName) {
super(clientSideConfig);
this.map = Objects.requireNonNull(map);
this.offloadableExecutorName = Objects.requireNonNull(offlodableExecutorName);
}

@Override
public <T> CommandResult<T> execute(K key, Request<T> request) {
HazelcastEntryProcessor<K, T> entryProcessor = new HazelcastEntryProcessor<>(request);
HazelcastEntryProcessor<K, T> entryProcessor = offloadableExecutorName == null?
new HazelcastEntryProcessor<>(request) :
new HazelcastOffloadableEntryProcessor<>(request, offloadableExecutorName);
byte[] response = map.executeOnKey(key, entryProcessor);
Version backwardCompatibilityVersion = request.getBackwardCompatibilityVersion();
return deserializeResult(response, backwardCompatibilityVersion);
Expand All @@ -85,7 +96,9 @@ public boolean isAsyncModeSupported() {

@Override
public <T> CompletableFuture<CommandResult<T>> executeAsync(K key, Request<T> request) {
HazelcastEntryProcessor<K, T> entryProcessor = new HazelcastEntryProcessor<>(request);
HazelcastEntryProcessor<K, T> entryProcessor = offloadableExecutorName == null?
new HazelcastEntryProcessor<>(request) :
new HazelcastOffloadableEntryProcessor<>(request, offloadableExecutorName);
CompletionStage<byte[]> future = map.submitToKey(key, entryProcessor);
Version backwardCompatibilityVersion = request.getBackwardCompatibilityVersion();
return (CompletableFuture) future.thenApply((byte[] bytes) -> InternalSerializationHelper.deserializeResult(bytes, backwardCompatibilityVersion));
Expand Down Expand Up @@ -133,6 +146,13 @@ public static void addCustomSerializers(SerializationConfig serializationConfig,
.setImplementation(new SimpleBackupProcessorSerializer(typeIdBase + 1))
.setTypeClass(SimpleBackupProcessor.class)
);

serializationConfig.addSerializerConfig(
new SerializerConfig()
.setImplementation(new HazelcastOffloadableEntryProcessorSerializer(typeIdBase + 2))
.setTypeClass(HazelcastOffloadableEntryProcessor.class)
);

}

}
@@ -0,0 +1,76 @@
/*-
* ========================LICENSE_START=================================
* Bucket4j
* %%
* Copyright (C) 2015 - 2020 Vladimir Bukhtoyarov
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =========================LICENSE_END==================================
*/
package io.github.bucket4j.grid.hazelcast.serialization;

import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.StreamSerializer;
import com.hazelcast.nio.serialization.TypedStreamDeserializer;
import io.github.bucket4j.grid.hazelcast.HazelcastEntryProcessor;
import io.github.bucket4j.grid.hazelcast.HazelcastOffloadableEntryProcessor;

import java.io.IOException;


public class HazelcastOffloadableEntryProcessorSerializer implements StreamSerializer<HazelcastOffloadableEntryProcessor>, TypedStreamDeserializer<HazelcastOffloadableEntryProcessor> {

private final int typeId;

public HazelcastOffloadableEntryProcessorSerializer(int typeId) {
this.typeId = typeId;
}

public Class<HazelcastEntryProcessor> getSerializableType() {
return HazelcastEntryProcessor.class;
}

@Override
public int getTypeId() {
return typeId;
}

@Override
public void destroy() {

}

@Override
public void write(ObjectDataOutput out, HazelcastOffloadableEntryProcessor serializable) throws IOException {
out.writeByteArray(serializable.getRequestBytes());
out.writeString(serializable.getExecutorName());
}

@Override
public HazelcastOffloadableEntryProcessor read(ObjectDataInput in) throws IOException {
return read0(in);
}

@Override
public HazelcastOffloadableEntryProcessor read(ObjectDataInput in, Class aClass) throws IOException {
return read0(in);
}

private HazelcastOffloadableEntryProcessor read0(ObjectDataInput in) throws IOException {
byte[] commandBytes = in.readByteArray();
String executorName = in.readString();
return new HazelcastOffloadableEntryProcessor(commandBytes, executorName);
}

}
@@ -0,0 +1,81 @@
package io.github.bucket4j.hazelcast;

import com.hazelcast.config.Config;
import com.hazelcast.config.JoinConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import io.github.bucket4j.distributed.proxy.ClientSideConfig;
import io.github.bucket4j.distributed.proxy.ProxyManager;
import io.github.bucket4j.grid.hazelcast.HazelcastProxyManager;
import io.github.bucket4j.tck.AbstractDistributedBucketTest;
import org.gridkit.nanocloud.Cloud;
import org.gridkit.nanocloud.CloudFactory;
import org.gridkit.nanocloud.VX;
import org.gridkit.vicluster.ViNode;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.Serializable;
import java.util.UUID;

public class HazelcastWithCustomSerializersAndOffloadableExecutorTest extends AbstractDistributedBucketTest<String> {

private static IMap<String, byte[]> map;
private static Cloud cloud;
private static ViNode server;

private static HazelcastInstance hazelcastInstance;

@BeforeClass
public static void setup() {
// start separated JVM on current host
cloud = CloudFactory.createCloud();
cloud.node("**").x(VX.TYPE).setLocal();
server = cloud.node("stateful-hazelcast-server");

server.exec((Runnable & Serializable) () -> {
Config config = new Config();
HazelcastProxyManager.addCustomSerializers(config.getSerializationConfig(), 10_000);
JoinConfig joinConfig = config.getNetworkConfig().getJoin();
joinConfig.getMulticastConfig().setEnabled(false);
joinConfig.getTcpIpConfig().setEnabled(true);
joinConfig.getTcpIpConfig().addMember("127.0.0.1:5702");
config.setLiteMember(false);
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance(config);
hazelcastInstance.getMap("my_buckets");
});

// start hazelcast client which works inside current JVM and does not hold data
Config config = new Config();
JoinConfig joinConfig = config.getNetworkConfig().getJoin();
joinConfig.getMulticastConfig().setEnabled(false);
joinConfig.getTcpIpConfig().setEnabled(true);
joinConfig.getTcpIpConfig().addMember("127.0.0.1:5701");
HazelcastProxyManager.addCustomSerializers(config.getSerializationConfig(), 10_000);
config.setLiteMember(true);
hazelcastInstance = Hazelcast.newHazelcastInstance(config);
map = hazelcastInstance.getMap("my_buckets");
}

@AfterClass
public static void shutdown() {
if (hazelcastInstance != null) {
hazelcastInstance.shutdown();
}
if (cloud != null) {
cloud.shutdown();
}
}

@Override
protected ProxyManager<String> getProxyManager() {
return new HazelcastProxyManager<>(map, ClientSideConfig.getDefault(), "my-executor");
}

@Override
protected String generateRandomKey() {
return UUID.randomUUID().toString();
}

}

0 comments on commit a226652

Please sign in to comment.