diff --git a/.appveyor.yml b/.appveyor.yml
index 251cefe86..8ab6b7c96 100644
--- a/.appveyor.yml
+++ b/.appveyor.yml
@@ -81,6 +81,8 @@ for:
test_script:
- 'nox --non-interactive --error-on-missing-interpreters --session test pristine --python $PYTHON'
- 'nox --non-interactive --session demo check_style docs'
+ on_finish:
+ - 'ip link show' # Diagnostics aid
- # DEPLOYMENT
matrix:
diff --git a/.idea/dictionaries/pavel.xml b/.idea/dictionaries/pavel.xml
index bda1eef1a..5749471dc 100644
--- a/.idea/dictionaries/pavel.xml
+++ b/.idea/dictionaries/pavel.xml
@@ -8,6 +8,7 @@
abcdefghi
abstractmethods
addopts
+ addr
aega
afdx
allocatee
@@ -249,6 +250,7 @@
sphinxarg
sssss
ssssssss
+ stacklevel
stdint
strictification
strictify
diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index 455b970b9..96c2905d8 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -3,6 +3,18 @@
Changelog
=========
+v1.4
+----
+
+- Behavior of the redundant output session changed:
+ :meth:`pyuavcan.transport.redundant.RedundantOutputSession.send` returns as soon as at least one inferior is done
+ transmitting, the slower ones keep transmitting in the background.
+ In other words, the redundant transport now operates at the rate of the fastest inferior (used to be the slowest one).
+
+- Implement the DSDL UX improvement described in `#147 `_.
+
+- Fully adopt PEP 585 in generated code.
+
v1.3
----
diff --git a/docs/static/custom.css b/docs/static/custom.css
index f9b316e4b..60a73ec7d 100644
--- a/docs/static/custom.css
+++ b/docs/static/custom.css
@@ -36,6 +36,10 @@ h2 {
font-weight: bold;
}
+.rst-content dl {
+ display: block !important;
+}
+
.rst-content a {
color: #1700b3;
}
diff --git a/noxfile.py b/noxfile.py
index c23416e0b..7fccedf7a 100644
--- a/noxfile.py
+++ b/noxfile.py
@@ -124,7 +124,7 @@ def test(session):
# 2. At least MyPy has to be run separately per Python version we support.
# If the interpreter is not CPython, this may need to be conditionally disabled.
session.install(
- "mypy == 0.910",
+ "mypy == 0.920",
"pylint == 2.12.*",
)
session.run("mypy", "--strict", *map(str, src_dirs), str(compiled_dir))
diff --git a/pyuavcan/VERSION b/pyuavcan/VERSION
index 3a3cd8cc8..88c5fb891 100644
--- a/pyuavcan/VERSION
+++ b/pyuavcan/VERSION
@@ -1 +1 @@
-1.3.1
+1.4.0
diff --git a/pyuavcan/dsdl/_builtin_form.py b/pyuavcan/dsdl/_builtin_form.py
index ef3b242e5..3359a5d1f 100644
--- a/pyuavcan/dsdl/_builtin_form.py
+++ b/pyuavcan/dsdl/_builtin_form.py
@@ -3,11 +3,10 @@
# Author: Pavel Kirienko
import typing
-
+import logging
import numpy
from numpy.typing import NDArray
import pydsdl
-
from ._composite_object import CompositeObject, get_model, get_attribute, set_attribute, get_class
from ._composite_object import CompositeObjectTypeVar
@@ -93,6 +92,8 @@ def update_from_builtin(destination: CompositeObjectTypeVar, source: typing.Any)
Source field names shall match the original unstropped names provided in the DSDL definition;
e.g., `if`, not `if_`. If there is more than one variant specified for a union type, the last
specified variant takes precedence.
+ If the structure of the source does not match the destination object, the correct representation
+ may be deduced automatically as long as it can be done unambiguously.
:param destination: The object to update. The update will be done in-place. If you don't want the source
object modified, clone it beforehand.
@@ -124,23 +125,68 @@ def update_from_builtin(destination: CompositeObjectTypeVar, source: typing.Any)
... }
... }) # doctest: +NORMALIZE_WHITESPACE
uavcan.register.Access.Request...name='my.register'...value=[ 1, 2, 42,-10000]...
- """
- # UX improvement; see https://github.com/UAVCAN/pyuavcan/issues/116
- if not isinstance(source, dict):
- raise TypeError(
- f"Invalid format: cannot update an instance of type {type(destination).__name__!r} "
- f"from value of type {type(source).__name__!r}"
- )
-
- source = dict(source) # Create copy to prevent mutation of the original
+ The following examples showcase positional initialization:
+
+ >>> from uavcan.node import Heartbeat_1
+ >>> update_from_builtin(Heartbeat_1(), [123456, 1, 2]) # doctest: +NORMALIZE_WHITESPACE
+ uavcan.node.Heartbeat.1.0(uptime=123456,
+ health=uavcan.node.Health.1.0(value=1),
+ mode=uavcan.node.Mode.1.0(value=2),
+ vendor_specific_status_code=0)
+ >>> update_from_builtin(Heartbeat_1(), 123456) # doctest: +NORMALIZE_WHITESPACE
+ uavcan.node.Heartbeat.1.0(uptime=123456,
+ health=uavcan.node.Health.1.0(value=0),
+ mode=uavcan.node.Mode.1.0(value=0),
+ vendor_specific_status_code=0)
+ >>> update_from_builtin(Heartbeat_1(), [0, 0, 0, 0, 0]) # doctest: +IGNORE_EXCEPTION_DETAIL
+ Traceback (most recent call last):
+ ...
+ TypeError: ...
+
+ >>> update_from_builtin(uavcan.primitive.array.Real64_1(), 123.456) # doctest: +NORMALIZE_WHITESPACE
+ uavcan.primitive.array.Real64.1.0(value=[123.456])
+ >>> update_from_builtin(uavcan.primitive.array.Real64_1(), [123.456]) # doctest: +NORMALIZE_WHITESPACE
+ uavcan.primitive.array.Real64.1.0(value=[123.456])
+ >>> update_from_builtin(uavcan.primitive.array.Real64_1(), [123.456, -9]) # doctest: +NORMALIZE_WHITESPACE
+ uavcan.primitive.array.Real64.1.0(value=[123.456, -9. ])
+
+ >>> update_from_builtin(uavcan.register.Access_1_0.Request(), ["X", {"integer8": 99}]) # Same as the next one!
+ uavcan.register.Access.Request...name='X'...value=[99]...
+ >>> update_from_builtin(uavcan.register.Access_1_0.Request(), {'name': 'X', 'value': {'integer8': {'value': [99]}}})
+ uavcan.register.Access.Request...name='X'...value=[99]...
+ """
+ _logger.debug("update_from_builtin: destination/source on the next lines:\n%r\n%r", destination, source)
if not isinstance(destination, CompositeObject): # pragma: no cover
raise TypeError(f"Bad destination: expected a CompositeObject, got {type(destination).__name__}")
-
model = get_model(destination)
_raise_if_service_type(model)
+ fields = model.fields_except_padding
+
+ # UX improvement: https://github.com/UAVCAN/pyuavcan/issues/147 -- match the source against the data type.
+ if not isinstance(source, dict):
+ if not isinstance(source, (list, tuple)): # Assume positional initialization.
+ source = (source,)
+ can_propagate = fields and isinstance(fields[0].data_type, (pydsdl.ArrayType, pydsdl.CompositeType))
+ too_many_values = len(source) > (1 if isinstance(model.inner_type, pydsdl.UnionType) else len(fields))
+ if can_propagate and too_many_values:
+ _logger.debug(
+ "update_from_builtin: %d source values cannot be applied to %s but the first field accepts "
+ "positional initialization -- propagating down",
+ len(source),
+ type(destination).__name__,
+ )
+ source = [source]
+ if len(source) > len(fields):
+ raise TypeError(
+ f"Cannot apply {len(source)} values to {len(fields)} fields in {type(destination).__name__}"
+ )
+ source = {f.name: v for f, v in zip(fields, source)}
+ return update_from_builtin(destination, source)
- for f in model.fields_except_padding:
+ source = dict(source) # Create copy to prevent mutation of the original
+
+ for f in fields:
field_type = f.data_type
try:
value = source.pop(f.name)
@@ -182,3 +228,6 @@ def _raise_if_service_type(model: pydsdl.SerializableType) -> None:
f"Built-in form is not defined for service types. "
f"Did you mean to use Request or Response? Input type: {model}"
)
+
+
+_logger = logging.getLogger(__name__)
diff --git a/pyuavcan/dsdl/_templates/base.j2 b/pyuavcan/dsdl/_templates/base.j2
index 6d5540e85..68845ed5e 100644
--- a/pyuavcan/dsdl/_templates/base.j2
+++ b/pyuavcan/dsdl/_templates/base.j2
@@ -20,7 +20,6 @@
from __future__ import annotations
import numpy as _np_
from numpy.typing import NDArray as _NDArray_
-from typing import Optional as _Opt_
import pydsdl as _pydsdl_
import pyuavcan.dsdl as _dsdl_
{%- if T.deprecated %}
@@ -175,7 +174,7 @@ class {{ name }}(_dsdl_.{%- if type.has_fixed_port_id -%}FixedPort{%- endif -%}C
{%- for f in type.fields_except_padding -%}
,
{{ f|id }}: {{ ''.ljust(type.fields|longest_id_length - f|id|length) -}}
- _Opt_[{{ relaxed_type_annotation(f.data_type) }}] = None
+ None | {{ relaxed_type_annotation(f.data_type) }} = None
{%- endfor -%}
) -> None:
"""
@@ -250,7 +249,7 @@ class {{ name }}(_dsdl_.{%- if type.has_fixed_port_id -%}FixedPort{%- endif -%}C
{%- else %} {#- IS UNION (guaranteed to contain at least 2 fields none of which are padding) #}
{%- for f in type.fields %}
self._{{ f|id }}: {{ ''.ljust(type.fields|longest_id_length - f|id|length) -}}
- _Opt_[{{ strict_type_annotation(f.data_type) }}] = None
+ None | {{ strict_type_annotation(f.data_type) }} = None
{%- endfor %}
_init_cnt_: int = 0
{% for f in type.fields %}
@@ -298,14 +297,7 @@ class {{ name }}(_dsdl_.{%- if type.has_fixed_port_id -%}FixedPort{%- endif -%}C
-#}
{%- for f in type.fields_except_padding %}
@property
- def {{ f|id }}(self) -> {# #}
- {%- if type.inner_type is UnionType -%}
- _Opt_[
- {%- endif -%}
- {{ strict_type_annotation(f.data_type) }}
- {%- if type.inner_type is UnionType -%}
- ]
- {%- endif -%}:
+ def {{ f|id }}(self) -> {{ "None | " * (type.inner_type is UnionType) }}{{ strict_type_annotation(f.data_type) }}:
"""
{{ f }}
{%- if f.data_type is VariableLengthArrayType and f.data_type.string_like %}
diff --git a/pyuavcan/transport/can/media/_frame.py b/pyuavcan/transport/can/media/_frame.py
index cd033f39f..fe239bb21 100644
--- a/pyuavcan/transport/can/media/_frame.py
+++ b/pyuavcan/transport/can/media/_frame.py
@@ -75,7 +75,7 @@ class Envelope:
_DLC_TO_LENGTH = [0, 1, 2, 3, 4, 5, 6, 7, 8, 12, 16, 20, 24, 32, 48, 64]
-_LENGTH_TO_DLC: typing.Dict[int, int] = dict(zip(*list(zip(*enumerate(_DLC_TO_LENGTH)))[::-1])) # type: ignore
+_LENGTH_TO_DLC: typing.Dict[int, int] = dict(zip(*list(zip(*enumerate(_DLC_TO_LENGTH)))[::-1]))
assert len(_LENGTH_TO_DLC) == 16 == len(_DLC_TO_LENGTH)
for item in _DLC_TO_LENGTH:
assert _DLC_TO_LENGTH[_LENGTH_TO_DLC[item]] == item, "Invalid DLC tables"
diff --git a/pyuavcan/transport/can/media/socketcan/_socketcan.py b/pyuavcan/transport/can/media/socketcan/_socketcan.py
index aa0f03fdd..1d693bdbe 100644
--- a/pyuavcan/transport/can/media/socketcan/_socketcan.py
+++ b/pyuavcan/transport/can/media/socketcan/_socketcan.py
@@ -195,7 +195,7 @@ def handler_wrapper(frs: typing.Sequence[typing.Tuple[Timestamp, Envelope]]) ->
def _read_frame(self, ts_mono_ns: int) -> typing.Tuple[Timestamp, Envelope]:
while True:
- data, ancdata, msg_flags, _addr = self._sock.recvmsg(
+ data, ancdata, msg_flags, _addr = self._sock.recvmsg( # type: ignore
self._native_frame_size, self._ancillary_data_buffer_size
)
assert msg_flags & socket.MSG_TRUNC == 0, "The data buffer is not large enough"
@@ -295,7 +295,7 @@ class _NativeFrameDataCapacity(enum.IntEnum):
_CAN_EFF_MASK = 0x1FFFFFFF
-def _make_socket(iface_name: str, can_fd: bool) -> socket.SocketType:
+def _make_socket(iface_name: str, can_fd: bool) -> socket.socket:
s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) # type: ignore
try:
s.bind((iface_name,))
diff --git a/pyuavcan/transport/redundant/__init__.py b/pyuavcan/transport/redundant/__init__.py
index 6af3e6d5e..3b0f48c1d 100644
--- a/pyuavcan/transport/redundant/__init__.py
+++ b/pyuavcan/transport/redundant/__init__.py
@@ -365,6 +365,11 @@
>>> tr.protocol_parameters
ProtocolParameters(transfer_id_modulo=0, max_nodes=0, mtu=0)
+.. doctest::
+ :hide:
+
+ >>> doctest_await(asyncio.sleep(1.0)) # Let pending tasks terminate before the loop is closed.
+
A redundant transport can be used with just one inferior to implement ad-hoc PnP allocation as follows:
the transport is set up with an anonymous inferior which is disposed of upon completing the allocation procedure;
the new inferior is then installed in the place of the old one configured to use the newly allocated node-ID value.
diff --git a/pyuavcan/transport/redundant/_redundant_transport.py b/pyuavcan/transport/redundant/_redundant_transport.py
index 1aabc520d..e1c3f64ea 100644
--- a/pyuavcan/transport/redundant/_redundant_transport.py
+++ b/pyuavcan/transport/redundant/_redundant_transport.py
@@ -330,7 +330,7 @@ def retire() -> None:
def _construct_inferior_session(transport: pyuavcan.transport.Transport, owner: RedundantSession) -> None:
assert isinstance(transport, pyuavcan.transport.Transport)
if isinstance(owner, pyuavcan.transport.InputSession):
- inferior = transport.get_input_session(owner.specifier, owner.payload_metadata)
+ inferior: pyuavcan.transport.Session = transport.get_input_session(owner.specifier, owner.payload_metadata)
elif isinstance(owner, pyuavcan.transport.OutputSession):
inferior = transport.get_output_session(owner.specifier, owner.payload_metadata)
else:
diff --git a/pyuavcan/transport/redundant/_session/_output.py b/pyuavcan/transport/redundant/_session/_output.py
index 0bdcdf46f..f4faedc43 100644
--- a/pyuavcan/transport/redundant/_session/_output.py
+++ b/pyuavcan/transport/redundant/_session/_output.py
@@ -2,9 +2,11 @@
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko
-import typing
+from __future__ import annotations
+from typing import Callable, Optional, Sequence
import logging
import asyncio
+import dataclasses
import pyuavcan.transport
from ._base import RedundantSession, RedundantSessionStatistics
@@ -54,6 +56,44 @@ def inferior_session(self) -> pyuavcan.transport.OutputSession:
return self._inferior_session
+@dataclasses.dataclass(frozen=True)
+class _WorkItem:
+ """
+ Send the transfer before the deadline, then notify the future unless it is already canceled.
+ """
+
+ transfer: pyuavcan.transport.Transfer
+ monotonic_deadline: float
+ future: asyncio.Future[bool]
+
+
+@dataclasses.dataclass(frozen=True)
+class _Inferior:
+ """
+ Each inferior runs a dedicated worker task.
+ The worker takes work items from the queue one by one and attempts to transmit them.
+ Upon completion (timeout/exception/success) the future is materialized unless cancelled.
+ """
+
+ session: pyuavcan.transport.OutputSession
+ worker: asyncio.Task[None]
+ queue: asyncio.Queue[_WorkItem]
+
+ def close(self) -> None:
+ try:
+ self.session.close()
+ finally:
+ if self.worker.done():
+ self.worker.result()
+ else:
+ self.worker.cancel()
+ while True:
+ try:
+ self.queue.get_nowait().future.cancel()
+ except asyncio.QueueEmpty:
+ break
+
+
class RedundantOutputSession(RedundantSession, pyuavcan.transport.OutputSession):
"""
This is a composite of a group of :class:`pyuavcan.transport.OutputSession`.
@@ -65,21 +105,21 @@ def __init__(
self,
specifier: pyuavcan.transport.OutputSessionSpecifier,
payload_metadata: pyuavcan.transport.PayloadMetadata,
- finalizer: typing.Callable[[], None],
+ finalizer: Callable[[], None],
):
"""
Do not call this directly! Use the factory method instead.
"""
self._specifier = specifier
self._payload_metadata = payload_metadata
- self._finalizer: typing.Optional[typing.Callable[[], None]] = finalizer
+ self._finalizer: Optional[Callable[[], None]] = finalizer
assert isinstance(self._specifier, pyuavcan.transport.OutputSessionSpecifier)
assert isinstance(self._payload_metadata, pyuavcan.transport.PayloadMetadata)
assert callable(self._finalizer)
- self._inferiors: typing.List[pyuavcan.transport.OutputSession] = []
- self._feedback_handler: typing.Optional[typing.Callable[[RedundantFeedback], None]] = None
- self._idle_send_future: typing.Optional[asyncio.Future[None]] = None
+ self._inferiors: list[_Inferior] = []
+ self._feedback_handler: Optional[Callable[[RedundantFeedback], None]] = None
+ self._idle_send_future: Optional[asyncio.Future[None]] = None
self._lock = asyncio.Lock()
self._stat_transfers = 0
@@ -91,17 +131,20 @@ def _add_inferior(self, session: pyuavcan.transport.Session) -> None:
assert isinstance(session, pyuavcan.transport.OutputSession)
assert self._finalizer is not None, "The session was supposed to be unregistered"
assert session.specifier == self.specifier and session.payload_metadata == self.payload_metadata
- if session not in self._inferiors:
- # Synchronize the feedback state.
- if self._feedback_handler is not None:
- self._enable_feedback_on_inferior(session)
- else:
- session.disable_feedback()
- # If and only if all went well, add the new inferior to the set.
- self._inferiors.append(session)
- # Unlock the pending transmission because now we have an inferior to work with.
- if self._idle_send_future is not None:
- self._idle_send_future.set_result(None)
+ if session in self.inferiors:
+ return
+ # Synchronize the feedback state.
+ if self._feedback_handler is not None:
+ self._enable_feedback_on_inferior(session)
+ else:
+ session.disable_feedback()
+ # If all went well, add the new inferior to the set.
+ que: asyncio.Queue[_WorkItem] = asyncio.Queue()
+ tsk = asyncio.get_event_loop().create_task(self._inferior_worker_task(session, que))
+ self._inferiors.append(_Inferior(session, tsk, que))
+ # Unlock the pending transmission because now we have an inferior to work with.
+ if self._idle_send_future is not None:
+ self._idle_send_future.set_result(None)
def _close_inferior(self, session_index: int) -> None:
assert session_index >= 0, "Negative indexes may lead to unexpected side effects"
@@ -114,10 +157,10 @@ def _close_inferior(self, session_index: int) -> None:
session.close() # May raise.
@property
- def inferiors(self) -> typing.Sequence[pyuavcan.transport.OutputSession]:
- return self._inferiors[:]
+ def inferiors(self) -> Sequence[pyuavcan.transport.OutputSession]:
+ return [x.session for x in self._inferiors]
- def enable_feedback(self, handler: typing.Callable[[RedundantFeedback], None]) -> None:
+ def enable_feedback(self, handler: Callable[[RedundantFeedback], None]) -> None:
"""
The operation is atomic on all inferiors.
If at least one inferior fails to enable feedback, all inferiors are rolled back into the disabled state.
@@ -126,7 +169,7 @@ def enable_feedback(self, handler: typing.Callable[[RedundantFeedback], None]) -
try:
self._feedback_handler = handler
for ses in self._inferiors:
- self._enable_feedback_on_inferior(ses)
+ self._enable_feedback_on_inferior(ses.session)
except Exception as ex:
_logger.info("%s could not enable feedback, rolling back into the disabled state: %r", self, ex)
self.disable_feedback()
@@ -139,14 +182,16 @@ def disable_feedback(self) -> None:
self._feedback_handler = None
for ses in self._inferiors:
try:
- ses.disable_feedback()
+ ses.session.disable_feedback()
except Exception as ex:
_logger.exception("%s could not disable feedback on %r: %s", self, ses, ex)
async def send(self, transfer: pyuavcan.transport.Transfer, monotonic_deadline: float) -> bool:
"""
Sends the transfer via all of the inferior sessions concurrently.
- Returns when all of the inferior calls return and/or raise exceptions.
+ Returns when the first of the inferior calls succeeds; the remaining will keep sending in the background;
+ that is, the redundant transport operates at the rate of the fastest inferior, delegating the slower ones
+ to background tasks.
Edge cases:
- If there are no inferiors, the method will await until either the deadline is expired
@@ -157,8 +202,7 @@ async def send(self, transfer: pyuavcan.transport.Transfer, monotonic_deadline:
- If at least one inferior succeeds, True is returned (logical OR).
If the other inferiors raise exceptions, they are logged as errors and suppressed.
- - If all inferiors raise exceptions, the exception from the first one is propagated,
- the rest are logged as errors and suppressed.
+ - If all inferiors raise exceptions, one of them is propagated, the rest are logged as errors and suppressed.
- If all inferiors time out, False is returned (logical OR).
@@ -199,31 +243,36 @@ async def send(self, transfer: pyuavcan.transport.Transfer, monotonic_deadline:
finally:
self._idle_send_future = None
assert not self._idle_send_future
-
if not inferiors:
self._stat_drops += 1
return False # Still nothing.
- results = await asyncio.gather(
- *[ses.send(transfer, monotonic_deadline) for ses in inferiors], return_exceptions=True
- )
- assert results and len(results) == len(inferiors)
- _logger.debug("%s send results: %s", self, results)
-
- exceptions = [ex for ex in results if isinstance(ex, Exception)]
-
- # Result consolidation logic as described in the doc.
- if exceptions:
- # Taking great efforts to make the error message very understandable to the user.
- _logger.error( # pylint: disable=logging-not-lazy
- f"{self}: {len(exceptions)} of {len(results)} inferiors have failed: "
- + ", ".join(f"{i}:{self._describe_send_result(r)}" for i, r in enumerate(results))
- )
- if len(exceptions) >= len(results):
- self._stat_errors += 1
- raise exceptions[0]
-
- if any(x is True for x in results):
+ # We have at least one inferior so we can handle this transaction. Create the work items.
+ futures: list[asyncio.Future[bool]] = []
+ for inf in self._inferiors:
+ fut: asyncio.Future[bool] = asyncio.Future()
+ inf.queue.put_nowait(_WorkItem(transfer, monotonic_deadline, fut))
+ futures.append(fut)
+
+ # Execute the work items concurrently and unblock as soon as the first inferior is done transmitting.
+ # Those that are still pending are cancelled because we're not going to wait around for the slow ones.
+ done, pending = await asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED)
+ assert len(done) + len(pending) == len(inferiors) == len(futures)
+ _logger.debug("%s send results: done=%s, pending=%s", self, done, pending)
+ for p in pending:
+ p.cancel() # We will no longer need this.
+
+ # Extract the results to determine the final outcome of the transaction.
+ results = [x.result() for x in done if x.exception() is None]
+ exceptions = [x.exception() for x in done if x.exception() is not None]
+ assert 0 < (len(results) + len(exceptions)) <= len(inferiors) # Some tasks may be not yet done.
+ assert not results or all(isinstance(x, bool) for x in results)
+ if exceptions and not results:
+ self._stat_errors += 1
+ exc = exceptions[0]
+ assert isinstance(exc, BaseException)
+ raise exc
+ if results and any(results):
self._stat_transfers += 1
self._stat_payload_bytes += sum(map(len, transfer.fragmented_payload))
return True
@@ -247,7 +296,7 @@ def sample_statistics(self) -> RedundantSessionStatistics:
- ``frames`` - the total number of frames summed from all inferiors (i.e., replicated frame count).
This value is invalidated when the set of inferiors is changed. The semantics may change later.
"""
- inferiors = [s.sample_statistics() for s in self._inferiors]
+ inferiors = [s.session.sample_statistics() for s in self._inferiors]
return RedundantSessionStatistics(
transfers=self._stat_transfers,
frames=sum(s.frames for s in inferiors),
@@ -269,6 +318,37 @@ def close(self) -> None:
if fin is not None:
fin()
+ async def _inferior_worker_task(self, ses: pyuavcan.transport.OutputSession, que: asyncio.Queue[_WorkItem]) -> None:
+ try:
+ _logger.debug("%s: Task for inferior %r is starting", self, ses)
+ while self._finalizer:
+ wrk = await que.get()
+ try:
+ result = await ses.send(wrk.transfer, wrk.monotonic_deadline)
+ except (asyncio.CancelledError, pyuavcan.transport.ResourceClosedError):
+ break # Do not cancel the future because we don't want to unblock the master task.
+ except Exception as ex:
+ _logger.error("%s: Inferior %r failed: %s: %s", self, ses, type(ex).__name__, ex)
+ _logger.debug("%s: Stack trace for the above inferior failure:", self, exc_info=True)
+ if not wrk.future.done():
+ wrk.future.set_exception(ex)
+ else:
+ _logger.debug(
+ "%s: Inferior %r send result: %s; future %s",
+ self,
+ ses,
+ "success" if result else "timeout",
+ wrk.future,
+ )
+ if not wrk.future.done():
+ wrk.future.set_result(result)
+ except (asyncio.CancelledError, pyuavcan.transport.ResourceClosedError):
+ pass
+ except Exception as ex:
+ _logger.exception("%s: Task for %r has encountered an unhandled exception: %s", self, ses, ex)
+ finally:
+ _logger.debug("%s: Task for %r is stopping", self, ses)
+
def _enable_feedback_on_inferior(self, inferior_session: pyuavcan.transport.OutputSession) -> None:
def proxy(fb: pyuavcan.transport.Feedback) -> None:
"""
@@ -276,7 +356,7 @@ def proxy(fb: pyuavcan.transport.Feedback) -> None:
constructs a higher-level redundant feedback instance from it,
and then passes it along to the higher-level handler.
"""
- if inferior_session not in self._inferiors:
+ if inferior_session not in self.inferiors:
_logger.warning(
"%s got unexpected feedback %s from %s which is not a registered inferior. "
"The transport or its underlying software or hardware are probably misbehaving, "
@@ -298,11 +378,3 @@ def proxy(fb: pyuavcan.transport.Feedback) -> None:
_logger.debug("%s ignoring unattended feedback %r from %r", self, fb, inferior_session)
inferior_session.enable_feedback(proxy)
-
- @staticmethod
- def _describe_send_result(result: typing.Union[bool, Exception]) -> str:
- if isinstance(result, Exception):
- return repr(result)
- if isinstance(result, bool):
- return "success" if result else "timeout"
- assert False
diff --git a/pyuavcan/transport/udp/_ip/_link_layer.py b/pyuavcan/transport/udp/_ip/_link_layer.py
index 54b5df074..df83e5f0c 100644
--- a/pyuavcan/transport/udp/_ip/_link_layer.py
+++ b/pyuavcan/transport/udp/_ip/_link_layer.py
@@ -5,7 +5,7 @@
from __future__ import annotations
import sys
import time
-import typing
+from typing import Callable, Any, Optional, cast, Sequence
import ctypes
import socket
import logging
@@ -100,7 +100,7 @@ class LinkLayerSniffer:
- https://github.com/karpierz/libpcap/blob/master/tests/capturetest.py
"""
- def __init__(self, filter_expression: str, callback: typing.Callable[[LinkLayerCapture], None]) -> None:
+ def __init__(self, filter_expression: str, callback: Callable[[LinkLayerCapture], None]) -> None:
"""
:param filter_expression: The standard pcap filter expression;
see https://www.tcpdump.org/manpages/pcap-filter.7.html.
@@ -117,7 +117,7 @@ def __init__(self, filter_expression: str, callback: typing.Callable[[LinkLayerC
self._filter_expr = str(filter_expression)
self._callback = callback
self._keep_going = True
- self._workers: typing.List[threading.Thread] = []
+ self._workers: list[threading.Thread] = []
try:
dev_names = _find_devices()
_logger.debug("Capturable network devices: %s", dev_names)
@@ -175,7 +175,7 @@ def _thread_worker(self, name: str, pd: object, decoder: PacketDecoder) -> None:
# noinspection PyTypeChecker
@pcap.pcap_handler # type: ignore
- def proxy(_: object, header: ctypes.Structure, packet: typing.Any) -> None:
+ def proxy(_: object, header: ctypes.Structure, packet: Any) -> None:
# Parse the header, extract the timestamp and the packet length.
header = header.contents
ts_ns = (header.ts.tv_sec * 1_000_000 + header.ts.tv_usec) * 1000
@@ -247,11 +247,11 @@ def __repr__(self) -> str:
)
-PacketEncoder = typing.Callable[["LinkLayerPacket"], typing.Optional[memoryview]]
-PacketDecoder = typing.Callable[[memoryview], typing.Optional["LinkLayerPacket"]]
+PacketEncoder = Callable[["LinkLayerPacket"], Optional[memoryview]]
+PacketDecoder = Callable[[memoryview], Optional["LinkLayerPacket"]]
-def _get_codecs() -> typing.Dict[int, typing.Tuple[PacketEncoder, PacketDecoder]]:
+def _get_codecs() -> dict[int, tuple[PacketEncoder, PacketDecoder]]:
"""
A factory of paired encode/decode functions that are used for building and parsing link-layer packets.
The pairs are organized into a dict where the key is the data link type code from libpcap;
@@ -265,7 +265,7 @@ def _get_codecs() -> typing.Dict[int, typing.Tuple[PacketEncoder, PacketDecoder]
import libpcap as pcap
from socket import AddressFamily
- def get_ethernet() -> typing.Tuple[PacketEncoder, PacketDecoder]:
+ def get_ethernet() -> tuple[PacketEncoder, PacketDecoder]:
# https://en.wikipedia.org/wiki/EtherType
af_to_ethertype = {
AddressFamily.AF_INET: 0x0800,
@@ -273,7 +273,7 @@ def get_ethernet() -> typing.Tuple[PacketEncoder, PacketDecoder]:
}
ethertype_to_af = {v: k for k, v in af_to_ethertype.items()}
- def enc(p: LinkLayerPacket) -> typing.Optional[memoryview]:
+ def enc(p: LinkLayerPacket) -> Optional[memoryview]:
try:
return memoryview(
b"".join(
@@ -288,7 +288,7 @@ def enc(p: LinkLayerPacket) -> typing.Optional[memoryview]:
except LookupError:
return None
- def dec(p: memoryview) -> typing.Optional[LinkLayerPacket]:
+ def dec(p: memoryview) -> Optional[LinkLayerPacket]:
if len(p) < 14:
return None
src = p[0:6]
@@ -302,17 +302,17 @@ def dec(p: memoryview) -> typing.Optional[LinkLayerPacket]:
return enc, dec
- def get_loopback(byte_order: str) -> typing.Tuple[PacketEncoder, PacketDecoder]:
+ def get_loopback(byte_order: str) -> tuple[PacketEncoder, PacketDecoder]:
# DLT_NULL is used by the Windows loopback interface. Info: https://wiki.wireshark.org/NullLoopback
# The source and destination addresses are not representable in this data link layer.
- def enc(p: LinkLayerPacket) -> typing.Optional[memoryview]:
- return memoryview(b"".join((p.protocol.to_bytes(4, byte_order), p.payload)))
+ def enc(p: LinkLayerPacket) -> Optional[memoryview]:
+ return memoryview(b"".join((p.protocol.to_bytes(4, byte_order), p.payload))) # type: ignore
- def dec(p: memoryview) -> typing.Optional[LinkLayerPacket]:
+ def dec(p: memoryview) -> Optional[LinkLayerPacket]:
if len(p) < 4:
return None
try:
- protocol = AddressFamily(int.from_bytes(p[0:4], byte_order))
+ protocol = AddressFamily(int.from_bytes(p[0:4], byte_order)) # type: ignore
except ValueError:
return None
empty = memoryview(b"")
@@ -328,7 +328,7 @@ def dec(p: memoryview) -> typing.Optional[LinkLayerPacket]:
}
-def _find_devices() -> typing.List[str]:
+def _find_devices() -> list[str]:
"""
Returns a list of local network devices that can be captured from.
Raises a PermissionError if the user is suspected to lack the privileges necessary for capture.
@@ -351,8 +351,8 @@ def _find_devices() -> typing.List[str]:
# because, for example, that process does not have sufficient privileges to open them for capturing;
# if so, those devices will not appear on the list.
raise PermissionError("No capturable devices have been found. Do you have the required privileges?")
- dev_names: typing.List[str] = []
- d = typing.cast(ctypes.Structure, devices)
+ dev_names: list[str] = []
+ d = cast(ctypes.Structure, devices)
while d:
d = d.contents
name = d.name.decode()
@@ -365,9 +365,7 @@ def _find_devices() -> typing.List[str]:
return dev_names
-def _capture_all(
- device_names: typing.List[str], filter_expression: str
-) -> typing.List[typing.Tuple[str, object, PacketDecoder]]:
+def _capture_all(device_names: list[str], filter_expression: str) -> list[tuple[str, object, PacketDecoder]]:
"""
Begin capture on all devices in promiscuous mode.
We can't use "any" because libpcap does not support promiscuous mode with it, as stated in the docs and here:
@@ -378,7 +376,7 @@ def _capture_all(
import libpcap as pcap
codecs = _get_codecs()
- caps: typing.List[typing.Tuple[str, object, PacketDecoder]] = []
+ caps: list[tuple[str, object, PacketDecoder]] = []
try:
for name in device_names:
pd = _capture_single_device(name, filter_expression, list(codecs.keys()))
@@ -410,9 +408,7 @@ def _capture_all(
return caps
-def _capture_single_device(
- device: str, filter_expression: str, data_link_hints: typing.Sequence[int]
-) -> typing.Optional[object]:
+def _capture_single_device(device: str, filter_expression: str, data_link_hints: Sequence[int]) -> Optional[object]:
"""
Returns None if the interface managed by this device is not up or if it cannot be captured from for other reasons.
On GNU/Linux, some virtual devices (like netfilter devices) can only be accessed by a superuser.
@@ -498,7 +494,7 @@ def status_to_str(error_code: int) -> str:
except Exception:
pcap.close(pd)
raise
- return typing.cast(object, pd)
+ return cast(object, pd)
_SNAPSHOT_LENGTH = 65535
diff --git a/tests/dsdl/_builtin_form.py b/tests/dsdl/_builtin_form.py
index 38d85522c..5274fe1ac 100644
--- a/tests/dsdl/_builtin_form.py
+++ b/tests/dsdl/_builtin_form.py
@@ -126,13 +126,9 @@ def _unittest_slow_builtin_form_automatic(compiled: typing.List[pyuavcan.dsdl.Ge
# noinspection PyUnusedLocal
-def _unittest_issue_116(compiled: typing.List[pyuavcan.dsdl.GeneratedPackageInfo]) -> None:
+def _unittest_issue_147(compiled: typing.List[pyuavcan.dsdl.GeneratedPackageInfo]) -> None:
from uavcan.register import Access_1_0
- valid = pyuavcan.dsdl.update_from_builtin(Access_1_0.Request(), {"name": {"name": "uavcan.pub.measurement"}})
+ # Automatic promotion https://github.com/UAVCAN/pyuavcan/issues/147
+ valid = pyuavcan.dsdl.update_from_builtin(Access_1_0.Request(), "uavcan.pub.measurement")
assert valid.name.name.tobytes().decode() == "uavcan.pub.measurement"
- with pytest.raises(TypeError) as ex:
- pyuavcan.dsdl.update_from_builtin(Access_1_0.Request(), {"name": "uavcan.pub.measurement"})
- print("Exception message:", ex)
- assert "str" in str(ex)
- assert "Name_1_" in str(ex)
diff --git a/tests/transport/can/media/_socketcan.py b/tests/transport/can/media/_socketcan.py
index ad9aba915..c84e58aa2 100644
--- a/tests/transport/can/media/_socketcan.py
+++ b/tests/transport/can/media/_socketcan.py
@@ -67,6 +67,11 @@ def on_rx_b(frames: typing.Iterable[typing.Tuple[Timestamp, Envelope]]) -> None:
[
Envelope(DataFrame(FrameFormat.BASE, 0x123, bytearray(range(6))), loopback=True),
Envelope(DataFrame(FrameFormat.EXTENDED, 0x1BADC0FE, bytearray(range(8))), loopback=True),
+ ],
+ asyncio.get_event_loop().time() + 1.0,
+ )
+ await media_a.send(
+ [
Envelope(DataFrame(FrameFormat.EXTENDED, 0x1FF45678, bytearray(range(0))), loopback=False),
],
asyncio.get_event_loop().time() + 1.0,
diff --git a/tests/transport/redundant/_redundant.py b/tests/transport/redundant/_redundant.py
index d10614796..c23890f40 100644
--- a/tests/transport/redundant/_redundant.py
+++ b/tests/transport/redundant/_redundant.py
@@ -452,7 +452,8 @@ async def wait(how_many: int) -> None:
await asyncio.sleep(1.0)
-def _unittest_redundant_transport_reconfiguration() -> None:
+@pytest.mark.asyncio
+async def _unittest_redundant_transport_reconfiguration() -> None:
from pyuavcan.transport import OutputSessionSpecifier, MessageDataSpecifier, PayloadMetadata
tr = RedundantTransport()
@@ -466,3 +467,6 @@ def _unittest_redundant_transport_reconfiguration() -> None:
with pytest.raises(pyuavcan.transport.OperationNotDefinedForAnonymousNodeError):
tr.attach_inferior(LoopbackTransport(None, allow_anonymous_transfers=False))
assert len(tr.inferiors) == 1
+
+ tr.close()
+ await asyncio.sleep(2.0)
diff --git a/tests/transport/redundant/_session_output.py b/tests/transport/redundant/_session_output.py
index e875933b3..087749675 100644
--- a/tests/transport/redundant/_session_output.py
+++ b/tests/transport/redundant/_session_output.py
@@ -194,6 +194,7 @@ async def add_inferior(inferior: pyuavcan.transport.OutputSession) -> None:
],
)
assert len(feedback) == 2
+ feedback.sort(key=lambda x: x.inferior_session is not inf_a) # Ensure consistent ordering
assert feedback[0].inferior_session is inf_a
assert feedback[0].original_transfer_timestamp == ts
assert ts.system <= feedback[0].first_frame_transmission_timestamp.system <= time.time()
@@ -324,6 +325,8 @@ async def add_inferior(inferior: pyuavcan.transport.OutputSession) -> None:
assert None is await (rx_a.receive(loop.time() + 1))
assert None is await (rx_b.receive(loop.time() + 1))
+ await asyncio.sleep(2.0)
+
async def _unittest_redundant_output_exceptions(caplog: typing.Any) -> None:
loop = asyncio.get_event_loop()
@@ -450,3 +453,5 @@ def retire() -> None:
is_retired = False
ses.close()
assert not is_retired
+
+ await asyncio.sleep(2.0)