diff --git a/modules/admin/admin-event/build.gradle b/modules/admin/admin-event/build.gradle index 57c04c2fe1e..434e60e6f2a 100644 --- a/modules/admin/admin-event/build.gradle +++ b/modules/admin/admin-event/build.gradle @@ -2,6 +2,9 @@ apply from: "$rootDir/gradle/osgi.gradle" dependencies { compile project( ':web:web-api' ) + implementation project( ':core:core-internal' ) + + testImplementation( testFixtures( project(":web:web-jetty") ) ) } jar { diff --git a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventEndpoint.java b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventEndpoint.java new file mode 100644 index 00000000000..0af04dc769a --- /dev/null +++ b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventEndpoint.java @@ -0,0 +1,96 @@ +package com.enonic.xp.admin.event.impl; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.websocket.CloseReason; +import javax.websocket.Endpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.Session; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class EventEndpoint + extends Endpoint +{ + private static final Logger LOG = LoggerFactory.getLogger( EventEndpoint.class ); + + private static final int INFLIGHT_MAX = 100_000; + + private final WebsocketManager webSocketManager; + + private final AtomicInteger inflightCounter = new AtomicInteger( 0 ); + + private volatile Session session; + + public EventEndpoint( final WebsocketManager webSocketManager ) + { + this.webSocketManager = webSocketManager; + } + + @Override + public void onOpen( final Session session, final EndpointConfig config ) + { + this.session = session; + this.webSocketManager.registerSocket( this ); + LOG.debug( "Opened websocket {}", session.getId() ); + } + + @Override + public void onClose( final Session session, final CloseReason closeReason ) + { + unregister(); + LOG.debug( "Closed websocket {}", session.getId() ); + } + + @Override + public void onError( final Session session, final Throwable error ) + { + unregister(); + LOG.warn( "Errored websocket {}", session.getId(), error ); + } + + public void sendMessage( final String message ) + { + final Session session = this.session; + if ( isSessionOpen( session ) ) + { + final int inflight = inflightCounter.getAndIncrement(); + + if ( inflight < INFLIGHT_MAX ) + { + session.getAsyncRemote().sendText( message, result -> inflightCounter.decrementAndGet() ); + } + else if ( inflight == INFLIGHT_MAX ) + { + unregister(); + LOG.warn( "Websocket client is too slow. Closing websocket {}", session.getId() ); + try + { + session.close( new CloseReason( CloseReason.CloseCodes.TRY_AGAIN_LATER, "Websocket client is too slow" ) ); + } + catch ( IOException e ) + { + LOG.error( "Failed to close slow websocket", e ); + } + } + } + } + + public boolean isOpen() + { + return isSessionOpen( this.session ); + } + + private void unregister() + { + this.webSocketManager.unregisterSocket( this ); + this.session = null; + } + + private static boolean isSessionOpen( final Session session ) + { + return session != null && session.isOpen(); + } +} diff --git a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventEndpointFactory.java b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventEndpointFactory.java new file mode 100644 index 00000000000..583f6c803d7 --- /dev/null +++ b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventEndpointFactory.java @@ -0,0 +1,38 @@ +package com.enonic.xp.admin.event.impl; + +import java.util.List; + +import javax.websocket.Endpoint; + +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; + +import com.enonic.xp.web.websocket.EndpointFactory; + +@Component +public class EventEndpointFactory + implements EndpointFactory +{ + private static final List SUB_PROTOCOLS = List.of( "text" ); + + private final WebsocketManager webSocketManager; + + @Activate + public EventEndpointFactory( @Reference final WebsocketManager webSocketManager ) + { + this.webSocketManager = webSocketManager; + } + + @Override + public Endpoint newEndpoint() + { + return new EventEndpoint( webSocketManager ); + } + + @Override + public List getSubProtocols() + { + return SUB_PROTOCOLS; + } +} diff --git a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventHandler.java b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventHandler.java index 33d6e95ea06..3eb8a2d8647 100644 --- a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventHandler.java +++ b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventHandler.java @@ -1,9 +1,6 @@ package com.enonic.xp.admin.event.impl; import java.io.IOException; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; import javax.servlet.Servlet; import javax.servlet.ServletException; @@ -11,32 +8,32 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import javax.websocket.Endpoint; +import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Reference; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.enonic.xp.annotation.Order; import com.enonic.xp.security.RoleKeys; import com.enonic.xp.web.websocket.EndpointFactory; import com.enonic.xp.web.websocket.WebSocketService; -@Component(immediate = true, service = {Servlet.class, WebSocketManager.class}, property = {"connector=xp"}) -@Order( -100 ) +@Component(immediate = true, service = {Servlet.class}, property = {"connector=xp"}) +@Order(-100) @WebServlet("/admin/event") public final class EventHandler extends HttpServlet - implements WebSocketManager, EndpointFactory { - private final static Logger LOG = LoggerFactory.getLogger( EventHandler.class ); + private final WebSocketService webSocketService; - private static final String PROTOCOL = "text"; + private final EndpointFactory endpointFactory; - private final Set sockets = new CopyOnWriteArraySet<>(); - - private WebSocketService webSocketService; + @Activate + public EventHandler( @Reference final WebSocketService webSocketService, @Reference final EndpointFactory endpointFactory ) + { + this.webSocketService = webSocketService; + this.endpointFactory = endpointFactory; + } @Override protected void doGet( final HttpServletRequest req, final HttpServletResponse res ) @@ -54,52 +51,6 @@ protected void doGet( final HttpServletRequest req, final HttpServletResponse re return; } - this.webSocketService.acceptWebSocket( req, res, this ); - } - - @Override - public Endpoint newEndpoint() - { - return new EventWebSocket( this ); - } - - @Override - public List getSubProtocols() - { - return List.of( PROTOCOL ); - } - - @Override - public void registerSocket( final EventWebSocket webSocket ) - { - this.sockets.add( webSocket ); - } - - @Override - public void unregisterSocket( final EventWebSocket webSocket ) - { - this.sockets.remove( webSocket ); - } - - @Override - public void sendToAll( final String message ) - { - for ( final EventWebSocket socket : this.sockets ) - { - try - { - socket.sendMessage( message ); - } - catch ( IOException e ) - { - LOG.warn( "Failed to send message via web socket", e ); - } - } - } - - @Reference - public void setWebSocketService( final WebSocketService webSocketService ) - { - this.webSocketService = webSocketService; + this.webSocketService.acceptWebSocket( req, res, endpointFactory ); } } diff --git a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventListenerImpl.java b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventListenerImpl.java deleted file mode 100644 index c9514721c1f..00000000000 --- a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventListenerImpl.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.enonic.xp.admin.event.impl; - -import org.osgi.service.component.annotations.Component; -import org.osgi.service.component.annotations.Reference; - -import com.fasterxml.jackson.databind.node.ObjectNode; - -import com.enonic.xp.admin.event.impl.json.EventJsonSerializer; -import com.enonic.xp.event.Event; -import com.enonic.xp.event.EventListener; - -@Component(immediate = true) -public final class EventListenerImpl - implements EventListener -{ - private WebSocketManager webSocketManager; - - private final EventJsonSerializer serializer; - - public EventListenerImpl() - { - this.serializer = new EventJsonSerializer(); - } - - @Override - public void onEvent( final Event event ) - { - final ObjectNode json = this.serializer.toJson( event ); - if ( json == null ) - { - return; - } - - this.webSocketManager.sendToAll( json.toString() ); - } - - @Reference - public void setWebSocketManager( final WebSocketManager webSocketManager ) - { - this.webSocketManager = webSocketManager; - } -} diff --git a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventWebSocket.java b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventWebSocket.java deleted file mode 100644 index 04fb381e79c..00000000000 --- a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/EventWebSocket.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.enonic.xp.admin.event.impl; - -import java.io.IOException; - -import javax.websocket.CloseReason; -import javax.websocket.Endpoint; -import javax.websocket.EndpointConfig; -import javax.websocket.Session; - -public final class EventWebSocket - extends Endpoint -{ - private Session session; - - private final WebSocketManager webSocketManager; - - public EventWebSocket( final WebSocketManager webSocketManager ) - { - this.webSocketManager = webSocketManager; - } - - @Override - public void onOpen( final Session session, final EndpointConfig config ) - { - this.session = session; - this.webSocketManager.registerSocket( this ); - } - - @Override - public void onClose( final Session session, final CloseReason closeReason ) - { - this.webSocketManager.unregisterSocket( this ); - this.session = null; - } - - @Override - public void onError( final Session session, final Throwable error ) - { - this.webSocketManager.unregisterSocket( this ); - this.session = null; - } - - public boolean isOpen() - { - return ( this.session != null ) && this.session.isOpen(); - } - - public void sendMessage( final String message ) - throws IOException - { - if ( isOpen() ) - { - this.session.getBasicRemote().sendText( message ); - } - } -} diff --git a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebSocketManager.java b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebSocketManager.java deleted file mode 100644 index c24502181c9..00000000000 --- a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebSocketManager.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.enonic.xp.admin.event.impl; - -public interface WebSocketManager -{ - void registerSocket( EventWebSocket webSocket ); - - void unregisterSocket( EventWebSocket webSocket ); - - void sendToAll( String message ); -} diff --git a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketEventExecutor.java b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketEventExecutor.java new file mode 100644 index 00000000000..0314e9e11f1 --- /dev/null +++ b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketEventExecutor.java @@ -0,0 +1,8 @@ +package com.enonic.xp.admin.event.impl; + +import java.util.concurrent.Executor; + +public interface WebsocketEventExecutor + extends Executor +{ +} diff --git a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketEventExecutorImpl.java b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketEventExecutorImpl.java new file mode 100644 index 00000000000..36006606ce5 --- /dev/null +++ b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketEventExecutorImpl.java @@ -0,0 +1,51 @@ +package com.enonic.xp.admin.event.impl; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.enonic.xp.core.internal.concurrent.SimpleExecutor; + +@Component +public class WebsocketEventExecutorImpl + implements WebsocketEventExecutor +{ + private static final Logger LOG = LoggerFactory.getLogger( WebsocketEventExecutorImpl.class ); + + private final SimpleExecutor simpleExecutor; + + @Activate + public WebsocketEventExecutorImpl() + { + final Function executorServiceSupplier = + ( threadFactory ) -> new ThreadPoolExecutor( 0, Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>( 200 ), threadFactory, + new ThreadPoolExecutor.CallerRunsPolicy() ); + + this.simpleExecutor = + new SimpleExecutor( executorServiceSupplier, "websocket-event-thread-%d", e -> LOG.error( "Websocket event failed", e ) ); + } + + @Deactivate + public void deactivate() + { + simpleExecutor.shutdownAndAwaitTermination( Duration.ofSeconds( 5 ), + neverCommenced -> LOG.warn( "Not all websocket events were delivered" ) ); + } + + @Override + public void execute( final Runnable command ) + { + simpleExecutor.execute( command ); + } +} diff --git a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketEventListener.java b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketEventListener.java new file mode 100644 index 00000000000..874087fd35f --- /dev/null +++ b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketEventListener.java @@ -0,0 +1,30 @@ +package com.enonic.xp.admin.event.impl; + +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; + +import com.enonic.xp.admin.event.impl.json.EventJsonSerializer; +import com.enonic.xp.event.Event; +import com.enonic.xp.event.EventListener; + +@Component(immediate = true) +public final class WebsocketEventListener + implements EventListener +{ + private final EventJsonSerializer serializer = new EventJsonSerializer(); + + private final WebsocketManager websocketManager; + + @Activate + public WebsocketEventListener( @Reference final WebsocketManager websocketManager ) + { + this.websocketManager = websocketManager; + } + + @Override + public void onEvent( final Event event ) + { + this.websocketManager.sendToAll( serializer.toJson( event ) ); + } +} diff --git a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketManager.java b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketManager.java new file mode 100644 index 00000000000..9a7c9e97952 --- /dev/null +++ b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketManager.java @@ -0,0 +1,10 @@ +package com.enonic.xp.admin.event.impl; + +public interface WebsocketManager +{ + void registerSocket( EventEndpoint eventEndpoint ); + + void unregisterSocket( EventEndpoint eventEndpoint ); + + void sendToAll( String message ); +} diff --git a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketManagerImpl.java b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketManagerImpl.java new file mode 100644 index 00000000000..70ef66409dc --- /dev/null +++ b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/WebsocketManagerImpl.java @@ -0,0 +1,41 @@ +package com.enonic.xp.admin.event.impl; + +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executor; + +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; + +@Component +public class WebsocketManagerImpl + implements WebsocketManager +{ + private final CopyOnWriteArraySet sockets = new CopyOnWriteArraySet<>(); + + private final Executor executor; + + @Activate + public WebsocketManagerImpl( @Reference(service = WebsocketEventExecutor.class) final Executor websocketEventExecutor ) + { + this.executor = websocketEventExecutor; + } + + @Override + public void registerSocket( final EventEndpoint eventEndpoint ) + { + this.sockets.add( eventEndpoint ); + } + + @Override + public void unregisterSocket( final EventEndpoint eventEndpoint ) + { + this.sockets.remove( eventEndpoint ); + } + + @Override + public void sendToAll( final String message ) + { + sockets.forEach( s -> executor.execute( () -> s.sendMessage( message ) ) ); + } +} diff --git a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/json/EventJsonSerializer.java b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/json/EventJsonSerializer.java index 5118d2ab7b4..3eb12d749a7 100644 --- a/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/json/EventJsonSerializer.java +++ b/modules/admin/admin-event/src/main/java/com/enonic/xp/admin/event/impl/json/EventJsonSerializer.java @@ -14,17 +14,13 @@ public final class EventJsonSerializer { - public ObjectNode toJson( final Event event ) + public String toJson( final Event event ) { - if ( event != null ) - { - final ObjectNode json = JsonNodeFactory.instance.objectNode(); - json.put( "type", event.getType() ); - json.put( "timestamp", event.getTimestamp() ); - json.set( "data", toJsonNode( event.getData() ) ); - return json; - } - return null; + final ObjectNode json = JsonNodeFactory.instance.objectNode(); + json.put( "type", event.getType() ); + json.put( "timestamp", event.getTimestamp() ); + json.set( "data", toJsonNode( event.getData() ) ); + return json.toString(); } private JsonNode toJsonNode( Object value ) diff --git a/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/EventEndpointTest.java b/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/EventEndpointTest.java new file mode 100644 index 00000000000..2623571f39d --- /dev/null +++ b/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/EventEndpointTest.java @@ -0,0 +1,91 @@ +package com.enonic.xp.admin.event.impl; + +import java.util.stream.IntStream; + +import javax.websocket.RemoteEndpoint; +import javax.websocket.SendHandler; +import javax.websocket.Session; + +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.notNull; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +class EventEndpointTest +{ + @Test + void sendMessage() + { + final Session session1 = mock( Session.class ); + final RemoteEndpoint.Async remoteEndpoint1 = mock( RemoteEndpoint.Async.class ); + when( session1.isOpen() ).thenReturn( true ); + when( session1.getAsyncRemote() ).thenReturn( remoteEndpoint1 ); + + final String testMessage = "test message"; + + final EventEndpoint eventWebSocket = new EventEndpoint( mock( WebsocketManager.class, withSettings().stubOnly() ) ); + eventWebSocket.onOpen( session1, null ); + eventWebSocket.sendMessage( testMessage ); + + verify( remoteEndpoint1 ).sendText( eq( testMessage ), notNull() ); + } + + @Test + void sendMessage_more_than_max_inflight_not_sent() + throws Exception + { + final Session session1 = mock( Session.class ); + final RemoteEndpoint.Async remoteEndpoint1 = mock( RemoteEndpoint.Async.class ); + when( session1.isOpen() ).thenReturn( true ); + when( session1.getAsyncRemote() ).thenReturn( remoteEndpoint1 ); + + final String testMessage = "test message"; + + final EventEndpoint eventWebSocket = new EventEndpoint( mock( WebsocketManager.class, withSettings().stubOnly() ) ); + eventWebSocket.onOpen( session1, null ); + + final int maxInflight = 100_000; + final int exceededCalls = 100; + IntStream.rangeClosed( 1, maxInflight + exceededCalls ).parallel().forEach( i -> eventWebSocket.sendMessage( testMessage ) ); + + verify( remoteEndpoint1, times( maxInflight ) ).sendText( eq( testMessage ), notNull() ); + verify( session1, times( 1 ) ).close( notNull() ); + } + + @Test + void sendMessage_more_than_max_inflight_some_sent() + { + final Session session1 = mock( Session.class ); + final RemoteEndpoint.Async remoteEndpoint1 = mock( RemoteEndpoint.Async.class ); + when( session1.isOpen() ).thenReturn( true ); + when( session1.getAsyncRemote() ).thenReturn( remoteEndpoint1 ); + + final ArgumentCaptor sendHandlerArgumentCaptor = ArgumentCaptor.forClass( SendHandler.class ); + doNothing().when( remoteEndpoint1 ).sendText( any(), sendHandlerArgumentCaptor.capture() ); + + final String testMessage = "test message"; + + final EventEndpoint eventWebSocket = new EventEndpoint( mock( WebsocketManager.class, withSettings().stubOnly() ) ); + eventWebSocket.onOpen( session1, null ); + + final int maxInflight = 100_000; + + IntStream.rangeClosed( 1, maxInflight / 2 ).parallel().forEach( i -> eventWebSocket.sendMessage( testMessage ) ); + + sendHandlerArgumentCaptor.getAllValues().forEach( sh -> sh.onResult( null ) ); + + final int exceededCalls = 100; + IntStream.rangeClosed( 1, maxInflight / 2 + exceededCalls ).parallel().forEach( i -> eventWebSocket.sendMessage( testMessage ) ); + + verify( remoteEndpoint1, atLeast( maxInflight ) ).sendText( eq( testMessage ), notNull() ); + } +} \ No newline at end of file diff --git a/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/EventHandlerTest.java b/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/EventHandlerTest.java index 35c6722fb16..a9c906ad29b 100644 --- a/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/EventHandlerTest.java +++ b/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/EventHandlerTest.java @@ -1,7 +1,8 @@ package com.enonic.xp.admin.event.impl; +import java.util.List; + import javax.websocket.Endpoint; -import javax.websocket.RemoteEndpoint; import javax.websocket.Session; import org.junit.jupiter.api.BeforeEach; @@ -23,6 +24,8 @@ public class EventHandlerTest { private EventHandler handler; + private EventEndpointFactory endpointFactory; + private WebSocketService webSocketService; private MockHttpServletRequest req; @@ -33,10 +36,9 @@ public class EventHandlerTest public void setup() throws Exception { - this.handler = new EventHandler(); - this.webSocketService = Mockito.mock( WebSocketService.class ); - this.handler.setWebSocketService( this.webSocketService ); + this.endpointFactory = new EventEndpointFactory( Mockito.mock( WebsocketManager.class ) ); + this.handler = new EventHandler( this.webSocketService, this.endpointFactory ); this.req = new MockHttpServletRequest(); this.res = new MockHttpServletResponse(); @@ -45,17 +47,17 @@ public void setup() @Test public void testSubProtocols() { - assertEquals( "[text]", this.handler.getSubProtocols().toString() ); + assertEquals( List.of( "text" ), endpointFactory.getSubProtocols() ); } @Test public void testNewEndpoint() throws Exception { - final Endpoint e1 = this.handler.newEndpoint(); + final Endpoint e1 = endpointFactory.newEndpoint(); assertNotNull( e1 ); - final Endpoint e2 = this.handler.newEndpoint(); + final Endpoint e2 = endpointFactory.newEndpoint(); assertNotNull( e2 ); assertNotSame( e1, e2 ); @@ -64,7 +66,7 @@ public void testNewEndpoint() @Test public void openCloseSocket() { - final EventWebSocket socket = (EventWebSocket) this.handler.newEndpoint(); + final EventEndpoint socket = (EventEndpoint) endpointFactory.newEndpoint(); assertFalse( socket.isOpen() ); final Session session = mockSession(); @@ -82,28 +84,6 @@ public void openCloseSocket() assertFalse( socket.isOpen() ); } - @Test - public void sendToAll() - throws Exception - { - final EventWebSocket socket1 = (EventWebSocket) this.handler.newEndpoint(); - final Session session1 = mockSession(); - final RemoteEndpoint.Basic basic1 = Mockito.mock( RemoteEndpoint.Basic.class ); - Mockito.when( session1.getBasicRemote() ).thenReturn( basic1 ); - socket1.onOpen( session1, null ); - - final EventWebSocket socket2 = (EventWebSocket) this.handler.newEndpoint(); - final Session session2 = mockSession(); - final RemoteEndpoint.Basic basic2 = Mockito.mock( RemoteEndpoint.Basic.class ); - Mockito.when( session2.getBasicRemote() ).thenReturn( basic2 ); - socket2.onOpen( session2, null ); - - this.handler.sendToAll( "hello" ); - - Mockito.verify( basic1, Mockito.times( 1 ) ).sendText( "hello" ); - Mockito.verify( basic2, Mockito.times( 1 ) ).sendText( "hello" ); - } - private Session mockSession() { final Session session = Mockito.mock( Session.class ); @@ -145,6 +125,6 @@ public void testUpgrade() this.handler.service( this.req, this.res ); assertEquals( 200, this.res.getStatus() ); - Mockito.verify( this.webSocketService, Mockito.times( 1 ) ).acceptWebSocket( this.req, this.res, this.handler ); + Mockito.verify( this.webSocketService, Mockito.times( 1 ) ).acceptWebSocket( this.req, this.res, this.endpointFactory ); } } diff --git a/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/WebsocketEventExecutorImplTest.java b/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/WebsocketEventExecutorImplTest.java new file mode 100644 index 00000000000..179a8f2699b --- /dev/null +++ b/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/WebsocketEventExecutorImplTest.java @@ -0,0 +1,19 @@ +package com.enonic.xp.admin.event.impl; + +import java.util.concurrent.Phaser; + +import org.junit.jupiter.api.Test; + +public class WebsocketEventExecutorImplTest +{ + @Test + void lifecycle() + { + final Phaser phaser = new Phaser( 2 ); + final WebsocketEventExecutorImpl executor = new WebsocketEventExecutorImpl(); + executor.execute( phaser::arriveAndAwaitAdvance ); + + phaser.arriveAndAwaitAdvance(); + executor.deactivate(); + } +} diff --git a/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/EventListenerImplTest.java b/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/WebsocketEventListenerTest.java similarity index 73% rename from modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/EventListenerImplTest.java rename to modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/WebsocketEventListenerTest.java index 423ab2640f6..881155e7574 100644 --- a/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/EventListenerImplTest.java +++ b/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/WebsocketEventListenerTest.java @@ -11,20 +11,19 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -public class EventListenerImplTest +public class WebsocketEventListenerTest { - private EventListenerImpl eventListener; + private WebsocketEventListener eventListener; - private WebSocketManager webSocketManager; + private WebsocketManager webSocketManager; @BeforeEach public final void setUp() throws Exception { - this.webSocketManager = mock( WebSocketManager.class ); + this.webSocketManager = mock( WebsocketManager.class ); - this.eventListener = new EventListenerImpl(); - this.eventListener.setWebSocketManager( this.webSocketManager ); + this.eventListener = new WebsocketEventListener( this.webSocketManager ); } @Test diff --git a/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/WebsocketManagerImplTest.java b/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/WebsocketManagerImplTest.java new file mode 100644 index 00000000000..9f1ef29558f --- /dev/null +++ b/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/WebsocketManagerImplTest.java @@ -0,0 +1,70 @@ +package com.enonic.xp.admin.event.impl; + +import javax.websocket.RemoteEndpoint; +import javax.websocket.Session; + +import org.junit.jupiter.api.Test; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.notNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class WebsocketManagerImplTest +{ + @Test + void sendToAll() + { + final WebsocketManagerImpl webSocketManager = new WebsocketManagerImpl( Runnable::run ); + + final EventEndpoint eventWebSocket1 = new EventEndpoint( webSocketManager ); + final Session session1 = mock( Session.class ); + final RemoteEndpoint.Async remoteEndpoint1 = mock( RemoteEndpoint.Async.class ); + when( session1.isOpen() ).thenReturn( true ); + when( session1.getAsyncRemote() ).thenReturn( remoteEndpoint1 ); + + final Session session2 = mock( Session.class ); + final EventEndpoint eventWebSocket2 = new EventEndpoint( webSocketManager ); + final RemoteEndpoint.Async remoteEndpoint2 = mock( RemoteEndpoint.Async.class ); + when( session2.isOpen() ).thenReturn( true ); + when( session2.getAsyncRemote() ).thenReturn( remoteEndpoint2 ); + + eventWebSocket1.onOpen( session1, null ); + + eventWebSocket2.onOpen( session2, null ); + + final String testMessage = "test message"; + + webSocketManager.sendToAll( testMessage ); + + verify( remoteEndpoint1 ).sendText( eq( testMessage ), notNull() ); + verify( remoteEndpoint2 ).sendText( eq( testMessage ), notNull() ); + } + + @Test + void unregister() + { + final WebsocketManagerImpl webSocketManager = new WebsocketManagerImpl( Runnable::run ); + + final EventEndpoint eventWebSocket1 = new EventEndpoint( webSocketManager ); + final Session session1 = mock( Session.class ); + final RemoteEndpoint.Async remoteEndpoint1 = mock( RemoteEndpoint.Async.class ); + when( session1.isOpen() ).thenReturn( true ); + when( session1.getAsyncRemote() ).thenReturn( remoteEndpoint1 ); + + eventWebSocket1.onOpen( session1, null ); + + final String testMessage = "test message"; + + webSocketManager.sendToAll( testMessage ); + + eventWebSocket1.onClose( session1, null ); + + webSocketManager.sendToAll( testMessage ); + + verify( remoteEndpoint1, times( 1 ) ).sendText( eq( testMessage ), notNull() ); + } + +} diff --git a/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/json/EventJsonSerializerTest.java b/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/json/EventJsonSerializerTest.java index 6e92f0b975f..a0b41037d2d 100644 --- a/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/json/EventJsonSerializerTest.java +++ b/modules/admin/admin-event/src/test/java/com/enonic/xp/admin/event/impl/json/EventJsonSerializerTest.java @@ -2,6 +2,7 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Objects; import org.junit.jupiter.api.BeforeEach; @@ -13,7 +14,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.enonic.xp.event.Event; import com.enonic.xp.json.ObjectMapperHelper; @@ -58,22 +58,23 @@ public void testEvent() value( "double", 6.7d ). value( "boolean", true ). value( "string", "test" ). + value( "collection", List.of( "a", "b" ) ). build(); - final ObjectNode json = this.serializer.toJson( event ); + final String json = this.serializer.toJson( event ); assertNotNull( json ); assertJson( "testEvent.json", json ); } - private void assertJson( final String fileName, final JsonNode actualNode ) + private void assertJson( final String fileName, final String actual ) throws Exception { final JsonNode expectedNode = MAPPER.readTree( readFromFile( fileName ) ); + final JsonNode actualNode = MAPPER.readTree( actual ); final String expectedStr = OBJECT_WRITER.writeValueAsString( expectedNode ); final String actualStr = OBJECT_WRITER.writeValueAsString( actualNode ); - assertEquals( expectedStr, actualStr ); } diff --git a/modules/admin/admin-event/src/test/resources/com/enonic/xp/admin/event/impl/json/testEvent.json b/modules/admin/admin-event/src/test/resources/com/enonic/xp/admin/event/impl/json/testEvent.json index ed7f19030c6..0a0833daafd 100644 --- a/modules/admin/admin-event/src/test/resources/com/enonic/xp/admin/event/impl/json/testEvent.json +++ b/modules/admin/admin-event/src/test/resources/com/enonic/xp/admin/event/impl/json/testEvent.json @@ -9,6 +9,10 @@ "float": 5.6, "double": 6.7, "boolean": true, - "string": "test" + "string": "test", + "collection": [ + "a", + "b" + ] } } diff --git a/modules/runtime/src/home/config/com.enonic.xp.web.jetty.cfg b/modules/runtime/src/home/config/com.enonic.xp.web.jetty.cfg index 22fb6872e91..21a6bbd2371 100644 --- a/modules/runtime/src/home/config/com.enonic.xp.web.jetty.cfg +++ b/modules/runtime/src/home/config/com.enonic.xp.web.jetty.cfg @@ -15,7 +15,7 @@ # multipart.store = # multipart.maxFileSize = -1 # multipart.maxRequestSize = -1 -# multipart.fileSizeTreshold = 1000 +# multipart.fileSizeThreshold = 1000 # gzip.enabled = true # gzip.minSize = 23 @@ -30,3 +30,5 @@ # threadPool.maxThreads = 200 # threadPool.minThreads = 8 # threadPool.idleTimeout = 60000 + +# websocket.idleTimeout = 300000 diff --git a/modules/web/web-api/build.gradle b/modules/web/web-api/build.gradle index f6f83c9abcc..4d344308f9c 100644 --- a/modules/web/web-api/build.gradle +++ b/modules/web/web-api/build.gradle @@ -5,7 +5,7 @@ dependencies { compile 'javax.ws.rs:javax.ws.rs-api:2.1.1' compile "jakarta.annotation:jakarta.annotation-api:${jakartaAnnotationApiVersion}" compile 'javax.servlet:javax.servlet-api:3.1.0' - compile 'javax.websocket:javax.websocket-api:1.1' + compile 'javax.websocket:javax.websocket-api:1.0' testFixturesImplementation "org.junit.jupiter:junit-jupiter-api:${junitJupiterVersion}" } diff --git a/modules/web/web-jetty/src/main/java/com/enonic/xp/web/jetty/impl/JettyConfig.java b/modules/web/web-jetty/src/main/java/com/enonic/xp/web/jetty/impl/JettyConfig.java index 127c73bb7db..4e084ce28d3 100644 --- a/modules/web/web-jetty/src/main/java/com/enonic/xp/web/jetty/impl/JettyConfig.java +++ b/modules/web/web-jetty/src/main/java/com/enonic/xp/web/jetty/impl/JettyConfig.java @@ -135,4 +135,9 @@ * Thread Idle Timeout (in milliseconds). */ int threadPool_idleTimeout() default 60000; + + /** + * The time in milliseconds that a websocket may be idle before closing. + */ + long websocket_idleTimeout() default 300000; } diff --git a/modules/web/web-jetty/src/main/java/com/enonic/xp/web/jetty/impl/websocket/WebSocketServiceImpl.java b/modules/web/web-jetty/src/main/java/com/enonic/xp/web/jetty/impl/websocket/WebSocketServiceImpl.java index f24c0e0b54e..bf0fa579874 100644 --- a/modules/web/web-jetty/src/main/java/com/enonic/xp/web/jetty/impl/websocket/WebSocketServiceImpl.java +++ b/modules/web/web-jetty/src/main/java/com/enonic/xp/web/jetty/impl/websocket/WebSocketServiceImpl.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import com.enonic.xp.web.dispatch.DispatchConstants; +import com.enonic.xp.web.jetty.impl.JettyConfig; import com.enonic.xp.web.websocket.EndpointFactory; import com.enonic.xp.web.websocket.WebSocketService; @@ -43,9 +44,11 @@ public final class WebSocketServiceImpl private final WebSocketServerFactory serverFactory; @Activate - public WebSocketServiceImpl( @Reference(target = SERVLET_CONTEXT_TARGET) final ServletContext servletContext ) + public WebSocketServiceImpl( final JettyConfig config, @Reference(target = SERVLET_CONTEXT_TARGET) final ServletContext servletContext ) { - this.serverFactory = new WebSocketServerFactory( servletContext, WebSocketPolicy.newServerPolicy() ); + final WebSocketPolicy webSocketPolicy = WebSocketPolicy.newServerPolicy(); + webSocketPolicy.setIdleTimeout( config.websocket_idleTimeout() ); + this.serverFactory = new WebSocketServerFactory( servletContext, webSocketPolicy ); } @SuppressWarnings("WeakerAccess") diff --git a/modules/web/web-jetty/src/test/java/com/enonic/xp/web/jetty/impl/websocket/WebSocketServiceImplTest.java b/modules/web/web-jetty/src/test/java/com/enonic/xp/web/jetty/impl/websocket/WebSocketServiceImplTest.java index cc61214ccf6..e38a970731a 100644 --- a/modules/web/web-jetty/src/test/java/com/enonic/xp/web/jetty/impl/websocket/WebSocketServiceImplTest.java +++ b/modules/web/web-jetty/src/test/java/com/enonic/xp/web/jetty/impl/websocket/WebSocketServiceImplTest.java @@ -35,7 +35,7 @@ protected void configure() this.server.setVirtualHosts( new String[]{DispatchConstants.VIRTUAL_HOST_PREFIX + DispatchConstants.XP_CONNECTOR} ); - this.service = new WebSocketServiceImpl( this.server.getHandler().getServletContext() ); + this.service = new WebSocketServiceImpl( config, this.server.getHandler().getServletContext() ); this.service.activate(); this.servlet = new TestWebSocketServlet(); diff --git a/modules/web/web-jetty/src/testFixtures/java/com/enonic/xp/web/jetty/impl/JettyTestSupport.java b/modules/web/web-jetty/src/testFixtures/java/com/enonic/xp/web/jetty/impl/JettyTestSupport.java index 8b2178b2093..ace37c41bf1 100644 --- a/modules/web/web-jetty/src/testFixtures/java/com/enonic/xp/web/jetty/impl/JettyTestSupport.java +++ b/modules/web/web-jetty/src/testFixtures/java/com/enonic/xp/web/jetty/impl/JettyTestSupport.java @@ -19,10 +19,13 @@ public abstract class JettyTestSupport protected String baseUrl; + protected JettyConfig config; + @BeforeEach public final void startServer() throws Exception { + this.config = new JettyConfigMockFactory().newConfig(); this.server = new JettyTestServer(); this.server.start(); configure(); diff --git a/modules/web/web-jetty/src/test/java/com/enonic/xp/web/jetty/impl/websocket/TestEndpoint.java b/modules/web/web-jetty/src/testFixtures/java/com/enonic/xp/web/jetty/impl/websocket/TestEndpoint.java similarity index 100% rename from modules/web/web-jetty/src/test/java/com/enonic/xp/web/jetty/impl/websocket/TestEndpoint.java rename to modules/web/web-jetty/src/testFixtures/java/com/enonic/xp/web/jetty/impl/websocket/TestEndpoint.java diff --git a/modules/web/web-jetty/src/test/java/com/enonic/xp/web/jetty/impl/websocket/TestWebSocketServlet.java b/modules/web/web-jetty/src/testFixtures/java/com/enonic/xp/web/jetty/impl/websocket/TestWebSocketServlet.java similarity index 100% rename from modules/web/web-jetty/src/test/java/com/enonic/xp/web/jetty/impl/websocket/TestWebSocketServlet.java rename to modules/web/web-jetty/src/testFixtures/java/com/enonic/xp/web/jetty/impl/websocket/TestWebSocketServlet.java