From 2625ed9fc074091c531c27ffcba7902771130261 Mon Sep 17 00:00:00 2001 From: Steve Kowalik Date: Tue, 20 Dec 2022 17:05:50 +1100 Subject: [PATCH 1/7] Forbid unsafe protocol URLs in Repo.clone{,_from}() Since the URL is passed directly to git clone, and the remote-ext helper will happily execute shell commands, so by default disallow URLs that contain a "::" unless a new unsafe_protocols kwarg is passed. (CVE-2022-24439) Fixes #1515 --- git/exc.py | 4 ++++ git/repo/base.py | 31 ++++++++++++++++++++++++++++++- test/test_repo.py | 36 ++++++++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/git/exc.py b/git/exc.py index 22fcde0d6..b696d792f 100644 --- a/git/exc.py +++ b/git/exc.py @@ -37,6 +37,10 @@ class NoSuchPathError(GitError, OSError): """Thrown if a path could not be access by the system.""" +class UnsafeOptionsUsedError(GitError): + """Thrown if unsafe protocols or options are passed without overridding.""" + + class CommandError(GitError): """Base class for exceptions thrown at every stage of `Popen()` execution. diff --git a/git/repo/base.py b/git/repo/base.py index 49a3d5a16..35ff68b0c 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -21,7 +21,12 @@ ) from git.config import GitConfigParser from git.db import GitCmdObjectDB -from git.exc import InvalidGitRepositoryError, NoSuchPathError, GitCommandError +from git.exc import ( + GitCommandError, + InvalidGitRepositoryError, + NoSuchPathError, + UnsafeOptionsUsedError, +) from git.index import IndexFile from git.objects import Submodule, RootModule, Commit from git.refs import HEAD, Head, Reference, TagReference @@ -128,6 +133,7 @@ class Repo(object): re_envvars = re.compile(r"(\$(\{\s?)?[a-zA-Z_]\w*(\}\s?)?|%\s?[a-zA-Z_]\w*\s?%)") re_author_committer_start = re.compile(r"^(author|committer)") re_tab_full_line = re.compile(r"^\t(.*)$") + re_config_protocol_option = re.compile(r"-[-]?c(|onfig)\s+protocol\.", re.I) # invariants # represents the configuration level of a configuration file @@ -1215,11 +1221,27 @@ def _clone( # END handle remote repo return repo + @classmethod + def unsafe_options( + cls, + url: str, + multi_options: Optional[List[str]] = None, + ) -> bool: + if "ext::" in url: + return True + if multi_options is not None: + if any(["--upload-pack" in m for m in multi_options]): + return True + if any([re.match(cls.re_config_protocol_option, m) for m in multi_options]): + return True + return False + def clone( self, path: PathLike, progress: Optional[Callable] = None, multi_options: Optional[List[str]] = None, + unsafe_protocols: bool = False, **kwargs: Any, ) -> "Repo": """Create a clone from this repository. @@ -1230,12 +1252,15 @@ def clone( option per list item which is passed exactly as specified to clone. For example ['--config core.filemode=false', '--config core.ignorecase', '--recurse-submodule=repo1_path', '--recurse-submodule=repo2_path'] + :param unsafe_protocols: Allow unsafe protocols to be used, like ext :param kwargs: * odbt = ObjectDatabase Type, allowing to determine the object database implementation used by the returned Repo instance * All remaining keyword arguments are given to the git-clone command :return: ``git.Repo`` (the newly cloned repo)""" + if not unsafe_protocols and self.unsafe_options(path, multi_options): + raise UnsafeOptionsUsedError(f"{path} requires unsafe_protocols flag") return self._clone( self.git, self.common_dir, @@ -1254,6 +1279,7 @@ def clone_from( progress: Optional[Callable] = None, env: Optional[Mapping[str, str]] = None, multi_options: Optional[List[str]] = None, + unsafe_protocols: bool = False, **kwargs: Any, ) -> "Repo": """Create a clone from the given URL @@ -1268,11 +1294,14 @@ def clone_from( If you want to unset some variable, consider providing empty string as its value. :param multi_options: See ``clone`` method + :param unsafe_protocols: Allow unsafe protocols to be used, like ext :param kwargs: see the ``clone`` method :return: Repo instance pointing to the cloned directory""" git = cls.GitCommandWrapperType(os.getcwd()) if env is not None: git.update_environment(**env) + if not unsafe_protocols and cls.unsafe_options(url, multi_options): + raise UnsafeOptionsUsedError(f"{url} requires unsafe_protocols flag") return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs) def archive( diff --git a/test/test_repo.py b/test/test_repo.py index 6382db7e4..53cae3cd7 100644 --- a/test/test_repo.py +++ b/test/test_repo.py @@ -13,6 +13,7 @@ import pickle import sys import tempfile +import uuid from unittest import mock, skipIf, SkipTest import pytest @@ -37,6 +38,7 @@ ) from git.exc import ( BadObject, + UnsafeOptionsUsedError, ) from git.repo.fun import touch from test.lib import TestBase, with_rw_repo, fixture @@ -263,6 +265,40 @@ def test_leaking_password_in_clone_logs(self, rw_dir): to_path=rw_dir, ) + def test_unsafe_options(self): + self.assertFalse(Repo.unsafe_options("github.com/deploy/deploy")) + + def test_unsafe_options_ext_url(self): + self.assertTrue(Repo.unsafe_options("ext::ssh")) + + def test_unsafe_options_multi_options_upload_pack(self): + self.assertTrue(Repo.unsafe_options("", ["--upload-pack='touch foo'"])) + + def test_unsafe_options_multi_options_config_user(self): + self.assertFalse(Repo.unsafe_options("", ["--config user"])) + + def test_unsafe_options_multi_options_config_protocol(self): + self.assertTrue(Repo.unsafe_options("", ["--config protocol.foo"])) + + def test_clone_from_forbids_helper_urls_by_default(self): + with self.assertRaises(UnsafeOptionsUsedError): + Repo.clone_from("ext::sh -c touch% /tmp/foo", "tmp") + + @with_rw_repo("HEAD") + def test_clone_from_allow_unsafe(self, repo): + bad_filename = pathlib.Path(f'{tempfile.gettempdir()}/{uuid.uuid4()}') + bad_url = f'ext::sh -c touch% {bad_filename}' + try: + repo.clone_from( + bad_url, 'tmp', + multi_options=["-c protocol.ext.allow=always"], + unsafe_protocols=True + ) + except GitCommandError: + pass + self.assertTrue(bad_filename.is_file()) + bad_filename.unlink() + @with_rw_repo("HEAD") def test_max_chunk_size(self, repo): class TestOutputStream(TestBase): From e6108c7997f5c8f7361b982959518e982b973230 Mon Sep 17 00:00:00 2001 From: Santos Gallegos Date: Fri, 23 Dec 2022 20:19:52 -0500 Subject: [PATCH 2/7] Block unsafe options and protocols by default --- git/cmd.py | 47 +++++++++++++++++++++++++++++- git/exc.py | 8 ++++-- git/remote.py | 73 +++++++++++++++++++++++++++++++++++++++++++---- git/repo/base.py | 63 ++++++++++++++++++++++++---------------- test/test_repo.py | 5 ++-- 5 files changed, 160 insertions(+), 36 deletions(-) diff --git a/git/cmd.py b/git/cmd.py index 3dd5aad33..9d4006b9d 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -4,6 +4,7 @@ # This module is part of GitPython and is released under # the BSD License: http://www.opensource.org/licenses/bsd-license.php from __future__ import annotations +import re from contextlib import contextmanager import io import logging @@ -24,7 +25,7 @@ from git.exc import CommandError from git.util import is_cygwin_git, cygpath, expand_path, remove_password_if_present -from .exc import GitCommandError, GitCommandNotFound +from .exc import GitCommandError, GitCommandNotFound, UnsafeOptionError, UnsafeProtocolError from .util import ( LazyMixin, stream_copy, @@ -262,6 +263,8 @@ class Git(LazyMixin): _excluded_ = ("cat_file_all", "cat_file_header", "_version_info") + re_unsafe_protocol = re.compile("(.+)::.+") + def __getstate__(self) -> Dict[str, Any]: return slots_to_dict(self, exclude=self._excluded_) @@ -454,6 +457,48 @@ def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> PathLike: url = url.replace("\\\\", "\\").replace("\\", "/") return url + @classmethod + def check_unsafe_protocols(cls, url: str) -> None: + """ + Check for unsafe protocols. + + Apart from the usual protocols (http, git, ssh), + Git allows "remote helpers" that have the form `::
`, + one of these helpers (`ext::`) can be used to invoke any arbitrary command. + + See: + + - https://git-scm.com/docs/gitremote-helpers + - https://git-scm.com/docs/git-remote-ext + """ + match = cls.re_unsafe_protocol.match(url) + if match: + protocol = match.group(1) + raise UnsafeProtocolError( + f"The `{protocol}::` protocol looks suspicious, use `allow_unsafe_protocols=True` to allow it." + ) + + @classmethod + def check_unsafe_options(cls, options: List[str], unsafe_options: List[str]) -> None: + """ + Check for unsafe options. + + Some options that are passed to `git ` can be used to execute + arbitrary commands, this are blocked by default. + """ + # Options can be of the form `foo` or `--foo bar` `--foo=bar`, + # so we need to check if they start with "--foo" or if they are equal to "foo". + bare_options = [ + option.lstrip("-") + for option in unsafe_options + ] + for option in options: + for unsafe_option, bare_option in zip(unsafe_options, bare_options): + if option.startswith(unsafe_option) or option == bare_option: + raise UnsafeOptionError( + f"{unsafe_option} is not allowed, use `allow_unsafe_options=True` to allow it." + ) + class AutoInterrupt(object): """Kill/Interrupt the stored process instance once this instance goes out of scope. It is used to prevent processes piling up in case iterators stop reading. diff --git a/git/exc.py b/git/exc.py index b696d792f..9b69a5889 100644 --- a/git/exc.py +++ b/git/exc.py @@ -37,8 +37,12 @@ class NoSuchPathError(GitError, OSError): """Thrown if a path could not be access by the system.""" -class UnsafeOptionsUsedError(GitError): - """Thrown if unsafe protocols or options are passed without overridding.""" +class UnsafeProtocolError(GitError): + """Thrown if unsafe protocols are passed without being explicitly allowed.""" + + +class UnsafeOptionError(GitError): + """Thrown if unsafe options are passed without being explicitly allowed.""" class CommandError(GitError): diff --git a/git/remote.py b/git/remote.py index 483d536ae..1eff00b96 100644 --- a/git/remote.py +++ b/git/remote.py @@ -535,6 +535,23 @@ class Remote(LazyMixin, IterableObj): __slots__ = ("repo", "name", "_config_reader") _id_attribute_ = "name" + unsafe_git_fetch_options = [ + # This option allows users to execute arbitrary commands. + # https://git-scm.com/docs/git-fetch#Documentation/git-fetch.txt---upload-packltupload-packgt + "--upload-pack", + ] + unsafe_git_pull_options = [ + # This option allows users to execute arbitrary commands. + # https://git-scm.com/docs/git-pull#Documentation/git-pull.txt---upload-packltupload-packgt + "--upload-pack" + ] + unsafe_git_push_options = [ + # This option allows users to execute arbitrary commands. + # https://git-scm.com/docs/git-push#Documentation/git-push.txt---execltgit-receive-packgt + "--receive-pack", + "--exec", + ] + def __init__(self, repo: "Repo", name: str) -> None: """Initialize a remote instance @@ -611,7 +628,9 @@ def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator["Remote yield Remote(repo, section[lbound + 1 : rbound]) # END for each configuration section - def set_url(self, new_url: str, old_url: Optional[str] = None, **kwargs: Any) -> "Remote": + def set_url( + self, new_url: str, old_url: Optional[str] = None, allow_unsafe_protocols: bool = False, **kwargs: Any + ) -> "Remote": """Configure URLs on current remote (cf command git remote set_url) This command manages URLs on the remote. @@ -620,15 +639,17 @@ def set_url(self, new_url: str, old_url: Optional[str] = None, **kwargs: Any) -> :param old_url: when set, replaces this URL with new_url for the remote :return: self """ + if not allow_unsafe_protocols: + Git.check_unsafe_protocols(new_url) scmd = "set-url" kwargs["insert_kwargs_after"] = scmd if old_url: - self.repo.git.remote(scmd, self.name, new_url, old_url, **kwargs) + self.repo.git.remote(scmd, "--", self.name, new_url, old_url, **kwargs) else: - self.repo.git.remote(scmd, self.name, new_url, **kwargs) + self.repo.git.remote(scmd, "--", self.name, new_url, **kwargs) return self - def add_url(self, url: str, **kwargs: Any) -> "Remote": + def add_url(self, url: str, allow_unsafe_protocols: bool = False, **kwargs: Any) -> "Remote": """Adds a new url on current remote (special case of git remote set_url) This command adds new URLs to a given remote, making it possible to have @@ -637,6 +658,8 @@ def add_url(self, url: str, **kwargs: Any) -> "Remote": :param url: string being the URL to add as an extra remote URL :return: self """ + if not allow_unsafe_protocols: + Git.check_unsafe_protocols(url) return self.set_url(url, add=True) def delete_url(self, url: str, **kwargs: Any) -> "Remote": @@ -729,7 +752,7 @@ def stale_refs(self) -> IterableList[Reference]: return out_refs @classmethod - def create(cls, repo: "Repo", name: str, url: str, **kwargs: Any) -> "Remote": + def create(cls, repo: "Repo", name: str, url: str, allow_unsafe_protocols: bool = False, **kwargs: Any) -> "Remote": """Create a new remote to the given repository :param repo: Repository instance that is to receive the new remote :param name: Desired name of the remote @@ -739,7 +762,10 @@ def create(cls, repo: "Repo", name: str, url: str, **kwargs: Any) -> "Remote": :raise GitCommandError: in case an origin with that name already exists""" scmd = "add" kwargs["insert_kwargs_after"] = scmd - repo.git.remote(scmd, name, Git.polish_url(url), **kwargs) + url = Git.polish_url(url) + if not allow_unsafe_protocols: + Git.check_unsafe_protocols(url) + repo.git.remote(scmd, "--", name, url, **kwargs) return cls(repo, name) # add is an alias @@ -921,6 +947,8 @@ def fetch( progress: Union[RemoteProgress, None, "UpdateProgress"] = None, verbose: bool = True, kill_after_timeout: Union[None, float] = None, + allow_unsafe_protocols: bool = False, + allow_unsafe_options: bool = False, **kwargs: Any, ) -> IterableList[FetchInfo]: """Fetch the latest changes for this remote @@ -963,6 +991,14 @@ def fetch( else: args = [refspec] + if not allow_unsafe_protocols: + for ref in args: + if ref: + Git.check_unsafe_protocols(ref) + + if not allow_unsafe_options: + Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_fetch_options) + proc = self.repo.git.fetch( "--", self, *args, as_process=True, with_stdout=False, universal_newlines=True, v=verbose, **kwargs ) @@ -976,6 +1012,8 @@ def pull( refspec: Union[str, List[str], None] = None, progress: Union[RemoteProgress, "UpdateProgress", None] = None, kill_after_timeout: Union[None, float] = None, + allow_unsafe_protocols: bool = False, + allow_unsafe_options: bool = False, **kwargs: Any, ) -> IterableList[FetchInfo]: """Pull changes from the given branch, being the same as a fetch followed @@ -990,6 +1028,16 @@ def pull( # No argument refspec, then ensure the repo's config has a fetch refspec. self._assert_refspec() kwargs = add_progress(kwargs, self.repo.git, progress) + + if not allow_unsafe_protocols and refspec: + if isinstance(refspec, str): + Git.check_unsafe_protocols(refspec) + else: + for ref in refspec: + Git.check_unsafe_protocols(ref) + if not allow_unsafe_options: + Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_pull_options) + proc = self.repo.git.pull( "--", self, refspec, with_stdout=False, as_process=True, universal_newlines=True, v=True, **kwargs ) @@ -1003,6 +1051,8 @@ def push( refspec: Union[str, List[str], None] = None, progress: Union[RemoteProgress, "UpdateProgress", Callable[..., RemoteProgress], None] = None, kill_after_timeout: Union[None, float] = None, + allow_unsafe_protocols: bool = False, + allow_unsafe_options: bool = False, **kwargs: Any, ) -> IterableList[PushInfo]: """Push changes from source branch in refspec to target branch in refspec. @@ -1033,6 +1083,17 @@ def push( If the operation fails completely, the length of the returned IterableList will be 0.""" kwargs = add_progress(kwargs, self.repo.git, progress) + + if not allow_unsafe_protocols and refspec: + if isinstance(refspec, str): + Git.check_unsafe_protocols(refspec) + else: + for ref in refspec: + Git.check_unsafe_protocols(ref) + + if not allow_unsafe_options: + Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_push_options) + proc = self.repo.git.push( "--", self, diff --git a/git/repo/base.py b/git/repo/base.py index 35ff68b0c..7473c52ed 100644 --- a/git/repo/base.py +++ b/git/repo/base.py @@ -25,7 +25,6 @@ GitCommandError, InvalidGitRepositoryError, NoSuchPathError, - UnsafeOptionsUsedError, ) from git.index import IndexFile from git.objects import Submodule, RootModule, Commit @@ -133,7 +132,18 @@ class Repo(object): re_envvars = re.compile(r"(\$(\{\s?)?[a-zA-Z_]\w*(\}\s?)?|%\s?[a-zA-Z_]\w*\s?%)") re_author_committer_start = re.compile(r"^(author|committer)") re_tab_full_line = re.compile(r"^\t(.*)$") - re_config_protocol_option = re.compile(r"-[-]?c(|onfig)\s+protocol\.", re.I) + + unsafe_git_clone_options = [ + # This option allows users to execute arbitrary commands. + # https://git-scm.com/docs/git-clone#Documentation/git-clone.txt---upload-packltupload-packgt + "--upload-pack", + "-u", + # Users can override configuration variables + # like `protocol.allow` or `core.gitProxy` to execute arbitrary commands. + # https://git-scm.com/docs/git-clone#Documentation/git-clone.txt---configltkeygtltvaluegt + "--config", + "-c", + ] # invariants # represents the configuration level of a configuration file @@ -961,7 +971,7 @@ def blame( file: str, incremental: bool = False, rev_opts: Optional[List[str]] = None, - **kwargs: Any + **kwargs: Any, ) -> List[List[Commit | List[str | bytes] | None]] | Iterator[BlameEntry] | None: """The blame information for the given file at the given revision. @@ -1152,6 +1162,8 @@ def _clone( odb_default_type: Type[GitCmdObjectDB], progress: Union["RemoteProgress", "UpdateProgress", Callable[..., "RemoteProgress"], None] = None, multi_options: Optional[List[str]] = None, + allow_unsafe_protocols: bool = False, + allow_unsafe_options: bool = False, **kwargs: Any, ) -> "Repo": odbt = kwargs.pop("odbt", odb_default_type) @@ -1173,6 +1185,12 @@ def _clone( multi = None if multi_options: multi = shlex.split(" ".join(multi_options)) + + if not allow_unsafe_protocols: + Git.check_unsafe_protocols(str(url)) + if not allow_unsafe_options and multi_options: + Git.check_unsafe_options(options=multi_options, unsafe_options=cls.unsafe_git_clone_options) + proc = git.clone( multi, "--", @@ -1221,27 +1239,13 @@ def _clone( # END handle remote repo return repo - @classmethod - def unsafe_options( - cls, - url: str, - multi_options: Optional[List[str]] = None, - ) -> bool: - if "ext::" in url: - return True - if multi_options is not None: - if any(["--upload-pack" in m for m in multi_options]): - return True - if any([re.match(cls.re_config_protocol_option, m) for m in multi_options]): - return True - return False - def clone( self, path: PathLike, progress: Optional[Callable] = None, multi_options: Optional[List[str]] = None, - unsafe_protocols: bool = False, + allow_unsafe_protocols: bool = False, + allow_unsafe_options: bool = False, **kwargs: Any, ) -> "Repo": """Create a clone from this repository. @@ -1259,8 +1263,6 @@ def clone( * All remaining keyword arguments are given to the git-clone command :return: ``git.Repo`` (the newly cloned repo)""" - if not unsafe_protocols and self.unsafe_options(path, multi_options): - raise UnsafeOptionsUsedError(f"{path} requires unsafe_protocols flag") return self._clone( self.git, self.common_dir, @@ -1268,6 +1270,8 @@ def clone( type(self.odb), progress, multi_options, + allow_unsafe_protocols=allow_unsafe_protocols, + allow_unsafe_options=allow_unsafe_options, **kwargs, ) @@ -1279,7 +1283,8 @@ def clone_from( progress: Optional[Callable] = None, env: Optional[Mapping[str, str]] = None, multi_options: Optional[List[str]] = None, - unsafe_protocols: bool = False, + allow_unsafe_protocols: bool = False, + allow_unsafe_options: bool = False, **kwargs: Any, ) -> "Repo": """Create a clone from the given URL @@ -1300,9 +1305,17 @@ def clone_from( git = cls.GitCommandWrapperType(os.getcwd()) if env is not None: git.update_environment(**env) - if not unsafe_protocols and cls.unsafe_options(url, multi_options): - raise UnsafeOptionsUsedError(f"{url} requires unsafe_protocols flag") - return cls._clone(git, url, to_path, GitCmdObjectDB, progress, multi_options, **kwargs) + return cls._clone( + git, + url, + to_path, + GitCmdObjectDB, + progress, + multi_options, + allow_unsafe_protocols=allow_unsafe_protocols, + allow_unsafe_options=allow_unsafe_options, + **kwargs, + ) def archive( self, diff --git a/test/test_repo.py b/test/test_repo.py index 53cae3cd7..a937836f9 100644 --- a/test/test_repo.py +++ b/test/test_repo.py @@ -38,7 +38,8 @@ ) from git.exc import ( BadObject, - UnsafeOptionsUsedError, + UnsafeOptionError, + UnsafeProtocolError, ) from git.repo.fun import touch from test.lib import TestBase, with_rw_repo, fixture @@ -281,7 +282,7 @@ def test_unsafe_options_multi_options_config_protocol(self): self.assertTrue(Repo.unsafe_options("", ["--config protocol.foo"])) def test_clone_from_forbids_helper_urls_by_default(self): - with self.assertRaises(UnsafeOptionsUsedError): + with self.assertRaises(UnsafeOptionError): Repo.clone_from("ext::sh -c touch% /tmp/foo", "tmp") @with_rw_repo("HEAD") From fd2c6da5f82009398d241dc07603fbcd490ced29 Mon Sep 17 00:00:00 2001 From: Santos Gallegos Date: Tue, 27 Dec 2022 16:56:43 -0500 Subject: [PATCH 3/7] Updates from review --- git/cmd.py | 10 +++++----- git/remote.py | 21 +++++++++------------ test/test_git.py | 4 ++-- 3 files changed, 16 insertions(+), 19 deletions(-) diff --git a/git/cmd.py b/git/cmd.py index 9d4006b9d..9ef1e3a65 100644 --- a/git/cmd.py +++ b/git/cmd.py @@ -488,12 +488,12 @@ def check_unsafe_options(cls, options: List[str], unsafe_options: List[str]) -> """ # Options can be of the form `foo` or `--foo bar` `--foo=bar`, # so we need to check if they start with "--foo" or if they are equal to "foo". - bare_options = [ + bare_unsafe_options = [ option.lstrip("-") for option in unsafe_options ] for option in options: - for unsafe_option, bare_option in zip(unsafe_options, bare_options): + for unsafe_option, bare_option in zip(unsafe_options, bare_unsafe_options): if option.startswith(unsafe_option) or option == bare_option: raise UnsafeOptionError( f"{unsafe_option} is not allowed, use `allow_unsafe_options=True` to allow it." @@ -1193,12 +1193,12 @@ def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any return args @classmethod - def __unpack_args(cls, arg_list: Sequence[str]) -> List[str]: + def _unpack_args(cls, arg_list: Sequence[str]) -> List[str]: outlist = [] if isinstance(arg_list, (list, tuple)): for arg in arg_list: - outlist.extend(cls.__unpack_args(arg)) + outlist.extend(cls._unpack_args(arg)) else: outlist.append(str(arg_list)) @@ -1283,7 +1283,7 @@ def _call_process( # Prepare the argument list opt_args = self.transform_kwargs(**opts_kwargs) - ext_args = self.__unpack_args([a for a in args if a is not None]) + ext_args = self._unpack_args([a for a in args if a is not None]) if insert_after_this_arg is None: args_list = opt_args + ext_args diff --git a/git/remote.py b/git/remote.py index 1eff00b96..520544b66 100644 --- a/git/remote.py +++ b/git/remote.py @@ -1029,12 +1029,11 @@ def pull( self._assert_refspec() kwargs = add_progress(kwargs, self.repo.git, progress) - if not allow_unsafe_protocols and refspec: - if isinstance(refspec, str): - Git.check_unsafe_protocols(refspec) - else: - for ref in refspec: - Git.check_unsafe_protocols(ref) + refspec = Git._unpack_args(refspec or []) + if not allow_unsafe_protocols: + for ref in refspec: + Git.check_unsafe_protocols(ref) + if not allow_unsafe_options: Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_pull_options) @@ -1084,12 +1083,10 @@ def push( be 0.""" kwargs = add_progress(kwargs, self.repo.git, progress) - if not allow_unsafe_protocols and refspec: - if isinstance(refspec, str): - Git.check_unsafe_protocols(refspec) - else: - for ref in refspec: - Git.check_unsafe_protocols(ref) + refspec = Git._unpack_args(refspec or []) + if not allow_unsafe_protocols: + for ref in refspec: + Git.check_unsafe_protocols(ref) if not allow_unsafe_options: Git.check_unsafe_options(options=list(kwargs.keys()), unsafe_options=self.unsafe_git_push_options) diff --git a/test/test_git.py b/test/test_git.py index 6ba833b4a..e7d236deb 100644 --- a/test/test_git.py +++ b/test/test_git.py @@ -39,12 +39,12 @@ def test_call_process_calls_execute(self, git): self.assertEqual(git.call_args, ((["git", "version"],), {})) def test_call_unpack_args_unicode(self): - args = Git._Git__unpack_args("Unicode€™") + args = Git._unpack_args("Unicode€™") mangled_value = "Unicode\u20ac\u2122" self.assertEqual(args, [mangled_value]) def test_call_unpack_args(self): - args = Git._Git__unpack_args(["git", "log", "--", "Unicode€™"]) + args = Git._unpack_args(["git", "log", "--", "Unicode€™"]) mangled_value = "Unicode\u20ac\u2122" self.assertEqual(args, ["git", "log", "--", mangled_value]) From b92f01a3a38fc8e171d08575c69de9733811faa6 Mon Sep 17 00:00:00 2001 From: Santos Gallegos Date: Tue, 27 Dec 2022 18:22:58 -0500 Subject: [PATCH 4/7] Update/add tests for Repo.clone* --- test/test_repo.py | 148 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 121 insertions(+), 27 deletions(-) diff --git a/test/test_repo.py b/test/test_repo.py index a937836f9..72320184f 100644 --- a/test/test_repo.py +++ b/test/test_repo.py @@ -13,7 +13,6 @@ import pickle import sys import tempfile -import uuid from unittest import mock, skipIf, SkipTest import pytest @@ -226,6 +225,7 @@ def test_clone_from_pathlib_withConfig(self, rw_dir): "--config submodule.repo.update=checkout", "--config filter.lfs.clean='git-lfs clean -- %f'", ], + allow_unsafe_options=True, ) self.assertEqual(cloned.config_reader().get_value("submodule", "active"), "repo") @@ -266,39 +266,133 @@ def test_leaking_password_in_clone_logs(self, rw_dir): to_path=rw_dir, ) - def test_unsafe_options(self): - self.assertFalse(Repo.unsafe_options("github.com/deploy/deploy")) + @with_rw_repo("HEAD") + def test_clone_unsafe_options(self, rw_repo): + tmp_dir = pathlib.Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + rw_repo.clone(tmp_dir, multi_options=[unsafe_option]) - def test_unsafe_options_ext_url(self): - self.assertTrue(Repo.unsafe_options("ext::ssh")) + @with_rw_repo("HEAD") + def test_clone_unsafe_options_allowed(self, rw_repo): + tmp_dir = pathlib.Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) + + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not destination.exists() + rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) + assert destination.exists() - def test_unsafe_options_multi_options_upload_pack(self): - self.assertTrue(Repo.unsafe_options("", ["--upload-pack='touch foo'"])) + @with_rw_repo("HEAD") + def test_clone_safe_options(self, rw_repo): + tmp_dir = pathlib.Path(tempfile.mkdtemp()) + options = [ + "--depth=1", + "--single-branch", + "-q", + ] + for option in options: + destination = tmp_dir / option + assert not destination.exists() + rw_repo.clone(destination, multi_options=[option]) + assert destination.exists() - def test_unsafe_options_multi_options_config_user(self): - self.assertFalse(Repo.unsafe_options("", ["--config user"])) + @with_rw_repo("HEAD") + def test_clone_from_unsafe_options(self, rw_repo): + tmp_dir = pathlib.Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + Repo.clone_from(rw_repo.working_dir, tmp_dir, multi_options=[unsafe_option]) - def test_unsafe_options_multi_options_config_protocol(self): - self.assertTrue(Repo.unsafe_options("", ["--config protocol.foo"])) + @with_rw_repo("HEAD") + def test_clone_from_unsafe_options_allowed(self, rw_repo): + tmp_dir = pathlib.Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + Repo.clone_from( + rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True + ) - def test_clone_from_forbids_helper_urls_by_default(self): - with self.assertRaises(UnsafeOptionError): - Repo.clone_from("ext::sh -c touch% /tmp/foo", "tmp") + unsafe_options = [ + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for i, unsafe_option in enumerate(unsafe_options): + destination = tmp_dir / str(i) + assert not destination.exists() + Repo.clone_from(rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True) + assert destination.exists() @with_rw_repo("HEAD") - def test_clone_from_allow_unsafe(self, repo): - bad_filename = pathlib.Path(f'{tempfile.gettempdir()}/{uuid.uuid4()}') - bad_url = f'ext::sh -c touch% {bad_filename}' - try: - repo.clone_from( - bad_url, 'tmp', - multi_options=["-c protocol.ext.allow=always"], - unsafe_protocols=True - ) - except GitCommandError: - pass - self.assertTrue(bad_filename.is_file()) - bad_filename.unlink() + def test_clone_from_safe_options(self, rw_repo): + tmp_dir = pathlib.Path(tempfile.mkdtemp()) + options = [ + "--depth=1", + "--single-branch", + "-q", + ] + for option in options: + destination = tmp_dir / option + assert not destination.exists() + Repo.clone_from(rw_repo.common_dir, destination, multi_options=[option]) + assert destination.exists() + + def test_clone_from_unsafe_procol(self): + tmp_dir = pathlib.Path(tempfile.mkdtemp()) + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Repo.clone_from(url, tmp_dir) + + def test_clone_from_unsafe_procol_allowed(self): + tmp_dir = pathlib.Path(tempfile.mkdtemp()) + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + Repo.clone_from(url, tmp_dir, allow_unsafe_protocols=True) @with_rw_repo("HEAD") def test_max_chunk_size(self, repo): From c8ae33b9314a7d3716827b5cb705a3cd0a2e4a46 Mon Sep 17 00:00:00 2001 From: Santos Gallegos Date: Tue, 27 Dec 2022 19:15:40 -0500 Subject: [PATCH 5/7] More tests --- git/objects/submodule/base.py | 36 +++++- git/remote.py | 4 +- test/test_remote.py | 211 ++++++++++++++++++++++++++++++++++ test/test_submodule.py | 3 +- 4 files changed, 247 insertions(+), 7 deletions(-) diff --git a/git/objects/submodule/base.py b/git/objects/submodule/base.py index bdcdf1ec5..9aa9deb27 100644 --- a/git/objects/submodule/base.py +++ b/git/objects/submodule/base.py @@ -272,7 +272,16 @@ def _module_abspath(cls, parent_repo: "Repo", path: PathLike, name: str) -> Path # end @classmethod - def _clone_repo(cls, repo: "Repo", url: str, path: PathLike, name: str, **kwargs: Any) -> "Repo": + def _clone_repo( + cls, + repo: "Repo", + url: str, + path: PathLike, + name: str, + allow_unsafe_options: bool = False, + allow_unsafe_protocols: bool = False, + **kwargs: Any, + ) -> "Repo": """:return: Repo instance of newly cloned repository :param repo: our parent repository :param url: url to clone from @@ -289,7 +298,13 @@ def _clone_repo(cls, repo: "Repo", url: str, path: PathLike, name: str, **kwargs module_checkout_path = osp.join(str(repo.working_tree_dir), path) # end - clone = git.Repo.clone_from(url, module_checkout_path, **kwargs) + clone = git.Repo.clone_from( + url, + module_checkout_path, + allow_unsafe_options=allow_unsafe_options, + allow_unsafe_protocols=allow_unsafe_protocols, + **kwargs, + ) if cls._need_gitfile_submodules(repo.git): cls._write_git_file_and_module_config(module_checkout_path, module_abspath) # end @@ -359,6 +374,8 @@ def add( depth: Union[int, None] = None, env: Union[Mapping[str, str], None] = None, clone_multi_options: Union[Sequence[TBD], None] = None, + allow_unsafe_options: bool = False, + allow_unsafe_protocols: bool = False, ) -> "Submodule": """Add a new submodule to the given repository. This will alter the index as well as the .gitmodules file, but will not create a new commit. @@ -475,7 +492,16 @@ def add( kwargs["multi_options"] = clone_multi_options # _clone_repo(cls, repo, url, path, name, **kwargs): - mrepo = cls._clone_repo(repo, url, path, name, env=env, **kwargs) + mrepo = cls._clone_repo( + repo, + url, + path, + name, + env=env, + allow_unsafe_options=allow_unsafe_options, + allow_unsafe_protocols=allow_unsafe_protocols, + **kwargs, + ) # END verify url ## See #525 for ensuring git urls in config-files valid under Windows. @@ -520,6 +546,8 @@ def update( keep_going: bool = False, env: Union[Mapping[str, str], None] = None, clone_multi_options: Union[Sequence[TBD], None] = None, + allow_unsafe_options: bool = False, + allow_unsafe_protocols: bool = False, ) -> "Submodule": """Update the repository of this submodule to point to the checkout we point at with the binsha of this instance. @@ -643,6 +671,8 @@ def update( n=True, env=env, multi_options=clone_multi_options, + allow_unsafe_options=allow_unsafe_options, + allow_unsafe_protocols=allow_unsafe_protocols, ) # END handle dry-run progress.update( diff --git a/git/remote.py b/git/remote.py index 520544b66..47a0115b0 100644 --- a/git/remote.py +++ b/git/remote.py @@ -658,9 +658,7 @@ def add_url(self, url: str, allow_unsafe_protocols: bool = False, **kwargs: Any) :param url: string being the URL to add as an extra remote URL :return: self """ - if not allow_unsafe_protocols: - Git.check_unsafe_protocols(url) - return self.set_url(url, add=True) + return self.set_url(url, add=True, allow_unsafe_protocols=allow_unsafe_protocols) def delete_url(self, url: str, **kwargs: Any) -> "Remote": """Deletes a new url on current remote (special case of git remote set_url) diff --git a/test/test_remote.py b/test/test_remote.py index 7df64c206..9583724fe 100644 --- a/test/test_remote.py +++ b/test/test_remote.py @@ -23,6 +23,8 @@ GitCommandError, ) from git.cmd import Git +from pathlib import Path +from git.exc import UnsafeOptionError, UnsafeProtocolError from test.lib import ( TestBase, with_rw_repo, @@ -690,6 +692,215 @@ def test_push_error(self, repo): with self.assertRaisesRegex(GitCommandError, "src refspec __BAD_REF__ does not match any"): rem.push("__BAD_REF__") + @with_rw_repo("HEAD") + def test_set_unsafe_url(self, rw_repo): + remote = rw_repo.remote("origin") + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.set_url(url) + + @with_rw_repo("HEAD") + def test_set_unsafe_url_allowed(self, rw_repo): + remote = rw_repo.remote("origin") + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::17/foo", + ] + for url in urls: + remote.set_url(url, allow_unsafe_protocols=True) + assert list(remote.urls)[-1] == url + + @with_rw_repo("HEAD") + def test_add_unsafe_url(self, rw_repo): + remote = rw_repo.remote("origin") + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.add_url(url) + + @with_rw_repo("HEAD") + def test_add_unsafe_url_allowed(self, rw_repo): + remote = rw_repo.remote("origin") + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::17/foo", + ] + for url in urls: + remote.add_url(url, allow_unsafe_protocols=True) + assert list(remote.urls)[-1] == url + + @with_rw_repo("HEAD") + def test_create_remote_unsafe_url(self, rw_repo): + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Remote.create(rw_repo, "origin", url) + + @with_rw_repo("HEAD") + def test_create_remote_unsafe_url_allowed(self, rw_repo): + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::17/foo", + ] + for i, url in enumerate(urls): + remote = Remote.create(rw_repo, f"origin{i}", url, allow_unsafe_protocols=True) + assert remote.url == url + + @with_rw_repo("HEAD") + def test_fetch_unsafe_url(self, rw_repo): + remote = rw_repo.remote("origin") + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.fetch(url) + + @with_rw_repo("HEAD") + def test_fetch_unsafe_url_allowed(self, rw_repo): + remote = rw_repo.remote("origin") + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.fetch(url, allow_unsafe_protocols=True) + + @with_rw_repo("HEAD") + def test_fetch_unsafe_options(self, rw_repo): + remote = rw_repo.remote("origin") + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + remote.fetch(**unsafe_option) + + @with_rw_repo("HEAD") + def test_fetch_unsafe_options_allowed(self, rw_repo): + remote = rw_repo.remote("origin") + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + remote.fetch(**unsafe_option, allow_unsafe_options=True) + + @with_rw_repo("HEAD") + def test_pull_unsafe_url(self, rw_repo): + remote = rw_repo.remote("origin") + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.pull(url) + + @with_rw_repo("HEAD") + def test_pull_unsafe_url_allowed(self, rw_repo): + remote = rw_repo.remote("origin") + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.pull(url, allow_unsafe_protocols=True) + + @with_rw_repo("HEAD") + def test_pull_unsafe_options(self, rw_repo): + remote = rw_repo.remote("origin") + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + remote.pull(**unsafe_option) + + @with_rw_repo("HEAD") + def test_pull_unsafe_options_allowed(self, rw_repo): + remote = rw_repo.remote("origin") + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + remote.pull(**unsafe_option, allow_unsafe_options=True) + + @with_rw_repo("HEAD") + def test_push_unsafe_url(self, rw_repo): + remote = rw_repo.remote("origin") + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::17/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + remote.push(url) + + @with_rw_repo("HEAD") + def test_push_unsafe_url_allowed(self, rw_repo): + remote = rw_repo.remote("origin") + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::17/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + remote.push(url, allow_unsafe_protocols=True) + + @with_rw_repo("HEAD") + def test_push_unsafe_options(self, rw_repo): + remote = rw_repo.remote("origin") + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + { + "receive-pack": f"touch {tmp_file}", + "exec": f"touch {tmp_file}", + } + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + remote.push(**unsafe_option) + + @with_rw_repo("HEAD") + def test_push_unsafe_options_allowed(self, rw_repo): + remote = rw_repo.remote("origin") + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + { + "receive-pack": f"touch {tmp_file}", + "exec": f"touch {tmp_file}", + } + ] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + remote.push(**unsafe_option, allow_unsafe_options=True) + class TestTimeouts(TestBase): @with_rw_repo("HEAD", bare=False) diff --git a/test/test_submodule.py b/test/test_submodule.py index fef6bda3a..3ac29b9aa 100644 --- a/test/test_submodule.py +++ b/test/test_submodule.py @@ -1026,7 +1026,7 @@ def test_update_clone_multi_options_argument(self, rwdir): ) # Act - sm.update(init=True, clone_multi_options=["--config core.eol=true"]) + sm.update(init=True, clone_multi_options=["--config core.eol=true"], allow_unsafe_options=True) # Assert sm_config = GitConfigParser(file_or_files=osp.join(parent.git_dir, "modules", sm_name, "config")) @@ -1070,6 +1070,7 @@ def test_add_clone_multi_options_argument(self, rwdir): sm_name, url=self._small_repo_url(), clone_multi_options=["--config core.eol=true"], + allow_unsafe_options=True, ) # Assert From 9dc43926207b2205d77511c6ffd40944199f0c2d Mon Sep 17 00:00:00 2001 From: Santos Gallegos Date: Tue, 27 Dec 2022 20:07:18 -0500 Subject: [PATCH 6/7] Submodule tests --- test/test_submodule.py | 116 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 1 deletion(-) diff --git a/test/test_submodule.py b/test/test_submodule.py index 3ac29b9aa..5b1622178 100644 --- a/test/test_submodule.py +++ b/test/test_submodule.py @@ -3,6 +3,8 @@ # the BSD License: http://www.opensource.org/licenses/bsd-license.php import os import shutil +import tempfile +from pathlib import Path import sys from unittest import skipIf @@ -12,7 +14,13 @@ from git.cmd import Git from git.compat import is_win from git.config import GitConfigParser, cp -from git.exc import InvalidGitRepositoryError, RepositoryDirtyError +from git.exc import ( + GitCommandError, + InvalidGitRepositoryError, + RepositoryDirtyError, + UnsafeOptionError, + UnsafeProtocolError, +) from git.objects.submodule.base import Submodule from git.objects.submodule.root import RootModule, RootUpdateProgress from git.repo.fun import find_submodule_git_dir, touch @@ -1090,3 +1098,109 @@ def test_add_no_clone_multi_options_argument(self, rwdir): sm_config = GitConfigParser(file_or_files=osp.join(parent.git_dir, "modules", sm_name, "config")) with self.assertRaises(cp.NoOptionError): sm_config.get_value("core", "eol") + + @with_rw_repo("HEAD") + def test_submodule_add_unsafe_url(self, rw_repo): + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::/foo", + ] + for url in urls: + with self.assertRaises(UnsafeProtocolError): + Submodule.add(rw_repo, "new", "new", url) + + @with_rw_repo("HEAD") + def test_submodule_add_unsafe_url_allowed(self, rw_repo): + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::/foo", + ] + for url in urls: + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + Submodule.add(rw_repo, "new", "new", url, allow_unsafe_protocols=True) + + @with_rw_repo("HEAD") + def test_submodule_add_unsafe_options(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + Submodule.add(rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option]) + + @with_rw_repo("HEAD") + def test_submodule_add_unsafe_options_allowed(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + for unsafe_option in unsafe_options: + with self.assertRaises(GitCommandError): + Submodule.add( + rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True + ) + + @with_rw_repo("HEAD") + def test_submodule_update_unsafe_url(self, rw_repo): + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::/foo", + ] + for url in urls: + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) + with self.assertRaises(UnsafeProtocolError): + submodule.update() + + @with_rw_repo("HEAD") + def test_submodule_update_unsafe_url_allowed(self, rw_repo): + urls = [ + "ext::sh -c touch% /tmp/pwn", + "fd::/foo", + ] + for url in urls: + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) + # The URL will be allowed into the command, but the command will + # fail since we don't have that protocol enabled in the Git config file. + with self.assertRaises(GitCommandError): + submodule.update(allow_unsafe_protocols=True) + + @with_rw_repo("HEAD") + def test_submodule_update_unsafe_options(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + with self.assertRaises(UnsafeOptionError): + submodule.update(clone_multi_options=[unsafe_option]) + + @with_rw_repo("HEAD") + def test_submodule_update_unsafe_options_allowed(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" + unsafe_options = [ + f"--upload-pack='touch {tmp_file}'", + f"-u 'touch {tmp_file}'", + "--config=protocol.ext.allow=always", + "-c protocol.ext.allow=always", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + with self.assertRaises(GitCommandError): + submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) From f4f2658d5d308b3fb9162e50cd4c7b346e7a0a47 Mon Sep 17 00:00:00 2001 From: Santos Gallegos Date: Wed, 28 Dec 2022 21:48:29 -0500 Subject: [PATCH 7/7] Updates from review --- AUTHORS | 1 + test/test_remote.py | 71 +++++++++++++++++++++++++++++++++++------- test/test_repo.py | 14 ++++++++- test/test_submodule.py | 41 +++++++++++++++++++++--- 4 files changed, 110 insertions(+), 17 deletions(-) diff --git a/AUTHORS b/AUTHORS index 8f3f2ccfe..8ccc09fc0 100644 --- a/AUTHORS +++ b/AUTHORS @@ -50,4 +50,5 @@ Contributors are: -Patrick Gerard -Luke Twist -Joseph Hale +-Santos Gallegos Portions derived from other open source works and are clearly marked. diff --git a/test/test_remote.py b/test/test_remote.py index 9583724fe..3a47afab5 100644 --- a/test/test_remote.py +++ b/test/test_remote.py @@ -694,84 +694,107 @@ def test_push_error(self, repo): @with_rw_repo("HEAD") def test_set_unsafe_url(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" remote = rw_repo.remote("origin") urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::17/foo", ] for url in urls: with self.assertRaises(UnsafeProtocolError): remote.set_url(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_set_unsafe_url_allowed(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" remote = rw_repo.remote("origin") urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::17/foo", ] for url in urls: remote.set_url(url, allow_unsafe_protocols=True) assert list(remote.urls)[-1] == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_add_unsafe_url(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" remote = rw_repo.remote("origin") urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::17/foo", ] for url in urls: with self.assertRaises(UnsafeProtocolError): remote.add_url(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_add_unsafe_url_allowed(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" remote = rw_repo.remote("origin") urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::17/foo", ] for url in urls: remote.add_url(url, allow_unsafe_protocols=True) assert list(remote.urls)[-1] == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_create_remote_unsafe_url(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::17/foo", ] for url in urls: with self.assertRaises(UnsafeProtocolError): Remote.create(rw_repo, "origin", url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_create_remote_unsafe_url_allowed(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::17/foo", ] for i, url in enumerate(urls): remote = Remote.create(rw_repo, f"origin{i}", url, allow_unsafe_protocols=True) assert remote.url == url + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_url(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" remote = rw_repo.remote("origin") urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::17/foo", ] for url in urls: with self.assertRaises(UnsafeProtocolError): remote.fetch(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_url_allowed(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" remote = rw_repo.remote("origin") urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::17/foo", ] for url in urls: @@ -779,6 +802,7 @@ def test_fetch_unsafe_url_allowed(self, rw_repo): # fail since we don't have that protocol enabled in the Git config file. with self.assertRaises(GitCommandError): remote.fetch(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_options(self, rw_repo): @@ -789,6 +813,7 @@ def test_fetch_unsafe_options(self, rw_repo): for unsafe_option in unsafe_options: with self.assertRaises(UnsafeOptionError): remote.fetch(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_fetch_unsafe_options_allowed(self, rw_repo): @@ -798,25 +823,32 @@ def test_fetch_unsafe_options_allowed(self, rw_repo): unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] for unsafe_option in unsafe_options: # The options will be allowed, but the command will fail. + assert not tmp_file.exists() with self.assertRaises(GitCommandError): remote.fetch(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_url(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" remote = rw_repo.remote("origin") urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::17/foo", ] for url in urls: with self.assertRaises(UnsafeProtocolError): remote.pull(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_url_allowed(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" remote = rw_repo.remote("origin") urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::17/foo", ] for url in urls: @@ -824,6 +856,7 @@ def test_pull_unsafe_url_allowed(self, rw_repo): # fail since we don't have that protocol enabled in the Git config file. with self.assertRaises(GitCommandError): remote.pull(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_options(self, rw_repo): @@ -834,6 +867,7 @@ def test_pull_unsafe_options(self, rw_repo): for unsafe_option in unsafe_options: with self.assertRaises(UnsafeOptionError): remote.pull(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_pull_unsafe_options_allowed(self, rw_repo): @@ -843,25 +877,32 @@ def test_pull_unsafe_options_allowed(self, rw_repo): unsafe_options = [{"upload-pack": f"touch {tmp_file}"}] for unsafe_option in unsafe_options: # The options will be allowed, but the command will fail. + assert not tmp_file.exists() with self.assertRaises(GitCommandError): remote.pull(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_url(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" remote = rw_repo.remote("origin") urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::17/foo", ] for url in urls: with self.assertRaises(UnsafeProtocolError): remote.push(url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_url_allowed(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" remote = rw_repo.remote("origin") urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::17/foo", ] for url in urls: @@ -869,6 +910,7 @@ def test_push_unsafe_url_allowed(self, rw_repo): # fail since we don't have that protocol enabled in the Git config file. with self.assertRaises(GitCommandError): remote.push(url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_options(self, rw_repo): @@ -882,8 +924,10 @@ def test_push_unsafe_options(self, rw_repo): } ] for unsafe_option in unsafe_options: + assert not tmp_file.exists() with self.assertRaises(UnsafeOptionError): remote.push(**unsafe_option) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_push_unsafe_options_allowed(self, rw_repo): @@ -898,8 +942,11 @@ def test_push_unsafe_options_allowed(self, rw_repo): ] for unsafe_option in unsafe_options: # The options will be allowed, but the command will fail. + assert not tmp_file.exists() with self.assertRaises(GitCommandError): remote.push(**unsafe_option, allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() class TestTimeouts(TestBase): diff --git a/test/test_repo.py b/test/test_repo.py index 72320184f..5874dbe6a 100644 --- a/test/test_repo.py +++ b/test/test_repo.py @@ -279,6 +279,7 @@ def test_clone_unsafe_options(self, rw_repo): for unsafe_option in unsafe_options: with self.assertRaises(UnsafeOptionError): rw_repo.clone(tmp_dir, multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_clone_unsafe_options_allowed(self, rw_repo): @@ -290,9 +291,12 @@ def test_clone_unsafe_options_allowed(self, rw_repo): ] for i, unsafe_option in enumerate(unsafe_options): destination = tmp_dir / str(i) + assert not tmp_file.exists() # The options will be allowed, but the command will fail. with self.assertRaises(GitCommandError): rw_repo.clone(destination, multi_options=[unsafe_option], allow_unsafe_options=True) + assert tmp_file.exists() + tmp_file.unlink() unsafe_options = [ "--config=protocol.ext.allow=always", @@ -331,6 +335,7 @@ def test_clone_from_unsafe_options(self, rw_repo): for unsafe_option in unsafe_options: with self.assertRaises(UnsafeOptionError): Repo.clone_from(rw_repo.working_dir, tmp_dir, multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_clone_from_unsafe_options_allowed(self, rw_repo): @@ -342,11 +347,14 @@ def test_clone_from_unsafe_options_allowed(self, rw_repo): ] for i, unsafe_option in enumerate(unsafe_options): destination = tmp_dir / str(i) + assert not tmp_file.exists() # The options will be allowed, but the command will fail. with self.assertRaises(GitCommandError): Repo.clone_from( rw_repo.working_dir, destination, multi_options=[unsafe_option], allow_unsafe_options=True ) + assert tmp_file.exists() + tmp_file.unlink() unsafe_options = [ "--config=protocol.ext.allow=always", @@ -374,16 +382,19 @@ def test_clone_from_safe_options(self, rw_repo): def test_clone_from_unsafe_procol(self): tmp_dir = pathlib.Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::17/foo", ] for url in urls: with self.assertRaises(UnsafeProtocolError): Repo.clone_from(url, tmp_dir) + assert not tmp_file.exists() def test_clone_from_unsafe_procol_allowed(self): tmp_dir = pathlib.Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" urls = [ "ext::sh -c touch% /tmp/pwn", "fd::/foo", @@ -393,6 +404,7 @@ def test_clone_from_unsafe_procol_allowed(self): # fail since we don't have that protocol enabled in the Git config file. with self.assertRaises(GitCommandError): Repo.clone_from(url, tmp_dir, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_max_chunk_size(self, repo): diff --git a/test/test_submodule.py b/test/test_submodule.py index 5b1622178..13878df28 100644 --- a/test/test_submodule.py +++ b/test/test_submodule.py @@ -1101,18 +1101,23 @@ def test_add_no_clone_multi_options_argument(self, rwdir): @with_rw_repo("HEAD") def test_submodule_add_unsafe_url(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::/foo", ] for url in urls: with self.assertRaises(UnsafeProtocolError): Submodule.add(rw_repo, "new", "new", url) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_url_allowed(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::/foo", ] for url in urls: @@ -1120,6 +1125,7 @@ def test_submodule_add_unsafe_url_allowed(self, rw_repo): # fail since we don't have that protocol enabled in the Git config file. with self.assertRaises(GitCommandError): Submodule.add(rw_repo, "new", "new", url, allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_options(self, rw_repo): @@ -1134,6 +1140,7 @@ def test_submodule_add_unsafe_options(self, rw_repo): for unsafe_option in unsafe_options: with self.assertRaises(UnsafeOptionError): Submodule.add(rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_add_unsafe_options_allowed(self, rw_repo): @@ -1142,6 +1149,16 @@ def test_submodule_add_unsafe_options_allowed(self, rw_repo): unsafe_options = [ f"--upload-pack='touch {tmp_file}'", f"-u 'touch {tmp_file}'", + ] + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + Submodule.add( + rw_repo, "new", "new", str(tmp_dir), clone_multi_options=[unsafe_option], allow_unsafe_options=True + ) + assert not tmp_file.exists() + + unsafe_options = [ "--config=protocol.ext.allow=always", "-c protocol.ext.allow=always", ] @@ -1153,19 +1170,24 @@ def test_submodule_add_unsafe_options_allowed(self, rw_repo): @with_rw_repo("HEAD") def test_submodule_update_unsafe_url(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::/foo", ] for url in urls: submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=url) with self.assertRaises(UnsafeProtocolError): submodule.update() + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_url_allowed(self, rw_repo): + tmp_dir = Path(tempfile.mkdtemp()) + tmp_file = tmp_dir / "pwn" urls = [ - "ext::sh -c touch% /tmp/pwn", + f"ext::sh -c touch% {tmp_file}", "fd::/foo", ] for url in urls: @@ -1174,6 +1196,7 @@ def test_submodule_update_unsafe_url_allowed(self, rw_repo): # fail since we don't have that protocol enabled in the Git config file. with self.assertRaises(GitCommandError): submodule.update(allow_unsafe_protocols=True) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_options(self, rw_repo): @@ -1189,6 +1212,7 @@ def test_submodule_update_unsafe_options(self, rw_repo): for unsafe_option in unsafe_options: with self.assertRaises(UnsafeOptionError): submodule.update(clone_multi_options=[unsafe_option]) + assert not tmp_file.exists() @with_rw_repo("HEAD") def test_submodule_update_unsafe_options_allowed(self, rw_repo): @@ -1197,6 +1221,15 @@ def test_submodule_update_unsafe_options_allowed(self, rw_repo): unsafe_options = [ f"--upload-pack='touch {tmp_file}'", f"-u 'touch {tmp_file}'", + ] + submodule = Submodule(rw_repo, b"\0" * 20, name="new", path="new", url=str(tmp_dir)) + for unsafe_option in unsafe_options: + # The options will be allowed, but the command will fail. + with self.assertRaises(GitCommandError): + submodule.update(clone_multi_options=[unsafe_option], allow_unsafe_options=True) + assert not tmp_file.exists() + + unsafe_options = [ "--config=protocol.ext.allow=always", "-c protocol.ext.allow=always", ]