Skip to content

Commit

Permalink
Replace TimerTask with ScheduledExecutorService (#878)
Browse files Browse the repository at this point in the history
Replace TimerTask with ScheduledExecutorService
  • Loading branch information
marci4 committed Apr 23, 2019
2 parents 8a74a87 + a911973 commit 73c6805
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 32 deletions.
59 changes: 31 additions & 28 deletions src/main/java/org/java_websocket/AbstractWebSocket.java
Expand Up @@ -26,13 +26,16 @@
package org.java_websocket;

import org.java_websocket.framing.CloseFrame;
import org.java_websocket.util.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;


/**
Expand Down Expand Up @@ -60,21 +63,21 @@ public abstract class AbstractWebSocket extends WebSocketAdapter {
private boolean reuseAddr;

/**
* Attribute for a timer allowing to check for lost connections
* @since 1.3.4
* Attribute for a service that triggers lost connection checking
* @since 1.4.1
*/
private Timer connectionLostTimer;
private ScheduledExecutorService connectionLostCheckerService;
/**
* Attribute for a timertask allowing to check for lost connections
* @since 1.3.4
* Attribute for a task that checks for lost connections
* @since 1.4.1
*/
private TimerTask connectionLostTimerTask;
private ScheduledFuture connectionLostCheckerFuture;

/**
* Attribute for the lost connection check interval
* Attribute for the lost connection check interval in nanoseconds
* @since 1.3.4
*/
private int connectionLostTimeout = 60;
private long connectionLostTimeout = TimeUnit.SECONDS.toNanos(60);

/**
* Attribute to keep track if the WebSocket Server/Client is running/connected
Expand All @@ -89,12 +92,12 @@ public abstract class AbstractWebSocket extends WebSocketAdapter {
/**
* Get the interval checking for lost connections
* Default is 60 seconds
* @return the interval
* @return the interval in seconds
* @since 1.3.4
*/
public int getConnectionLostTimeout() {
synchronized (syncConnectionLost) {
return connectionLostTimeout;
return (int) TimeUnit.NANOSECONDS.toSeconds(connectionLostTimeout);
}
}

Expand All @@ -107,7 +110,7 @@ public int getConnectionLostTimeout() {
*/
public void setConnectionLostTimeout( int connectionLostTimeout ) {
synchronized (syncConnectionLost) {
this.connectionLostTimeout = connectionLostTimeout;
this.connectionLostTimeout = TimeUnit.SECONDS.toNanos(connectionLostTimeout);
if (this.connectionLostTimeout <= 0) {
log.trace("Connection lost timer stopped");
cancelConnectionLostTimer();
Expand Down Expand Up @@ -139,7 +142,7 @@ public void setConnectionLostTimeout( int connectionLostTimeout ) {
*/
protected void stopConnectionLostTimer() {
synchronized (syncConnectionLost) {
if (connectionLostTimer != null || connectionLostTimerTask != null) {
if (connectionLostCheckerService != null || connectionLostCheckerFuture != null) {
this.websocketRunning = false;
log.trace("Connection lost timer stopped");
cancelConnectionLostTimer();
Expand Down Expand Up @@ -168,8 +171,8 @@ protected void startConnectionLostTimer() {
*/
private void restartConnectionLostTimer() {
cancelConnectionLostTimer();
connectionLostTimer = new Timer("WebSocketTimer");
connectionLostTimerTask = new TimerTask() {
connectionLostCheckerService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("connectionLostChecker"));
Runnable connectionLostChecker = new Runnable() {

/**
* Keep the connections in a separate list to not cause deadlocks
Expand All @@ -180,31 +183,31 @@ public void run() {
connections.clear();
try {
connections.addAll( getConnections() );
long current = ( System.currentTimeMillis() - ( connectionLostTimeout * 1500 ) );
long minimumPongTime = (long) (System.nanoTime() - ( connectionLostTimeout * 1.5 ));
for( WebSocket conn : connections ) {
executeConnectionLostDetection(conn, current);
executeConnectionLostDetection(conn, minimumPongTime);
}
} catch ( Exception e ) {
//Ignore this exception
}
connections.clear();
}
};
connectionLostTimer.scheduleAtFixedRate( connectionLostTimerTask,1000L*connectionLostTimeout , 1000L*connectionLostTimeout );

connectionLostCheckerFuture = connectionLostCheckerService.scheduleAtFixedRate(connectionLostChecker, connectionLostTimeout, connectionLostTimeout, TimeUnit.NANOSECONDS);
}

/**
* Send a ping to the endpoint or close the connection since the other endpoint did not respond with a ping
* @param webSocket the websocket instance
* @param current the current time in milliseconds
* @param minimumPongTime the lowest/oldest allowable last pong time (in nanoTime) before we consider the connection to be lost
*/
private void executeConnectionLostDetection(WebSocket webSocket, long current) {
private void executeConnectionLostDetection(WebSocket webSocket, long minimumPongTime) {
if (!(webSocket instanceof WebSocketImpl)) {
return;
}
WebSocketImpl webSocketImpl = (WebSocketImpl) webSocket;
if( webSocketImpl.getLastPong() < current ) {
if( webSocketImpl.getLastPong() < minimumPongTime ) {
log.trace("Closing connection due to no pong received: {}", webSocketImpl);
webSocketImpl.closeConnection( CloseFrame.ABNORMAL_CLOSE, "The connection was closed because the other endpoint did not respond with a pong in time. For more information check: https://github.com/TooTallNate/Java-WebSocket/wiki/Lost-connection-detection" );
} else {
Expand All @@ -228,13 +231,13 @@ private void executeConnectionLostDetection(WebSocket webSocket, long current) {
* @since 1.3.4
*/
private void cancelConnectionLostTimer() {
if( connectionLostTimer != null ) {
connectionLostTimer.cancel();
connectionLostTimer = null;
if( connectionLostCheckerService != null ) {
connectionLostCheckerService.shutdownNow();
connectionLostCheckerService = null;
}
if( connectionLostTimerTask != null ) {
connectionLostTimerTask.cancel();
connectionLostTimerTask = null;
if( connectionLostCheckerFuture != null ) {
connectionLostCheckerFuture.cancel(false);
connectionLostCheckerFuture = null;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/java_websocket/WebSocketImpl.java
Expand Up @@ -151,7 +151,7 @@ public class WebSocketImpl implements WebSocket {
/**
* Attribute, when the last pong was recieved
*/
private long lastPong = System.currentTimeMillis();
private long lastPong = System.nanoTime();

/**
* Attribut to synchronize the write
Expand Down Expand Up @@ -802,7 +802,7 @@ long getLastPong() {
* Update the timestamp when the last pong was received
*/
public void updateLastPong() {
this.lastPong = System.currentTimeMillis();
this.lastPong = System.nanoTime();
}

/**
Expand Down
47 changes: 47 additions & 0 deletions src/main/java/org/java_websocket/util/NamedThreadFactory.java
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2010-2019 Nathan Rajlich
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following
* conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/

package org.java_websocket.util;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class NamedThreadFactory implements ThreadFactory {
private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String threadPrefix;

public NamedThreadFactory(String threadPrefix) {
this.threadPrefix = threadPrefix;
}

@Override
public Thread newThread(Runnable runnable) {
Thread thread = defaultThreadFactory.newThread(runnable);
thread.setName(threadPrefix + "-" + threadNumber);
return thread;
}
}
4 changes: 2 additions & 2 deletions src/test/java/org/java_websocket/issues/Issue666Test.java
Expand Up @@ -80,7 +80,7 @@ public void onStart() {
}
for( Thread thread : mapAfter.values() ) {
String name = thread.getName();
if( !name.startsWith( "WebSocketSelector-" ) && !name.startsWith( "WebSocketWorker-" ) && !name.equals( "WebSocketTimer" ) ) {
if( !name.startsWith( "WebSocketSelector-" ) && !name.startsWith( "WebSocketWorker-" ) && !name.startsWith( "connectionLostChecker-" ) ) {
Assert.fail( "Thread not correctly named! Is: " + name );
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ public void onStart() {
}
for( Thread thread : mapAfter.values() ) {
String name = thread.getName();
if( !name.equals( "WebSocketTimer" ) && !name.startsWith( "WebSocketWriteThread-" ) && !name.startsWith( "WebSocketConnectReadThread-" )) {
if( !name.startsWith( "connectionLostChecker-" ) && !name.startsWith( "WebSocketWriteThread-" ) && !name.startsWith( "WebSocketConnectReadThread-" )) {
Assert.fail( "Thread not correctly named! Is: " + name );
}
}
Expand Down

0 comments on commit 73c6805

Please sign in to comment.