Skip to content

Commit

Permalink
Refactor type comments into annotations
Browse files Browse the repository at this point in the history
Also remove flake8<6 restriction.
Fixes #137
  • Loading branch information
maarten-ic committed Feb 1, 2023
1 parent 9655342 commit 2b0493b
Show file tree
Hide file tree
Showing 26 changed files with 97 additions and 103 deletions.
8 changes: 4 additions & 4 deletions docs/source/examples/python/interact_coupling.py
Expand Up @@ -55,10 +55,10 @@ def __init__(self) -> None:
The cache starts out empty.
"""
self.t_cur = None # type: Optional[float]
self.data_cur = None # type: Optional[Any]
self.t_next = None # type: Optional[float]
self.data_next = None # type: Optional[Any]
self.t_cur: Optional[float] = None
self.data_cur: Optional[Any] = None
self.t_next: Optional[float] = None
self.data_next: Optional[Any] = None

def add_data(self, t: float, data: Any) -> None:
"""Add new data to the cache.
Expand Down
14 changes: 7 additions & 7 deletions libmuscle/python/libmuscle/checkpoint_triggers.py
Expand Up @@ -92,7 +92,7 @@ def __init__(self, range: CheckpointRangeRule) -> None:
self._start = range.start
self._stop = range.stop
self._every = range.every
self._last = None # type: Union[int, float, None]
self._last: Union[int, float, None] = None
if self._stop is not None:
start = 0 if self._start is None else self._start
diff = self._stop - start
Expand Down Expand Up @@ -127,8 +127,8 @@ def __init__(self, checkpoint_rules: List[CheckpointRule]) -> None:
Args:
checkpoint_rules: checkpoint rules (from ymmsl)
"""
self._triggers = [] # type: List[CheckpointTrigger]
at_rules = [] # type: List[CheckpointAtRule]
self._triggers: List[CheckpointTrigger] = []
at_rules: List[CheckpointAtRule] = []
for rule in checkpoint_rules:
if isinstance(rule, CheckpointAtRule):
if rule.at:
Expand Down Expand Up @@ -165,7 +165,7 @@ class TriggerManager:

def __init__(self) -> None:
self._has_checkpoints = False
self._last_triggers = [] # type: List[str]
self._last_triggers: List[str] = []
self._cpts_considered_until = float('-inf')

def set_checkpoint_info(
Expand All @@ -184,11 +184,11 @@ def set_checkpoint_info(

self._wall = CombinedCheckpointTriggers(checkpoints.wallclock_time)
self._prevwall = 0.0
self._nextwall = self._wall.next_checkpoint(0.0) # type: Optional[float]
self._nextwall: Optional[float] = self._wall.next_checkpoint(0.0)

self._sim = CombinedCheckpointTriggers(checkpoints.simulation_time)
self._prevsim = None # type: Optional[float]
self._nextsim = None # type: Optional[float]
self._prevsim: Optional[float] = None
self._nextsim: Optional[float] = None

def elapsed_walltime(self) -> float:
"""Returns elapsed wallclock_time in seconds.
Expand Down
12 changes: 6 additions & 6 deletions libmuscle/python/libmuscle/communicator.py
Expand Up @@ -90,16 +90,16 @@ def __init__(self, kernel: Reference, index: List[int],
self._post_office = PostOffice()
self._profiler = profiler

self._servers = list() # type: List[TransportServer]
self._servers: List[TransportServer] = []

# indexed by remote instance id
self._clients = dict() # type: Dict[Reference, MPPClient]
self._clients: Dict[Reference, MPPClient] = {}

for server_type in transport_server_types:
server = server_type(self._post_office)
self._servers.append(server)

self._ports = dict() # type: Dict[str, Port]
self._ports: Dict[str, Port] = {}

def get_locations(self) -> List[str]:
"""Returns a list of locations that we can be reached at.
Expand Down Expand Up @@ -156,7 +156,7 @@ def list_ports(self) -> Dict[Operator, List[str]]:
port names. Operators with no associated ports are not
included.
"""
result = dict() # type: Dict[Operator, List[str]]
result: Dict[Operator, List[str]] = {}
for port_name, port in self._ports.items():
if port.operator not in result:
result[port.operator] = list()
Expand Down Expand Up @@ -197,7 +197,7 @@ def send_message(
"""
if slot is None:
_logger.debug('Sending message on {}'.format(port_name))
slot_list = [] # type: List[int]
slot_list: List[int] = []
else:
_logger.debug('Sending message on {}[{}]'.format(port_name, slot))
slot_list = [slot]
Expand Down Expand Up @@ -273,7 +273,7 @@ def receive_message(self, port_name: str, slot: Optional[int] = None,
"""
if slot is None:
port_and_slot = port_name
slot_list = [] # type: List[int]
slot_list: List[int] = []
else:
port_and_slot = f"{port_name}[{slot}]"
slot_list = [slot]
Expand Down
8 changes: 4 additions & 4 deletions libmuscle/python/libmuscle/endpoint.py
Expand Up @@ -51,10 +51,10 @@ def __init__(self, kernel: Reference, index: List[int], port: Identifier,
port: Name of the port used.
slot: Slot on which to send or receive.
"""
self.kernel = kernel # type: Reference
self.index = index # type: List[int]
self.port = port # type: Identifier
self.slot = slot # type: List[int]
self.kernel = kernel
self.index = index
self.port = port
self.slot = slot

def ref(self) -> Reference:
"""Express as Reference.
Expand Down
8 changes: 4 additions & 4 deletions libmuscle/python/libmuscle/instance.py
Expand Up @@ -154,10 +154,10 @@ def __init__(
self._trigger_manager = TriggerManager()
"""Keeps track of checkpoints and triggers snapshots."""

self._first_run = None # type: Optional[bool]
self._first_run: Optional[bool] = None
"""Whether this is the first iteration of the reuse loop"""

self._do_reuse = None # type: Optional[bool]
self._do_reuse: Optional[bool] = None
"""Whether to enter this iteration of the reuse loop
This is None during the reuse loop, and set between
Expand All @@ -170,7 +170,7 @@ def __init__(
self._do_init = False
"""Whether to do f_init on this iteration of the reuse loop"""

self._f_init_cache = dict() # type: _FInitCacheType
self._f_init_cache: _FInitCacheType = {}
"""Stores pre-received messages for f_init ports"""

self._register()
Expand Down Expand Up @@ -910,7 +910,7 @@ def __make_full_name(self
option and splits it into a component name and an index.
"""
def split_reference(ref: Reference) -> Tuple[Reference, List[int]]:
index = list() # type: List[int]
index: List[int] = []
i = 0
while i < len(ref) and isinstance(ref[i], Identifier):
i += 1
Expand Down
8 changes: 4 additions & 4 deletions libmuscle/python/libmuscle/manager/instance_manager.py
Expand Up @@ -65,10 +65,10 @@ def __init__(
self._configuration = configuration
self._run_dir = run_dir

self._resources_in = Queue() # type: Queue[Resources]
self._requests_out = Queue() # type: Queue[InstantiatorRequest]
self._results_in = Queue() # type: Queue[_ResultType]
self._log_records_in = Queue() # type: Queue[logging.LogRecord]
self._resources_in: Queue[Resources] = Queue()
self._requests_out: Queue[InstantiatorRequest] = Queue()
self._results_in: Queue[_ResultType] = Queue()
self._log_records_in: Queue[logging.LogRecord] = Queue()

self._instantiator = QCGPJInstantiator(
self._resources_in, self._requests_out, self._results_in,
Expand Down
4 changes: 2 additions & 2 deletions libmuscle/python/libmuscle/manager/instance_registry.py
Expand Up @@ -18,8 +18,8 @@ class InstanceRegistry:
def __init__(self) -> None:
"""Construct an empty InstanceRegistry"""
self._deregistered_one = Condition() # doubles as lock
self._locations = dict() # type: Dict[Reference, List[str]]
self._ports = dict() # type: Dict[Reference, List[Port]]
self._locations: Dict[Reference, List[str]] = {}
self._ports: Dict[Reference, List[Port]] = {}
self._startup = True

def add(self, name: Reference, locations: List[str], ports: List[Port]
Expand Down
4 changes: 2 additions & 2 deletions libmuscle/python/libmuscle/manager/instantiator.py
Expand Up @@ -47,8 +47,8 @@ def __init__(self, instance: Reference, resources: Resources) -> None:
self.instance = instance
self.resources = resources
self.status = ProcessStatus.STARTED
self.exit_code = None # type: Optional[int]
self.error_msg = None # type: Optional[str]
self.exit_code: Optional[int] = None
self.error_msg: Optional[str] = None


class InstantiatorRequest:
Expand Down
2 changes: 1 addition & 1 deletion libmuscle/python/libmuscle/manager/manager.py
Expand Up @@ -57,7 +57,7 @@ def __init__(
self._configuration,
self._run_dir.path / 'configuration.ymmsl')

self._instance_manager = None # type: Optional[InstanceManager]
self._instance_manager: Optional[InstanceManager] = None
try:
configuration = self._configuration.as_configuration()
if self._run_dir is not None:
Expand Down
7 changes: 3 additions & 4 deletions libmuscle/python/libmuscle/manager/qcgpj_instantiator.py
Expand Up @@ -46,7 +46,7 @@ class StateTracker:
"""
def __init__(self) -> None:
"""Create a StateTracker."""
self.processes = dict() # type: Dict[Reference, Process]
self.processes: Dict[Reference, Process] = {}

# These are for communicating with QCG-PJ
self.queued_to_execute = 0
Expand Down Expand Up @@ -122,8 +122,7 @@ def run(self) -> None:
self._reconfigure_logging()

# Executor needs to be instantiated before we go async
qcg_config = {
qcg_Config.AUX_DIR: str(qcgpj_dir)} # type: Dict[str, str]
qcg_config: Dict[str, str] = {qcg_Config.AUX_DIR: str(qcgpj_dir)}
self._qcg_resources = qcg_get_resources(qcg_config)
self._state_tracker = StateTracker()
self._executor = qcg_Executor(
Expand All @@ -146,7 +145,7 @@ async def _main(self) -> None:
jobs, stopping them, or shutting down. Results of finished jobs
are returned via the results queue.
"""
qcg_iters = dict() # type: Dict[Reference, qcg_SchedulingIteration]
qcg_iters: Dict[Reference, qcg_SchedulingIteration] = {}

await asyncio.sleep(0.01) # allow requests_in queue to be populated

Expand Down
16 changes: 8 additions & 8 deletions libmuscle/python/libmuscle/manager/snapshot_registry.py
Expand Up @@ -200,10 +200,10 @@ def __init__(
self._snapshot_folder = snapshot_folder
self._topology_store = topology_store

self._queue = Queue() # type: Queue[_QueueItemType]
self._snapshots = {} # type: _SnapshotDictType
self._queue: Queue[_QueueItemType] = Queue()
self._snapshots: _SnapshotDictType = {}

self._instances = set() # type: Set[Reference]
self._instances: Set[Reference] = set()
for component in config.model.components:
self._instances.update(component.instances())

Expand Down Expand Up @@ -294,7 +294,7 @@ def _get_workflow_snapshots(
# to further restrict the sets of snapshots as peer snapshots are
# selected.
# First restriction is that the snapshots have to be locally consistent.
allowed_snapshots = {} # type: Dict[Reference, FrozenSet[SnapshotNode]]
allowed_snapshots: Dict[Reference, FrozenSet[SnapshotNode]] = {}
for instance in instances_to_cover:
allowed_snapshots[instance] = frozenset(
i_snapshot
Expand All @@ -321,7 +321,7 @@ def num_allowed_snapshots(instance: Reference) -> int:
workflow_snapshots = []
selected_snapshots = [snapshot]
# This stack stores history of allowed_snapshots and enables roll back
stack = [] # type: List[Dict[Reference, FrozenSet[SnapshotNode]]]
stack: List[Dict[Reference, FrozenSet[SnapshotNode]]] = []

# Update allowed_snapshots for peers of the selected snapshot
for peer, snapshots in snapshot.consistent_peers.items():
Expand Down Expand Up @@ -430,7 +430,7 @@ def _generate_description(
self, selected_snapshots: List[SnapshotNode], now: datetime) -> str:
"""Generate a human-readable description of the workflow snapshot.
"""
triggers = {} # type: Dict[str, List[str]]
triggers: Dict[str, List[str]] = {}
component_info = []
max_instance_len = len('Instance ')
for node in selected_snapshots:
Expand Down Expand Up @@ -477,7 +477,7 @@ def _cleanup_snapshots(
newest_snapshots[snapshot.instance] = snapshot

# Remove all snapshots that are older than the newest snapshots
removed_snapshots = set() # type: Set[SnapshotNode]
removed_snapshots: Set[SnapshotNode] = set()
for snapshot in newest_snapshots.values():
all_snapshots = self._snapshots[snapshot.instance]
idx = all_snapshots.index(snapshot)
Expand Down Expand Up @@ -538,7 +538,7 @@ def _get_connections(self, instance: Reference, peer: Reference
instance_kernel = instance.without_trailing_ints()
peer_kernel = peer.without_trailing_ints()

connected_ports = [] # type: List[_ConnectionType]
connected_ports: List[_ConnectionType] = []
for conduit in self._model.conduits:
if (conduit.sending_component() == instance_kernel and
conduit.receiving_component() == peer_kernel):
Expand Down
2 changes: 1 addition & 1 deletion libmuscle/python/libmuscle/mcp/tcp_transport_client.py
Expand Up @@ -31,7 +31,7 @@ def __init__(self, location: str) -> None:
"""
addresses = location[4:].split(',')

sock = None # type: Optional[socket.SocketType]
sock: Optional[socket.SocketType] = None
for address in addresses:
try:
sock = self._connect(address)
Expand Down
4 changes: 2 additions & 2 deletions libmuscle/python/libmuscle/mcp/tcp_transport_server.py
Expand Up @@ -87,7 +87,7 @@ def get_location(self) -> str:
"""
host, port = self._server.server_address

locs = list() # type: List[str]
locs: List[str] = []
for address in self._get_if_addresses():
locs.append('{}:{}'.format(address, port))
return 'tcp:{}'.format(','.join(locs))
Expand All @@ -103,7 +103,7 @@ def close(self) -> None:
self._server.server_close()

def _get_if_addresses(self) -> List[str]:
all_addresses = list() # type: List[str]
all_addresses: List[str] = []
ifs = netifaces.interfaces()
for interface in ifs:
addrs = netifaces.ifaddresses(interface)
Expand Down
2 changes: 1 addition & 1 deletion libmuscle/python/libmuscle/mpp_client.py
Expand Up @@ -24,7 +24,7 @@ def __init__(self, locations: List[str]) -> None:
Args:
locations: The peer's location strings
"""
client = None # type: Optional[TransportClient]
client: Optional[TransportClient] = None
for ClientType in transport_client_types:
for location in locations:
if ClientType.can_connect_to(location):
Expand Down
3 changes: 1 addition & 2 deletions libmuscle/python/libmuscle/mpp_message.py
Expand Up @@ -103,8 +103,7 @@ def _decode_grid(code: int, data: bytes) -> Grid:
order = order_map[grid_dict['order']]
shape = tuple(grid_dict['shape'])
dtype = type_map[ExtTypeId(code)]
array = np.ndarray( # type: ignore
shape, dtype, grid_dict['data'], order=order) # type: ignore
array = np.ndarray(shape, dtype, grid_dict['data'], order=order) # type: ignore
indexes = grid_dict['indexes']
if indexes == []:
indexes = None
Expand Down
2 changes: 1 addition & 1 deletion libmuscle/python/libmuscle/outbox.py
Expand Up @@ -10,7 +10,7 @@ class Outbox:
def __init__(self) -> None:
"""Create an empty Outbox.
"""
self.__queue = Queue() # type: Queue[bytes]
self.__queue: Queue[bytes] = Queue()

def is_empty(self) -> bool:
"""Returns True iff the outbox is empty.
Expand Down
2 changes: 1 addition & 1 deletion libmuscle/python/libmuscle/peer_manager.py
Expand Up @@ -34,7 +34,7 @@ def __init__(self, kernel: Reference, index: List[int],
self.__index = index

# peer port ids, indexed by local kernel.port id
self.__peers = dict() # type: Dict[Reference, List[Reference]]
self.__peers: Dict[Reference, List[Reference]] = {}

for conduit in conduits:
if str(conduit.sending_component()) == str(kernel):
Expand Down

0 comments on commit 2b0493b

Please sign in to comment.