Skip to content

Commit

Permalink
Notify web-socket event listeners independently #8042
Browse files Browse the repository at this point in the history
  • Loading branch information
rymsha authored and vbradnitski committed May 5, 2020
1 parent 23617ce commit d93cb6e
Show file tree
Hide file tree
Showing 28 changed files with 519 additions and 226 deletions.
3 changes: 3 additions & 0 deletions modules/admin/admin-event/build.gradle
Expand Up @@ -2,6 +2,9 @@ apply from: "$rootDir/gradle/osgi.gradle"

dependencies {
compile project( ':web:web-api' )
implementation project( ':core:core-internal' )

testImplementation( testFixtures( project(":web:web-jetty") ) )
}

jar {
Expand Down
@@ -0,0 +1,96 @@
package com.enonic.xp.admin.event.impl;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class EventEndpoint
extends Endpoint
{
private static final Logger LOG = LoggerFactory.getLogger( EventEndpoint.class );

private static final int INFLIGHT_MAX = 100_000;

private final WebsocketManager webSocketManager;

private final AtomicInteger inflightCounter = new AtomicInteger( 0 );

private volatile Session session;

public EventEndpoint( final WebsocketManager webSocketManager )
{
this.webSocketManager = webSocketManager;
}

@Override
public void onOpen( final Session session, final EndpointConfig config )
{
this.session = session;
this.webSocketManager.registerSocket( this );
LOG.debug( "Opened websocket {}", session.getId() );
}

@Override
public void onClose( final Session session, final CloseReason closeReason )
{
unregister();
LOG.debug( "Closed websocket {}", session.getId() );
}

@Override
public void onError( final Session session, final Throwable error )
{
unregister();
LOG.warn( "Errored websocket {}", session.getId(), error );
}

public void sendMessage( final String message )
{
final Session session = this.session;
if ( isSessionOpen( session ) )
{
final int inflight = inflightCounter.getAndIncrement();

if ( inflight < INFLIGHT_MAX )
{
session.getAsyncRemote().sendText( message, result -> inflightCounter.decrementAndGet() );
}
else if ( inflight == INFLIGHT_MAX )
{
unregister();
LOG.warn( "Websocket client is too slow. Closing websocket {}", session.getId() );
try
{
session.close( new CloseReason( CloseReason.CloseCodes.TRY_AGAIN_LATER, "Websocket client is too slow" ) );
}
catch ( IOException e )
{
LOG.error( "Failed to close slow websocket", e );
}
}
}
}

public boolean isOpen()
{
return isSessionOpen( this.session );
}

private void unregister()
{
this.webSocketManager.unregisterSocket( this );
this.session = null;
}

private static boolean isSessionOpen( final Session session )
{
return session != null && session.isOpen();
}
}
@@ -0,0 +1,38 @@
package com.enonic.xp.admin.event.impl;

import java.util.List;

import javax.websocket.Endpoint;

import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;

import com.enonic.xp.web.websocket.EndpointFactory;

@Component
public class EventEndpointFactory
implements EndpointFactory
{
private static final List<String> SUB_PROTOCOLS = List.of( "text" );

private final WebsocketManager webSocketManager;

@Activate
public EventEndpointFactory( @Reference final WebsocketManager webSocketManager )
{
this.webSocketManager = webSocketManager;
}

@Override
public Endpoint newEndpoint()
{
return new EventEndpoint( webSocketManager );
}

@Override
public List<String> getSubProtocols()
{
return SUB_PROTOCOLS;
}
}
@@ -1,42 +1,39 @@
package com.enonic.xp.admin.event.impl;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.websocket.Endpoint;

import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.enonic.xp.annotation.Order;
import com.enonic.xp.security.RoleKeys;
import com.enonic.xp.web.websocket.EndpointFactory;
import com.enonic.xp.web.websocket.WebSocketService;

@Component(immediate = true, service = {Servlet.class, WebSocketManager.class}, property = {"connector=xp"})
@Order( -100 )
@Component(immediate = true, service = {Servlet.class}, property = {"connector=xp"})
@Order(-100)
@WebServlet("/admin/event")
public final class EventHandler
extends HttpServlet
implements WebSocketManager, EndpointFactory
{
private final static Logger LOG = LoggerFactory.getLogger( EventHandler.class );
private final WebSocketService webSocketService;

private static final String PROTOCOL = "text";
private final EndpointFactory endpointFactory;

private final Set<EventWebSocket> sockets = new CopyOnWriteArraySet<>();

private WebSocketService webSocketService;
@Activate
public EventHandler( @Reference final WebSocketService webSocketService, @Reference final EndpointFactory endpointFactory )
{
this.webSocketService = webSocketService;
this.endpointFactory = endpointFactory;
}

@Override
protected void doGet( final HttpServletRequest req, final HttpServletResponse res )
Expand All @@ -54,52 +51,6 @@ protected void doGet( final HttpServletRequest req, final HttpServletResponse re
return;
}

this.webSocketService.acceptWebSocket( req, res, this );
}

@Override
public Endpoint newEndpoint()
{
return new EventWebSocket( this );
}

@Override
public List<String> getSubProtocols()
{
return List.of( PROTOCOL );
}

@Override
public void registerSocket( final EventWebSocket webSocket )
{
this.sockets.add( webSocket );
}

@Override
public void unregisterSocket( final EventWebSocket webSocket )
{
this.sockets.remove( webSocket );
}

@Override
public void sendToAll( final String message )
{
for ( final EventWebSocket socket : this.sockets )
{
try
{
socket.sendMessage( message );
}
catch ( IOException e )
{
LOG.warn( "Failed to send message via web socket", e );
}
}
}

@Reference
public void setWebSocketService( final WebSocketService webSocketService )
{
this.webSocketService = webSocketService;
this.webSocketService.acceptWebSocket( req, res, endpointFactory );
}
}

This file was deleted.

This file was deleted.

This file was deleted.

@@ -0,0 +1,8 @@
package com.enonic.xp.admin.event.impl;

import java.util.concurrent.Executor;

public interface WebsocketEventExecutor
extends Executor
{
}

0 comments on commit d93cb6e

Please sign in to comment.