From 0c373363bf1fcb5e115da81b2fc922e80447f272 Mon Sep 17 00:00:00 2001 From: Marcel Prestel Date: Tue, 11 Jun 2019 20:45:16 +0200 Subject: [PATCH 1/3] Synchronize access during broadcast Fix ConcurrentModificationException found during the investigation of #879 --- .../server/WebSocketServer.java | 18 +- .../java_websocket/issues/Issue879Test.java | 170 ++++++++++++++++++ 2 files changed, 180 insertions(+), 8 deletions(-) create mode 100644 src/test/java/org/java_websocket/issues/Issue879Test.java diff --git a/src/main/java/org/java_websocket/server/WebSocketServer.java b/src/main/java/org/java_websocket/server/WebSocketServer.java index 19331e96..b61c2975 100644 --- a/src/main/java/org/java_websocket/server/WebSocketServer.java +++ b/src/main/java/org/java_websocket/server/WebSocketServer.java @@ -924,14 +924,16 @@ private void doBroadcast(Object data, Collection clients) { return; } Map> draftFrames = new HashMap>(); - for( WebSocket client : clients ) { - if( client != null ) { - Draft draft = client.getDraft(); - fillFrames(draft, draftFrames, sData, bData); - try { - client.sendFrame( draftFrames.get( draft ) ); - } catch ( WebsocketNotConnectedException e ) { - //Ignore this exception in this case + synchronized (clients) { + for (WebSocket client : clients) { + if (client != null) { + Draft draft = client.getDraft(); + fillFrames(draft, draftFrames, sData, bData); + try { + client.sendFrame(draftFrames.get(draft)); + } catch (WebsocketNotConnectedException e) { + //Ignore this exception in this case + } } } } diff --git a/src/test/java/org/java_websocket/issues/Issue879Test.java b/src/test/java/org/java_websocket/issues/Issue879Test.java new file mode 100644 index 00000000..e95b3dcd --- /dev/null +++ b/src/test/java/org/java_websocket/issues/Issue879Test.java @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2010-2019 Nathan Rajlich + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + * + */ + +package org.java_websocket.issues; + +import org.java_websocket.WebSocket; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ClientHandshake; +import org.java_websocket.handshake.ServerHandshake; +import org.java_websocket.server.WebSocketServer; +import org.java_websocket.util.SocketUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.ConcurrentModificationException; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertFalse; + +@RunWith(Parameterized.class) +public class Issue879Test { + + private static final int NUMBER_OF_TESTS = 40; + + @Parameterized.Parameter + public int numberOfConnections; + + + @Test(timeout= 10000) + public void QuickStopTest() throws IOException, InterruptedException, URISyntaxException { + final boolean[] wasBindException = {false}; + final boolean[] wasConcurrentException = new boolean[1]; + final CountDownLatch countDownLatch = new CountDownLatch(1); + + class SimpleServer extends WebSocketServer { + public SimpleServer(InetSocketAddress address) { + super(address); + } + + @Override + public void onOpen(WebSocket conn, ClientHandshake handshake) { + broadcast("new connection: " + handshake.getResourceDescriptor()); //This method sends a message to all clients connected + } + + @Override + public void onClose(WebSocket conn, int code, String reason, boolean remote) { + } + + @Override + public void onMessage(WebSocket conn, String message) { + } + + @Override + public void onMessage(WebSocket conn, ByteBuffer message) { + } + + @Override + public void onError(WebSocket conn, Exception ex) { + if (ex instanceof BindException) { + wasBindException[0] = true; + } + if (ex instanceof ConcurrentModificationException) { + wasConcurrentException[0] = true; + } + } + + @Override + public void onStart() { + countDownLatch.countDown(); + } + } + int port = SocketUtil.getAvailablePort(); + SimpleServer serverA = new SimpleServer(new InetSocketAddress( port)); + SimpleServer serverB = new SimpleServer(new InetSocketAddress( port)); + serverA.start(); + countDownLatch.await(); + List clients = startNewConnections(numberOfConnections, port); + Thread.sleep(100); + int numberOfConnected = 0; + for (WebSocketClient client : clients) { + if (client.isOpen()) + numberOfConnected++; + } + // Number will differ since we use connect instead of connectBlocking + // System.out.println(numberOfConnected + " " + numberOfConnections); + + serverA.stop(); + serverB.start(); + clients.clear(); + assertFalse("There was a BindException", wasBindException[0]); + assertFalse("There was a ConcurrentModificationException", wasConcurrentException[0]); + } + + @Parameterized.Parameters + public static Collection data() { + List ret = new ArrayList(NUMBER_OF_TESTS); + for (int i = 0; i < NUMBER_OF_TESTS; i++) ret.add(new Integer[]{25+ i*25}); + return ret; + } + + private List startNewConnections(int numberOfConnections, int port) throws URISyntaxException, InterruptedException { + List clients = new ArrayList(numberOfConnections); + for (int i = 0; i < numberOfConnections; i++) { + WebSocketClient client = new SimpleClient(new URI("ws://localhost:" + port)); + client.connect(); + Thread.sleep(1); + clients.add(client); + } + return clients; + } + class SimpleClient extends WebSocketClient { + + public SimpleClient(URI serverUri) { + super(serverUri); + } + + @Override + public void onOpen(ServerHandshake handshakedata) { + + } + + @Override + public void onMessage(String message) { + + } + + @Override + public void onClose(int code, String reason, boolean remote) { + + } + + @Override + public void onError(Exception ex) { + + } + } +} From 23a105543ee83fee7d3ccaa952d42ddf6518b759 Mon Sep 17 00:00:00 2001 From: Marcel Prestel Date: Tue, 11 Jun 2019 20:47:14 +0200 Subject: [PATCH 2/3] Update Issue879Test.java Reduce the number of tests --- src/test/java/org/java_websocket/issues/Issue879Test.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/java_websocket/issues/Issue879Test.java b/src/test/java/org/java_websocket/issues/Issue879Test.java index e95b3dcd..bd30cf99 100644 --- a/src/test/java/org/java_websocket/issues/Issue879Test.java +++ b/src/test/java/org/java_websocket/issues/Issue879Test.java @@ -53,7 +53,7 @@ @RunWith(Parameterized.class) public class Issue879Test { - private static final int NUMBER_OF_TESTS = 40; + private static final int NUMBER_OF_TESTS = 20; @Parameterized.Parameter public int numberOfConnections; From d634714ddc6656dd33db340cd35039f98236b26e Mon Sep 17 00:00:00 2001 From: Marcel Prestel Date: Wed, 12 Jun 2019 21:58:00 +0200 Subject: [PATCH 3/3] Rework after review Copy the clients into a local list --- .../server/WebSocketServer.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/java_websocket/server/WebSocketServer.java b/src/main/java/org/java_websocket/server/WebSocketServer.java index b61c2975..ceba5da6 100644 --- a/src/main/java/org/java_websocket/server/WebSocketServer.java +++ b/src/main/java/org/java_websocket/server/WebSocketServer.java @@ -924,16 +924,18 @@ private void doBroadcast(Object data, Collection clients) { return; } Map> draftFrames = new HashMap>(); + List clientCopy; synchronized (clients) { - for (WebSocket client : clients) { - if (client != null) { - Draft draft = client.getDraft(); - fillFrames(draft, draftFrames, sData, bData); - try { - client.sendFrame(draftFrames.get(draft)); - } catch (WebsocketNotConnectedException e) { - //Ignore this exception in this case - } + clientCopy = new ArrayList(clients); + } + for (WebSocket client : clientCopy) { + if (client != null) { + Draft draft = client.getDraft(); + fillFrames(draft, draftFrames, sData, bData); + try { + client.sendFrame(draftFrames.get(draft)); + } catch (WebsocketNotConnectedException e) { + //Ignore this exception in this case } } }