-
-
Notifications
You must be signed in to change notification settings - Fork 6.1k
/
encoders.py
169 lines (158 loc) · 5.86 KB
/
encoders.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
from enum import Enum
from pathlib import PurePath
from types import GeneratorType
from typing import Any, Callable, Dict, List, Set, Tuple, Union
from fastapi.logger import logger
from fastapi.utils import PYDANTIC_1
from pydantic import BaseModel
from pydantic.json import ENCODERS_BY_TYPE
SetIntStr = Set[Union[int, str]]
DictIntStrAny = Dict[Union[int, str], Any]
def generate_encoders_by_class_tuples(
type_encoder_map: Dict[Any, Callable]
) -> Dict[Callable, Tuple]:
encoders_by_classes: Dict[Callable, List] = {}
for type_, encoder in type_encoder_map.items():
encoders_by_classes.setdefault(encoder, []).append(type_)
encoders_by_class_tuples: Dict[Callable, Tuple] = {}
for encoder, classes in encoders_by_classes.items():
encoders_by_class_tuples[encoder] = tuple(classes)
return encoders_by_class_tuples
encoders_by_class_tuples = generate_encoders_by_class_tuples(ENCODERS_BY_TYPE)
def jsonable_encoder(
obj: Any,
include: Union[SetIntStr, DictIntStrAny] = None,
exclude: Union[SetIntStr, DictIntStrAny] = set(),
by_alias: bool = True,
skip_defaults: bool = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
custom_encoder: dict = {},
sqlalchemy_safe: bool = True,
) -> Any:
if skip_defaults is not None:
logger.warning( # pragma: nocover
"skip_defaults in jsonable_encoder has been deprecated in favor of "
"exclude_unset to keep in line with Pydantic v1, support for it will be "
"removed soon."
)
if include is not None and not isinstance(include, set):
include = set(include)
if exclude is not None and not isinstance(exclude, set):
exclude = set(exclude)
if isinstance(obj, BaseModel):
encoder = getattr(obj.Config, "json_encoders", {})
if custom_encoder:
encoder.update(custom_encoder)
if PYDANTIC_1:
obj_dict = obj.dict(
include=include,
exclude=exclude,
by_alias=by_alias,
exclude_unset=bool(exclude_unset or skip_defaults),
exclude_none=exclude_none,
exclude_defaults=exclude_defaults,
)
else: # pragma: nocover
if exclude_defaults:
raise ValueError("Cannot use exclude_defaults")
obj_dict = obj.dict(
include=include,
exclude=exclude,
by_alias=by_alias,
skip_defaults=bool(exclude_unset or skip_defaults),
)
if "__root__" in obj_dict:
obj_dict = obj_dict["__root__"]
return jsonable_encoder(
obj_dict,
exclude_none=exclude_none,
exclude_defaults=exclude_defaults,
custom_encoder=encoder,
sqlalchemy_safe=sqlalchemy_safe,
)
if isinstance(obj, Enum):
return obj.value
if isinstance(obj, PurePath):
return str(obj)
if isinstance(obj, (str, int, float, type(None))):
return obj
if isinstance(obj, dict):
encoded_dict = {}
for key, value in obj.items():
if (
(
not sqlalchemy_safe
or (not isinstance(key, str))
or (not key.startswith("_sa"))
)
and (value is not None or not exclude_none)
and ((include and key in include) or key not in exclude)
):
encoded_key = jsonable_encoder(
key,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_none=exclude_none,
custom_encoder=custom_encoder,
sqlalchemy_safe=sqlalchemy_safe,
)
encoded_value = jsonable_encoder(
value,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_none=exclude_none,
custom_encoder=custom_encoder,
sqlalchemy_safe=sqlalchemy_safe,
)
encoded_dict[encoded_key] = encoded_value
return encoded_dict
if isinstance(obj, (list, set, frozenset, GeneratorType, tuple)):
encoded_list = []
for item in obj:
encoded_list.append(
jsonable_encoder(
item,
include=include,
exclude=exclude,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
exclude_none=exclude_none,
custom_encoder=custom_encoder,
sqlalchemy_safe=sqlalchemy_safe,
)
)
return encoded_list
if custom_encoder:
if type(obj) in custom_encoder:
return custom_encoder[type(obj)](obj)
else:
for encoder_type, encoder in custom_encoder.items():
if isinstance(obj, encoder_type):
return encoder(obj)
if type(obj) in ENCODERS_BY_TYPE:
return ENCODERS_BY_TYPE[type(obj)](obj)
for encoder, classes_tuple in encoders_by_class_tuples.items():
if isinstance(obj, classes_tuple):
return encoder(obj)
errors: List[Exception] = []
try:
data = dict(obj)
except Exception as e:
errors.append(e)
try:
data = vars(obj)
except Exception as e:
errors.append(e)
raise ValueError(errors)
return jsonable_encoder(
data,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
exclude_none=exclude_none,
custom_encoder=custom_encoder,
sqlalchemy_safe=sqlalchemy_safe,
)