Skip to content

Commit

Permalink
[gcloud] Retry on connection/read timeout (#5)
Browse files Browse the repository at this point in the history
* [gcloud] Retry on connection/read timeout

Revert this comment after a version of `google-cloud-storage` with
googleapis/python-storage#727 gets released and
ScanForm is updated to use it.

* [tests/gcloud] Add retry to expected params
  • Loading branch information
mlazowik committed Mar 10, 2022
1 parent 4498889 commit 34f77f4
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 21 deletions.
49 changes: 40 additions & 9 deletions storages/backends/gcloud.py
Expand Up @@ -16,11 +16,15 @@
)

try:
import requests.exceptions
from google.api_core import exceptions as api_exceptions, retry
from google.auth import exceptions as auth_exceptions
from google.cloud.exceptions import NotFound
from google.cloud.storage import Blob, Client
from google.cloud.storage.blob import _quote
from google.cloud.storage.retry import (
DEFAULT_RETRY, DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
_ADDITIONAL_RETRYABLE_STATUS_CODES, _RETRYABLE_TYPES,
ConditionalRetryPolicy, is_generation_specified,
)
except ImportError:
raise ImproperlyConfigured("Could not load Google Cloud Storage bindings.\n"
Expand All @@ -30,14 +34,40 @@
CONTENT_ENCODING = 'content_encoding'
CONTENT_TYPE = 'content_type'

# TODO(m1): revert the commit that introduced this after a version of
# `google-cloud-storage` with https://github.com/googleapis/python-storage/pull/727
# gets released and ScanForm is updated to use it.
CUSTOM_RETRYABLE_TYPES = _RETRYABLE_TYPES + (
requests.exceptions.Timeout,
)


def _should_retry(exc):
"""Predicate for determining when to retry."""
if isinstance(exc, CUSTOM_RETRYABLE_TYPES):
return True
elif isinstance(exc, api_exceptions.GoogleAPICallError):
return exc.code in _ADDITIONAL_RETRYABLE_STATUS_CODES
elif isinstance(exc, auth_exceptions.TransportError):
return _should_retry(exc.args[0])
else:
return False


RETRY = retry.Retry(predicate=_should_retry)

RETRY_IF_GENERATION_SPECIFIED = ConditionalRetryPolicy(
RETRY, is_generation_specified, ["query_params"]
)


class GoogleCloudFile(CompressedFileMixin, File):
def __init__(self, name, mode, storage):
self.name = name
self.mime_type = mimetypes.guess_type(name)[0]
self._mode = mode
self._storage = storage
self.blob = storage.bucket.get_blob(name, timeout=storage.timeout)
self.blob = storage.bucket.get_blob(name, timeout=storage.timeout, retry=RETRY)
if not self.blob and 'w' in mode:
self.blob = Blob(
self.name, storage.bucket,
Expand All @@ -58,7 +88,8 @@ def _get_file(self):
)
if 'r' in self._mode:
self._is_dirty = False
self.blob.download_to_file(self._file, timeout=self._storage.timeout)
self.blob.download_to_file(self._file, timeout=self._storage.timeout,
retry=RETRY)
self._file.seek(0)
if self._storage.gzip and self.blob.content_encoding == 'gzip':
self._file = self._decompress_file(mode=self._mode, file=self._file)
Expand Down Expand Up @@ -140,9 +171,9 @@ def get_default_settings(self):
@property
def retry_policy__if_generation_specified_or_immutable(self):
if self.all_files_immutable:
return DEFAULT_RETRY
return RETRY

return DEFAULT_RETRY_IF_GENERATION_SPECIFIED
return RETRY_IF_GENERATION_SPECIFIED

@property
def client(self):
Expand Down Expand Up @@ -242,13 +273,13 @@ def delete(self, name):
def exists(self, name):
if not name: # root element aka the bucket
try:
self.client.get_bucket(self.bucket, timeout=self.timeout)
self.client.get_bucket(self.bucket, timeout=self.timeout, retry=RETRY)
return True
except NotFound:
return False

name = self._normalize_name(clean_name(name))
return bool(self.bucket.get_blob(name, timeout=self.timeout))
return bool(self.bucket.get_blob(name, timeout=self.timeout, retry=RETRY))

def listdir(self, name):
name = self._normalize_name(clean_name(name))
Expand All @@ -258,7 +289,7 @@ def listdir(self, name):
name += '/'

iterator = self.bucket.list_blobs(prefix=name, delimiter='/',
timeout=self.timeout)
timeout=self.timeout, retry=RETRY)
blobs = list(iterator)
prefixes = iterator.prefixes

