forked from spring-projects/spring-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DefaultPartHttpMessageReader.java
233 lines (203 loc) · 8.79 KB
/
DefaultPartHttpMessageReader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
/*
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.codec.multipart;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.LoggingCodecSupport;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Default {@code HttpMessageReader} for parsing {@code "multipart/form-data"}
* requests to a stream of {@link Part}s.
*
* <p>In default, non-streaming mode, this message reader stores the
* {@linkplain Part#content() contents} of parts smaller than
* {@link #setMaxInMemorySize(int) maxInMemorySize} in memory, and parts larger
* than that to a temporary file in
* {@link #setFileStorageDirectory(Path) fileStorageDirectory}.
* <p>In {@linkplain #setStreaming(boolean) streaming} mode, the contents of the
* part is streamed directly from the parsed input buffer stream, and not stored
* in memory nor file.
*
* <p>This reader can be provided to {@link MultipartHttpMessageReader} in order
* to aggregate all parts into a Map.
*
* @author Arjen Poutsma
* @since 5.3
*/
public class DefaultPartHttpMessageReader extends LoggingCodecSupport implements HttpMessageReader<Part> {
private int maxInMemorySize = 256 * 1024;
private int maxHeadersSize = 10 * 1024;
private long maxDiskUsagePerPart = -1;
private int maxParts = -1;
private boolean streaming;
private Scheduler blockingOperationScheduler = Schedulers.boundedElastic();
private FileStorage fileStorage = FileStorage.tempDirectory(this::getBlockingOperationScheduler);
private Charset headersCharset = StandardCharsets.UTF_8;
/**
* Configure the maximum amount of memory that is allowed per headers section of each part.
* When the limit
* @param byteCount the maximum amount of memory for headers
*/
public void setMaxHeadersSize(int byteCount) {
this.maxHeadersSize = byteCount;
}
/**
* Get the {@link #setMaxInMemorySize configured} maximum in-memory size.
*/
public int getMaxInMemorySize() {
return this.maxInMemorySize;
}
/**
* Configure the maximum amount of memory allowed per part.
* When the limit is exceeded:
* <ul>
* <li>file parts are written to a temporary file.
* <li>non-file parts are rejected with {@link DataBufferLimitException}.
* </ul>
* <p>By default this is set to 256K.
* <p>Note that this property is ignored when
* {@linkplain #setStreaming(boolean) streaming} is enabled.
* @param maxInMemorySize the in-memory limit in bytes; if set to -1 the entire
* contents will be stored in memory
*/
public void setMaxInMemorySize(int maxInMemorySize) {
this.maxInMemorySize = maxInMemorySize;
}
/**
* Configure the maximum amount of disk space allowed for file parts.
* <p>By default this is set to -1, meaning that there is no maximum.
* <p>Note that this property is ignored when
* {@linkplain #setStreaming(boolean) streaming} is enabled, , or when
* {@link #setMaxInMemorySize(int) maxInMemorySize} is set to -1.
*/
public void setMaxDiskUsagePerPart(long maxDiskUsagePerPart) {
this.maxDiskUsagePerPart = maxDiskUsagePerPart;
}
/**
* Specify the maximum number of parts allowed in a given multipart request.
* <p>By default this is set to -1, meaning that there is no maximum.
*/
public void setMaxParts(int maxParts) {
this.maxParts = maxParts;
}
/**
* Set the directory used to store parts larger than
* {@link #setMaxInMemorySize(int) maxInMemorySize}. By default, a directory
* named {@code spring-webflux-multipart} is created under the system
* temporary directory.
* <p>Note that this property is ignored when
* {@linkplain #setStreaming(boolean) streaming} is enabled, or when
* {@link #setMaxInMemorySize(int) maxInMemorySize} is set to -1.
* @throws IOException if an I/O error occurs, or the parent directory
* does not exist
*/
public void setFileStorageDirectory(Path fileStorageDirectory) throws IOException {
Assert.notNull(fileStorageDirectory, "FileStorageDirectory must not be null");
this.fileStorage = FileStorage.fromPath(fileStorageDirectory);
}
/**
* Set the Reactor {@link Scheduler} to be used for creating files and
* directories, and writing to files. By default,
* {@link Schedulers#boundedElastic()} is used, but this property allows for
* changing it to an externally managed scheduler.
* <p>Note that this property is ignored when
* {@linkplain #setStreaming(boolean) streaming} is enabled, or when
* {@link #setMaxInMemorySize(int) maxInMemorySize} is set to -1.
* @see Schedulers#newBoundedElastic
*/
public void setBlockingOperationScheduler(Scheduler blockingOperationScheduler) {
Assert.notNull(blockingOperationScheduler, "FileCreationScheduler must not be null");
this.blockingOperationScheduler = blockingOperationScheduler;
}
private Scheduler getBlockingOperationScheduler() {
return this.blockingOperationScheduler;
}
/**
* When set to {@code true}, the {@linkplain Part#content() part content}
* is streamed directly from the parsed input buffer stream, and not stored
* in memory nor file.
* When {@code false}, parts are backed by
* in-memory and/or file storage. Defaults to {@code false}.
* <p><strong>NOTE</strong> that with streaming enabled, the
* {@code Flux<Part>} that is produced by this message reader must be
* consumed in the original order, i.e. the order of the HTTP message.
* Additionally, the {@linkplain Part#content() body contents} must either
* be completely consumed or canceled before moving to the next part.
* <p>Also note that enabling this property effectively ignores
* {@link #setMaxInMemorySize(int) maxInMemorySize},
* {@link #setMaxDiskUsagePerPart(long) maxDiskUsagePerPart},
* {@link #setFileStorageDirectory(Path) fileStorageDirectory}, and
* {@link #setBlockingOperationScheduler(Scheduler) fileCreationScheduler}.
*/
public void setStreaming(boolean streaming) {
this.streaming = streaming;
}
/**
* Set the character set used to decode headers.
* Defaults to UTF-8 as per RFC 7578.
* @param headersCharset the charset to use for decoding headers
* @since 5.3.6
* @see <a href="https://tools.ietf.org/html/rfc7578#section-5.1">RFC-7578 Section 5.1</a>
*/
public void setHeadersCharset(Charset headersCharset) {
Assert.notNull(headersCharset, "HeadersCharset must not be null");
this.headersCharset = headersCharset;
}
@Override
public List<MediaType> getReadableMediaTypes() {
return Collections.singletonList(MediaType.MULTIPART_FORM_DATA);
}
@Override
public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType) {
return Part.class.equals(elementType.toClass()) &&
(mediaType == null || MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType));
}
@Override
public Mono<Part> readMono(ResolvableType elementType, ReactiveHttpInputMessage message,
Map<String, Object> hints) {
return Mono.error(new UnsupportedOperationException("Cannot read multipart request body into single Part"));
}
@Override
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
return Flux.defer(() -> {
byte[] boundary = MultipartUtils.boundary(message, this.headersCharset);
if (boundary == null) {
return Flux.error(new DecodingException("No multipart boundary found in Content-Type: \"" +
message.getHeaders().getContentType() + "\""));
}
Flux<MultipartParser.Token> tokens = MultipartParser.parse(message.getBody(), boundary,
this.maxHeadersSize, this.headersCharset);
return PartGenerator.createParts(tokens, this.maxParts, this.maxInMemorySize, this.maxDiskUsagePerPart,
this.streaming, this.fileStorage.directory(), this.blockingOperationScheduler);
});
}
}