Skip to content

Commit

Permalink
Issue #7635 - implement maxBlockedStreams logic in QpackDecoder
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Feb 28, 2022
1 parent dcdda2a commit 39ff60f
Showing 1 changed file with 16 additions and 2 deletions.
Expand Up @@ -16,7 +16,11 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.MetaData;
Expand Down Expand Up @@ -50,6 +54,7 @@ public class QpackDecoder implements Dumpable
private final List<EncodedFieldSection> _encodedFieldSections = new ArrayList<>();
private final NBitIntegerParser _integerDecoder = new NBitIntegerParser();
private final InstructionHandler _instructionHandler = new InstructionHandler();
private final Map<Long, AtomicInteger> _blockedStreams = new HashMap<>();
private int _maxHeaderSize;
private int _maxBlockedStreams;

Expand Down Expand Up @@ -100,7 +105,6 @@ public void setMaxHeaderSize(int maxHeaderSize)

public int getMaxBlockedStreams()
{
// TODO: implement logic about blocked streams by calling this method.
return _maxBlockedStreams;
}

Expand Down Expand Up @@ -172,6 +176,10 @@ public boolean decode(long streamId, ByteBuffer buffer, Handler handler) throws
{
if (LOG.isDebugEnabled())
LOG.debug("Deferred Decoding: streamId={}, encodedFieldSection={}", streamId, encodedFieldSection);
AtomicInteger blockedFields = _blockedStreams.computeIfAbsent(streamId, id -> new AtomicInteger(0));
blockedFields.incrementAndGet();
if (_blockedStreams.size() > _maxBlockedStreams)
throw new QpackException.SessionException(QPACK_DECOMPRESSION_FAILED, "exceeded max blocked streams");
_encodedFieldSections.add(encodedFieldSection);
}

Expand Down Expand Up @@ -226,6 +234,7 @@ public void parseInstructions(ByteBuffer buffer) throws QpackException
public void streamCancellation(long streamId)
{
_encodedFieldSections.removeIf(encodedFieldSection -> encodedFieldSection.getStreamId() == streamId);
_blockedStreams.remove(streamId);
_metaDataNotifications.removeIf(notification -> notification._streamId == streamId);
_instructions.add(new StreamCancellationInstruction(streamId));
notifyInstructionHandler();
Expand All @@ -234,12 +243,17 @@ public void streamCancellation(long streamId)
private void checkEncodedFieldSections() throws QpackException
{
int insertCount = _context.getDynamicTable().getInsertCount();
for (EncodedFieldSection encodedFieldSection : _encodedFieldSections)
Iterator<EncodedFieldSection> iterator = _encodedFieldSections.iterator();
while (iterator.hasNext())
{
EncodedFieldSection encodedFieldSection = iterator.next();
if (encodedFieldSection.getRequiredInsertCount() <= insertCount)
{
iterator.remove();
long streamId = encodedFieldSection.getStreamId();
MetaData metaData = encodedFieldSection.decode(_context, _maxHeaderSize);
if (_blockedStreams.get(streamId).decrementAndGet() <= 0)
_blockedStreams.remove(streamId);
if (LOG.isDebugEnabled())
LOG.debug("Decoded: streamId={}, metadata={}", streamId, metaData);

Expand Down

0 comments on commit 39ff60f

Please sign in to comment.