Expand All @@ -276,7 +307,7 @@ def listdir(self, name):

def _get_blob(self, name):
# Wrap google.cloud.storage's blob to raise if the file doesn't exist
blob = self.bucket.get_blob(name, timeout=self.timeout)
blob = self.bucket.get_blob(name, timeout=self.timeout, retry=RETRY)

if blob is None:
raise NotFound('File does not exist: {}'.format(name))
Expand Down
36 changes: 24 additions & 12 deletions tests/test_gcloud.py
Expand Up @@ -38,7 +38,8 @@ def test_open_read(self):

f = self.storage.open(self.filename)
self.storage._client.bucket.assert_called_with(self.bucket_name)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60,
retry=mock.ANY)

f.blob.download_to_file = lambda tmpfile, **kwargs: tmpfile.write(data)
self.assertEqual(f.read(), data)
Expand All @@ -49,7 +50,8 @@ def test_open_read_num_bytes(self):

f = self.storage.open(self.filename)
self.storage._client.bucket.assert_called_with(self.bucket_name)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60,
retry=mock.ANY)

f.blob.download_to_file = lambda tmpfile, **kwargs: tmpfile.write(data)
self.assertEqual(f.read(num_bytes), data[0:num_bytes])
Expand All @@ -59,7 +61,8 @@ def test_open_read_nonexistent(self):
self.storage._bucket.get_blob.return_value = None

self.assertRaises(FileNotFoundError, self.storage.open, self.filename)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60,
retry=mock.ANY)

def test_open_read_nonexistent_unicode(self):
filename = 'ủⓝï℅ⅆℇ.txt'
Expand Down Expand Up @@ -145,12 +148,14 @@ def test_delete(self):
def test_exists(self):
self.storage._bucket = mock.MagicMock()
self.assertTrue(self.storage.exists(self.filename))
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60,
retry=mock.ANY)

self.storage._bucket.reset_mock()
self.storage._bucket.get_blob.return_value = None
self.assertFalse(self.storage.exists(self.filename))
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60,
retry=mock.ANY)

def test_exists_no_bucket(self):
# exists('') should return False if the bucket doesn't exist
Expand Down Expand Up @@ -235,7 +240,8 @@ def test_size(self):
self.storage._bucket.get_blob.return_value = blob

self.assertEqual(self.storage.size(self.filename), size)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60,
retry=mock.ANY)

def test_size_no_file(self):
self.storage._bucket = mock.MagicMock()
Expand All @@ -256,7 +262,8 @@ def test_modified_time(self):
mt = self.storage.modified_time(self.filename)
self.assertTrue(timezone.is_naive(mt))
self.assertEqual(mt, naive_date)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60,
retry=mock.ANY)

def test_get_modified_time(self):
naive_date = datetime(2017, 1, 2, 3, 4, 5, 678)
Expand All @@ -272,13 +279,15 @@ def test_get_modified_time(self):
self.assertTrue(timezone.is_naive(mt))
naive_date_montreal = timezone.make_naive(aware_date)
self.assertEqual(mt, naive_date_montreal)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60,
retry=mock.ANY)

with self.settings(TIME_ZONE='America/Montreal', USE_TZ=True):
mt = self.storage.get_modified_time(self.filename)
self.assertTrue(timezone.is_aware(mt))
self.assertEqual(mt, aware_date)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60,
retry=mock.ANY)

def test_get_created_time(self):
naive_date = datetime(2017, 1, 2, 3, 4, 5, 678)
Expand All @@ -294,13 +303,15 @@ def test_get_created_time(self):
self.assertTrue(timezone.is_naive(mt))
naive_date_montreal = timezone.make_naive(aware_date)
self.assertEqual(mt, naive_date_montreal)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60,
retry=mock.ANY)

with self.settings(TIME_ZONE='America/Montreal', USE_TZ=True):
mt = self.storage.get_created_time(self.filename)
self.assertTrue(timezone.is_aware(mt))
self.assertEqual(mt, aware_date)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60,
retry=mock.ANY)

def test_modified_time_no_file(self):
self.storage._bucket = mock.MagicMock()
Expand Down Expand Up @@ -387,7 +398,8 @@ def test_get_available_name(self):
self.storage.file_overwrite = False
self.assertEqual(self.storage.get_available_name(
self.filename), self.filename)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60)
self.storage._bucket.get_blob.assert_called_with(self.filename, timeout=60,
retry=mock.ANY)

def test_get_available_name_unicode(self):
filename = 'ủⓝï℅ⅆℇ.txt'
Expand Down

0 comments on commit 34f77f4

Please sign in to comment.