Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use static exceptions for closing websocket flushers and in ContentProducer #8155

Merged
merged 5 commits into from Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,32 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.websocket.core.exception;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this should be here or in an internal implementation package.


/**
* Suppressed exceptions are disabled for this implementation,
* meaning calling {@link #addSuppressed(Throwable)} has no effect.
* This means instances of {@link SentinelWebSocketCloseException} are suitable to be kept as static fields.
*/
public class SentinelWebSocketCloseException extends Exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @lorban suggested, perhaps create a util class called StaticException that forces the false,true args of the super constructor.

{
public SentinelWebSocketCloseException()
{
this(null);
}

public SentinelWebSocketCloseException(String message)
{
super(message, null, false, true);
}
}
Expand Up @@ -23,6 +23,7 @@
import org.eclipse.jetty.websocket.core.Extension;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.IncomingFrames;
import org.eclipse.jetty.websocket.core.exception.SentinelWebSocketCloseException;

/**
* <p>This flusher can be used to mutated and fragment {@link Frame}s and forwarded them on towards the application using the
Expand All @@ -38,6 +39,8 @@
*/
public abstract class DemandingFlusher extends IteratingCallback implements DemandChain
{
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new SentinelWebSocketCloseException();

private final IncomingFrames _emitFrame;
private final AtomicLong _demand = new AtomicLong();
private final AtomicReference<Throwable> _failure = new AtomicReference<>();
Expand Down Expand Up @@ -101,6 +104,18 @@ public void onFrame(Frame frame, Callback callback)
succeeded();
}

/**
* Used to close this flusher when there is no explicit failure.
*/
public void closeFlusher()
{
if (_failure.compareAndSet(null, SENTINEL_CLOSE_EXCEPTION))
{
failed(SENTINEL_CLOSE_EXCEPTION);
iterate();
}
}

/**
* Used to fail this flusher possibly from an external event such as a callback.
* @param t the failure.
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.exception.SentinelWebSocketCloseException;
import org.eclipse.jetty.websocket.core.exception.WebSocketException;
import org.eclipse.jetty.websocket.core.exception.WebSocketWriteTimeoutException;
import org.slf4j.Logger;
Expand All @@ -44,6 +45,7 @@ public class FrameFlusher extends IteratingCallback
{
public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY);
private static final Logger LOG = LoggerFactory.getLogger(FrameFlusher.class);
private static final Throwable CLOSED_CHANNEL = new SentinelWebSocketCloseException();

private final AutoLock lock = new AutoLock();
private final LongAdder messagesOut = new LongAdder();
Expand Down Expand Up @@ -184,15 +186,7 @@ public void onClose(Throwable cause)
{
try (AutoLock l = lock.lock())
{
// TODO: find a way to not create exception if cause is null.
closedCause = cause == null ? new ClosedChannelException()
{
@Override
public Throwable fillInStackTrace()
{
return this;
}
} : cause;
closedCause = cause == null ? CLOSED_CHANNEL : cause;
}
iterate();
}
Expand Down
Expand Up @@ -14,7 +14,6 @@
package org.eclipse.jetty.websocket.core.internal;

import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -150,17 +149,8 @@ public void init(final ExtensionConfig config, WebSocketComponents components)
@Override
public void close()
{
// TODO: use IteratingCallback.close() instead of creating exception with failFlusher methods.
ClosedChannelException exception = new ClosedChannelException()
{
@Override
public Throwable fillInStackTrace()
{
return this;
}
};
incomingFlusher.failFlusher(exception);
outgoingFlusher.failFlusher(exception);
incomingFlusher.closeFlusher();
outgoingFlusher.closeFlusher();
releaseInflater();
releaseDeflater();
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.exception.SentinelWebSocketCloseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,6 +34,7 @@
public abstract class TransformingFlusher
{
private final Logger log = LoggerFactory.getLogger(this.getClass());
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new SentinelWebSocketCloseException();

private final AutoLock lock = new AutoLock();
private final Queue<FrameEntry> entries = new ArrayDeque<>();
Expand Down Expand Up @@ -77,13 +79,20 @@ public final void sendFrame(Frame frame, Callback callback, boolean batch)
notifyCallbackFailure(callback, failure);
}

/**
* Used to close this flusher when there is no explicit failure.
*/
public void closeFlusher()
{
failFlusher(SENTINEL_CLOSE_EXCEPTION);
}

/**
* Used to fail this flusher possibly from an external event such as a callback.
* @param t the failure.
*/
public void failFlusher(Throwable t)
{
// TODO: find a way to close the flusher in non error case without exception.
boolean failed = false;
try (AutoLock l = lock.lock())
{
Expand Down