-
-
Notifications
You must be signed in to change notification settings - Fork 104
/
base.j2
412 lines (368 loc) · 17.3 KB
/
base.j2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
{#-
# Copyright (c) 2019 UAVCAN Consortium
# This software is distributed under the terms of the MIT License.
# Author: Pavel Kirienko <pavel@uavcan.org>
-#}
# AUTOGENERATED, DO NOT EDIT.
#
# Source file:
# {{ T.source_file_path }}
#
# Generated at: {{ now_utc }} UTC
# Is deprecated: {{ T.deprecated and 'yes' or 'no' }}
# Fixed port ID: {{ T.fixed_port_id }}
# Full name: {{ T.full_name }}
# Version: {{ T.version.major }}.{{ T.version.minor }}
#
# pylint: skip-file
# mypy: warn_unused_ignores=False
from __future__ import annotations
import numpy as _np_
from numpy.typing import NDArray as _NDArray_
import pydsdl as _pydsdl_
import pyuavcan.dsdl as _dsdl_
{%- if T.deprecated %}
import warnings as _warnings_
{%- endif -%}
{%- for n in T|imports %}
import {{ n }}
{%- endfor -%}
{#- How many elements in the array trigger summarization rather than full output.
#- Summarization replaces middle elements with an ellipsis. -#}
{%- set ARRAY_PRINT_SUMMARIZATION_THRESHOLD = 100 -%}
{%- from 'serialization.j2' import serialize -%}
{%- from 'deserialization.j2' import deserialize -%}
{#-
# FIELD TYPE ANNOTATIONS.
-#}
{%- macro strict_type_annotation(t) -%}
{%- if t is BooleanType -%} bool
{%- elif t is IntegerType -%} int
{%- elif t is FloatType -%} float
{%- elif t is ArrayType -%} _NDArray_[{{ t.element_type|numpy_scalar_type }}]
{%- elif t is CompositeType -%} {{ t|full_reference_name }}
{%- else -%}{% assert False %}
{%- endif -%}
{%- endmacro -%}
{%- macro relaxed_type_annotation(t) -%}
{%- if t is BooleanType -%} bool
{%- elif t is IntegerType -%} int | {{ t|numpy_scalar_type }}
{%- elif t is FloatType -%} int | float | {{ t|numpy_scalar_type }}
{%- elif t is CompositeType -%} {{ t|full_reference_name }}
{%- elif t is ArrayType -%}
{%- if (t.element_type is UnsignedIntegerType) and t.element_type.bit_length <= 8 -%}
_NDArray_[{{ t.element_type|numpy_scalar_type }}] | list[int] | memoryview | bytes | bytearray {% if t.string_like -%}| str{%- endif -%}
{%- else -%}
_NDArray_[{{ t.element_type|numpy_scalar_type }}] | list[{{ strict_type_annotation(t.element_type) }}]
{%- endif -%}
{%- else -%}{% assert False %}
{%- endif -%}
{%- endmacro -%}
{#-
# ARRAY ASSIGNMENT BLOCK.
# Validates the type and dimensionality of the input array, and converts it into the proper type as necessary.
# Emits post-assignment invariant checks to ensure correct behavior of the generated code.
-#}
{%- macro assign_array(f, src) -%}
{%- set t = f.data_type -%}
{%- if t is FixedLengthArrayType -%} {%- set cmp = '==' -%}
{%- elif t is VariableLengthArrayType -%} {%- set cmp = '<=' -%}
{%- else -%}{%- assert False -%}
{%- endif -%}
{%- if t.string_like -%} {#- DSDL uses UTF-8, which is the default in Python. -#}
{{ src }} = {{ src }}.encode() if isinstance({{ src }}, str) else {{ src }} # Implicit string encoding
{% endif -%}
{%- if t.element_type is UnsignedIntegerType and t.element_type.bit_length <= 8 -%}
if isinstance({{ src }}, (bytes, bytearray)) and len({{ src }}) {{ cmp }} {{ t.capacity }}:
# Fast zero-copy initialization from buffer. Necessary when dealing with images, point clouds, etc.
# Mutability will be inherited; e.g., bytes - immutable, bytearray - mutable.
self._{{ f|id }} = _np_.frombuffer({{ src }}, {{ t.element_type|numpy_scalar_type }}) # type: ignore
el {#- Concatenated with the "if" below -#}
{% endif -%}
if isinstance({{ src }}, _np_.ndarray)
{#- #} and {{ src }}.dtype == {{ t.element_type|numpy_scalar_type }}
{#- #} and {{ src }}.ndim == 1
{#- #} and {{ src }}.size {{ cmp }} {{ t.capacity }}: # type: ignore
# Fast binding if the source array has the same type and dimensionality. Beware of the shared reference.
self._{{ f|id }} = {{ src }}
else:
# Last resort, slow construction of a new array. New memory may be allocated.
{{ src }} = _np_.array({{ src }}, {{ t.element_type|numpy_scalar_type }}).flatten()
if not {{ src }}.size {{ cmp }} {{ t.capacity }}: # Length cannot be checked before casting and flattening
raise ValueError(f'{{ f.name }}: invalid array length: not { {{- src }}.size} {{ cmp }} {{ t.capacity }}')
self._{{ f|id }} = {{ src }}
assert isinstance(self._{{ f|id }}, _np_.ndarray)
assert self._{{ f|id }}.dtype == {{ t.element_type|numpy_scalar_type }} # type: ignore
assert self._{{ f|id }}.ndim == 1
assert len(self._{{ f|id }}) {{ cmp }} {{ t.capacity }}
{%- endmacro -%}
{#-
# FIELD TO STRING CONVERSION.
# Emits an expression that constructs a string-printable representation of the field.
# The resulting expression shall be wrapped into str() or fed into '%s'.
-#}
{%- macro printable_field_representation(f) -%}
{%- if f.data_type is ArrayType -%}
{%- if f.data_type.string_like -%}
repr(bytes(self.{{ f|id }}))[1:]
{%- else -%}
_np_.array2string(self.{{ f|id }}, separator=',', edgeitems=10, {# -#}
threshold={{ ARRAY_PRINT_SUMMARIZATION_THRESHOLD }}, {# -#}
max_line_width={{ ARRAY_PRINT_SUMMARIZATION_THRESHOLD * 10000 }})
{%- endif -%}
{%- else -%}
self.{{ f|id }}
{%- endif -%}
{%- endmacro -%}
{#-
# MAIN CODE GENERATION MACRO.
# Accepts the name of the generated type and its DSDL type descriptor object of type pydsdl.CompositeType.
-#}
{%- macro data_schema(name, type, parent_class_name=None) -%}
{%- set full_class_name = ((parent_class_name + '.') if parent_class_name else '') + name -%}
# noinspection PyUnresolvedReferences, PyPep8, PyPep8Naming, SpellCheckingInspection, DuplicatedCode
class {{ name }}(_dsdl_.{%- if type.has_fixed_port_id -%}FixedPort{%- endif -%}CompositeObject):
"""
Generated property settings use relaxed type signatures, accepting a large variety of
possible representations of the value, which are automatically converted to a well-defined
internal representation. When accessing a property, this strict well-defined internal
representation is always returned. The implicit strictification enables more precise static
type analysis.
The value returned by the __repr__() method may be invariant to some of the field values,
and its format is not guaranteed to be stable. Therefore, the returned string representation
can be used only for displaying purposes; any kind of automation build on top of that will
be fragile and prone to mismaintenance.
"""
{#-
# CONSTANTS
-#}
{%- for c in type.constants %}
{%- set target -%}
{{ c|id }}: {{ ''.ljust(type.constants|longest_id_length - c|id|length) }}{{ strict_type_annotation(c.data_type) }}
{%- endset %}
{%- if c.data_type is BooleanType %}
{{ target }} = {{ c.value.native_value }}
{%- elif c.data_type is IntegerType %}
{{ target }} = {{ c.value.as_native_integer() }}
{%- elif c.data_type is FloatType %}
{{ target }} = {{ c.value.native_value.numerator }} / {{ c.value.native_value.denominator }}
{%- else -%}{%- assert False -%}
{%- endif %}
{{- '\n' if loop.last else '' -}}
{%- endfor %}
def __init__(self
{%- if type.inner_type is UnionType -%}, *{%- endif -%}
{%- for f in type.fields_except_padding -%}
,
{{ f|id }}: {{ ''.ljust(type.fields|longest_id_length - f|id|length) -}}
None | {{ relaxed_type_annotation(f.data_type) }} = None
{%- endfor -%}
) -> None:
"""
{{ type.full_name }}.{{ type.version.major }}.{{ type.version.minor }}
Raises ValueError if any of the primitive values are outside the permitted range, regardless of the cast mode.
{%- if type.inner_type is UnionType %}
If no parameters are provided, the first field will be default-initialized and selected.
If one parameter is provided, it will be used to initialize and select the field under the same name.
If more than one parameter is provided, a ValueError will be raised.
{%- endif %}
{%- for f in type.fields_except_padding %}
:param {{ f|id }}: {{ ''.ljust(type.fields|longest_id_length - f|id|length) }}{{ f }}
{%- endfor %}
"""
{%- if type.deprecated %}
_warnings_.warn('Data type {{ type }} is deprecated', DeprecationWarning)
{% endif -%}
{#-
# FIELD INITIALIZATION
-#}
{%- if type.inner_type is not UnionType -%}
{%- for f in type.fields_except_padding %}
self._{{ f|id }}: {{ ''.ljust(type.fields|longest_id_length - f|id|length) -}}
{{- strict_type_annotation(f.data_type) }}
{{- '\n' if loop.last else '' -}}
{%- endfor %}
{%- for f in type.fields_except_padding %}
{%- if f.data_type is BooleanType %}
self.{{ f|id }} = {{ f|id }} if {{ f|id }} is not None else False
{%- elif f.data_type is IntegerType %}
self.{{ f|id }} = {{ f|id }} if {{ f|id }} is not None else 0 # type: ignore
{%- elif f.data_type is FloatType %}
self.{{ f|id }} = {{ f|id }} if {{ f|id }} is not None else 0.0 # type: ignore
{%- elif f.data_type is FixedLengthArrayType %}
if {{ f|id }} is None:
{%- if f.data_type.element_type is CompositeType %}
self.{{ f|id }} = _np_.array([{{ f.data_type.element_type|full_reference_name }}() {# -#}
for _ in range({{ f.data_type.capacity }})], {# -#}
{{ f.data_type.element_type|numpy_scalar_type }})
{%- else %}
self.{{ f|id }} = _np_.zeros({{ f.data_type.capacity }}, {# -#}
{{ f.data_type.element_type|numpy_scalar_type }})
{%- endif %}
else:
{{ assign_array(f, f|id) | indent(8) }}
{%- elif f.data_type is VariableLengthArrayType %}
if {{ f|id }} is None:
self.{{ f|id }} = _np_.array([], {{ f.data_type.element_type|numpy_scalar_type }})
else:
{{ assign_array(f, f|id) | indent(8) }}
{%- elif f.data_type is CompositeType %}
if {{ f|id }} is None:
self.{{ f|id }} = {{ f.data_type|full_reference_name }}()
elif isinstance({{ f|id }}, {{ f.data_type|full_reference_name }}):
self.{{ f|id }} = {{ f|id }}
else:
raise ValueError(f'{{ f|id }}: expected {{ f.data_type|full_reference_name }} '
f'got {type({{ f|id }}).__name__}')
{%- else -%}{%- assert False -%}
{%- endif %}
{% else %}
pass
{# #}
{%- endfor %}
{%- else %} {#- IS UNION (guaranteed to contain at least 2 fields none of which are padding) #}
{%- for f in type.fields %}
self._{{ f|id }}: {{ ''.ljust(type.fields|longest_id_length - f|id|length) -}}
None | {{ strict_type_annotation(f.data_type) }} = None
{%- endfor %}
_init_cnt_: int = 0
{% for f in type.fields %}
if {{ f|id }} is not None:
_init_cnt_ += 1
self.{{ f|id }} = {{ f|id }} # type: ignore
{% endfor %}
if _init_cnt_ == 0:
{%- set f = type.fields[0] -%}
{%- if f.data_type is BooleanType %}
self.{{ f|id }} = False
{%- elif f.data_type is IntegerType %}
self.{{ f|id }} = 0
{%- elif f.data_type is FloatType %}
self.{{ f|id }} = 0.0
{%- elif f.data_type is FixedLengthArrayType %}
{%- if f.data_type.element_type is CompositeType %}
self.{{ f|id }} = _np_.array([{{ f.data_type.element_type|full_reference_name }}() {# -#}
for _ in range({{ f.data_type.capacity }})], {# -#}
{{ f.data_type.element_type|numpy_scalar_type }})
{%- else %}
self.{{ f|id }} = _np_.zeros({{ f.data_type.capacity }}, {# -#}
{{ f.data_type.element_type|numpy_scalar_type }})
{%- endif %}
{%- elif f.data_type is VariableLengthArrayType %}
self.{{ f|id }} = _np_.array([], {{ f.data_type.element_type|numpy_scalar_type }})
{%- elif f.data_type is CompositeType %}
self.{{ f|id }} = {{ f.data_type|full_reference_name }}()
{%- else -%}{%- assert False -%}
{%- endif %} # Default initialization
elif _init_cnt_ == 1:
pass # A value is already assigned, nothing to do
else:
raise ValueError(f'Union cannot hold values of more than one field')
{% endif %}
{#-
# FIELD ACCESSORS AND MUTATORS
-#}
{%- for f in type.fields_except_padding %}
@property
def {{ f|id }}(self) -> {{ "None | " * (type.inner_type is UnionType) }}{{ strict_type_annotation(f.data_type) }}:
"""
{{ f }}
{%- if f.data_type is VariableLengthArrayType and f.data_type.string_like %}
DSDL does not support strings natively yet. To interpret this array as a string,
use tobytes() to convert the NumPy array to bytes, and then decode() to convert bytes to string:
.{{ f|id }}.tobytes().decode()
When assigning a string to this property, no manual conversion is necessary (it will happen automatically).
{%- endif %}
The setter raises ValueError if the supplied value exceeds the valid range or otherwise inapplicable.
"""
return self._{{ f|id }}
@{{ f|id }}.setter
def {{ f|id }}(self, x: {{ relaxed_type_annotation(f.data_type) }}) -> None:
{%- if f.data_type is BooleanType %}
self._{{ f|id }} = bool(x) # Cast to bool implements saturation
{%- elif f.data_type is IntegerType %}
"""Raises ValueError if the value is outside of the permitted range, regardless of the cast mode."""
x = int(x)
if {{ f.data_type.inclusive_value_range.min }} <= x <= {{ f.data_type.inclusive_value_range.max }}:
self._{{ f|id }} = x
else:
raise ValueError(f'{{ f|id }}: value {x} is not in [{{ f.data_type.inclusive_value_range.min }}, {# -#}
{{ f.data_type.inclusive_value_range.max }}]')
{%- elif f.data_type is FloatType %}
"""Raises ValueError if the value is finite and outside of the permitted range, regardless of the cast mode."""
{#- We do not emit range check for float64 because its range matches that of the native Python's float. #}
{%- if f.data_type.bit_length < 64 %}
x = float(x)
in_range = {{ f.data_type.inclusive_value_range.min }}.0 <= x <= {{ f.data_type.inclusive_value_range.max }}.0
if in_range or not _np_.isfinite(x):
self._{{ f|id }} = x
else:
raise ValueError(f'{{ f|id }}: value {x} is not in [{{ f.data_type.inclusive_value_range.min }}, {# -#}
{{ f.data_type.inclusive_value_range.max }}]')
{%- else %}
self._{{ f|id }} = float(x) # Range check not required
{%- endif %}
{%- elif f.data_type is ArrayType %}
{{ assign_array(f, 'x') | indent(4) }}
{%- elif f.data_type is CompositeType %}
if isinstance(x, {{ f.data_type|full_reference_name }}):
self._{{ f|id }} = x
else:
raise ValueError(f'{{ f|id }}: expected {{ f.data_type|full_reference_name }} got {type(x).__name__}')
{%- else -%}{%- assert False -%}
{%- endif %}
{%- if type.inner_type is UnionType %}
{%- for z in type.fields if z.name != f.name %}
self._{{ z|id }} = None
{%- endfor %}
{%- endif %}
{% endfor -%}
{#
# SERIALIZATION METHODS
#}
# noinspection PyProtectedMember
def _serialize_(self, _ser_: {{ full_class_name }}._SerializerTypeVar_) -> None:
{{ serialize(type) | remove_blank_lines | indent }}
# noinspection PyProtectedMember
@staticmethod
def _deserialize_(_des_: {{ full_class_name }}._DeserializerTypeVar_) -> {{ full_class_name }}:
{{ deserialize(type, full_class_name) | remove_blank_lines | indent }}
assert isinstance(self, {{ full_class_name }})
return self
{#
# PYTHON DATA MODEL
#}
def __repr__(self) -> str:
{%- if type.inner_type is not UnionType %}
_o_0_ = ', '.join([
{%- for f in type.fields_except_padding %}
'{{ f.name }}=%s' % {{ printable_field_representation(f) }},
{%- endfor %}
])
{%- else %} {#- UNION #}
_o_0_ = '(MALFORMED UNION)'
{%- for f in type.fields %}
if self.{{ f|id }} is not None:
_o_0_ = '{{ f.name }}=%s' % {{ printable_field_representation(f) }}
{%- endfor %}
{%- endif %}
return f'{{ type.full_name }}.{{ type.version.major }}.{{ type.version.minor }}({_o_0_})'
{#
# PYDSDL TYPE DESCRIPTOR
#}
{%- if T.has_fixed_port_id %}
_FIXED_PORT_ID_ = {{ T.fixed_port_id|int }}
{%- endif %}
{%- assert type.extent % 8 == 0 %}
_EXTENT_BYTES_ = {{ type.extent // 8 }}
{% set meta_type = type.__class__.__name__ -%}
_MODEL_: _pydsdl_.{{ meta_type }} = _dsdl_.CompositeObject._restore_constant_(
{{ type | pickle | indent(8) }}
)
assert isinstance(_MODEL_, _pydsdl_.{{ meta_type }})
{%- endmacro -%}
{#-
# The position of this comment defines the number of blank lines between imports and class definition.
# Do not put any definitions below.
#}
{% block contents %}{% endblock %}