Skip to content

Commit

Permalink
Fix file uploads beyond MMAP_SEGMENT_SIZE (#10793)
Browse files Browse the repository at this point in the history
* Fix file uploads beyond MMAP_SEGMENT_SIZE
The code path was not covered by tests and was broken.

I also replaced the ReentrantLock by a NonReentrantLock that is based on a Semaphore. This is because there are some semantic checks in Chunk that might not work with a reentrant lock (e.g. repeated deallocate calls could lead to ref count errors). But this is unrelated to the bug.

Fixes #10666

* Update http-server-netty/src/main/java/io/micronaut/http/server/netty/NonReentrantLock.java

Co-authored-by: Sergio del Amo <sergio.delamo@softamo.com>

* make badssl more resilient to timeouts

* make badssl more resilient to timeouts

---------

Co-authored-by: Sergio del Amo <sergio.delamo@softamo.com>
  • Loading branch information
yawkat and sdelamo committed May 6, 2024
1 parent c547328 commit aeaec05
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 6 deletions.
Expand Up @@ -4,6 +4,7 @@ import io.micronaut.http.client.DefaultHttpClientConfiguration
import io.micronaut.http.client.HttpClient
import io.micronaut.http.ssl.ClientSslConfiguration
import spock.lang.PendingFeature
import spock.lang.Retry
import spock.lang.Specification

import javax.net.ssl.SSLHandshakeException
Expand All @@ -13,6 +14,7 @@ import java.time.Duration
// See http-client/src/test/groovy/io/micronaut/http/client/SslSpec.groovy
class SslSpec extends Specification {

@Retry(count = 5) // sometimes badssl.com times out
void 'bad server ssl cert'() {
given:
def cfg = new DefaultHttpClientConfiguration()
Expand Down
Expand Up @@ -32,6 +32,7 @@ import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.ssl.SslHandshakeTimeoutException
import reactor.core.publisher.Flux
import spock.lang.Ignore
import spock.lang.Retry
import spock.lang.Specification

import javax.net.ssl.SSLHandshakeException
Expand Down Expand Up @@ -111,6 +112,7 @@ class SslSpec extends Specification {
}
}

@Retry(count = 5) // sometimes badssl.com times out
void 'bad server ssl cert'() {
given:
def cfg = new DefaultHttpClientConfiguration()
Expand Down
Expand Up @@ -52,7 +52,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -576,7 +575,7 @@ public final class Chunk extends AbstractReferenceCounted {
// one reference is kept by the MicronautHttpData.chunks list, and is released on MicronautHttpData.deallocate.
// The other reference is created by the user on pollChunk, and released when she calls claim()

private final Lock lock = new ReentrantLock();
private final Lock lock = new NonReentrantLock();
private final long offset;
@Nullable
private ByteBuf buf; // always has refCnt = 1
Expand All @@ -595,11 +594,11 @@ private void loadFromDisk(int length) throws IOException {
buf = mmapSegment(firstSegmentIndex).retainedSlice(offsetInSegment, Math.toIntExact(length));
} else {
CompositeByteBuf composite = Unpooled.compositeBuffer(lastSegmentIndex - firstSegmentIndex + 1);
composite.addComponent(mmapSegment(firstSegmentIndex).retainedSlice(offsetInSegment, MMAP_SEGMENT_SIZE - offsetInSegment));
composite.addComponent(true, mmapSegment(firstSegmentIndex).retainedSlice(offsetInSegment, MMAP_SEGMENT_SIZE - offsetInSegment));
for (int i = firstSegmentIndex + 1; i < lastSegmentIndex; i++) {
composite.addComponent(mmapSegment(i).retain());
composite.addComponent(true, mmapSegment(i).retain());
}
composite.addComponent(mmapSegment(lastSegmentIndex).retainedSlice(0, Math.toIntExact((offset + length) % MMAP_SEGMENT_SIZE)));
composite.addComponent(true, mmapSegment(lastSegmentIndex).retainedSlice(0, Math.toIntExact((offset + length - 1) % MMAP_SEGMENT_SIZE + 1)));
buf = composite;
}
if (oldBuf != null) {
Expand Down
@@ -0,0 +1,65 @@
/*
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.http.server.netty;

import io.micronaut.core.annotation.NonNull;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
* Non-reentrant {@link Lock} implementation based on a semaphore.
*
* @author Jonas Konrad
*/
final class NonReentrantLock extends Semaphore implements Lock {
public NonReentrantLock() {
super(1);
}

@Override
public void lock() {
acquireUninterruptibly();
}

@Override
public void lockInterruptibly() throws InterruptedException {
acquire();
}

@Override
public boolean tryLock() {
return tryAcquire();
}

@Override
public boolean tryLock(long time, @NonNull TimeUnit unit) throws InterruptedException {
return tryAcquire(time, unit);
}

@Override
public void unlock() {
release();
}

@NonNull
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
Expand Up @@ -25,7 +25,9 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import spock.lang.Retry

import java.nio.file.Files
import java.security.MessageDigest
import java.util.concurrent.ThreadLocalRandom

/**
* Any changes or additions to this test should also be done
Expand Down Expand Up @@ -419,9 +421,51 @@ class DiskUploadSpec extends AbstractMicronautSpec {
result == 'data.json: 16'
}

void "test very big file upload"() {
given:
def tmp = Files.createTempFile("DiskUploadSpec-data", ".bin")

def length = 1500 * 1024 * 1024 // 1500MiB
try (OutputStream os = Files.newOutputStream(tmp)) {
int remaining = length
byte[] arr = new byte[4096];
while (remaining > 0) {
ThreadLocalRandom.current().nextBytes(arr)
os.write(arr, 0, Math.min(arr.length, remaining))
remaining -= arr.length
}
}
MultipartBody requestBody = MultipartBody.builder()
.addPart("data", "data.bin", MediaType.APPLICATION_OCTET_STREAM_TYPE, tmp.toFile())
.build()

when:
Flux<HttpResponse<String>> flowable = Flux.from(client.exchange(
HttpRequest.POST("/upload/receive-completed-file-upload-huge", requestBody)
.contentType(MediaType.MULTIPART_FORM_DATA)
.accept(MediaType.TEXT_PLAIN_TYPE),
String
))

HttpResponse<String> response = flowable.blockFirst()
def result = response.getBody().get()

then:
response.code() == HttpStatus.OK.code
result == "data.bin: " + length

cleanup:
Files.deleteIfExists(tmp)
}

@Override
Map<String, Object> getConfiguration() {
super.getConfiguration() << ['micronaut.http.client.read-timeout': 300, 'micronaut.server.multipart.disk': true]
super.getConfiguration() << [
'micronaut.http.client.read-timeout': 300,
'micronaut.server.multipart.disk': true,
'micronaut.server.max-request-size': '2GB',
'micronaut.server.multipart.max-file-size': '2GB'
]
}

private byte[] calculateMd5(byte[] bytes) {
Expand Down
Expand Up @@ -105,6 +105,22 @@ public String receiveCompletedFileUpload(CompletedFileUpload data) {
}
}

@Post(value = "/receive-completed-file-upload-huge", consumes = MediaType.MULTIPART_FORM_DATA, produces = MediaType.TEXT_PLAIN)
public String receiveCompletedFileUploadHuge(CompletedFileUpload data) {
try (InputStream is = data.getInputStream()) {
long n = 0;
byte[] arr = new byte[4096];
while (true) {
int o = is.read(arr);
if (o < 0) break;
n += o;
}
return data.getFilename() + ": " + n;
} catch (IOException e) {
return e.getMessage();
}
}

@Post(value = "/receive-completed-file-upload-stream", consumes = MediaType.MULTIPART_FORM_DATA, produces = MediaType.TEXT_PLAIN)
public String receiveCompletedFileUploadStream(CompletedFileUpload data) {
try {
Expand Down

0 comments on commit aeaec05

Please sign in to comment.