Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --required-plugins and --allowed-plugins #1439

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/flake8/api/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def get_style_guide(**kwargs):
ignore_config_files=prelim_opts.isolated,
)

application.find_plugins(config_finder)
application.find_plugins(config_finder, prelim_opts)
application.register_plugin_options()
application.parse_configuration_and_cli(
config_finder,
Expand Down
22 changes: 22 additions & 0 deletions src/flake8/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Exception classes for all of Flake8."""
from typing import Dict
from typing import List


class Flake8Exception(Exception):
Expand Down Expand Up @@ -69,3 +70,24 @@ def __str__(self) -> str:
"name": self.plugin["plugin_name"],
"exc": self.original_exception,
}


class PluginMissingError(Flake8Exception):
"""A plugin that was required was not found."""

FORMAT = "User required %(plugins)s but %(missing)s was not found."

def __init__(
self, required_plugins: List[str], missing_plugins: List[str]
) -> None:
"""Store the information passed in to format the exception message."""
self.required_plugins = required_plugins
self.missing_plugins = missing_plugins
super().__init__(required_plugins, missing_plugins)

def __str__(self) -> str:
"""Format our exception message."""
return self.FORMAT % {
"plugins": ", ".join(self.required_plugins),
"missing": ", ".join(self.missing_plugins),
}
22 changes: 19 additions & 3 deletions src/flake8/main/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,28 @@ def exit(self) -> None:
(self.result_count > 0) or self.catastrophic_failure
)

def find_plugins(self, config_finder: config.ConfigFileFinder) -> None:
def find_plugins(
self,
config_finder: config.ConfigFileFinder,
prelim_opts: argparse.Namespace,
) -> None:
"""Find and load the plugins for this application.

Set the :attr:`check_plugins` and :attr:`formatting_plugins` attributes
based on the discovered plugins found.

:param config.ConfigFileFinder config_finder:
The finder for finding and reading configuration files.
:param argparse.Namespace prelim_opts:
The options parsed preliminarily from the CLI
"""
local_plugins = config.get_local_plugins(config_finder)
(
allowed_plugins,
required_plugins,
) = config.get_plugin_allowlist_and_requirements(
config_finder, prelim_opts
)

sys.path.extend(local_plugins.paths)

Expand All @@ -160,7 +172,11 @@ def find_plugins(self, config_finder: config.ConfigFileFinder) -> None:
local_plugins.report
)

self.check_plugins.load_plugins()
try:
self.check_plugins.load_plugins(allowed_plugins, required_plugins)
except exceptions.PluginMissingError as e:
print(f"Error: {e!s}")
raise SystemExit(2)
self.formatting_plugins.load_plugins()

def register_plugin_options(self) -> None:
Expand Down Expand Up @@ -340,7 +356,7 @@ def initialize(self, argv: List[str]) -> None:
config_file=prelim_opts.config,
ignore_config_files=prelim_opts.isolated,
)
self.find_plugins(config_finder)
self.find_plugins(config_finder, prelim_opts)
self.register_plugin_options()
self.parse_configuration_and_cli(
config_finder,
Expand Down
17 changes: 17 additions & 0 deletions src/flake8/main/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,23 @@ def register_preliminary_options(parser: argparse.ArgumentParser) -> None:
help="Ignore all configuration files.",
)

add_argument(
"--allowed-plugins",
default=None,
# parse_from_config=True,
# comma_separated_list=True,
help="Which plugins are allowed to run from the environment",
)

add_argument(
"--required-plugins",
default=None,
# parse_from_config=True,
# comma_separated_list=True,
help="Which plugins are required for linting. Exits if not all are "
"present.",
)


class JobsArgument:
"""Type callback for the --jobs argument."""
Expand Down
86 changes: 85 additions & 1 deletion src/flake8/options/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Config handling logic for Flake8."""
import argparse
import collections
import configparser
import logging
Expand Down Expand Up @@ -273,7 +274,7 @@ def get_local_plugins(config_finder):
if config_finder.ignore_config_files:
LOG.debug(
"Refusing to look for local plugins in configuration"
"files due to user-requested isolation"
" files due to user-requested isolation"
)
return local_plugins

Expand Down Expand Up @@ -315,4 +316,87 @@ def get_local_plugins(config_finder):
return local_plugins


def get_plugin_allowlist_and_requirements(
config_finder: ConfigFileFinder, preliminary_opts: argparse.Namespace
) -> Tuple[List[str], List[str]]:
"""Get allowed and required plugin lists from config.

:param config_finder:
The config file finder to use.
:type config_finder:
:class:`~flake8.options.config.ConfigFileFinder`
:param preliminary_opts:
The config file finder to use.
:type preliminary_opts:
:class:`~argparse.Namespace`
:returns:
tuple of the allowed and required plugin lists
"""
read_allowed_plugins_from_config = True
read_required_plugins_from_config = True
allowed_plugins: List[str] = []
required_plugins: List[str] = []
return_tuple: Tuple[List[str], List[str]] = (
allowed_plugins,
required_plugins,
)

if preliminary_opts.allowed_plugins is not None:
allowed_plugins.extend(
utils.parse_comma_separated_list(preliminary_opts.allowed_plugins)
)
read_allowed_plugins_from_config = False

if preliminary_opts.required_plugins is not None:
required_plugins.extend(
utils.parse_comma_separated_list(preliminary_opts.required_plugins)
)
read_required_plugins_from_config = False

if config_finder.ignore_config_files:
LOG.debug(
"Refusing to look for plugin configuration in configuration"
" files due to user-requested isolation"
)
return return_tuple

if (
not read_allowed_plugins_from_config
and not read_required_plugins_from_config
):
LOG.debug("Found --allowed-plugins and --required-plugins")
return return_tuple

if config_finder.config_file:
LOG.debug(
'Reading local plugins only from "%s" specified via '
"--config by the user",
config_finder.config_file,
)
config = config_finder.cli_config(config_finder.config_file)
config_files = [config_finder.config_file]
else:
config, config_files = config_finder.local_configs_with_files()

section = f"{config_finder.program_name}"
if (
config.has_option(section, "allowed-plugins")
and read_allowed_plugins_from_config
):
allowed_plugins_str = config.get(section, "allowed-plugins").strip()
allowed_plugins.extend(
utils.parse_comma_separated_list(allowed_plugins_str)
)
if (
config.has_option(section, "required-plugins")
and read_required_plugins_from_config
):
required_plugins_str = config.get(section, "required-plugins").strip()
required_plugins.extend(
utils.parse_comma_separated_list(required_plugins_str)
)

return return_tuple


LocalPlugins = collections.namedtuple("LocalPlugins", "extension report paths")
30 changes: 28 additions & 2 deletions src/flake8/plugins/manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Plugin loading and management logic and classes."""
import logging
from typing import Any
from typing import cast
from typing import Dict
from typing import List
from typing import Optional
Expand Down Expand Up @@ -410,13 +411,38 @@ def generated_function(plugin):

return generated_function

def load_plugins(self):
"""Load all plugins of this type that are managed by this manager."""
def load_plugins(
self,
allowed_plugins: Optional[List[str]] = None,
required_plugins: Optional[List[str]] = None,
) -> None:
"""Load all plugins of this type that are managed by this manager.

:param allowed_plugins:
This is the list of plugins allowed to be loaded
:param required_plugins:
This is the list of plugins required by the user
:raises PluginMissingError:
If a required plugin is missing
"""
if self.plugins_loaded:
return

requireset = set(required_plugins or [])
allowset = set(allowed_plugins or [])
loaded = set()
for plugin in self.plugins.values():
if allowset and plugin.name not in allowset:
continue
plugin.load_plugin()
loaded.add(plugin.name)

missing = requireset.difference(loaded)
if requireset and missing:
required_plugins = cast(List[str], required_plugins)
raise exceptions.PluginMissingError(
required_plugins, sorted(missing)
)

# Do not set plugins_loaded if we run into an exception
self.plugins_loaded = True
Expand Down