Skip to content

Commit

Permalink
Add Part::delete method
Browse files Browse the repository at this point in the history
This commit introduces the Part::delete method, that deletes its
underlying storage.

Closes gh-27612
  • Loading branch information
poutsma committed Nov 2, 2021
1 parent 47d3819 commit 694db22
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 18 deletions.
@@ -0,0 +1,129 @@
/*
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.http.codec.multipart;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;

/**
* Part content abstraction used by {@link DefaultParts}.
*
* @author Arjen Poutsma
* @since 5.3.13
*/
abstract class Content {


protected Content() {
}

/**
* Return the content.
*/
public abstract Flux<DataBuffer> content();

/**
* Delete this content. Default implementation does nothing.
*/
public Mono<Void> delete() {
return Mono.empty();
}

/**
* Returns a new {@code Content} based on the given flux of data buffers.
*/
public static Content fromFlux(Flux<DataBuffer> content) {
return new FluxContent(content);
}

/**
* Return a new {@code Content} based on the given file path.
*/
public static Content fromFile(Path file, Scheduler scheduler) {
return new FileContent(file, scheduler);
}


/**
* {@code Content} implementation based on a flux of data buffers.
*/
private static final class FluxContent extends Content {

private final Flux<DataBuffer> content;


public FluxContent(Flux<DataBuffer> content) {
this.content = content;
}


@Override
public Flux<DataBuffer> content() {
return this.content;
}
}


/**
* {@code Content} implementation based on a file.
*/
private static final class FileContent extends Content {

private final Path file;

private final Scheduler scheduler;


public FileContent(Path file, Scheduler scheduler) {
this.file = file;
this.scheduler = scheduler;
}


@Override
public Flux<DataBuffer> content() {
return DataBufferUtils.readByteChannel(
() -> Files.newByteChannel(this.file, StandardOpenOption.READ),
DefaultDataBufferFactory.sharedInstance, 1024)
.subscribeOn(this.scheduler);
}

@Override
public Mono<Void> delete() {
return Mono.<Void>fromRunnable(() -> {
try {
Files.delete(this.file);
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
})
.subscribeOn(this.scheduler);
}
}
}
Expand Up @@ -57,7 +57,7 @@ public static FormFieldPart formFieldPart(HttpHeaders headers, String value) {
* @param content the content of the part
* @return {@link Part} or {@link FilePart}, depending on {@link HttpHeaders#getContentDisposition()}
*/
public static Part part(HttpHeaders headers, Flux<DataBuffer> content) {
public static Part part(HttpHeaders headers, Content content) {
Assert.notNull(headers, "Headers must not be null");
Assert.notNull(content, "Content must not be null");

Expand Down Expand Up @@ -142,16 +142,21 @@ public String toString() {
*/
private static class DefaultPart extends AbstractPart {

private final Flux<DataBuffer> content;
private final Content content;

public DefaultPart(HttpHeaders headers, Flux<DataBuffer> content) {
public DefaultPart(HttpHeaders headers, Content content) {
super(headers);
this.content = content;
}

@Override
public Flux<DataBuffer> content() {
return this.content;
return this.content.content();
}

@Override
public Mono<Void> delete() {
return this.content.delete();
}

@Override
Expand All @@ -171,9 +176,9 @@ public String toString() {
/**
* Default implementation of {@link FilePart}.
*/
private static class DefaultFilePart extends DefaultPart implements FilePart {
private static final class DefaultFilePart extends DefaultPart implements FilePart {

public DefaultFilePart(HttpHeaders headers, Flux<DataBuffer> content) {
public DefaultFilePart(HttpHeaders headers, Content content) {
super(headers, content);
}

Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.springframework.http.codec.multipart;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
Expand Down Expand Up @@ -57,4 +58,13 @@ public interface Part {
*/
Flux<DataBuffer> content();

/**
* Return a mono that, when subscribed to, deletes the underlying storage
* for this part.
* @since 5.3.13
*/
default Mono<Void> delete() {
return Mono.empty();
}

}
Expand Up @@ -160,7 +160,7 @@ else if (!this.streaming) {
requestToken();
}
});
emitPart(DefaultParts.part(headers, streamingContent));
emitPart(DefaultParts.part(headers, Content.fromFlux(streamingContent)));
}
}

Expand Down Expand Up @@ -518,7 +518,7 @@ private void emitMemoryPart() {
}
this.content.clear();
Flux<DataBuffer> content = Flux.just(DefaultDataBufferFactory.sharedInstance.wrap(bytes));
emitPart(DefaultParts.part(this.headers, content));
emitPart(DefaultParts.part(this.headers, Content.fromFlux(content)));
}

@Override
Expand Down Expand Up @@ -674,21 +674,13 @@ public void body(DataBuffer dataBuffer) {
@Override
public void partComplete(boolean finalPart) {
MultipartUtils.closeChannel(this.channel);
Flux<DataBuffer> content = partContent();
emitPart(DefaultParts.part(this.headers, content));
emitPart(DefaultParts.part(this.headers,
Content.fromFile(this.file, PartGenerator.this.blockingOperationScheduler)));
if (finalPart) {
emitComplete();
}
}

private Flux<DataBuffer> partContent() {
return DataBufferUtils
.readByteChannel(
() -> Files.newByteChannel(this.file, StandardOpenOption.READ),
DefaultDataBufferFactory.sharedInstance, 1024)
.subscribeOn(PartGenerator.this.blockingOperationScheduler);
}

@Override
public void dispose() {
if (this.closeOnDispose) {
Expand Down
Expand Up @@ -16,7 +16,9 @@

package org.springframework.http.codec.multipart;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
Expand All @@ -40,6 +42,7 @@
import org.synchronoss.cloud.nio.multipart.NioMultipartParser;
import org.synchronoss.cloud.nio.multipart.NioMultipartParserListener;
import org.synchronoss.cloud.nio.multipart.PartBodyStreamStorageFactory;
import org.synchronoss.cloud.nio.stream.storage.NameAwarePurgableFileInputStream;
import org.synchronoss.cloud.nio.stream.storage.StreamStorage;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -497,6 +500,38 @@ public Flux<DataBuffer> content() {
protected StreamStorage getStorage() {
return this.storage;
}

@Override
public Mono<Void> delete() {
return Mono.fromRunnable(() -> {
File file = getFile();
if (file != null) {
file.delete();
}
});
}

@Nullable
private File getFile() {
InputStream inputStream = null;
try {
inputStream = getStorage().getInputStream();
if (inputStream instanceof NameAwarePurgableFileInputStream) {
NameAwarePurgableFileInputStream stream = (NameAwarePurgableFileInputStream) inputStream;
return stream.getFile();
}
}
finally {
if (inputStream != null) {
try {
inputStream.close();
}
catch (IOException ignore) {
}
}
}
return null;
}
}


Expand Down

0 comments on commit 694db22

Please sign in to comment.