Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use static exceptions for closing websocket flushers and in ContentProducer #8155

Merged
merged 5 commits into from Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,6 +19,7 @@

import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.util.StaticException;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
Expand All @@ -31,8 +32,8 @@
class AsyncContentProducer implements ContentProducer
{
private static final Logger LOG = LoggerFactory.getLogger(AsyncContentProducer.class);
private static final HttpInput.ErrorContent RECYCLED_ERROR_CONTENT = new HttpInput.ErrorContent(new IllegalStateException("ContentProducer has been recycled"));
private static final Throwable UNCONSUMED_CONTENT_EXCEPTION = new IOException("Unconsumed content")
private static final HttpInput.ErrorContent RECYCLED_ERROR_CONTENT = new HttpInput.ErrorContent(new StaticException("ContentProducer has been recycled"));
private static final Throwable UNCONSUMED_CONTENT_EXCEPTION = new StaticException("Unconsumed content")
{
@Override
public Throwable fillInStackTrace()
Expand Down Expand Up @@ -190,7 +191,7 @@ public boolean consumeAll()
Throwable x = UNCONSUMED_CONTENT_EXCEPTION;
if (LOG.isDebugEnabled())
{
x = new IOException("Unconsumed content");
x = new StaticException("Unconsumed content");
lorban marked this conversation as resolved.
Show resolved Hide resolved
LOG.debug("consumeAll {}", this, x);
}
failCurrentContent(x);
Expand Down
@@ -0,0 +1,34 @@
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.util;

/**
* This exception can safely be stored in a static variable as suppressed exceptions are disabled,
* meaning calling {@link #addSuppressed(Throwable)} has no effect.
* This prevents potential memory leaks where a statically-stored exception would accumulate
* suppressed exceptions added to them.
*/
public class StaticException extends Exception
{
public StaticException()
{
this(null);
}

public StaticException(String message)
{
// Make sure to call the super constructor that disables suppressed exception.
super(message, null, false, true);
lorban marked this conversation as resolved.
Show resolved Hide resolved
}
}
lorban marked this conversation as resolved.
Show resolved Hide resolved
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.StaticException;
import org.eclipse.jetty.websocket.core.Extension;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.IncomingFrames;
Expand All @@ -38,6 +39,8 @@
*/
public abstract class DemandingFlusher extends IteratingCallback implements DemandChain
{
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new StaticException("Closed");

private final IncomingFrames _emitFrame;
private final AtomicLong _demand = new AtomicLong();
private final AtomicReference<Throwable> _failure = new AtomicReference<>();
Expand Down Expand Up @@ -101,6 +104,18 @@ public void onFrame(Frame frame, Callback callback)
succeeded();
}

/**
* Used to close this flusher when there is no explicit failure.
*/
public void closeFlusher()
{
if (_failure.compareAndSet(null, SENTINEL_CLOSE_EXCEPTION))
{
failed(SENTINEL_CLOSE_EXCEPTION);
iterate();
}
}

/**
* Used to fail this flusher possibly from an external event such as a callback.
* @param t the failure.
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.StaticException;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Scheduler;
Expand All @@ -44,6 +45,7 @@ public class FrameFlusher extends IteratingCallback
{
public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY);
private static final Logger LOG = LoggerFactory.getLogger(FrameFlusher.class);
private static final Throwable CLOSED_CHANNEL = new StaticException("Closed");

private final AutoLock lock = new AutoLock();
private final LongAdder messagesOut = new LongAdder();
Expand Down Expand Up @@ -184,15 +186,7 @@ public void onClose(Throwable cause)
{
try (AutoLock l = lock.lock())
{
// TODO: find a way to not create exception if cause is null.
closedCause = cause == null ? new ClosedChannelException()
{
@Override
public Throwable fillInStackTrace()
{
return this;
}
} : cause;
closedCause = cause == null ? CLOSED_CHANNEL : cause;
}
iterate();
}
Expand Down
Expand Up @@ -14,7 +14,6 @@
package org.eclipse.jetty.websocket.core.internal;

import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -150,17 +149,8 @@ public void init(final ExtensionConfig config, WebSocketComponents components)
@Override
public void close()
{
// TODO: use IteratingCallback.close() instead of creating exception with failFlusher methods.
ClosedChannelException exception = new ClosedChannelException()
{
@Override
public Throwable fillInStackTrace()
{
return this;
}
};
incomingFlusher.failFlusher(exception);
outgoingFlusher.failFlusher(exception);
incomingFlusher.closeFlusher();
outgoingFlusher.closeFlusher();
releaseInflater();
releaseDeflater();
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.StaticException;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.websocket.core.Frame;
import org.slf4j.Logger;
Expand All @@ -33,6 +34,7 @@
public abstract class TransformingFlusher
{
private final Logger log = LoggerFactory.getLogger(this.getClass());
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new StaticException("Closed");

private final AutoLock lock = new AutoLock();
private final Queue<FrameEntry> entries = new ArrayDeque<>();
Expand Down Expand Up @@ -77,13 +79,20 @@ public final void sendFrame(Frame frame, Callback callback, boolean batch)
notifyCallbackFailure(callback, failure);
}

/**
* Used to close this flusher when there is no explicit failure.
*/
public void closeFlusher()
{
failFlusher(SENTINEL_CLOSE_EXCEPTION);
}

/**
* Used to fail this flusher possibly from an external event such as a callback.
* @param t the failure.
*/
public void failFlusher(Throwable t)
{
// TODO: find a way to close the flusher in non error case without exception.
boolean failed = false;
try (AutoLock l = lock.lock())
{
Expand Down