From 28cdd221cc180ae89bf20eb5b28a55cddfcf3c26 Mon Sep 17 00:00:00 2001 From: Pavel Kirienko Date: Tue, 21 Dec 2021 22:52:09 +0200 Subject: [PATCH] v1.4 (#197) * Refactor the redundant output session to return optimistically, as soon as the first inferior is done transmitting. The test suite currently generates warnings due to some of the async tasks not being finalized properly. * Fix #147 * Fix doc styling; same problem as in https://github.com/UAVCAN/pydsdl/pull/74\#discussion_r756429790 * Add diagnostics for GNU/Linux test on AppVeyor * Split transmission in SocketCAN loopback test --- .appveyor.yml | 2 + .idea/dictionaries/pavel.xml | 2 + CHANGELOG.rst | 12 ++ docs/static/custom.css | 4 + noxfile.py | 2 +- pyuavcan/VERSION | 2 +- pyuavcan/dsdl/_builtin_form.py | 75 +++++-- pyuavcan/dsdl/_templates/base.j2 | 14 +- pyuavcan/transport/can/media/_frame.py | 2 +- .../can/media/socketcan/_socketcan.py | 4 +- pyuavcan/transport/redundant/__init__.py | 5 + .../redundant/_redundant_transport.py | 2 +- .../transport/redundant/_session/_output.py | 184 ++++++++++++------ pyuavcan/transport/udp/_ip/_link_layer.py | 48 +++-- tests/dsdl/_builtin_form.py | 10 +- tests/transport/can/media/_socketcan.py | 5 + tests/transport/redundant/_redundant.py | 6 +- tests/transport/redundant/_session_output.py | 5 + 18 files changed, 264 insertions(+), 120 deletions(-) diff --git a/.appveyor.yml b/.appveyor.yml index 251cefe86..8ab6b7c96 100644 --- a/.appveyor.yml +++ b/.appveyor.yml @@ -81,6 +81,8 @@ for: test_script: - 'nox --non-interactive --error-on-missing-interpreters --session test pristine --python $PYTHON' - 'nox --non-interactive --session demo check_style docs' + on_finish: + - 'ip link show' # Diagnostics aid - # DEPLOYMENT matrix: diff --git a/.idea/dictionaries/pavel.xml b/.idea/dictionaries/pavel.xml index bda1eef1a..5749471dc 100644 --- a/.idea/dictionaries/pavel.xml +++ b/.idea/dictionaries/pavel.xml @@ -8,6 +8,7 @@ abcdefghi abstractmethods addopts + addr aega afdx allocatee @@ -249,6 +250,7 @@ sphinxarg sssss ssssssss + stacklevel stdint strictification strictify diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 455b970b9..96c2905d8 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -3,6 +3,18 @@ Changelog ========= +v1.4 +---- + +- Behavior of the redundant output session changed: + :meth:`pyuavcan.transport.redundant.RedundantOutputSession.send` returns as soon as at least one inferior is done + transmitting, the slower ones keep transmitting in the background. + In other words, the redundant transport now operates at the rate of the fastest inferior (used to be the slowest one). + +- Implement the DSDL UX improvement described in `#147 `_. + +- Fully adopt PEP 585 in generated code. + v1.3 ---- diff --git a/docs/static/custom.css b/docs/static/custom.css index f9b316e4b..60a73ec7d 100644 --- a/docs/static/custom.css +++ b/docs/static/custom.css @@ -36,6 +36,10 @@ h2 { font-weight: bold; } +.rst-content dl { + display: block !important; +} + .rst-content a { color: #1700b3; } diff --git a/noxfile.py b/noxfile.py index c23416e0b..7fccedf7a 100644 --- a/noxfile.py +++ b/noxfile.py @@ -124,7 +124,7 @@ def test(session): # 2. At least MyPy has to be run separately per Python version we support. # If the interpreter is not CPython, this may need to be conditionally disabled. session.install( - "mypy == 0.910", + "mypy == 0.920", "pylint == 2.12.*", ) session.run("mypy", "--strict", *map(str, src_dirs), str(compiled_dir)) diff --git a/pyuavcan/VERSION b/pyuavcan/VERSION index 3a3cd8cc8..88c5fb891 100644 --- a/pyuavcan/VERSION +++ b/pyuavcan/VERSION @@ -1 +1 @@ -1.3.1 +1.4.0 diff --git a/pyuavcan/dsdl/_builtin_form.py b/pyuavcan/dsdl/_builtin_form.py index ef3b242e5..3359a5d1f 100644 --- a/pyuavcan/dsdl/_builtin_form.py +++ b/pyuavcan/dsdl/_builtin_form.py @@ -3,11 +3,10 @@ # Author: Pavel Kirienko import typing - +import logging import numpy from numpy.typing import NDArray import pydsdl - from ._composite_object import CompositeObject, get_model, get_attribute, set_attribute, get_class from ._composite_object import CompositeObjectTypeVar @@ -93,6 +92,8 @@ def update_from_builtin(destination: CompositeObjectTypeVar, source: typing.Any) Source field names shall match the original unstropped names provided in the DSDL definition; e.g., `if`, not `if_`. If there is more than one variant specified for a union type, the last specified variant takes precedence. + If the structure of the source does not match the destination object, the correct representation + may be deduced automatically as long as it can be done unambiguously. :param destination: The object to update. The update will be done in-place. If you don't want the source object modified, clone it beforehand. @@ -124,23 +125,68 @@ def update_from_builtin(destination: CompositeObjectTypeVar, source: typing.Any) ... } ... }) # doctest: +NORMALIZE_WHITESPACE uavcan.register.Access.Request...name='my.register'...value=[ 1, 2, 42,-10000]... - """ - # UX improvement; see https://github.com/UAVCAN/pyuavcan/issues/116 - if not isinstance(source, dict): - raise TypeError( - f"Invalid format: cannot update an instance of type {type(destination).__name__!r} " - f"from value of type {type(source).__name__!r}" - ) - - source = dict(source) # Create copy to prevent mutation of the original + The following examples showcase positional initialization: + + >>> from uavcan.node import Heartbeat_1 + >>> update_from_builtin(Heartbeat_1(), [123456, 1, 2]) # doctest: +NORMALIZE_WHITESPACE + uavcan.node.Heartbeat.1.0(uptime=123456, + health=uavcan.node.Health.1.0(value=1), + mode=uavcan.node.Mode.1.0(value=2), + vendor_specific_status_code=0) + >>> update_from_builtin(Heartbeat_1(), 123456) # doctest: +NORMALIZE_WHITESPACE + uavcan.node.Heartbeat.1.0(uptime=123456, + health=uavcan.node.Health.1.0(value=0), + mode=uavcan.node.Mode.1.0(value=0), + vendor_specific_status_code=0) + >>> update_from_builtin(Heartbeat_1(), [0, 0, 0, 0, 0]) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + TypeError: ... + + >>> update_from_builtin(uavcan.primitive.array.Real64_1(), 123.456) # doctest: +NORMALIZE_WHITESPACE + uavcan.primitive.array.Real64.1.0(value=[123.456]) + >>> update_from_builtin(uavcan.primitive.array.Real64_1(), [123.456]) # doctest: +NORMALIZE_WHITESPACE + uavcan.primitive.array.Real64.1.0(value=[123.456]) + >>> update_from_builtin(uavcan.primitive.array.Real64_1(), [123.456, -9]) # doctest: +NORMALIZE_WHITESPACE + uavcan.primitive.array.Real64.1.0(value=[123.456, -9. ]) + + >>> update_from_builtin(uavcan.register.Access_1_0.Request(), ["X", {"integer8": 99}]) # Same as the next one! + uavcan.register.Access.Request...name='X'...value=[99]... + >>> update_from_builtin(uavcan.register.Access_1_0.Request(), {'name': 'X', 'value': {'integer8': {'value': [99]}}}) + uavcan.register.Access.Request...name='X'...value=[99]... + """ + _logger.debug("update_from_builtin: destination/source on the next lines:\n%r\n%r", destination, source) if not isinstance(destination, CompositeObject): # pragma: no cover raise TypeError(f"Bad destination: expected a CompositeObject, got {type(destination).__name__}") - model = get_model(destination) _raise_if_service_type(model) + fields = model.fields_except_padding + + # UX improvement: https://github.com/UAVCAN/pyuavcan/issues/147 -- match the source against the data type. + if not isinstance(source, dict): + if not isinstance(source, (list, tuple)): # Assume positional initialization. + source = (source,) + can_propagate = fields and isinstance(fields[0].data_type, (pydsdl.ArrayType, pydsdl.CompositeType)) + too_many_values = len(source) > (1 if isinstance(model.inner_type, pydsdl.UnionType) else len(fields)) + if can_propagate and too_many_values: + _logger.debug( + "update_from_builtin: %d source values cannot be applied to %s but the first field accepts " + "positional initialization -- propagating down", + len(source), + type(destination).__name__, + ) + source = [source] + if len(source) > len(fields): + raise TypeError( + f"Cannot apply {len(source)} values to {len(fields)} fields in {type(destination).__name__}" + ) + source = {f.name: v for f, v in zip(fields, source)} + return update_from_builtin(destination, source) - for f in model.fields_except_padding: + source = dict(source) # Create copy to prevent mutation of the original + + for f in fields: field_type = f.data_type try: value = source.pop(f.name) @@ -182,3 +228,6 @@ def _raise_if_service_type(model: pydsdl.SerializableType) -> None: f"Built-in form is not defined for service types. " f"Did you mean to use Request or Response? Input type: {model}" ) + + +_logger = logging.getLogger(__name__) diff --git a/pyuavcan/dsdl/_templates/base.j2 b/pyuavcan/dsdl/_templates/base.j2 index 6d5540e85..68845ed5e 100644 --- a/pyuavcan/dsdl/_templates/base.j2 +++ b/pyuavcan/dsdl/_templates/base.j2 @@ -20,7 +20,6 @@ from __future__ import annotations import numpy as _np_ from numpy.typing import NDArray as _NDArray_ -from typing import Optional as _Opt_ import pydsdl as _pydsdl_ import pyuavcan.dsdl as _dsdl_ {%- if T.deprecated %} @@ -175,7 +174,7 @@ class {{ name }}(_dsdl_.{%- if type.has_fixed_port_id -%}FixedPort{%- endif -%}C {%- for f in type.fields_except_padding -%} , {{ f|id }}: {{ ''.ljust(type.fields|longest_id_length - f|id|length) -}} - _Opt_[{{ relaxed_type_annotation(f.data_type) }}] = None + None | {{ relaxed_type_annotation(f.data_type) }} = None {%- endfor -%} ) -> None: """ @@ -250,7 +249,7 @@ class {{ name }}(_dsdl_.{%- if type.has_fixed_port_id -%}FixedPort{%- endif -%}C {%- else %} {#- IS UNION (guaranteed to contain at least 2 fields none of which are padding) #} {%- for f in type.fields %} self._{{ f|id }}: {{ ''.ljust(type.fields|longest_id_length - f|id|length) -}} - _Opt_[{{ strict_type_annotation(f.data_type) }}] = None + None | {{ strict_type_annotation(f.data_type) }} = None {%- endfor %} _init_cnt_: int = 0 {% for f in type.fields %} @@ -298,14 +297,7 @@ class {{ name }}(_dsdl_.{%- if type.has_fixed_port_id -%}FixedPort{%- endif -%}C -#} {%- for f in type.fields_except_padding %} @property - def {{ f|id }}(self) -> {# #} - {%- if type.inner_type is UnionType -%} - _Opt_[ - {%- endif -%} - {{ strict_type_annotation(f.data_type) }} - {%- if type.inner_type is UnionType -%} - ] - {%- endif -%}: + def {{ f|id }}(self) -> {{ "None | " * (type.inner_type is UnionType) }}{{ strict_type_annotation(f.data_type) }}: """ {{ f }} {%- if f.data_type is VariableLengthArrayType and f.data_type.string_like %} diff --git a/pyuavcan/transport/can/media/_frame.py b/pyuavcan/transport/can/media/_frame.py index cd033f39f..fe239bb21 100644 --- a/pyuavcan/transport/can/media/_frame.py +++ b/pyuavcan/transport/can/media/_frame.py @@ -75,7 +75,7 @@ class Envelope: _DLC_TO_LENGTH = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64] -_LENGTH_TO_DLC: typing.Dict[int, int] = dict(zip(*list(zip(*enumerate(_DLC_TO_LENGTH)))[::-1])) # type: ignore +_LENGTH_TO_DLC: typing.Dict[int, int] = dict(zip(*list(zip(*enumerate(_DLC_TO_LENGTH)))[::-1])) assert len(_LENGTH_TO_DLC) == 16 == len(_DLC_TO_LENGTH) for item in _DLC_TO_LENGTH: assert _DLC_TO_LENGTH[_LENGTH_TO_DLC[item]] == item, "Invalid DLC tables" diff --git a/pyuavcan/transport/can/media/socketcan/_socketcan.py b/pyuavcan/transport/can/media/socketcan/_socketcan.py index aa0f03fdd..1d693bdbe 100644 --- a/pyuavcan/transport/can/media/socketcan/_socketcan.py +++ b/pyuavcan/transport/can/media/socketcan/_socketcan.py @@ -195,7 +195,7 @@ def handler_wrapper(frs: typing.Sequence[typing.Tuple[Timestamp, Envelope]]) -> def _read_frame(self, ts_mono_ns: int) -> typing.Tuple[Timestamp, Envelope]: while True: - data, ancdata, msg_flags, _addr = self._sock.recvmsg( + data, ancdata, msg_flags, _addr = self._sock.recvmsg( # type: ignore self._native_frame_size, self._ancillary_data_buffer_size ) assert msg_flags & socket.MSG_TRUNC == 0, "The data buffer is not large enough" @@ -295,7 +295,7 @@ class _NativeFrameDataCapacity(enum.IntEnum): _CAN_EFF_MASK = 0x1FFFFFFF -def _make_socket(iface_name: str, can_fd: bool) -> socket.SocketType: +def _make_socket(iface_name: str, can_fd: bool) -> socket.socket: s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) # type: ignore try: s.bind((iface_name,)) diff --git a/pyuavcan/transport/redundant/__init__.py b/pyuavcan/transport/redundant/__init__.py index 6af3e6d5e..3b0f48c1d 100644 --- a/pyuavcan/transport/redundant/__init__.py +++ b/pyuavcan/transport/redundant/__init__.py @@ -365,6 +365,11 @@ >>> tr.protocol_parameters ProtocolParameters(transfer_id_modulo=0, max_nodes=0, mtu=0) +.. doctest:: + :hide: + + >>> doctest_await(asyncio.sleep(1.0)) # Let pending tasks terminate before the loop is closed. + A redundant transport can be used with just one inferior to implement ad-hoc PnP allocation as follows: the transport is set up with an anonymous inferior which is disposed of upon completing the allocation procedure; the new inferior is then installed in the place of the old one configured to use the newly allocated node-ID value. diff --git a/pyuavcan/transport/redundant/_redundant_transport.py b/pyuavcan/transport/redundant/_redundant_transport.py index 1aabc520d..e1c3f64ea 100644 --- a/pyuavcan/transport/redundant/_redundant_transport.py +++ b/pyuavcan/transport/redundant/_redundant_transport.py @@ -330,7 +330,7 @@ def retire() -> None: def _construct_inferior_session(transport: pyuavcan.transport.Transport, owner: RedundantSession) -> None: assert isinstance(transport, pyuavcan.transport.Transport) if isinstance(owner, pyuavcan.transport.InputSession): - inferior = transport.get_input_session(owner.specifier, owner.payload_metadata) + inferior: pyuavcan.transport.Session = transport.get_input_session(owner.specifier, owner.payload_metadata) elif isinstance(owner, pyuavcan.transport.OutputSession): inferior = transport.get_output_session(owner.specifier, owner.payload_metadata) else: diff --git a/pyuavcan/transport/redundant/_session/_output.py b/pyuavcan/transport/redundant/_session/_output.py index 0bdcdf46f..f4faedc43 100644 --- a/pyuavcan/transport/redundant/_session/_output.py +++ b/pyuavcan/transport/redundant/_session/_output.py @@ -2,9 +2,11 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko -import typing +from __future__ import annotations +from typing import Callable, Optional, Sequence import logging import asyncio +import dataclasses import pyuavcan.transport from ._base import RedundantSession, RedundantSessionStatistics @@ -54,6 +56,44 @@ def inferior_session(self) -> pyuavcan.transport.OutputSession: return self._inferior_session +@dataclasses.dataclass(frozen=True) +class _WorkItem: + """ + Send the transfer before the deadline, then notify the future unless it is already canceled. + """ + + transfer: pyuavcan.transport.Transfer + monotonic_deadline: float + future: asyncio.Future[bool] + + +@dataclasses.dataclass(frozen=True) +class _Inferior: + """ + Each inferior runs a dedicated worker task. + The worker takes work items from the queue one by one and attempts to transmit them. + Upon completion (timeout/exception/success) the future is materialized unless cancelled. + """ + + session: pyuavcan.transport.OutputSession + worker: asyncio.Task[None] + queue: asyncio.Queue[_WorkItem] + + def close(self) -> None: + try: + self.session.close() + finally: + if self.worker.done(): + self.worker.result() + else: + self.worker.cancel() + while True: + try: + self.queue.get_nowait().future.cancel() + except asyncio.QueueEmpty: + break + + class RedundantOutputSession(RedundantSession, pyuavcan.transport.OutputSession): """ This is a composite of a group of :class:`pyuavcan.transport.OutputSession`. @@ -65,21 +105,21 @@ def __init__( self, specifier: pyuavcan.transport.OutputSessionSpecifier, payload_metadata: pyuavcan.transport.PayloadMetadata, - finalizer: typing.Callable[[], None], + finalizer: Callable[[], None], ): """ Do not call this directly! Use the factory method instead. """ self._specifier = specifier self._payload_metadata = payload_metadata - self._finalizer: typing.Optional[typing.Callable[[], None]] = finalizer + self._finalizer: Optional[Callable[[], None]] = finalizer assert isinstance(self._specifier, pyuavcan.transport.OutputSessionSpecifier) assert isinstance(self._payload_metadata, pyuavcan.transport.PayloadMetadata) assert callable(self._finalizer) - self._inferiors: typing.List[pyuavcan.transport.OutputSession] = [] - self._feedback_handler: typing.Optional[typing.Callable[[RedundantFeedback], None]] = None - self._idle_send_future: typing.Optional[asyncio.Future[None]] = None + self._inferiors: list[_Inferior] = [] + self._feedback_handler: Optional[Callable[[RedundantFeedback], None]] = None + self._idle_send_future: Optional[asyncio.Future[None]] = None self._lock = asyncio.Lock() self._stat_transfers = 0 @@ -91,17 +131,20 @@ def _add_inferior(self, session: pyuavcan.transport.Session) -> None: assert isinstance(session, pyuavcan.transport.OutputSession) assert self._finalizer is not None, "The session was supposed to be unregistered" assert session.specifier == self.specifier and session.payload_metadata == self.payload_metadata - if session not in self._inferiors: - # Synchronize the feedback state. - if self._feedback_handler is not None: - self._enable_feedback_on_inferior(session) - else: - session.disable_feedback() - # If and only if all went well, add the new inferior to the set. - self._inferiors.append(session) - # Unlock the pending transmission because now we have an inferior to work with. - if self._idle_send_future is not None: - self._idle_send_future.set_result(None) + if session in self.inferiors: + return + # Synchronize the feedback state. + if self._feedback_handler is not None: + self._enable_feedback_on_inferior(session) + else: + session.disable_feedback() + # If all went well, add the new inferior to the set. + que: asyncio.Queue[_WorkItem] = asyncio.Queue() + tsk = asyncio.get_event_loop().create_task(self._inferior_worker_task(session, que)) + self._inferiors.append(_Inferior(session, tsk, que)) + # Unlock the pending transmission because now we have an inferior to work with. + if self._idle_send_future is not None: + self._idle_send_future.set_result(None) def _close_inferior(self, session_index: int) -> None: assert session_index >= 0, "Negative indexes may lead to unexpected side effects" @@ -114,10 +157,10 @@ def _close_inferior(self, session_index: int) -> None: session.close() # May raise. @property - def inferiors(self) -> typing.Sequence[pyuavcan.transport.OutputSession]: - return self._inferiors[:] + def inferiors(self) -> Sequence[pyuavcan.transport.OutputSession]: + return [x.session for x in self._inferiors] - def enable_feedback(self, handler: typing.Callable[[RedundantFeedback], None]) -> None: + def enable_feedback(self, handler: Callable[[RedundantFeedback], None]) -> None: """ The operation is atomic on all inferiors. If at least one inferior fails to enable feedback, all inferiors are rolled back into the disabled state. @@ -126,7 +169,7 @@ def enable_feedback(self, handler: typing.Callable[[RedundantFeedback], None]) - try: self._feedback_handler = handler for ses in self._inferiors: - self._enable_feedback_on_inferior(ses) + self._enable_feedback_on_inferior(ses.session) except Exception as ex: _logger.info("%s could not enable feedback, rolling back into the disabled state: %r", self, ex) self.disable_feedback() @@ -139,14 +182,16 @@ def disable_feedback(self) -> None: self._feedback_handler = None for ses in self._inferiors: try: - ses.disable_feedback() + ses.session.disable_feedback() except Exception as ex: _logger.exception("%s could not disable feedback on %r: %s", self, ses, ex) async def send(self, transfer: pyuavcan.transport.Transfer, monotonic_deadline: float) -> bool: """ Sends the transfer via all of the inferior sessions concurrently. - Returns when all of the inferior calls return and/or raise exceptions. + Returns when the first of the inferior calls succeeds; the remaining will keep sending in the background; + that is, the redundant transport operates at the rate of the fastest inferior, delegating the slower ones + to background tasks. Edge cases: - If there are no inferiors, the method will await until either the deadline is expired @@ -157,8 +202,7 @@ async def send(self, transfer: pyuavcan.transport.Transfer, monotonic_deadline: - If at least one inferior succeeds, True is returned (logical OR). If the other inferiors raise exceptions, they are logged as errors and suppressed. - - If all inferiors raise exceptions, the exception from the first one is propagated, - the rest are logged as errors and suppressed. + - If all inferiors raise exceptions, one of them is propagated, the rest are logged as errors and suppressed. - If all inferiors time out, False is returned (logical OR). @@ -199,31 +243,36 @@ async def send(self, transfer: pyuavcan.transport.Transfer, monotonic_deadline: finally: self._idle_send_future = None assert not self._idle_send_future - if not inferiors: self._stat_drops += 1 return False # Still nothing. - results = await asyncio.gather( - *[ses.send(transfer, monotonic_deadline) for ses in inferiors], return_exceptions=True - ) - assert results and len(results) == len(inferiors) - _logger.debug("%s send results: %s", self, results) - - exceptions = [ex for ex in results if isinstance(ex, Exception)] - - # Result consolidation logic as described in the doc. - if exceptions: - # Taking great efforts to make the error message very understandable to the user. - _logger.error( # pylint: disable=logging-not-lazy - f"{self}: {len(exceptions)} of {len(results)} inferiors have failed: " - + ", ".join(f"{i}:{self._describe_send_result(r)}" for i, r in enumerate(results)) - ) - if len(exceptions) >= len(results): - self._stat_errors += 1 - raise exceptions[0] - - if any(x is True for x in results): + # We have at least one inferior so we can handle this transaction. Create the work items. + futures: list[asyncio.Future[bool]] = [] + for inf in self._inferiors: + fut: asyncio.Future[bool] = asyncio.Future() + inf.queue.put_nowait(_WorkItem(transfer, monotonic_deadline, fut)) + futures.append(fut) + + # Execute the work items concurrently and unblock as soon as the first inferior is done transmitting. + # Those that are still pending are cancelled because we're not going to wait around for the slow ones. + done, pending = await asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED) + assert len(done) + len(pending) == len(inferiors) == len(futures) + _logger.debug("%s send results: done=%s, pending=%s", self, done, pending) + for p in pending: + p.cancel() # We will no longer need this. + + # Extract the results to determine the final outcome of the transaction. + results = [x.result() for x in done if x.exception() is None] + exceptions = [x.exception() for x in done if x.exception() is not None] + assert 0 < (len(results) + len(exceptions)) <= len(inferiors) # Some tasks may be not yet done. + assert not results or all(isinstance(x, bool) for x in results) + if exceptions and not results: + self._stat_errors += 1 + exc = exceptions[0] + assert isinstance(exc, BaseException) + raise exc + if results and any(results): self._stat_transfers += 1 self._stat_payload_bytes += sum(map(len, transfer.fragmented_payload)) return True @@ -247,7 +296,7 @@ def sample_statistics(self) -> RedundantSessionStatistics: - ``frames`` - the total number of frames summed from all inferiors (i.e., replicated frame count). This value is invalidated when the set of inferiors is changed. The semantics may change later. """ - inferiors = [s.sample_statistics() for s in self._inferiors] + inferiors = [s.session.sample_statistics() for s in self._inferiors] return RedundantSessionStatistics( transfers=self._stat_transfers, frames=sum(s.frames for s in inferiors), @@ -269,6 +318,37 @@ def close(self) -> None: if fin is not None: fin() + async def _inferior_worker_task(self, ses: pyuavcan.transport.OutputSession, que: asyncio.Queue[_WorkItem]) -> None: + try: + _logger.debug("%s: Task for inferior %r is starting", self, ses) + while self._finalizer: + wrk = await que.get() + try: + result = await ses.send(wrk.transfer, wrk.monotonic_deadline) + except (asyncio.CancelledError, pyuavcan.transport.ResourceClosedError): + break # Do not cancel the future because we don't want to unblock the master task. + except Exception as ex: + _logger.error("%s: Inferior %r failed: %s: %s", self, ses, type(ex).__name__, ex) + _logger.debug("%s: Stack trace for the above inferior failure:", self, exc_info=True) + if not wrk.future.done(): + wrk.future.set_exception(ex) + else: + _logger.debug( + "%s: Inferior %r send result: %s; future %s", + self, + ses, + "success" if result else "timeout", + wrk.future, + ) + if not wrk.future.done(): + wrk.future.set_result(result) + except (asyncio.CancelledError, pyuavcan.transport.ResourceClosedError): + pass + except Exception as ex: + _logger.exception("%s: Task for %r has encountered an unhandled exception: %s", self, ses, ex) + finally: + _logger.debug("%s: Task for %r is stopping", self, ses) + def _enable_feedback_on_inferior(self, inferior_session: pyuavcan.transport.OutputSession) -> None: def proxy(fb: pyuavcan.transport.Feedback) -> None: """ @@ -276,7 +356,7 @@ def proxy(fb: pyuavcan.transport.Feedback) -> None: constructs a higher-level redundant feedback instance from it, and then passes it along to the higher-level handler. """ - if inferior_session not in self._inferiors: + if inferior_session not in self.inferiors: _logger.warning( "%s got unexpected feedback %s from %s which is not a registered inferior. " "The transport or its underlying software or hardware are probably misbehaving, " @@ -298,11 +378,3 @@ def proxy(fb: pyuavcan.transport.Feedback) -> None: _logger.debug("%s ignoring unattended feedback %r from %r", self, fb, inferior_session) inferior_session.enable_feedback(proxy) - - @staticmethod - def _describe_send_result(result: typing.Union[bool, Exception]) -> str: - if isinstance(result, Exception): - return repr(result) - if isinstance(result, bool): - return "success" if result else "timeout" - assert False diff --git a/pyuavcan/transport/udp/_ip/_link_layer.py b/pyuavcan/transport/udp/_ip/_link_layer.py index 54b5df074..df83e5f0c 100644 --- a/pyuavcan/transport/udp/_ip/_link_layer.py +++ b/pyuavcan/transport/udp/_ip/_link_layer.py @@ -5,7 +5,7 @@ from __future__ import annotations import sys import time -import typing +from typing import Callable, Any, Optional, cast, Sequence import ctypes import socket import logging @@ -100,7 +100,7 @@ class LinkLayerSniffer: - https://github.com/karpierz/libpcap/blob/master/tests/capturetest.py """ - def __init__(self, filter_expression: str, callback: typing.Callable[[LinkLayerCapture], None]) -> None: + def __init__(self, filter_expression: str, callback: Callable[[LinkLayerCapture], None]) -> None: """ :param filter_expression: The standard pcap filter expression; see https://www.tcpdump.org/manpages/pcap-filter.7.html. @@ -117,7 +117,7 @@ def __init__(self, filter_expression: str, callback: typing.Callable[[LinkLayerC self._filter_expr = str(filter_expression) self._callback = callback self._keep_going = True - self._workers: typing.List[threading.Thread] = [] + self._workers: list[threading.Thread] = [] try: dev_names = _find_devices() _logger.debug("Capturable network devices: %s", dev_names) @@ -175,7 +175,7 @@ def _thread_worker(self, name: str, pd: object, decoder: PacketDecoder) -> None: # noinspection PyTypeChecker @pcap.pcap_handler # type: ignore - def proxy(_: object, header: ctypes.Structure, packet: typing.Any) -> None: + def proxy(_: object, header: ctypes.Structure, packet: Any) -> None: # Parse the header, extract the timestamp and the packet length. header = header.contents ts_ns = (header.ts.tv_sec * 1_000_000 + header.ts.tv_usec) * 1000 @@ -247,11 +247,11 @@ def __repr__(self) -> str: ) -PacketEncoder = typing.Callable[["LinkLayerPacket"], typing.Optional[memoryview]] -PacketDecoder = typing.Callable[[memoryview], typing.Optional["LinkLayerPacket"]] +PacketEncoder = Callable[["LinkLayerPacket"], Optional[memoryview]] +PacketDecoder = Callable[[memoryview], Optional["LinkLayerPacket"]] -def _get_codecs() -> typing.Dict[int, typing.Tuple[PacketEncoder, PacketDecoder]]: +def _get_codecs() -> dict[int, tuple[PacketEncoder, PacketDecoder]]: """ A factory of paired encode/decode functions that are used for building and parsing link-layer packets. The pairs are organized into a dict where the key is the data link type code from libpcap; @@ -265,7 +265,7 @@ def _get_codecs() -> typing.Dict[int, typing.Tuple[PacketEncoder, PacketDecoder] import libpcap as pcap from socket import AddressFamily - def get_ethernet() -> typing.Tuple[PacketEncoder, PacketDecoder]: + def get_ethernet() -> tuple[PacketEncoder, PacketDecoder]: # https://en.wikipedia.org/wiki/EtherType af_to_ethertype = { AddressFamily.AF_INET: 0x0800, @@ -273,7 +273,7 @@ def get_ethernet() -> typing.Tuple[PacketEncoder, PacketDecoder]: } ethertype_to_af = {v: k for k, v in af_to_ethertype.items()} - def enc(p: LinkLayerPacket) -> typing.Optional[memoryview]: + def enc(p: LinkLayerPacket) -> Optional[memoryview]: try: return memoryview( b"".join( @@ -288,7 +288,7 @@ def enc(p: LinkLayerPacket) -> typing.Optional[memoryview]: except LookupError: return None - def dec(p: memoryview) -> typing.Optional[LinkLayerPacket]: + def dec(p: memoryview) -> Optional[LinkLayerPacket]: if len(p) < 14: return None src = p[0:6] @@ -302,17 +302,17 @@ def dec(p: memoryview) -> typing.Optional[LinkLayerPacket]: return enc, dec - def get_loopback(byte_order: str) -> typing.Tuple[PacketEncoder, PacketDecoder]: + def get_loopback(byte_order: str) -> tuple[PacketEncoder, PacketDecoder]: # DLT_NULL is used by the Windows loopback interface. Info: https://wiki.wireshark.org/NullLoopback # The source and destination addresses are not representable in this data link layer. - def enc(p: LinkLayerPacket) -> typing.Optional[memoryview]: - return memoryview(b"".join((p.protocol.to_bytes(4, byte_order), p.payload))) + def enc(p: LinkLayerPacket) -> Optional[memoryview]: + return memoryview(b"".join((p.protocol.to_bytes(4, byte_order), p.payload))) # type: ignore - def dec(p: memoryview) -> typing.Optional[LinkLayerPacket]: + def dec(p: memoryview) -> Optional[LinkLayerPacket]: if len(p) < 4: return None try: - protocol = AddressFamily(int.from_bytes(p[0:4], byte_order)) + protocol = AddressFamily(int.from_bytes(p[0:4], byte_order)) # type: ignore except ValueError: return None empty = memoryview(b"") @@ -328,7 +328,7 @@ def dec(p: memoryview) -> typing.Optional[LinkLayerPacket]: } -def _find_devices() -> typing.List[str]: +def _find_devices() -> list[str]: """ Returns a list of local network devices that can be captured from. Raises a PermissionError if the user is suspected to lack the privileges necessary for capture. @@ -351,8 +351,8 @@ def _find_devices() -> typing.List[str]: # because, for example, that process does not have sufficient privileges to open them for capturing; # if so, those devices will not appear on the list. raise PermissionError("No capturable devices have been found. Do you have the required privileges?") - dev_names: typing.List[str] = [] - d = typing.cast(ctypes.Structure, devices) + dev_names: list[str] = [] + d = cast(ctypes.Structure, devices) while d: d = d.contents name = d.name.decode() @@ -365,9 +365,7 @@ def _find_devices() -> typing.List[str]: return dev_names -def _capture_all( - device_names: typing.List[str], filter_expression: str -) -> typing.List[typing.Tuple[str, object, PacketDecoder]]: +def _capture_all(device_names: list[str], filter_expression: str) -> list[tuple[str, object, PacketDecoder]]: """ Begin capture on all devices in promiscuous mode. We can't use "any" because libpcap does not support promiscuous mode with it, as stated in the docs and here: @@ -378,7 +376,7 @@ def _capture_all( import libpcap as pcap codecs = _get_codecs() - caps: typing.List[typing.Tuple[str, object, PacketDecoder]] = [] + caps: list[tuple[str, object, PacketDecoder]] = [] try: for name in device_names: pd = _capture_single_device(name, filter_expression, list(codecs.keys())) @@ -410,9 +408,7 @@ def _capture_all( return caps -def _capture_single_device( - device: str, filter_expression: str, data_link_hints: typing.Sequence[int] -) -> typing.Optional[object]: +def _capture_single_device(device: str, filter_expression: str, data_link_hints: Sequence[int]) -> Optional[object]: """ Returns None if the interface managed by this device is not up or if it cannot be captured from for other reasons. On GNU/Linux, some virtual devices (like netfilter devices) can only be accessed by a superuser. @@ -498,7 +494,7 @@ def status_to_str(error_code: int) -> str: except Exception: pcap.close(pd) raise - return typing.cast(object, pd) + return cast(object, pd) _SNAPSHOT_LENGTH = 65535 diff --git a/tests/dsdl/_builtin_form.py b/tests/dsdl/_builtin_form.py index 38d85522c..5274fe1ac 100644 --- a/tests/dsdl/_builtin_form.py +++ b/tests/dsdl/_builtin_form.py @@ -126,13 +126,9 @@ def _unittest_slow_builtin_form_automatic(compiled: typing.List[pyuavcan.dsdl.Ge # noinspection PyUnusedLocal -def _unittest_issue_116(compiled: typing.List[pyuavcan.dsdl.GeneratedPackageInfo]) -> None: +def _unittest_issue_147(compiled: typing.List[pyuavcan.dsdl.GeneratedPackageInfo]) -> None: from uavcan.register import Access_1_0 - valid = pyuavcan.dsdl.update_from_builtin(Access_1_0.Request(), {"name": {"name": "uavcan.pub.measurement"}}) + # Automatic promotion https://github.com/UAVCAN/pyuavcan/issues/147 + valid = pyuavcan.dsdl.update_from_builtin(Access_1_0.Request(), "uavcan.pub.measurement") assert valid.name.name.tobytes().decode() == "uavcan.pub.measurement" - with pytest.raises(TypeError) as ex: - pyuavcan.dsdl.update_from_builtin(Access_1_0.Request(), {"name": "uavcan.pub.measurement"}) - print("Exception message:", ex) - assert "str" in str(ex) - assert "Name_1_" in str(ex) diff --git a/tests/transport/can/media/_socketcan.py b/tests/transport/can/media/_socketcan.py index ad9aba915..c84e58aa2 100644 --- a/tests/transport/can/media/_socketcan.py +++ b/tests/transport/can/media/_socketcan.py @@ -67,6 +67,11 @@ def on_rx_b(frames: typing.Iterable[typing.Tuple[Timestamp, Envelope]]) -> None: [ Envelope(DataFrame(FrameFormat.BASE, 0x123, bytearray(range(6))), loopback=True), Envelope(DataFrame(FrameFormat.EXTENDED, 0x1BADC0FE, bytearray(range(8))), loopback=True), + ], + asyncio.get_event_loop().time() + 1.0, + ) + await media_a.send( + [ Envelope(DataFrame(FrameFormat.EXTENDED, 0x1FF45678, bytearray(range(0))), loopback=False), ], asyncio.get_event_loop().time() + 1.0, diff --git a/tests/transport/redundant/_redundant.py b/tests/transport/redundant/_redundant.py index d10614796..c23890f40 100644 --- a/tests/transport/redundant/_redundant.py +++ b/tests/transport/redundant/_redundant.py @@ -452,7 +452,8 @@ async def wait(how_many: int) -> None: await asyncio.sleep(1.0) -def _unittest_redundant_transport_reconfiguration() -> None: +@pytest.mark.asyncio +async def _unittest_redundant_transport_reconfiguration() -> None: from pyuavcan.transport import OutputSessionSpecifier, MessageDataSpecifier, PayloadMetadata tr = RedundantTransport() @@ -466,3 +467,6 @@ def _unittest_redundant_transport_reconfiguration() -> None: with pytest.raises(pyuavcan.transport.OperationNotDefinedForAnonymousNodeError): tr.attach_inferior(LoopbackTransport(None, allow_anonymous_transfers=False)) assert len(tr.inferiors) == 1 + + tr.close() + await asyncio.sleep(2.0) diff --git a/tests/transport/redundant/_session_output.py b/tests/transport/redundant/_session_output.py index e875933b3..087749675 100644 --- a/tests/transport/redundant/_session_output.py +++ b/tests/transport/redundant/_session_output.py @@ -194,6 +194,7 @@ async def add_inferior(inferior: pyuavcan.transport.OutputSession) -> None: ], ) assert len(feedback) == 2 + feedback.sort(key=lambda x: x.inferior_session is not inf_a) # Ensure consistent ordering assert feedback[0].inferior_session is inf_a assert feedback[0].original_transfer_timestamp == ts assert ts.system <= feedback[0].first_frame_transmission_timestamp.system <= time.time() @@ -324,6 +325,8 @@ async def add_inferior(inferior: pyuavcan.transport.OutputSession) -> None: assert None is await (rx_a.receive(loop.time() + 1)) assert None is await (rx_b.receive(loop.time() + 1)) + await asyncio.sleep(2.0) + async def _unittest_redundant_output_exceptions(caplog: typing.Any) -> None: loop = asyncio.get_event_loop() @@ -450,3 +453,5 @@ def retire() -> None: is_retired = False ses.close() assert not is_retired + + await asyncio.sleep(2.0)