diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/executor/ClientExecutorServiceTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/executor/ClientExecutorServiceTest.java index 72a815617d11..396a2800c059 100644 --- a/hazelcast-client/src/test/java/com/hazelcast/client/executor/ClientExecutorServiceTest.java +++ b/hazelcast-client/src/test/java/com/hazelcast/client/executor/ClientExecutorServiceTest.java @@ -28,6 +28,7 @@ import com.hazelcast.nio.ObjectDataInput; import com.hazelcast.nio.ObjectDataOutput; import com.hazelcast.nio.serialization.DataSerializable; +import com.hazelcast.nio.serialization.HazelcastSerializationException; import com.hazelcast.test.AssertTask; import com.hazelcast.test.HazelcastParallelClassRunner; import com.hazelcast.test.annotation.ParallelTest; @@ -40,6 +41,7 @@ import org.junit.runner.RunWith; import java.io.IOException; +import java.io.Serializable; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; @@ -48,7 +50,9 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import static com.hazelcast.test.HazelcastTestSupport.assertOpenEventually; import static com.hazelcast.test.HazelcastTestSupport.assertTrueEventually; import static com.hazelcast.test.HazelcastTestSupport.randomString; import static org.junit.Assert.assertEquals; @@ -249,8 +253,10 @@ public void testCallableSerializedOnce_submitToAddress() throws ExecutionExcepti assertEquals(2, future.get()); } + static class SerializedCounterCallable implements Callable, DataSerializable { + int counter; public SerializedCounterCallable() { @@ -271,4 +277,46 @@ public void readData(ObjectDataInput in) throws IOException { counter = in.readInt() + 1; } } + + @Test(expected = HazelcastSerializationException.class) + public void testUnserializableResponse_exceptionPropagatesToClient() throws Throwable { + IExecutorService service = client.getExecutorService("executor"); + TaskWithUnserialazableResponse counterCallable = new TaskWithUnserialazableResponse(); + Future future = service.submit(counterCallable); + try { + future.get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test(expected = HazelcastSerializationException.class) + public void testUnserializableResponse_exceptionPropagatesToClientCallback() throws Throwable { + IExecutorService service = client.getExecutorService("executor"); + TaskWithUnserialazableResponse counterCallable = new TaskWithUnserialazableResponse(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicReference throwable = new AtomicReference(); + service.submit(counterCallable, new ExecutionCallback() { + @Override + public void onResponse(Object response) { + + } + + @Override + public void onFailure(Throwable t) { + throwable.set(t.getCause()); + countDownLatch.countDown(); + } + }); + assertOpenEventually(countDownLatch); + throw throwable.get(); + } + + private static class TaskWithUnserialazableResponse implements Callable, Serializable { + @Override + public Object call() throws Exception { + return new Object(); + } + } + } diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/AbstractMessageTask.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/AbstractMessageTask.java index 0c72fba39957..0b56cab442ba 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/AbstractMessageTask.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/AbstractMessageTask.java @@ -185,8 +185,12 @@ private void checkPermissions(ClientEndpoint endpoint) { protected abstract void processMessage() throws Throwable; protected void sendResponse(Object response) { - ClientMessage clientMessage = encodeResponse(response); - sendClientMessage(clientMessage); + try { + ClientMessage clientMessage = encodeResponse(response); + sendClientMessage(clientMessage); + } catch (Exception e) { + handleProcessingFailure(e); + } } protected void sendClientMessage(ClientMessage resultClientMessage) { diff --git a/hazelcast/src/test/java/com/hazelcast/executor/ExecutorServiceTest.java b/hazelcast/src/test/java/com/hazelcast/executor/ExecutorServiceTest.java index 478efbff2081..8e115f8a3615 100644 --- a/hazelcast/src/test/java/com/hazelcast/executor/ExecutorServiceTest.java +++ b/hazelcast/src/test/java/com/hazelcast/executor/ExecutorServiceTest.java @@ -67,6 +67,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static com.hazelcast.config.ExecutorConfig.DEFAULT_POOL_SIZE; import static org.junit.Assert.assertEquals; @@ -1097,4 +1098,55 @@ public Object getPartitionKey() { return "key"; } } + + + @Test(expected = HazelcastSerializationException.class) + public void testUnserializableResponse_exceptionPropagatesToCaller() throws Throwable { + TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(2); + HazelcastInstance instance1 = factory.newHazelcastInstance(); + HazelcastInstance instance2 = factory.newHazelcastInstance(); + + IExecutorService service = instance1.getExecutorService("executor"); + TaskWithUnserialazableResponse counterCallable = new TaskWithUnserialazableResponse(); + Future future = service.submitToMember(counterCallable, instance2.getCluster().getLocalMember()); + try { + future.get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + } + + @Test(expected = HazelcastSerializationException.class) + public void testUnserializableResponse_exceptionPropagatesToCallerCallback() throws Throwable { + TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(2); + HazelcastInstance instance1 = factory.newHazelcastInstance(); + HazelcastInstance instance2 = factory.newHazelcastInstance(); + + + IExecutorService service = instance1.getExecutorService("executor"); + TaskWithUnserialazableResponse counterCallable = new TaskWithUnserialazableResponse(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + final AtomicReference throwable = new AtomicReference(); + service.submitToMember(counterCallable, instance2.getCluster().getLocalMember(), new ExecutionCallback() { + @Override + public void onResponse(Object response) { + + } + + @Override + public void onFailure(Throwable t) { + throwable.set(t); + countDownLatch.countDown(); + } + }); + assertOpenEventually(countDownLatch); + throw throwable.get(); + } + + private static class TaskWithUnserialazableResponse implements Callable, Serializable { + @Override + public Object call() throws Exception { + return new Object(); + } + } }