Skip to content

Commit

Permalink
Use static exceptions for closing websocket flushers.
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Jun 10, 2022
1 parent b1c19c0 commit 657222f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 22 deletions.
@@ -0,0 +1,32 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.websocket.core.exception;

/**
* Suppressed exceptions are disabled for this implementation,
* meaning calling {@link #addSuppressed(Throwable)} has no effect.
* This means instances of {@link SentinelWebSocketCloseException} are suitable to be kept as static fields.
*/
public class SentinelWebSocketCloseException extends Exception
{
public SentinelWebSocketCloseException()
{
this(null);
}

public SentinelWebSocketCloseException(String message)
{
super(message, null, false, true);
}
}
Expand Up @@ -23,6 +23,7 @@
import org.eclipse.jetty.websocket.core.Extension;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.IncomingFrames;
import org.eclipse.jetty.websocket.core.exception.SentinelWebSocketCloseException;

/**
* <p>This flusher can be used to mutated and fragment {@link Frame}s and forwarded them on towards the application using the
Expand All @@ -38,6 +39,8 @@
*/
public abstract class DemandingFlusher extends IteratingCallback implements DemandChain
{
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new SentinelWebSocketCloseException();

private final IncomingFrames _emitFrame;
private final AtomicLong _demand = new AtomicLong();
private final AtomicReference<Throwable> _failure = new AtomicReference<>();
Expand Down Expand Up @@ -101,6 +104,18 @@ public void onFrame(Frame frame, Callback callback)
succeeded();
}

/**
* Used to close this flusher when there is no explicit failure.
*/
public void closeFlusher()
{
if (_failure.compareAndSet(null, SENTINEL_CLOSE_EXCEPTION))
{
failed(SENTINEL_CLOSE_EXCEPTION);
iterate();
}
}

/**
* Used to fail this flusher possibly from an external event such as a callback.
* @param t the failure.
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.exception.SentinelWebSocketCloseException;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.core.exception.WebSocketWriteTimeoutException;
import org.slf4j.Logger;
Expand All @@ -44,6 +45,7 @@ public class FrameFlusher extends IteratingCallback
{
public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY);
private static final Logger LOG = LoggerFactory.getLogger(FrameFlusher.class);
private static final Throwable CLOSED_CHANNEL = new SentinelWebSocketCloseException();

private final AutoLock lock = new AutoLock();
private final LongAdder messagesOut = new LongAdder();
Expand Down Expand Up @@ -184,15 +186,7 @@ public void onClose(Throwable cause)
{
try (AutoLock l = lock.lock())
{
// TODO: find a way to not create exception if cause is null.
closedCause = cause == null ? new ClosedChannelException()
{
@Override
public Throwable fillInStackTrace()
{
return this;
}
} : cause;
closedCause = cause == null ? CLOSED_CHANNEL : cause;
}
iterate();
}
Expand Down
Expand Up @@ -14,7 +14,6 @@
package org.eclipse.jetty.websocket.core.internal;

import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -150,17 +149,8 @@ public void init(final ExtensionConfig config, WebSocketComponents components)
@Override
public void close()
{
// TODO: use IteratingCallback.close() instead of creating exception with failFlusher methods.
ClosedChannelException exception = new ClosedChannelException()
{
@Override
public Throwable fillInStackTrace()
{
return this;
}
};
incomingFlusher.failFlusher(exception);
outgoingFlusher.failFlusher(exception);
incomingFlusher.closeFlusher();
outgoingFlusher.closeFlusher();
releaseInflater();
releaseDeflater();
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.exception.SentinelWebSocketCloseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,6 +34,7 @@
public abstract class TransformingFlusher
{
private final Logger log = LoggerFactory.getLogger(this.getClass());
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new SentinelWebSocketCloseException();

private final AutoLock lock = new AutoLock();
private final Queue<FrameEntry> entries = new ArrayDeque<>();
Expand Down Expand Up @@ -77,13 +79,20 @@ public final void sendFrame(Frame frame, Callback callback, boolean batch)
notifyCallbackFailure(callback, failure);
}

/**
* Used to close this flusher when there is no explicit failure.
*/
public void closeFlusher()
{
failFlusher(SENTINEL_CLOSE_EXCEPTION);
}

/**
* Used to fail this flusher possibly from an external event such as a callback.
* @param t the failure.
*/
public void failFlusher(Throwable t)
{
// TODO: find a way to close the flusher in non error case without exception.
boolean failed = false;
try (AutoLock l = lock.lock())
{
Expand Down

0 comments on commit 657222f

Please sign in to comment.