From aa9d0fb0ce489c1b20df22e01d4b063ab767f16d Mon Sep 17 00:00:00 2001 From: Paul Petersik Date: Thu, 25 Apr 2024 04:15:30 +0200 Subject: [PATCH] [sftp] Check full path for existence in ._save() and ._mkdir() (#1372) --- storages/backends/sftpstorage.py | 18 ++++++++++++++---- tests/test_sftp.py | 18 +++++++++++++++--- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/storages/backends/sftpstorage.py b/storages/backends/sftpstorage.py index 52d4d59b..478235f6 100644 --- a/storages/backends/sftpstorage.py +++ b/storages/backends/sftpstorage.py @@ -118,7 +118,7 @@ def _mkdir(self, path): """Create directory, recursing up to create parent dirs if necessary.""" parent = posixpath.dirname(path) - if not self.exists(parent): + if not self._path_exists(parent): self._mkdir(parent) self.sftp.mkdir(path) @@ -134,7 +134,7 @@ def _save(self, name, content): content.seek(0, os.SEEK_SET) path = self._remote_path(name) dirname = posixpath.dirname(path) - if not self.exists(dirname): + if not self._path_exists(dirname): self._mkdir(dirname) self.sftp.putfo(content, path) @@ -152,13 +152,23 @@ def delete(self, name): except OSError: pass - def exists(self, name): + def _path_exists(self, path): + """Determines whether a file existis in the sftp storage given its + absolute path.""" try: - self.sftp.stat(self._remote_path(name)) + self.sftp.stat(path) return True except FileNotFoundError: return False + def exists(self, name): + """Determines whether a file exists within the root folder of the SFTP storage + (as set by `SFTP_STORAGE_ROOT`). This method differs from `._path_exists()` + in that the provided `name` is assumed to be the relative path of the file + within the root folder. + """ + return self._path_exists(self._remote_path(name)) + def _isdir_attr(self, item): # Return whether an item in sftp.listdir_attr results is a directory if item.st_mode is not None: diff --git a/tests/test_sftp.py b/tests/test_sftp.py index ba33d1fd..1c95b674 100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -16,7 +16,7 @@ class SFTPStorageTest(TestCase): def setUp(self): - self.storage = sftpstorage.SFTPStorage(host="foo") + self.storage = sftpstorage.SFTPStorage(host="foo", root_path="root") def test_init(self): pass @@ -95,13 +95,18 @@ def test_save_non_seekable(self, mock_sftp): ) def test_save_in_subdir(self, mock_sftp): self.storage._save("bar/foo", File(io.BytesIO(b"foo"), "foo")) - self.assertEqual(mock_sftp.mkdir.call_args_list[0][0], ("bar",)) + self.assertEqual(mock_sftp.stat.call_args_list[0][0], ("root/bar",)) + self.assertEqual(mock_sftp.mkdir.call_args_list[0][0], ("root/bar",)) self.assertTrue(mock_sftp.putfo.called) @patch("storages.backends.sftpstorage.SFTPStorage.sftp") def test_delete(self, mock_sftp): self.storage.delete("foo") - self.assertEqual(mock_sftp.remove.call_args_list[0][0], ("foo",)) + self.assertEqual(mock_sftp.remove.call_args_list[0][0], ("root/foo",)) + + @patch("storages.backends.sftpstorage.SFTPStorage.sftp") + def test_path_exists(self, mock_sftp): + self.assertTrue(self.storage._path_exists("root/foo")) @patch("storages.backends.sftpstorage.SFTPStorage.sftp") def test_exists(self, mock_sftp): @@ -114,6 +119,13 @@ def test_exists(self, mock_sftp): def test_not_exists(self, mock_sftp): self.assertFalse(self.storage.exists("foo")) + @patch( + "storages.backends.sftpstorage.SFTPStorage.sftp", + **{"stat.side_effect": FileNotFoundError()}, + ) + def test_not_path_exists(self, mock_sftp): + self.assertFalse(self.storage._path_exists("root/foo")) + @patch( "storages.backends.sftpstorage.SFTPStorage.sftp", **{"stat.side_effect": socket.timeout()},