/
fileio.py
543 lines (446 loc) · 20 KB
/
fileio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import warnings
from google.api_core.exceptions import RequestRangeNotSatisfiable
from google.cloud.storage._helpers import _NUM_RETRIES_MESSAGE
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED
from google.cloud.storage.retry import ConditionalRetryPolicy
# Resumable uploads require a chunk size of precisely a multiple of 256 KiB.
CHUNK_SIZE_MULTIPLE = 256 * 1024 # 256 KiB
DEFAULT_CHUNK_SIZE = 40 * 1024 * 1024 # 40 MiB
# Valid keyword arguments for download methods, and blob.reload() if needed.
# Note: Changes here need to be reflected in the blob.open() docstring.
VALID_DOWNLOAD_KWARGS = {
"if_generation_match",
"if_generation_not_match",
"if_metageneration_match",
"if_metageneration_not_match",
"timeout",
"retry",
"raw_download",
}
# Valid keyword arguments for upload methods.
# Note: Changes here need to be reflected in the blob.open() docstring.
VALID_UPLOAD_KWARGS = {
"content_type",
"predefined_acl",
"num_retries",
"if_generation_match",
"if_generation_not_match",
"if_metageneration_match",
"if_metageneration_not_match",
"timeout",
"checksum",
"retry",
}
class BlobReader(io.BufferedIOBase):
"""A file-like object that reads from a blob.
:type blob: 'google.cloud.storage.blob.Blob'
:param blob:
The blob to download.
:type chunk_size: long
:param chunk_size:
(Optional) The minimum number of bytes to read at a time. If fewer
bytes than the chunk_size are requested, the remainder is buffered.
The default is the chunk_size of the blob, or 40MiB.
:type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
:param retry:
(Optional) How to retry the RPC. A None value will disable
retries. A google.api_core.retry.Retry value will enable retries,
and the object will define retriable response codes and errors and
configure backoff and timeout options.
A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
Retry object and activates it only if certain conditions are met.
This class exists to provide safe defaults for RPC calls that are
not technically safe to retry normally (due to potential data
duplication or other side-effects) but become safe to retry if a
condition such as if_metageneration_match is set.
See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.
Media operations (downloads and uploads) do not support non-default
predicates in a Retry object. The default will always be used. Other
configuration changes for Retry objects such as delays and deadlines
are respected.
:param download_kwargs:
Keyword arguments to pass to the underlying API calls.
The following arguments are supported:
- ``if_generation_match``
- ``if_generation_not_match``
- ``if_metageneration_match``
- ``if_metageneration_not_match``
- ``timeout``
"""
def __init__(self, blob, chunk_size=None, retry=DEFAULT_RETRY, **download_kwargs):
"""docstring note that download_kwargs also used for reload()"""
for kwarg in download_kwargs:
if kwarg not in VALID_DOWNLOAD_KWARGS:
raise ValueError(
"BlobReader does not support keyword argument {}.".format(kwarg)
)
self._blob = blob
self._pos = 0
self._buffer = io.BytesIO()
self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE
self._retry = retry
self._download_kwargs = download_kwargs
def read(self, size=-1):
self._checkClosed() # Raises ValueError if closed.
result = self._buffer.read(size)
# If the read request demands more bytes than are buffered, fetch more.
remaining_size = size - len(result)
if remaining_size > 0 or size < 0:
self._pos += self._buffer.tell()
read_size = len(result)
self._buffer.seek(0)
self._buffer.truncate(0) # Clear the buffer to make way for new data.
fetch_start = self._pos
if size > 0:
# Fetch the larger of self._chunk_size or the remaining_size.
fetch_end = fetch_start + max(remaining_size, self._chunk_size)
else:
fetch_end = None
# Download the blob. Checksumming must be disabled as we are using
# chunked downloads, and the server only knows the checksum of the
# entire file.
try:
result += self._blob.download_as_bytes(
start=fetch_start,
end=fetch_end,
checksum=None,
retry=self._retry,
**self._download_kwargs
)
except RequestRangeNotSatisfiable:
# We've reached the end of the file. Python file objects should
# return an empty response in this case, not raise an error.
pass
# If more bytes were read than is immediately needed, buffer the
# remainder and then trim the result.
if size > 0 and len(result) > size:
self._buffer.write(result[size:])
self._buffer.seek(0)
result = result[:size]
# Increment relative offset by true amount read.
self._pos += len(result) - read_size
return result
def read1(self, size=-1):
return self.read(size)
def seek(self, pos, whence=0):
"""Seek within the blob.
This implementation of seek() uses knowledge of the blob size to
validate that the reported position does not exceed the blob last byte.
If the blob size is not already known it will call blob.reload().
"""
self._checkClosed() # Raises ValueError if closed.
if self._blob.size is None:
self._blob.reload(**self._download_kwargs)
initial_offset = self._pos + self._buffer.tell()
if whence == 0:
target_pos = pos
elif whence == 1:
target_pos = initial_offset + pos
elif whence == 2:
target_pos = self._blob.size + pos
if whence not in {0, 1, 2}:
raise ValueError("invalid whence value")
if target_pos > self._blob.size:
target_pos = self._blob.size
# Seek or invalidate buffer as needed.
if target_pos < self._pos:
# Target position < relative offset <= true offset.
# As data is not in buffer, invalidate buffer.
self._buffer.seek(0)
self._buffer.truncate(0)
new_pos = target_pos
self._pos = target_pos
else:
# relative offset <= target position <= size of file.
difference = target_pos - initial_offset
new_pos = self._pos + self._buffer.seek(difference, 1)
return new_pos
def close(self):
self._buffer.close()
def _checkClosed(self):
if self._buffer.closed:
raise ValueError("I/O operation on closed file.")
def readable(self):
return True
def writable(self):
return False
def seekable(self):
return True
class BlobWriter(io.BufferedIOBase):
"""A file-like object that writes to a blob.
:type blob: 'google.cloud.storage.blob.Blob'
:param blob:
The blob to which to write.
:type chunk_size: long
:param chunk_size:
(Optional) The maximum number of bytes to buffer before sending data
to the server, and the size of each request when data is sent.
Writes are implemented as a "resumable upload", so chunk_size for
writes must be exactly a multiple of 256KiB as with other resumable
uploads. The default is the chunk_size of the blob, or 40 MiB.
:type text_mode: bool
:param text_mode:
(Deprecated) A synonym for ignore_flush. For backwards-compatibility,
if True, sets ignore_flush to True. Use ignore_flush instead. This
parameter will be removed in a future release.
:type ignore_flush: bool
:param ignore_flush:
Makes flush() do nothing instead of raise an error. flush() without
closing is not supported by the remote service and therefore calling it
on this class normally results in io.UnsupportedOperation. However, that
behavior is incompatible with some consumers and wrappers of file
objects in Python, such as zipfile.ZipFile or io.TextIOWrapper. Setting
ignore_flush will cause flush() to successfully do nothing, for
compatibility with those contexts. The correct way to actually flush
data to the remote server is to close() (using this object as a context
manager is recommended).
:type retry: google.api_core.retry.Retry or google.cloud.storage.retry.ConditionalRetryPolicy
:param retry:
(Optional) How to retry the RPC. A None value will disable
retries. A google.api_core.retry.Retry value will enable retries,
and the object will define retriable response codes and errors and
configure backoff and timeout options.
A google.cloud.storage.retry.ConditionalRetryPolicy value wraps a
Retry object and activates it only if certain conditions are met.
This class exists to provide safe defaults for RPC calls that are
not technically safe to retry normally (due to potential data
duplication or other side-effects) but become safe to retry if a
condition such as if_metageneration_match is set.
See the retry.py source code and docstrings in this package
(google.cloud.storage.retry) for information on retry types and how
to configure them.
Media operations (downloads and uploads) do not support non-default
predicates in a Retry object. The default will always be used. Other
configuration changes for Retry objects such as delays and deadlines
are respected.
:param upload_kwargs:
Keyword arguments to pass to the underlying API
calls. The following arguments are supported:
- ``if_generation_match``
- ``if_generation_not_match``
- ``if_metageneration_match``
- ``if_metageneration_not_match``
- ``timeout``
- ``content_type``
- ``num_retries``
- ``predefined_acl``
- ``checksum``
"""
def __init__(
self,
blob,
chunk_size=None,
text_mode=False,
ignore_flush=False,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
**upload_kwargs
):
for kwarg in upload_kwargs:
if kwarg not in VALID_UPLOAD_KWARGS:
raise ValueError(
"BlobWriter does not support keyword argument {}.".format(kwarg)
)
self._blob = blob
self._buffer = SlidingBuffer()
self._upload_and_transport = None
# Resumable uploads require a chunk size of a multiple of 256KiB.
# self._chunk_size must not be changed after the upload is initiated.
self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE
# text_mode is a deprecated synonym for ignore_flush
self._ignore_flush = ignore_flush or text_mode
self._retry = retry
self._upload_kwargs = upload_kwargs
@property
def _chunk_size(self):
"""Get the blob's default chunk size.
:rtype: int or ``NoneType``
:returns: The current blob's chunk size, if it is set.
"""
return self.__chunk_size
@_chunk_size.setter
def _chunk_size(self, value):
"""Set the blob's default chunk size.
:type value: int
:param value: (Optional) The current blob's chunk size, if it is set.
:raises: :class:`ValueError` if ``value`` is not ``None`` and is not a
multiple of 256 KiB.
"""
if value is not None and value > 0 and value % CHUNK_SIZE_MULTIPLE != 0:
raise ValueError(
"Chunk size must be a multiple of %d." % CHUNK_SIZE_MULTIPLE
)
self.__chunk_size = value
def write(self, b):
self._checkClosed() # Raises ValueError if closed.
pos = self._buffer.write(b)
# If there is enough content, upload chunks.
num_chunks = len(self._buffer) // self._chunk_size
if num_chunks:
self._upload_chunks_from_buffer(num_chunks)
return pos
def _initiate_upload(self):
# num_retries is only supported for backwards-compatibility reasons.
num_retries = self._upload_kwargs.pop("num_retries", None)
retry = self._retry
content_type = self._upload_kwargs.pop("content_type", None)
if num_retries is not None:
warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning, stacklevel=2)
# num_retries and retry are mutually exclusive. If num_retries is
# set and retry is exactly the default, then nullify retry for
# backwards compatibility.
if retry is DEFAULT_RETRY_IF_GENERATION_SPECIFIED:
retry = None
# Handle ConditionalRetryPolicy.
if isinstance(retry, ConditionalRetryPolicy):
# Conditional retries are designed for non-media calls, which change
# arguments into query_params dictionaries. Media operations work
# differently, so here we make a "fake" query_params to feed to the
# ConditionalRetryPolicy.
query_params = {
"ifGenerationMatch": self._upload_kwargs.get("if_generation_match"),
"ifMetagenerationMatch": self._upload_kwargs.get(
"if_metageneration_match"
),
}
retry = retry.get_retry_policy_if_conditions_met(query_params=query_params)
self._upload_and_transport = self._blob._initiate_resumable_upload(
self._blob.bucket.client,
self._buffer,
content_type,
None,
num_retries,
chunk_size=self._chunk_size,
retry=retry,
**self._upload_kwargs
)
def _upload_chunks_from_buffer(self, num_chunks):
"""Upload a specified number of chunks."""
# Initialize the upload if necessary.
if not self._upload_and_transport:
self._initiate_upload()
upload, transport = self._upload_and_transport
# Upload chunks. The SlidingBuffer class will manage seek position.
for _ in range(num_chunks):
upload.transmit_next_chunk(transport)
# Wipe the buffer of chunks uploaded, preserving any remaining data.
self._buffer.flush()
def tell(self):
return self._buffer.tell() + len(self._buffer)
def flush(self):
# flush() is not fully supported by the remote service, so raise an
# error here, unless self._ignore_flush is set.
if not self._ignore_flush:
raise io.UnsupportedOperation(
"Cannot flush without finalizing upload. Use close() instead, "
"or set ignore_flush=True when constructing this class (see "
"docstring)."
)
def close(self):
self._checkClosed() # Raises ValueError if closed.
self._upload_chunks_from_buffer(1)
self._buffer.close()
def _checkClosed(self):
if self._buffer.closed:
raise ValueError("I/O operation on closed file.")
def readable(self):
return False
def writable(self):
return True
def seekable(self):
return False
class SlidingBuffer(object):
"""A non-rewindable buffer that frees memory of chunks already consumed.
This class is necessary because `google-resumable-media-python` expects
`tell()` to work relative to the start of the file, not relative to a place
in an intermediate buffer. Using this class, we present an external
interface with consistent seek and tell behavior without having to actually
store bytes already sent.
Behavior of this class differs from an ordinary BytesIO buffer. `write()`
will always append to the end of the file only and not change the seek
position otherwise. `flush()` will delete all data already read (data to the
left of the seek position). `tell()` will report the seek position of the
buffer including all deleted data. Additionally the class implements
__len__() which will report the size of the actual underlying buffer.
This class does not attempt to implement the entire Python I/O interface.
"""
def __init__(self):
self._buffer = io.BytesIO()
self._cursor = 0
def write(self, b):
"""Append to the end of the buffer without changing the position."""
self._checkClosed() # Raises ValueError if closed.
bookmark = self._buffer.tell()
self._buffer.seek(0, io.SEEK_END)
pos = self._buffer.write(b)
self._buffer.seek(bookmark)
return self._cursor + pos
def read(self, size=-1):
"""Read and move the cursor."""
self._checkClosed() # Raises ValueError if closed.
data = self._buffer.read(size)
self._cursor += len(data)
return data
def flush(self):
"""Delete already-read data (all data to the left of the position)."""
self._checkClosed() # Raises ValueError if closed.
# BytesIO can't be deleted from the left, so save any leftover, unread
# data and truncate at 0, then readd leftover data.
leftover = self._buffer.read()
self._buffer.seek(0)
self._buffer.truncate(0)
self._buffer.write(leftover)
self._buffer.seek(0)
def tell(self):
"""Report how many bytes have been read from the buffer in total."""
return self._cursor
def seek(self, pos):
"""Seek to a position (backwards only) within the internal buffer.
This implementation of seek() verifies that the seek destination is
contained in _buffer. It will raise ValueError if the destination byte
has already been purged from the buffer.
The "whence" argument is not supported in this implementation.
"""
self._checkClosed() # Raises ValueError if closed.
buffer_initial_pos = self._buffer.tell()
difference = pos - self._cursor
buffer_seek_result = self._buffer.seek(difference, io.SEEK_CUR)
if (
not buffer_seek_result - buffer_initial_pos == difference
or pos > self._cursor
):
# The seek did not arrive at the expected byte because the internal
# buffer does not (or no longer) contains the byte. Reset and raise.
self._buffer.seek(buffer_initial_pos)
raise ValueError("Cannot seek() to that value.")
self._cursor = pos
return self._cursor
def __len__(self):
"""Determine the size of the buffer by seeking to the end."""
bookmark = self._buffer.tell()
length = self._buffer.seek(0, io.SEEK_END)
self._buffer.seek(bookmark)
return length
def close(self):
return self._buffer.close()
def _checkClosed(self):
return self._buffer._checkClosed()
@property
def closed(self):
return self._buffer.closed