Skip to content

Commit

Permalink
SerializationException propagated to client when response unserializable
Browse files Browse the repository at this point in the history
When an unserializableResponse is trying to be send to client as response
from executor service tasks, the exception was logged on the server side,
and there was no response returned back to the client.

This pr, removes the logging in such case and sends related expcetion,
(HazelcastSerializationException) to the client.

Similar behaviour is also tested for the core side and verified it
already works as expected.

fixes #13639

(cherry picked from commit 8fbdc58)
  • Loading branch information
sancar committed Sep 17, 2018
1 parent 9b4e978 commit 8db66a5
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 2 deletions.
Expand Up @@ -28,6 +28,7 @@
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.DataSerializable;
import com.hazelcast.nio.serialization.HazelcastSerializationException;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelTest;
Expand All @@ -40,6 +41,7 @@
import org.junit.runner.RunWith;

import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
Expand All @@ -48,7 +50,9 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static com.hazelcast.test.HazelcastTestSupport.assertOpenEventually;
import static com.hazelcast.test.HazelcastTestSupport.assertTrueEventually;
import static com.hazelcast.test.HazelcastTestSupport.randomString;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -249,8 +253,10 @@ public void testCallableSerializedOnce_submitToAddress() throws ExecutionExcepti
assertEquals(2, future.get());
}


static class SerializedCounterCallable implements Callable, DataSerializable {


int counter;

public SerializedCounterCallable() {
Expand All @@ -271,4 +277,46 @@ public void readData(ObjectDataInput in) throws IOException {
counter = in.readInt() + 1;
}
}

@Test(expected = HazelcastSerializationException.class)
public void testUnserializableResponse_exceptionPropagatesToClient() throws Throwable {
IExecutorService service = client.getExecutorService("executor");
TaskWithUnserialazableResponse counterCallable = new TaskWithUnserialazableResponse();
Future future = service.submit(counterCallable);
try {
future.get();
} catch (ExecutionException e) {
throw e.getCause();
}
}

@Test(expected = HazelcastSerializationException.class)
public void testUnserializableResponse_exceptionPropagatesToClientCallback() throws Throwable {
IExecutorService service = client.getExecutorService("executor");
TaskWithUnserialazableResponse counterCallable = new TaskWithUnserialazableResponse();
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
service.submit(counterCallable, new ExecutionCallback() {
@Override
public void onResponse(Object response) {

}

@Override
public void onFailure(Throwable t) {
countDownLatch.countDown();
throwable.set(t.getCause());
}
});
assertOpenEventually(countDownLatch);
throw throwable.get();
}

private static class TaskWithUnserialazableResponse implements Callable, Serializable {
@Override
public Object call() throws Exception {
return new Object();
}
}

}
Expand Up @@ -185,8 +185,12 @@ private void checkPermissions(ClientEndpoint endpoint) {
protected abstract void processMessage() throws Throwable;

protected void sendResponse(Object response) {
ClientMessage clientMessage = encodeResponse(response);
sendClientMessage(clientMessage);
try {
ClientMessage clientMessage = encodeResponse(response);
sendClientMessage(clientMessage);
} catch (Exception e) {
handleProcessingFailure(e);
}
}

protected void sendClientMessage(ClientMessage resultClientMessage) {
Expand Down
Expand Up @@ -67,6 +67,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static com.hazelcast.config.ExecutorConfig.DEFAULT_POOL_SIZE;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -1097,4 +1098,55 @@ public Object getPartitionKey() {
return "key";
}
}


@Test(expected = HazelcastSerializationException.class)
public void testUnserializableResponse_exceptionPropagatesToCaller() throws Throwable {
TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(2);
HazelcastInstance instance1 = factory.newHazelcastInstance();
HazelcastInstance instance2 = factory.newHazelcastInstance();

IExecutorService service = instance1.getExecutorService("executor");
TaskWithUnserialazableResponse counterCallable = new TaskWithUnserialazableResponse();
Future future = service.submitToMember(counterCallable, instance2.getCluster().getLocalMember());
try {
future.get();
} catch (ExecutionException e) {
throw e.getCause();
}
}

@Test(expected = HazelcastSerializationException.class)
public void testUnserializableResponse_exceptionPropagatesToCallerCallback() throws Throwable {
TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(2);
HazelcastInstance instance1 = factory.newHazelcastInstance();
HazelcastInstance instance2 = factory.newHazelcastInstance();


IExecutorService service = instance1.getExecutorService("executor");
TaskWithUnserialazableResponse counterCallable = new TaskWithUnserialazableResponse();
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicReference<Throwable> throwable = new AtomicReference<Throwable>();
service.submitToMember(counterCallable, instance2.getCluster().getLocalMember(), new ExecutionCallback() {
@Override
public void onResponse(Object response) {

}

@Override
public void onFailure(Throwable t) {
countDownLatch.countDown();
throwable.set(t);
}
});
assertOpenEventually(countDownLatch);
throw throwable.get();
}

private static class TaskWithUnserialazableResponse implements Callable, Serializable {
@Override
public Object call() throws Exception {
return new Object();
}
}
}

0 comments on commit 8db66a5

Please sign in to comment.