From 53cebdbcc21f3cb058776c3a7b207fdb469676ff Mon Sep 17 00:00:00 2001 From: sancar Date: Fri, 7 Sep 2018 16:34:08 +0300 Subject: [PATCH] SerializationException propagated to client when response unserializable When an unserializableResponse is trying to be send to client as response from executor service tasks, the exception was logged on the server side, and there was no response returned back to the client. This pr, removes the logging in such case and sends related expcetion, (HazelcastSerializationException) to the client. Similar behaviour is also tested for the core side and verified it already works as expected. fixes https://github.com/hazelcast/hazelcast/issues/13639 (cherry picked from commit 8fbdc581f3b18568f9cc3769ce8b3074b9bdff20) --- .../executor/ClientExecutorServiceTest.java | 48 +++++++++++++++++ .../protocol/task/AbstractMessageTask.java | 8 ++- .../executor/ExecutorServiceTest.java | 52 +++++++++++++++++++ 3 files changed, 106 insertions(+), 2 deletions(-) diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/executor/ClientExecutorServiceTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/executor/ClientExecutorServiceTest.java index 72a815617d11..396a2800c059 100644 --- a/hazelcast-client/src/test/java/com/hazelcast/client/executor/ClientExecutorServiceTest.java +++ b/hazelcast-client/src/test/java/com/hazelcast/client/executor/ClientExecutorServiceTest.java @@ -28,6 +28,7 @@ import com.hazelcast.nio.ObjectDataInput; import com.hazelcast.nio.ObjectDataOutput; import com.hazelcast.nio.serialization.DataSerializable; +import com.hazelcast.nio.serialization.HazelcastSerializationException; import com.hazelcast.test.AssertTask; import com.hazelcast.test.HazelcastParallelClassRunner; import com.hazelcast.test.annotation.ParallelTest; @@ -40,6 +41,7 @@ import org.junit.runner.RunWith; import java.io.IOException; +import java.io.Serializable; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; @@ -48,7 +50,9 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import static com.hazelcast.test.HazelcastTestSupport.assertOpenEventually; import static com.hazelcast.test.HazelcastTestSupport.assertTrueEventually; import static com.hazelcast.test.HazelcastTestSupport.randomString; import static org.junit.Assert.assertEquals; @@ -249,8 +253,10 @@ public void testCallableSerializedOnce_submitToAddress() throws ExecutionExcepti assertEquals(2, future.get()); } + static class SerializedCounterCallable implements Callable, DataSerializable { + int counter; public SerializedCounterCallable() { @@ -271,4 +277,46 @@ public void readData(ObjectDataInput in) throws IOException { counter = in.readInt() + 1; } } + + @Test(expected = HazelcastSerializationException.class) + public void testUnserializableResponse_exceptionPropagatesToClient() throws Throwable { + IExecutorService service = client.getExecutorService("executor"); + TaskWithUnserialazableResponse counterCallable = new TaskWithUnserialazableResponse(); + Future future = service.submit(counterCallable); + try { + future.get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test(expected = HazelcastSerializationException.class) + public void testUnserializableResponse_exceptionPropagatesToClientCallback() throws Throwable { + IExecutorService service = client.getExecutorService("executor"); + TaskWithUnserialazableResponse counterCallable = new TaskWithUnserialazableResponse(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicReference throwable = new AtomicReference(); + service.submit(counterCallable, new ExecutionCallback() { + @Override + public void onResponse(Object response) { + + } + + @Override + public void onFailure(Throwable t) { + throwable.set(t.getCause()); + countDownLatch.countDown(); + } + }); + assertOpenEventually(countDownLatch); + throw throwable.get(); + } + + private static class TaskWithUnserialazableResponse implements Callable, Serializable { + @Override + public Object call() throws Exception { + return new Object(); + } + } + } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/AbstractMessageTask.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/AbstractMessageTask.java index 0c72fba39957..0b56cab442ba 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/AbstractMessageTask.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/AbstractMessageTask.java @@ -185,8 +185,12 @@ private void checkPermissions(ClientEndpoint endpoint) { protected abstract void processMessage() throws Throwable; protected void sendResponse(Object response) { - ClientMessage clientMessage = encodeResponse(response); - sendClientMessage(clientMessage); + try { + ClientMessage clientMessage = encodeResponse(response); + sendClientMessage(clientMessage); + } catch (Exception e) { + handleProcessingFailure(e); + } } protected void sendClientMessage(ClientMessage resultClientMessage) { diff --git a/hazelcast/src/test/java/com/hazelcast/executor/ExecutorServiceTest.java b/hazelcast/src/test/java/com/hazelcast/executor/ExecutorServiceTest.java index 478efbff2081..8e115f8a3615 100644 --- a/hazelcast/src/test/java/com/hazelcast/executor/ExecutorServiceTest.java +++ b/hazelcast/src/test/java/com/hazelcast/executor/ExecutorServiceTest.java @@ -67,6 +67,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static com.hazelcast.config.ExecutorConfig.DEFAULT_POOL_SIZE; import static org.junit.Assert.assertEquals; @@ -1097,4 +1098,55 @@ public Object getPartitionKey() { return "key"; } } + + + @Test(expected = HazelcastSerializationException.class) + public void testUnserializableResponse_exceptionPropagatesToCaller() throws Throwable { + TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(2); + HazelcastInstance instance1 = factory.newHazelcastInstance(); + HazelcastInstance instance2 = factory.newHazelcastInstance(); + + IExecutorService service = instance1.getExecutorService("executor"); + TaskWithUnserialazableResponse counterCallable = new TaskWithUnserialazableResponse(); + Future future = service.submitToMember(counterCallable, instance2.getCluster().getLocalMember()); + try { + future.get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test(expected = HazelcastSerializationException.class) + public void testUnserializableResponse_exceptionPropagatesToCallerCallback() throws Throwable { + TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(2); + HazelcastInstance instance1 = factory.newHazelcastInstance(); + HazelcastInstance instance2 = factory.newHazelcastInstance(); + + + IExecutorService service = instance1.getExecutorService("executor"); + TaskWithUnserialazableResponse counterCallable = new TaskWithUnserialazableResponse(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicReference throwable = new AtomicReference(); + service.submitToMember(counterCallable, instance2.getCluster().getLocalMember(), new ExecutionCallback() { + @Override + public void onResponse(Object response) { + + } + + @Override + public void onFailure(Throwable t) { + throwable.set(t); + countDownLatch.countDown(); + } + }); + assertOpenEventually(countDownLatch); + throw throwable.get(); + } + + private static class TaskWithUnserialazableResponse implements Callable, Serializable { + @Override + public Object call() throws Exception { + return new Object(); + } + } }