From 42690ac18f4f8e327eb43f0e7d53e6a21883ede1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Mas=C5=82owski?= Date: Mon, 31 May 2021 13:37:17 +0200 Subject: [PATCH] Add type hints and run mypy --- .github/workflows/linters.yml | 19 ++++++++++ mypy.ini | 25 +++++++++++++ port_for/__init__.py | 15 +++++++- port_for/_download_ranges.py | 20 ++++++---- port_for/api.py | 69 ++++++++++++++++++++++++++--------- port_for/ephemeral.py | 14 ++++--- port_for/py.typed | 0 port_for/store.py | 47 +++++++++++++----------- port_for/utils.py | 9 +++-- requirements-lint.txt | 3 +- scripts/port-for | 32 +++++++++------- setup.py | 2 + tests/test_cases.py | 53 ++++++++++++++++----------- 13 files changed, 214 insertions(+), 94 deletions(-) create mode 100644 mypy.ini create mode 100644 port_for/py.typed diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml index 2dc868c..7fc49c6 100644 --- a/.github/workflows/linters.yml +++ b/.github/workflows/linters.yml @@ -59,3 +59,22 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-python@v2 - uses: psf/black@main + + mypy: + runs-on: ubuntu-latest + strategy: + fail-fast: false + + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.9 + uses: actions/setup-python@v2 + with: + python-version: 3.9 + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements-lint.txt + - name: Run mypy + run: | + mypy port_for/ tests/ scripts/port-for diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..cd300cd --- /dev/null +++ b/mypy.ini @@ -0,0 +1,25 @@ +[mypy] +allow_redefinition = False +allow_untyped_globals = False +check_untyped_defs = True +disallow_incomplete_defs = True +disallow_subclassing_any = True +disallow_untyped_calls = True +disallow_untyped_decorators = True +disallow_untyped_defs = True +follow_imports = silent +ignore_missing_imports = False +implicit_reexport = False +no_implicit_optional = True +pretty = True +show_error_codes = True +strict_equality = True +warn_no_return = True +warn_return_any = True +warn_unreachable = True +warn_unused_ignores = True + +# Bundled third-party package. +[mypy-port_for.docopt.*] +check_untyped_defs = False +disallow_untyped_defs = False diff --git a/port_for/__init__.py b/port_for/__init__.py index 889edf3..4ebeb2b 100644 --- a/port_for/__init__.py +++ b/port_for/__init__.py @@ -3,6 +3,7 @@ __version__ = "0.5.0" +from ._ranges import UNASSIGNED_RANGES from .api import ( available_good_ports, available_ports, @@ -11,7 +12,19 @@ port_is_used, select_random, get_port, - UNASSIGNED_RANGES, ) from .store import PortStore from .exceptions import PortForException + +__all__ = ( + "UNASSIGNED_RANGES", + "available_good_ports", + "available_ports", + "is_available", + "good_port_ranges", + "port_is_used", + "select_random", + "get_port", + "PortStore", + "PortForException", +) diff --git a/port_for/_download_ranges.py b/port_for/_download_ranges.py index 01ff8e1..5e6a8fb 100644 --- a/port_for/_download_ranges.py +++ b/port_for/_download_ranges.py @@ -9,6 +9,7 @@ import datetime from urllib.request import Request, urlopen from xml.etree import ElementTree +from typing import Set, Iterator, Iterable, Tuple from port_for.utils import to_ranges, ranges_to_set @@ -25,7 +26,7 @@ WIKIPEDIA_PAGE = "http://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers" -def _write_unassigned_ranges(out_filename): +def _write_unassigned_ranges(out_filename: str) -> None: """ Downloads ports data from IANA & Wikipedia and converts it to a python module. This function is used to generate _ranges.py. @@ -41,14 +42,14 @@ def _write_unassigned_ranges(out_filename): f.write("]\n") -def _unassigned_ports(): +def _unassigned_ports() -> Set[int]: """Return a set of all unassigned ports (according to IANA and Wikipedia)""" free_ports = ranges_to_set(_parse_ranges(_iana_unassigned_port_ranges())) known_ports = ranges_to_set(_wikipedia_known_port_ranges()) return free_ports.difference(known_ports) -def _wikipedia_known_port_ranges(): +def _wikipedia_known_port_ranges() -> Iterator[Tuple[int, int]]: """ Returns used port ranges according to Wikipedia page. This page contains unofficial well-known ports. @@ -61,7 +62,7 @@ def _wikipedia_known_port_ranges(): return ((int(p[1]), int(p[3] if p[3] else p[1])) for p in ports) -def _iana_unassigned_port_ranges(): +def _iana_unassigned_port_ranges() -> Iterator[str]: """ Returns unassigned port ranges according to IANA. """ @@ -69,13 +70,18 @@ def _iana_unassigned_port_ranges(): xml = ElementTree.fromstring(page) records = xml.findall("{%s}record" % IANA_NS) for record in records: - description = record.find("{%s}description" % IANA_NS).text + description_el = record.find("{%s}description" % IANA_NS) + assert description_el is not None + description = description_el.text if description == "Unassigned": - numbers = record.find("{%s}number" % IANA_NS).text + number_el = record.find("{%s}number" % IANA_NS) + assert number_el is not None + numbers = number_el.text + assert numbers is not None yield numbers -def _parse_ranges(ranges): +def _parse_ranges(ranges: Iterable[str]) -> Iterator[Tuple[int, int]]: """Converts a list of string ranges to a list of [low, high] tuples.""" for txt in ranges: if "-" in txt: diff --git a/port_for/api.py b/port_for/api.py index 63c81d8..cbb7865 100644 --- a/port_for/api.py +++ b/port_for/api.py @@ -1,11 +1,11 @@ # -*- coding: utf-8 -*- -# -*- coding: utf-8 -*- from __future__ import absolute_import, division, with_statement import contextlib import socket import errno import random from itertools import chain +from typing import Optional, Set, List, Tuple, Iterable, TypeVar, Type, Union from port_for import ephemeral, utils from ._ranges import UNASSIGNED_RANGES from .exceptions import PortForException @@ -14,7 +14,10 @@ SYSTEM_PORT_RANGE = (0, 1024) -def select_random(ports=None, exclude_ports=None): +def select_random( + ports: Optional[Set[int]] = None, + exclude_ports: Optional[Iterable[int]] = None, +) -> int: """ Returns random unused port number. """ @@ -32,14 +35,18 @@ def select_random(ports=None, exclude_ports=None): raise PortForException("Can't select a port") -def is_available(port): +def is_available(port: int) -> bool: """ Returns if port is good to choose. """ return port in available_ports() and not port_is_used(port) -def available_ports(low=1024, high=65535, exclude_ranges=None): +def available_ports( + low: int = 1024, + high: int = 65535, + exclude_ranges: Optional[List[Tuple[int, int]]] = None, +) -> Set[int]: """ Returns a set of possible ports (excluding system, ephemeral and well-known ports). @@ -56,7 +63,9 @@ def available_ports(low=1024, high=65535, exclude_ranges=None): return available.difference(exclude) -def good_port_ranges(ports=None, min_range_len=20, border=3): +def good_port_ranges( + ports: Optional[Set[int]] = None, min_range_len: int = 20, border: int = 3 +) -> List[Tuple[int, int]]: """ Returns a list of 'good' port ranges. Such ranges are large and don't contain ephemeral or well-known ports. @@ -76,13 +85,13 @@ def good_port_ranges(ports=None, min_range_len=20, border=3): return without_borders -def available_good_ports(min_range_len=20, border=3): +def available_good_ports(min_range_len: int = 20, border: int = 3) -> Set[int]: return utils.ranges_to_set( good_port_ranges(min_range_len=min_range_len, border=border) ) -def port_is_used(port, host="127.0.0.1"): +def port_is_used(port: int, host: str = "127.0.0.1") -> bool: """ Returns if port is used. Port is considered used if the current process can't bind to it or the port doesn't refuse connections. @@ -91,7 +100,7 @@ def port_is_used(port, host="127.0.0.1"): return not unused -def _can_bind(port, host): +def _can_bind(port: int, host: str) -> bool: sock = socket.socket() with contextlib.closing(sock): try: @@ -101,7 +110,7 @@ def _can_bind(port, host): return True -def _refuses_connection(port, host): +def _refuses_connection(port: int, host: str) -> bool: sock = socket.socket() with contextlib.closing(sock): sock.settimeout(1) @@ -109,12 +118,28 @@ def _refuses_connection(port, host): return err == errno.ECONNREFUSED -def filter_by_type(lst, type_of): +T = TypeVar("T") + + +def filter_by_type(lst: Iterable, type_of: Type[T]) -> List[T]: """Returns a list of elements with given type.""" return [e for e in lst if isinstance(e, type_of)] -def get_port(ports): +def get_port( + ports: Union[ + str, + int, + Tuple[int, int], + Set[int], + List[str], + List[int], + List[Tuple[int, int]], + List[Set[int]], + List[Union[Set[int], Tuple[int, int]]], + List[Union[str, int, Tuple[int, int], Set[int]]], + ] +) -> Optional[int]: """ Retuns a random available port. If there's only one port passed (e.g. 5000 or '5000') function does not check if port is available. @@ -125,7 +150,7 @@ def get_port(ports): randomly selected port (None) - any random available port [(2000,3000)] or (2000,3000) - random available port from a given range [{4002,4003}] or {4002,4003} - random of 4002 or 4003 ports - [(2000,3000), {4002,4003}] -random of given orange and set + [(2000,3000), {4002,4003}] -random of given range and set :returns: a random free port :raises: ValueError """ @@ -135,18 +160,26 @@ def get_port(ports): return select_random(None) try: - return int(ports) + return int(ports) # type: ignore[arg-type] except TypeError: pass - ports_set = set() + ports_set: Set[int] = set() try: if not isinstance(ports, list): ports = [ports] - ranges = utils.ranges_to_set(filter_by_type(ports, tuple)) - nums = set(filter_by_type(ports, int)) - sets = set(chain(*filter_by_type(ports, (set, frozenset)))) + ranges: Set[int] = utils.ranges_to_set( + filter_by_type(ports, tuple) # type: ignore[arg-type] + ) + nums: Set[int] = set(filter_by_type(ports, int)) + sets: Set[int] = set( + chain( + *filter_by_type( + ports, (set, frozenset) # type: ignore[arg-type] + ) + ) + ) ports_set = ports_set.union(ranges, sets, nums) except ValueError: raise PortForException( @@ -155,7 +188,7 @@ def get_port(ports): 'or "(4000,5000)" or a comma-separated ports set' '"[{4000,5000,6000}]" or list of ints "[400,5000,6000,8000]"' 'or all of them "[(20000, 30000), {48889, 50121}, 4000, 4004]"' - % ports + % (ports,) ) return select_random(ports_set) diff --git a/port_for/ephemeral.py b/port_for/ephemeral.py index 29c56dd..b1c1578 100644 --- a/port_for/ephemeral.py +++ b/port_for/ephemeral.py @@ -8,11 +8,12 @@ """ from __future__ import absolute_import, with_statement import subprocess +from typing import List, Tuple, Dict DEFAULT_EPHEMERAL_PORT_RANGE = (32768, 65535) -def port_ranges(): +def port_ranges() -> List[Tuple[int, int]]: """ Returns a list of ephemeral port ranges for current machine. """ @@ -30,22 +31,25 @@ def port_ranges(): return [DEFAULT_EPHEMERAL_PORT_RANGE] -def _linux_ranges(): +def _linux_ranges() -> List[Tuple[int, int]]: with open("/proc/sys/net/ipv4/ip_local_port_range") as f: # use readline() instead of read() for linux + musl low, high = f.readline().split() return [(int(low), int(high))] -def _bsd_ranges(): +def _bsd_ranges() -> List[Tuple[int, int]]: pp = subprocess.Popen( ["sysctl", "net.inet.ip.portrange"], stdout=subprocess.PIPE ) stdout, stderr = pp.communicate() lines = stdout.decode("ascii").split("\n") - out = dict( + out: Dict[str, str] = dict( [ - [x.strip().rsplit(".")[-1] for x in line.split(":")] + [ + x.strip().rsplit(".")[-1] # type: ignore[misc] + for x in line.split(":") + ] for line in lines if line ] diff --git a/port_for/py.typed b/port_for/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/port_for/store.py b/port_for/store.py index cb9b908..4a06bd2 100644 --- a/port_for/store.py +++ b/port_for/store.py @@ -1,11 +1,8 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import, with_statement import os - -try: - from ConfigParser import ConfigParser, DEFAULTSECT -except ImportError: # python3 - from configparser import ConfigParser, DEFAULTSECT +from configparser import ConfigParser, DEFAULTSECT +from typing import Optional, List, Tuple, Union from .api import select_random from .exceptions import PortForException @@ -15,25 +12,28 @@ class PortStore(object): - def __init__(self, config_filename=DEFAULT_CONFIG_PATH): + def __init__(self, config_filename: str = DEFAULT_CONFIG_PATH): self._config = config_filename - def bind_port(self, app, port=None): + def bind_port( + self, app: str, port: Optional[Union[int, str]] = None + ) -> int: if "=" in app or ":" in app: raise Exception('invalid app name: "%s"' % app) + requested_port: Optional[str] = None if port is not None: - port = str(port) + requested_port = str(port) parser = self._get_parser() # this app already use some port; return it if parser.has_option(DEFAULTSECT, app): actual_port = parser.get(DEFAULTSECT, app) - if port is not None and port != actual_port: + if requested_port is not None and requested_port != actual_port: msg = ( "Can't bind to port %s: %s is already associated " - "with port %s" % (port, app, actual_port) + "with port %s" % (requested_port, app, actual_port) ) raise PortForException(msg) return int(actual_port) @@ -42,44 +42,47 @@ def bind_port(self, app, port=None): app_by_port = dict((v, k) for k, v in parser.items(DEFAULTSECT)) bound_port_numbers = map(int, app_by_port.keys()) - if port is None: - port = str(select_random(exclude_ports=bound_port_numbers)) + if requested_port is None: + requested_port = str( + select_random(exclude_ports=bound_port_numbers) + ) - if port in app_by_port: - binding_app = app_by_port[port] + if requested_port in app_by_port: + binding_app = app_by_port[requested_port] if binding_app != app: raise PortForException( - "Port %s is already used by %s!" % (port, binding_app) + "Port %s is already used by %s!" + % (requested_port, binding_app) ) # new app & new port - parser.set(DEFAULTSECT, app, port) + parser.set(DEFAULTSECT, app, requested_port) self._save(parser) - return int(port) + return int(requested_port) - def unbind_port(self, app): + def unbind_port(self, app: str) -> None: parser = self._get_parser() parser.remove_option(DEFAULTSECT, app) self._save(parser) - def bound_ports(self): + def bound_ports(self) -> List[Tuple[str, int]]: return [ (app, int(port)) for app, port in self._get_parser().items(DEFAULTSECT) ] - def _ensure_config_exists(self): + def _ensure_config_exists(self) -> None: if not os.path.exists(self._config): with open(self._config, "wb"): pass - def _get_parser(self): + def _get_parser(self) -> ConfigParser: self._ensure_config_exists() parser = ConfigParser() parser.read(self._config) return parser - def _save(self, parser): + def _save(self, parser: ConfigParser) -> None: with open(self._config, "wt") as f: parser.write(f) diff --git a/port_for/utils.py b/port_for/utils.py index b841d36..c6e1401 100644 --- a/port_for/utils.py +++ b/port_for/utils.py @@ -1,9 +1,10 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import import itertools +from typing import Iterable, Iterator, Tuple, Set -def ranges_to_set(lst): +def ranges_to_set(lst: Iterable[Tuple[int, int]]) -> Set[int]: """ Convert a list of ranges to a set of numbers:: @@ -15,7 +16,7 @@ def ranges_to_set(lst): return set(itertools.chain(*(range(x[0], x[1] + 1) for x in lst))) -def to_ranges(lst): +def to_ranges(lst: Iterable[int]) -> Iterator[Tuple[int, int]]: """ Convert a list of numbers to a list of ranges:: @@ -25,5 +26,5 @@ def to_ranges(lst): """ for a, b in itertools.groupby(enumerate(lst), lambda t: t[1] - t[0]): - b = list(b) - yield b[0][1], b[-1][1] + c = list(b) + yield c[0][1], c[-1][1] diff --git a/requirements-lint.txt b/requirements-lint.txt index 0f60c89..ce0a29c 100644 --- a/requirements-lint.txt +++ b/requirements-lint.txt @@ -3,4 +3,5 @@ pycodestyle==2.7.0 pydocstyle==6.1.1 pygments black==21.5b1 --r requirements-test.txt \ No newline at end of file +mypy==0.812 +-r requirements-test.txt diff --git a/scripts/port-for b/scripts/port-for index 9414a01..f21a764 100644 --- a/scripts/port-for +++ b/scripts/port-for @@ -26,33 +26,37 @@ Options: """ import sys +from typing import Optional import port_for from port_for.docopt import docopt store = port_for.PortStore() -def _list(): +def _list() -> None: for app, port in store.bound_ports(): sys.stdout.write("%s: %s\n" % (app, port)) -def _bind(app, port=None): - port = store.bind_port(app, port) - sys.stdout.write("%s\n" % port) +def _bind(app: str, port: Optional[str] = None) -> None: + bound_port = store.bind_port(app, port) + sys.stdout.write("%s\n" % bound_port) -def _unbind(app): +def _unbind(app: str) -> None: store.unbind_port(app) -if __name__ == '__main__': - args = docopt(__doc__, version='port-for %s' % port_for.__version__) - if args['']: - _bind(args[''], args['--port']) - elif args['--bind']: - _bind(args['--bind'], args['--port']) - elif args['--list']: +if __name__ == "__main__": + args = docopt( + __doc__, + version="port-for %s" % port_for.__version__, + ) # type: ignore[no-untyped-call] + if args[""]: + _bind(args[""], args["--port"]) + elif args["--bind"]: + _bind(args["--bind"], args["--port"]) + elif args["--list"]: _list() - elif args['--unbind']: - _unbind(args['--unbind']) + elif args["--unbind"]: + _unbind(args["--unbind"]) diff --git a/setup.py b/setup.py index c13276b..15865c0 100755 --- a/setup.py +++ b/setup.py @@ -31,4 +31,6 @@ "Topic :: Internet :: WWW/HTTP :: Site Management", ], python_requires=">=3.6", + zip_safe=False, + package_data={"port_for": ["py.typed"]}, ) diff --git a/tests/test_cases.py b/tests/test_cases.py index 81e4d76..9f0f21e 100644 --- a/tests/test_cases.py +++ b/tests/test_cases.py @@ -5,6 +5,7 @@ import mock import socket import os +from typing import Union, List, Set, Tuple import pytest @@ -13,12 +14,12 @@ from port_for.utils import ranges_to_set -def test_common_ports(): +def test_common_ports() -> None: assert not port_for.is_available(80) assert not port_for.is_available(11211) -def test_good_port_ranges(): +def test_good_port_ranges() -> None: ranges = [ (10, 15), # too short (100, 200), # good @@ -32,17 +33,17 @@ def test_good_port_ranges(): assert good_ranges == [(103, 197), (443, 492), (303, 327)], good_ranges -def test_something_works(): +def test_something_works() -> None: assert len(port_for.good_port_ranges()) > 10 assert len(port_for.available_good_ports()) > 1000 -def test_binding(): +def test_binding() -> None: # low ports are not available assert port_for.port_is_used(10) -def test_binding_high(): +def test_binding_high() -> None: s = socket.socket() s.bind(("", 0)) port = s.getsockname()[1] @@ -51,13 +52,13 @@ def test_binding_high(): assert not port_for.port_is_used(port) -def test_get_port_none(): +def test_get_port_none() -> None: """Test special case for get_port to return None.""" assert not get_port(-1) @pytest.mark.parametrize("port", (1234, "1234")) -def test_get_port_specific(port): +def test_get_port_specific(port: Union[str, int]) -> None: """Test special case for get_port to return same value.""" assert get_port(port) == 1234 @@ -69,7 +70,9 @@ def test_get_port_specific(port): (2000, 3000), ), ) -def test_get_port_from_range(port_range): +def test_get_port_from_range( + port_range: Union[List[Tuple[int, int]], Tuple[int, int]] +) -> None: """Test getting random port from given range.""" assert get_port(port_range) in list(range(2000, 3000 + 1)) @@ -81,26 +84,32 @@ def test_get_port_from_range(port_range): {4001, 4002, 4003}, ), ) -def test_get_port_from_set(port_set): +def test_get_port_from_set(port_set: Union[List[Set[int]], Set[int]]) -> None: """Test getting random port from given set.""" assert get_port(port_set) in {4001, 4002, 4003} -def test_port_mix(): +def test_port_mix() -> None: """Test getting random port from given set and range.""" - assert get_port([(2000, 3000), {4001, 4002, 4003}]) in set( - range(2000, 3000 + 1) - ) and {4001, 4002, 4003} + sets_and_ranges: List[Union[Tuple[int, int], Set[int]]] = [ + (2000, 3000), + {4001, 4002, 4003}, + ] + assert get_port(sets_and_ranges) in set(range(2000, 3000 + 1)) and { + 4001, + 4002, + 4003, + } class SelectPortTest(unittest.TestCase): @mock.patch("port_for.api.port_is_used") - def test_all_used(self, port_is_used): + def test_all_used(self, port_is_used: mock.MagicMock) -> None: port_is_used.return_value = True self.assertRaises(port_for.PortForException, port_for.select_random) @mock.patch("port_for.api.port_is_used") - def test_random_port(self, port_is_used): + def test_random_port(self, port_is_used: mock.MagicMock) -> None: ports = set([1, 2, 3]) used = {1: True, 2: False, 3: True} port_is_used.side_effect = lambda port: used[port] @@ -110,14 +119,14 @@ def test_random_port(self, port_is_used): class StoreTest(unittest.TestCase): - def setUp(self): + def setUp(self) -> None: fd, self.fname = tempfile.mkstemp() self.store = port_for.PortStore(self.fname) - def tearDown(self): + def tearDown(self) -> None: os.remove(self.fname) - def test_store(self): + def test_store(self) -> None: assert self.store.bound_ports() == [] port = self.store.bind_port("foo") @@ -134,14 +143,14 @@ def test_store(self): self.store.unbind_port("aar") self.assertEqual(self.store.bound_ports(), [("foo", port)]) - def test_rebind(self): + def test_rebind(self) -> None: # try to rebind an used port for an another app port = self.store.bind_port("foo") self.assertRaises( port_for.PortForException, self.store.bind_port, "baz", port ) - def test_change_port(self): + def test_change_port(self) -> None: # changing app ports is not supported. port = self.store.bind_port("foo") another_port = port_for.select_random() @@ -150,13 +159,13 @@ def test_change_port(self): port_for.PortForException, self.store.bind_port, "foo", another_port ) - def test_bind_unavailable(self): + def test_bind_unavailable(self) -> None: # it is possible to explicitly bind currently unavailable port port = self.store.bind_port("foo", 80) self.assertEqual(port, 80) self.assertEqual(self.store.bound_ports(), [("foo", 80)]) - def test_bind_non_auto(self): + def test_bind_non_auto(self) -> None: # it is possible to pass a port port = port_for.select_random() res_port = self.store.bind_port("foo", port)