Skip to content

Commit

Permalink
Refactor Contents to DefaultParts
Browse files Browse the repository at this point in the history
This commit moves the Contents abstraction into DefaultParts

See gh-27613
  • Loading branch information
poutsma committed Nov 3, 2021
1 parent 694db22 commit 0c7e000
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 140 deletions.

This file was deleted.

Expand Up @@ -16,10 +16,15 @@

package org.springframework.http.codec.multipart;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.Callable;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
Expand Down Expand Up @@ -50,17 +55,40 @@ public static FormFieldPart formFieldPart(HttpHeaders headers, String value) {
}

/**
* Create a new {@link Part} or {@link FilePart} with the given parameters.
* Create a new {@link Part} or {@link FilePart} based on a flux of data
* buffers. Returns {@link FilePart} if the {@code Content-Disposition} of
* the given headers contains a filename, or a "normal" {@link Part}
* otherwise.
* @param headers the part headers
* @param dataBuffers the content of the part
* @return {@link Part} or {@link FilePart}, depending on {@link HttpHeaders#getContentDisposition()}
*/
public static Part part(HttpHeaders headers, Flux<DataBuffer> dataBuffers) {
Assert.notNull(headers, "Headers must not be null");
Assert.notNull(dataBuffers, "DataBuffers must not be null");

return partInternal(headers, new FluxContent(dataBuffers));
}

/**
* Create a new {@link Part} or {@link FilePart} based on the given file.
* Returns {@link FilePart} if the {@code Content-Disposition} of the given
* headers contains a filename, or a "normal" {@link Part} otherwise
* @param headers the part headers
* @param content the content of the part
* @param file the file
* @param scheduler the scheduler used for reading the file
* @return {@link Part} or {@link FilePart}, depending on {@link HttpHeaders#getContentDisposition()}
*/
public static Part part(HttpHeaders headers, Content content) {
public static Part part(HttpHeaders headers, Path file, Scheduler scheduler) {
Assert.notNull(headers, "Headers must not be null");
Assert.notNull(content, "Content must not be null");
Assert.notNull(file, "File must not be null");
Assert.notNull(scheduler, "Scheduler must not be null");

return partInternal(headers, new FileContent(file, scheduler));
}


private static Part partInternal(HttpHeaders headers, Content content) {
String filename = headers.getContentDisposition().getFilename();
if (filename != null) {
return new DefaultFilePart(headers, content);
Expand Down Expand Up @@ -142,7 +170,8 @@ public String toString() {
*/
private static class DefaultPart extends AbstractPart {

private final Content content;
protected final Content content;


public DefaultPart(HttpHeaders headers, Content content) {
super(headers);
Expand Down Expand Up @@ -191,7 +220,7 @@ public String filename() {

@Override
public Mono<Void> transferTo(Path dest) {
return DataBufferUtils.write(content(), dest);
return this.content.transferTo(dest);
}

@Override
Expand All @@ -200,7 +229,7 @@ public String toString() {
String name = contentDisposition.getName();
String filename = contentDisposition.getFilename();
if (name != null) {
return "DefaultFilePart{" + name() + " (" + filename + ")}";
return "DefaultFilePart{" + name + " (" + filename + ")}";
}
else {
return "DefaultFilePart{(" + filename + ")}";
Expand All @@ -209,4 +238,100 @@ public String toString() {

}


/**
* Part content abstraction.
*/
private interface Content {

Flux<DataBuffer> content();

Mono<Void> transferTo(Path dest);

Mono<Void> delete();

}

/**
* {@code Content} implementation based on a flux of data buffers.
*/
private static final class FluxContent implements Content {

private final Flux<DataBuffer> content;


public FluxContent(Flux<DataBuffer> content) {
this.content = content;
}


@Override
public Flux<DataBuffer> content() {
return this.content;
}

@Override
public Mono<Void> transferTo(Path dest) {
return DataBufferUtils.write(this.content, dest);
}

@Override
public Mono<Void> delete() {
return Mono.empty();
}

}


/**
* {@code Content} implementation based on a file.
*/
private static final class FileContent implements Content {

private final Path file;

private final Scheduler scheduler;


public FileContent(Path file, Scheduler scheduler) {
this.file = file;
this.scheduler = scheduler;
}


@Override
public Flux<DataBuffer> content() {
return DataBufferUtils.readByteChannel(
() -> Files.newByteChannel(this.file, StandardOpenOption.READ),
DefaultDataBufferFactory.sharedInstance, 1024)
.subscribeOn(this.scheduler);
}

@Override
public Mono<Void> transferTo(Path dest) {
return blockingOperation(() -> Files.copy(this.file, dest, StandardCopyOption.REPLACE_EXISTING));
}

@Override
public Mono<Void> delete() {
return blockingOperation(() -> {
Files.delete(this.file);
return null;
});
}

private Mono<Void> blockingOperation(Callable<?> callable) {
return Mono.<Void>create(sink -> {
try {
callable.call();
sink.success();
}
catch (Exception ex) {
sink.error(ex);
}
})
.subscribeOn(this.scheduler);
}
}

}
Expand Up @@ -160,7 +160,7 @@ else if (!this.streaming) {
requestToken();
}
});
emitPart(DefaultParts.part(headers, Content.fromFlux(streamingContent)));
emitPart(DefaultParts.part(headers, streamingContent));
}
}

Expand Down Expand Up @@ -518,7 +518,7 @@ private void emitMemoryPart() {
}
this.content.clear();
Flux<DataBuffer> content = Flux.just(DefaultDataBufferFactory.sharedInstance.wrap(bytes));
emitPart(DefaultParts.part(this.headers, Content.fromFlux(content)));
emitPart(DefaultParts.part(this.headers, content));
}

@Override
Expand Down Expand Up @@ -674,8 +674,7 @@ public void body(DataBuffer dataBuffer) {
@Override
public void partComplete(boolean finalPart) {
MultipartUtils.closeChannel(this.channel);
emitPart(DefaultParts.part(this.headers,
Content.fromFile(this.file, PartGenerator.this.blockingOperationScheduler)));
emitPart(DefaultParts.part(this.headers, this.file, PartGenerator.this.blockingOperationScheduler));
if (finalPart) {
emitComplete();
}
Expand Down

0 comments on commit 0c7e000

Please sign in to comment.