-
-
Notifications
You must be signed in to change notification settings - Fork 104
/
_builtin_form.py
233 lines (196 loc) · 10.9 KB
/
_builtin_form.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# Copyright (c) 2019 UAVCAN Consortium
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko <pavel@uavcan.org>
import typing
import logging
import numpy
from numpy.typing import NDArray
import pydsdl
from ._composite_object import CompositeObject, get_model, get_attribute, set_attribute, get_class
from ._composite_object import CompositeObjectTypeVar
def to_builtin(obj: CompositeObject) -> typing.Dict[str, typing.Any]:
"""
Accepts a DSDL object (an instance of a Python class auto-generated from a DSDL definition),
returns its value represented using only native built-in types: dict, list, bool, int, float, str.
Ordering of dict elements is guaranteed to match the field ordering of the source definition.
Keys of dicts representing DSDL objects use the original unstropped names from the source DSDL definition;
e.g., ``if``, not ``if_``.
This is intended for use with JSON, YAML, and other serialization formats.
.. doctest::
:hide:
>>> import tests
>>> _ = tests.dsdl.compile()
>>> import json
>>> import uavcan.primitive.array
>>> json.dumps(to_builtin(uavcan.primitive.array.Integer32_1_0([-123, 456, 0])))
'{"value": [-123, 456, 0]}'
>>> import uavcan.register
>>> request = uavcan.register.Access_1_0.Request(
... uavcan.register.Name_1_0('my.register'),
... uavcan.register.Value_1_0(integer16=uavcan.primitive.array.Integer16_1_0([1, 2, +42, -10_000]))
... )
>>> to_builtin(request) # doctest: +NORMALIZE_WHITESPACE
{'name': {'name': 'my.register'},
'value': {'integer16': {'value': [1, 2, 42, -10000]}}}
"""
model = get_model(obj)
_raise_if_service_type(model)
out = _to_builtin_impl(obj, model)
assert isinstance(out, dict)
return out
def _to_builtin_impl(
obj: typing.Union[CompositeObject, NDArray[typing.Any], str, bool, int, float], model: pydsdl.SerializableType
) -> typing.Union[typing.Dict[str, typing.Any], typing.List[typing.Any], str, bool, int, float]:
if isinstance(model, pydsdl.CompositeType):
assert isinstance(obj, CompositeObject)
return {
f.name: _to_builtin_impl(get_attribute(obj, f.name), f.data_type)
for f in model.fields_except_padding
if get_attribute(obj, f.name) is not None # The check is to hide inactive union variants.
}
if isinstance(model, pydsdl.ArrayType):
assert isinstance(obj, numpy.ndarray)
if model.string_like: # TODO: drop this special case when strings are natively supported in DSDL.
try:
return bytes(e for e in obj).decode()
except UnicodeError:
return list(map(int, obj))
return [_to_builtin_impl(e, model.element_type) for e in obj]
if isinstance(model, pydsdl.PrimitiveType):
# The explicit conversions are needed to get rid of NumPy scalar types.
if isinstance(model, pydsdl.IntegerType):
return int(obj) # type: ignore
if isinstance(model, pydsdl.FloatType):
return float(obj) # type: ignore
if isinstance(model, pydsdl.BooleanType):
return bool(obj)
assert isinstance(obj, str)
return obj
assert False, "Unexpected inputs"
def update_from_builtin(destination: CompositeObjectTypeVar, source: typing.Any) -> CompositeObjectTypeVar:
"""
Updates the provided DSDL object (an instance of a Python class auto-generated from a DSDL definition)
with the values from a native representation, where DSDL objects are represented as dicts, arrays
are lists, and primitives are represented as int/float/bool. This is the reverse of :func:`to_builtin`.
Values that are not specified in the source are not updated (left at their original values),
so an empty source will leave the input object unchanged.
Source field names shall match the original unstropped names provided in the DSDL definition;
e.g., `if`, not `if_`. If there is more than one variant specified for a union type, the last
specified variant takes precedence.
If the structure of the source does not match the destination object, the correct representation
may be deduced automatically as long as it can be done unambiguously.
:param destination: The object to update. The update will be done in-place. If you don't want the source
object modified, clone it beforehand.
:param source: The :class:`dict` instance containing the values to update the destination object with.
:return: A reference to destination (not a copy).
:raises: :class:`ValueError` if the provided source values cannot be applied to the destination object,
or if the source contains fields that are not present in the destination object.
:class:`TypeError` if an entity of the source cannot be converted into the type expected by the destination.
>>> import tests; tests.dsdl.compile() # DSDL package generation not shown in this example.
[...]
>>> import json
>>> import uavcan.primitive.array
>>> import uavcan.register
>>> request = uavcan.register.Access_1_0.Request(
... uavcan.register.Name_1_0('my.register'),
... uavcan.register.Value_1_0(string=uavcan.primitive.String_1_0('Hello world!'))
... )
>>> request
uavcan.register.Access.Request...name='my.register'...value='Hello world!'...
>>> update_from_builtin(request, { # Switch the Value union from string to int16; keep the name unchanged.
... 'value': {
... 'integer16': {
... 'value': [1, 2, 42, -10000]
... }
... }
... }) # doctest: +NORMALIZE_WHITESPACE
uavcan.register.Access.Request...name='my.register'...value=[ 1, 2, 42,-10000]...
The following examples showcase positional initialization:
>>> from uavcan.node import Heartbeat_1
>>> update_from_builtin(Heartbeat_1(), [123456, 1, 2]) # doctest: +NORMALIZE_WHITESPACE
uavcan.node.Heartbeat.1.0(uptime=123456,
health=uavcan.node.Health.1.0(value=1),
mode=uavcan.node.Mode.1.0(value=2),
vendor_specific_status_code=0)
>>> update_from_builtin(Heartbeat_1(), 123456) # doctest: +NORMALIZE_WHITESPACE
uavcan.node.Heartbeat.1.0(uptime=123456,
health=uavcan.node.Health.1.0(value=0),
mode=uavcan.node.Mode.1.0(value=0),
vendor_specific_status_code=0)
>>> update_from_builtin(Heartbeat_1(), [0, 0, 0, 0, 0]) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
TypeError: ...
>>> update_from_builtin(uavcan.primitive.array.Real64_1(), 123.456) # doctest: +NORMALIZE_WHITESPACE
uavcan.primitive.array.Real64.1.0(value=[123.456])
>>> update_from_builtin(uavcan.primitive.array.Real64_1(), [123.456]) # doctest: +NORMALIZE_WHITESPACE
uavcan.primitive.array.Real64.1.0(value=[123.456])
>>> update_from_builtin(uavcan.primitive.array.Real64_1(), [123.456, -9]) # doctest: +NORMALIZE_WHITESPACE
uavcan.primitive.array.Real64.1.0(value=[123.456, -9. ])
>>> update_from_builtin(uavcan.register.Access_1_0.Request(), ["X", {"integer8": 99}]) # Same as the next one!
uavcan.register.Access.Request...name='X'...value=[99]...
>>> update_from_builtin(uavcan.register.Access_1_0.Request(), {'name': 'X', 'value': {'integer8': {'value': [99]}}})
uavcan.register.Access.Request...name='X'...value=[99]...
"""
_logger.debug("update_from_builtin: destination/source on the next lines:\n%r\n%r", destination, source)
if not isinstance(destination, CompositeObject): # pragma: no cover
raise TypeError(f"Bad destination: expected a CompositeObject, got {type(destination).__name__}")
model = get_model(destination)
_raise_if_service_type(model)
fields = model.fields_except_padding
# UX improvement: https://github.com/UAVCAN/pyuavcan/issues/147 -- match the source against the data type.
if not isinstance(source, dict):
if not isinstance(source, (list, tuple)): # Assume positional initialization.
source = (source,)
can_propagate = fields and isinstance(fields[0].data_type, (pydsdl.ArrayType, pydsdl.CompositeType))
too_many_values = len(source) > (1 if isinstance(model.inner_type, pydsdl.UnionType) else len(fields))
if can_propagate and too_many_values:
_logger.debug(
"update_from_builtin: %d source values cannot be applied to %s but the first field accepts "
"positional initialization -- propagating down",
len(source),
type(destination).__name__,
)
source = [source]
if len(source) > len(fields):
raise TypeError(
f"Cannot apply {len(source)} values to {len(fields)} fields in {type(destination).__name__}"
)
source = {f.name: v for f, v in zip(fields, source)}
return update_from_builtin(destination, source)
source = dict(source) # Create copy to prevent mutation of the original
for f in fields:
field_type = f.data_type
try:
value = source.pop(f.name)
except LookupError:
continue # No value specified, keep original value
if isinstance(field_type, pydsdl.CompositeType):
field_obj = get_attribute(destination, f.name)
if field_obj is None: # Oh, this is a union
field_obj = get_class(field_type)() # The variant was not selected, construct a default
set_attribute(destination, f.name, field_obj) # Switch the union to the new variant
update_from_builtin(field_obj, value)
elif isinstance(field_type, pydsdl.ArrayType):
element_type = field_type.element_type
if isinstance(element_type, pydsdl.PrimitiveType):
set_attribute(destination, f.name, value)
elif isinstance(element_type, pydsdl.CompositeType):
dtype = get_class(element_type)
set_attribute(destination, f.name, [update_from_builtin(dtype(), s) for s in value])
else:
assert False, f"Unexpected array element type: {element_type!r}"
elif isinstance(field_type, pydsdl.PrimitiveType):
set_attribute(destination, f.name, value)
else:
assert False, f"Unexpected field type: {field_type!r}"
if source:
raise ValueError(f"No such fields in {model}: {list(source.keys())}")
return destination
def _raise_if_service_type(model: pydsdl.SerializableType) -> None:
if isinstance(model, pydsdl.ServiceType): # pragma: no cover
raise TypeError(
f"Built-in form is not defined for service types. "
f"Did you mean to use Request or Response? Input type: {model}"
)
_logger = logging.getLogger(__name__)