Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #7635 - implement maxBlockedStreams logic in QpackDecoder #7660

Merged
Expand Up @@ -16,7 +16,11 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.MetaData;
Expand Down Expand Up @@ -50,6 +54,7 @@ public class QpackDecoder implements Dumpable
private final List<EncodedFieldSection> _encodedFieldSections = new ArrayList<>();
private final NBitIntegerParser _integerDecoder = new NBitIntegerParser();
private final InstructionHandler _instructionHandler = new InstructionHandler();
private final Map<Long, AtomicInteger> _blockedStreams = new HashMap<>();
private int _maxHeaderSize;
private int _maxBlockedStreams;

Expand Down Expand Up @@ -100,7 +105,6 @@ public void setMaxHeaderSize(int maxHeaderSize)

public int getMaxBlockedStreams()
{
// TODO: implement logic about blocked streams by calling this method.
return _maxBlockedStreams;
}

Expand Down Expand Up @@ -172,6 +176,10 @@ public boolean decode(long streamId, ByteBuffer buffer, Handler handler) throws
{
if (LOG.isDebugEnabled())
LOG.debug("Deferred Decoding: streamId={}, encodedFieldSection={}", streamId, encodedFieldSection);
AtomicInteger blockedFields = _blockedStreams.computeIfAbsent(streamId, id -> new AtomicInteger(0));
blockedFields.incrementAndGet();
if (_blockedStreams.size() > _maxBlockedStreams)
throw new QpackException.SessionException(QPACK_DECOMPRESSION_FAILED, "exceeded max blocked streams");
_encodedFieldSections.add(encodedFieldSection);
}

Expand Down Expand Up @@ -226,6 +234,7 @@ public void parseInstructions(ByteBuffer buffer) throws QpackException
public void streamCancellation(long streamId)
{
_encodedFieldSections.removeIf(encodedFieldSection -> encodedFieldSection.getStreamId() == streamId);
_blockedStreams.remove(streamId);
_metaDataNotifications.removeIf(notification -> notification._streamId == streamId);
_instructions.add(new StreamCancellationInstruction(streamId));
notifyInstructionHandler();
Expand All @@ -234,12 +243,17 @@ public void streamCancellation(long streamId)
private void checkEncodedFieldSections() throws QpackException
{
int insertCount = _context.getDynamicTable().getInsertCount();
for (EncodedFieldSection encodedFieldSection : _encodedFieldSections)
Iterator<EncodedFieldSection> iterator = _encodedFieldSections.iterator();
while (iterator.hasNext())
{
EncodedFieldSection encodedFieldSection = iterator.next();
if (encodedFieldSection.getRequiredInsertCount() <= insertCount)
{
iterator.remove();
long streamId = encodedFieldSection.getStreamId();
MetaData metaData = encodedFieldSection.decode(_context, _maxHeaderSize);
if (_blockedStreams.get(streamId).decrementAndGet() <= 0)
_blockedStreams.remove(streamId);
if (LOG.isDebugEnabled())
LOG.debug("Decoded: streamId={}, metadata={}", streamId, metaData);

Expand Down