diff --git a/git/exc.py b/git/exc.py index 22fcde0d6..b696d792f 100644 --- a/git/exc.py +++ b/git/exc.py @@ -37,6 +37,10 @@ class NoSuchPathError(GitError, OSError): """Thrown if a path could not be access by the system.""" +class UnsafeOptionsUsedError(GitError): + """Thrown if unsafe protocols or options are passed without overridding.""" + + class CommandError(GitError): """Base class for exceptions thrown at every stage of `Popen()` execution. diff --git a/git/repo/base.py b/git/repo/base.py index 49a3d5a16..35ff68b0c 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -21,7 +21,12 @@ ) from git.config import GitConfigParser from git.db import GitCmdObjectDB -from git.exc import InvalidGitRepositoryError, NoSuchPathError, GitCommandError +from git.exc import ( + GitCommandError, + InvalidGitRepositoryError, + NoSuchPathError, + UnsafeOptionsUsedError, +) from git.index import IndexFile from git.objects import Submodule, RootModule, Commit from git.refs import HEAD, Head, Reference, TagReference @@ -128,6 +133,7 @@ class Repo(object): re_envvars = re.compile(r"(\$(\{\s?)?[a-zA-Z_]\w*(\}\s?)?|%\s?[a-zA-Z_]\w*\s?%)") re_author_committer_start = re.compile(r"^(author|committer)") re_tab_full_line = re.compile(r"^\t(.*)$") + re_config_protocol_option = re.compile(r"-[-]?c(|onfig)\s+protocol\.", re.I) # invariants # represents the configuration level of a configuration file @@ -1215,11 +1221,27 @@ def _clone( # END handle remote repo return repo + @classmethod + def unsafe_options( + cls, + url: str, + multi_options: Optional[List[str]] = None, + ) -> bool: + if "ext::" in url: + return True + if multi_options is not None: + if any(["--upload-pack" in m for m in multi_options]): + return True + if any([re.match(cls.re_config_protocol_option, m) for m in multi_options]): + return True + return False + def clone( self, path: PathLike, progress: Optional[Callable] = None, multi_options: Optional[List[str]] = None, + unsafe_protocols: bool = False, **kwargs: Any, ) -> "Repo": """Create a clone from this repository. @@ -1230,12 +1252,15 @@ def clone( option per list item which is passed exactly as specified to clone. For example ['--config core.filemode=false', '--config core.ignorecase', '--recurse-submodule=repo1_path', '--recurse-submodule=repo2_path'] + :param unsafe_protocols: Allow unsafe protocols to be used, like ext :param kwargs: * odbt = ObjectDatabase Type, allowing to determine the object database implementation used by the returned Repo instance * All remaining keyword arguments are given to the git-clone command :return: ``git.Repo`` (the newly cloned repo)""" + if not unsafe_protocols and self.unsafe_options(path, multi_options): + raise UnsafeOptionsUsedError(f"{path} requires unsafe_protocols flag") return self._clone( self.git, self.common_dir, @@ -1254,6 +1279,7 @@ def clone_from( progress: Optional[Callable] = None, env: Optional[Mapping[str, str]] = None, multi_options: Optional[List[str]] = None, + unsafe_protocols: bool = False, **kwargs: Any, ) -> "Repo": """Create a clone from the given URL @@ -1268,11 +1294,14 @@ def clone_from( If you want to unset some variable, consider providing empty string as its value. :param multi_options: See ``clone`` method + :param unsafe_protocols: Allow unsafe protocols to be used, like ext :param kwargs: see the ``clone`` method :return: Repo instance pointing to the cloned directory""" git = cls.GitCommandWrapperType(os.getcwd()) if env is not None: git.update_environment(**env) + if not unsafe_protocols and cls.unsafe_options(url, multi_options): + raise UnsafeOptionsUsedError(f"{url} requires unsafe_protocols flag") return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs) def archive( diff --git a/test/test_repo.py b/test/test_repo.py index 6382db7e4..53cae3cd7 100644 --- a/test/test_repo.py +++ b/test/test_repo.py @@ -13,6 +13,7 @@ import pickle import sys import tempfile +import uuid from unittest import mock, skipIf, SkipTest import pytest @@ -37,6 +38,7 @@ ) from git.exc import ( BadObject, + UnsafeOptionsUsedError, ) from git.repo.fun import touch from test.lib import TestBase, with_rw_repo, fixture @@ -263,6 +265,40 @@ def test_leaking_password_in_clone_logs(self, rw_dir): to_path=rw_dir, ) + def test_unsafe_options(self): + self.assertFalse(Repo.unsafe_options("github.com/deploy/deploy")) + + def test_unsafe_options_ext_url(self): + self.assertTrue(Repo.unsafe_options("ext::ssh")) + + def test_unsafe_options_multi_options_upload_pack(self): + self.assertTrue(Repo.unsafe_options("", ["--upload-pack='touch foo'"])) + + def test_unsafe_options_multi_options_config_user(self): + self.assertFalse(Repo.unsafe_options("", ["--config user"])) + + def test_unsafe_options_multi_options_config_protocol(self): + self.assertTrue(Repo.unsafe_options("", ["--config protocol.foo"])) + + def test_clone_from_forbids_helper_urls_by_default(self): + with self.assertRaises(UnsafeOptionsUsedError): + Repo.clone_from("ext::sh -c touch% /tmp/foo", "tmp") + + @with_rw_repo("HEAD") + def test_clone_from_allow_unsafe(self, repo): + bad_filename = pathlib.Path(f'{tempfile.gettempdir()}/{uuid.uuid4()}') + bad_url = f'ext::sh -c touch% {bad_filename}' + try: + repo.clone_from( + bad_url, 'tmp', + multi_options=["-c protocol.ext.allow=always"], + unsafe_protocols=True + ) + except GitCommandError: + pass + self.assertTrue(bad_filename.is_file()) + bad_filename.unlink() + @with_rw_repo("HEAD") def test_max_chunk_size(self, repo): class TestOutputStream(TestBase):