forked from pydantic/pydantic
-
Notifications
You must be signed in to change notification settings - Fork 0
/
json_schema.py
2489 lines (2026 loc) · 103 KB
/
json_schema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Usage docs: https://docs.pydantic.dev/2.5/concepts/json_schema/
The `json_schema` module contains classes and functions to allow the way [JSON Schema](https://json-schema.org/)
is generated to be customized.
In general you shouldn't need to use this module directly; instead, you can use
[`BaseModel.model_json_schema`][pydantic.BaseModel.model_json_schema] and
[`TypeAdapter.json_schema`][pydantic.TypeAdapter.json_schema].
"""
from __future__ import annotations as _annotations
import dataclasses
import inspect
import math
import re
import warnings
from collections import defaultdict
from copy import deepcopy
from dataclasses import is_dataclass
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Callable,
Counter,
Dict,
Hashable,
Iterable,
NewType,
Sequence,
Tuple,
TypeVar,
Union,
cast,
)
import pydantic_core
from pydantic_core import CoreSchema, PydanticOmit, core_schema, to_jsonable_python
from pydantic_core.core_schema import ComputedField
from typing_extensions import Annotated, Literal, TypeAlias, assert_never, deprecated, final
from pydantic.warnings import PydanticDeprecatedSince26
from ._internal import (
_config,
_core_metadata,
_core_utils,
_decorators,
_internal_dataclass,
_mock_val_ser,
_schema_generation_shared,
_typing_extra,
)
from .annotated_handlers import GetJsonSchemaHandler
from .config import JsonDict, JsonSchemaExtraCallable, JsonValue
from .errors import PydanticInvalidForJsonSchema, PydanticSchemaGenerationError, PydanticUserError
if TYPE_CHECKING:
from . import ConfigDict
from ._internal._core_utils import CoreSchemaField, CoreSchemaOrField
from ._internal._dataclasses import PydanticDataclass
from ._internal._schema_generation_shared import GetJsonSchemaFunction
from .main import BaseModel
CoreSchemaOrFieldType = Literal[core_schema.CoreSchemaType, core_schema.CoreSchemaFieldType]
"""
A type alias for defined schema types that represents a union of
`core_schema.CoreSchemaType` and
`core_schema.CoreSchemaFieldType`.
"""
JsonSchemaValue = Dict[str, Any]
"""
A type alias for a JSON schema value. This is a dictionary of string keys to arbitrary JSON values.
"""
JsonSchemaMode = Literal['validation', 'serialization']
"""
A type alias that represents the mode of a JSON schema; either 'validation' or 'serialization'.
For some types, the inputs to validation differ from the outputs of serialization. For example,
computed fields will only be present when serializing, and should not be provided when
validating. This flag provides a way to indicate whether you want the JSON schema required
for validation inputs, or that will be matched by serialization outputs.
"""
_MODE_TITLE_MAPPING: dict[JsonSchemaMode, str] = {'validation': 'Input', 'serialization': 'Output'}
def update_json_schema(schema: JsonSchemaValue, updates: dict[str, Any]) -> JsonSchemaValue:
"""Update a JSON schema in-place by providing a dictionary of updates.
This function sets the provided key-value pairs in the schema and returns the updated schema.
Args:
schema: The JSON schema to update.
updates: A dictionary of key-value pairs to set in the schema.
Returns:
The updated JSON schema.
"""
schema.update(updates)
return schema
JsonSchemaWarningKind = Literal['skipped-choice', 'non-serializable-default']
"""
A type alias representing the kinds of warnings that can be emitted during JSON schema generation.
See [`GenerateJsonSchema.render_warning_message`][pydantic.json_schema.GenerateJsonSchema.render_warning_message]
for more details.
"""
class PydanticJsonSchemaWarning(UserWarning):
"""This class is used to emit warnings produced during JSON schema generation.
See the [`GenerateJsonSchema.emit_warning`][pydantic.json_schema.GenerateJsonSchema.emit_warning] and
[`GenerateJsonSchema.render_warning_message`][pydantic.json_schema.GenerateJsonSchema.render_warning_message]
methods for more details; these can be overridden to control warning behavior.
"""
# ##### JSON Schema Generation #####
DEFAULT_REF_TEMPLATE = '#/$defs/{model}'
"""The default format string used to generate reference names."""
# There are three types of references relevant to building JSON schemas:
# 1. core_schema "ref" values; these are not exposed as part of the JSON schema
# * these might look like the fully qualified path of a model, its id, or something similar
CoreRef = NewType('CoreRef', str)
# 2. keys of the "definitions" object that will eventually go into the JSON schema
# * by default, these look like "MyModel", though may change in the presence of collisions
# * eventually, we may want to make it easier to modify the way these names are generated
DefsRef = NewType('DefsRef', str)
# 3. the values corresponding to the "$ref" key in the schema
# * By default, these look like "#/$defs/MyModel", as in {"$ref": "#/$defs/MyModel"}
JsonRef = NewType('JsonRef', str)
CoreModeRef = Tuple[CoreRef, JsonSchemaMode]
JsonSchemaKeyT = TypeVar('JsonSchemaKeyT', bound=Hashable)
@dataclasses.dataclass(**_internal_dataclass.slots_true)
class _DefinitionsRemapping:
defs_remapping: dict[DefsRef, DefsRef]
json_remapping: dict[JsonRef, JsonRef]
@staticmethod
def from_prioritized_choices(
prioritized_choices: dict[DefsRef, list[DefsRef]],
defs_to_json: dict[DefsRef, JsonRef],
definitions: dict[DefsRef, JsonSchemaValue],
) -> _DefinitionsRemapping:
"""
This function should produce a remapping that replaces complex DefsRef with the simpler ones from the
prioritized_choices such that applying the name remapping would result in an equivalent JSON schema.
"""
# We need to iteratively simplify the definitions until we reach a fixed point.
# The reason for this is that outer definitions may reference inner definitions that get simplified
# into an equivalent reference, and the outer definitions won't be equivalent until we've simplified
# the inner definitions.
copied_definitions = deepcopy(definitions)
definitions_schema = {'$defs': copied_definitions}
for _iter in range(100): # prevent an infinite loop in the case of a bug, 100 iterations should be enough
# For every possible remapped DefsRef, collect all schemas that that DefsRef might be used for:
schemas_for_alternatives: dict[DefsRef, list[JsonSchemaValue]] = defaultdict(list)
for defs_ref in copied_definitions:
alternatives = prioritized_choices[defs_ref]
for alternative in alternatives:
schemas_for_alternatives[alternative].append(copied_definitions[defs_ref])
# Deduplicate the schemas for each alternative; the idea is that we only want to remap to a new DefsRef
# if it introduces no ambiguity, i.e., there is only one distinct schema for that DefsRef.
for defs_ref, schemas in schemas_for_alternatives.items():
schemas_for_alternatives[defs_ref] = _deduplicate_schemas(schemas_for_alternatives[defs_ref])
# Build the remapping
defs_remapping: dict[DefsRef, DefsRef] = {}
json_remapping: dict[JsonRef, JsonRef] = {}
for original_defs_ref in definitions:
alternatives = prioritized_choices[original_defs_ref]
# Pick the first alternative that has only one schema, since that means there is no collision
remapped_defs_ref = next(x for x in alternatives if len(schemas_for_alternatives[x]) == 1)
defs_remapping[original_defs_ref] = remapped_defs_ref
json_remapping[defs_to_json[original_defs_ref]] = defs_to_json[remapped_defs_ref]
remapping = _DefinitionsRemapping(defs_remapping, json_remapping)
new_definitions_schema = remapping.remap_json_schema({'$defs': copied_definitions})
if definitions_schema == new_definitions_schema:
# We've reached the fixed point
return remapping
definitions_schema = new_definitions_schema
raise PydanticInvalidForJsonSchema('Failed to simplify the JSON schema definitions')
def remap_defs_ref(self, ref: DefsRef) -> DefsRef:
return self.defs_remapping.get(ref, ref)
def remap_json_ref(self, ref: JsonRef) -> JsonRef:
return self.json_remapping.get(ref, ref)
def remap_json_schema(self, schema: Any) -> Any:
"""
Recursively update the JSON schema replacing all $refs
"""
if isinstance(schema, str):
# Note: this may not really be a JsonRef; we rely on having no collisions between JsonRefs and other strings
return self.remap_json_ref(JsonRef(schema))
elif isinstance(schema, list):
return [self.remap_json_schema(item) for item in schema]
elif isinstance(schema, dict):
for key, value in schema.items():
if key == '$ref' and isinstance(value, str):
schema['$ref'] = self.remap_json_ref(JsonRef(value))
elif key == '$defs':
schema['$defs'] = {
self.remap_defs_ref(DefsRef(key)): self.remap_json_schema(value)
for key, value in schema['$defs'].items()
}
else:
schema[key] = self.remap_json_schema(value)
return schema
class GenerateJsonSchema:
"""Usage docs: https://docs.pydantic.dev/2.7/concepts/json_schema/#customizing-the-json-schema-generation-process
A class for generating JSON schemas.
This class generates JSON schemas based on configured parameters. The default schema dialect
is [https://json-schema.org/draft/2020-12/schema](https://json-schema.org/draft/2020-12/schema).
The class uses `by_alias` to configure how fields with
multiple names are handled and `ref_template` to format reference names.
Attributes:
schema_dialect: The JSON schema dialect used to generate the schema. See
[Declaring a Dialect](https://json-schema.org/understanding-json-schema/reference/schema.html#id4)
in the JSON Schema documentation for more information about dialects.
ignored_warning_kinds: Warnings to ignore when generating the schema. `self.render_warning_message` will
do nothing if its argument `kind` is in `ignored_warning_kinds`;
this value can be modified on subclasses to easily control which warnings are emitted.
by_alias: Whether to use field aliases when generating the schema.
ref_template: The format string used when generating reference names.
core_to_json_refs: A mapping of core refs to JSON refs.
core_to_defs_refs: A mapping of core refs to definition refs.
defs_to_core_refs: A mapping of definition refs to core refs.
json_to_defs_refs: A mapping of JSON refs to definition refs.
definitions: Definitions in the schema.
Args:
by_alias: Whether to use field aliases in the generated schemas.
ref_template: The format string to use when generating reference names.
Raises:
JsonSchemaError: If the instance of the class is inadvertently re-used after generating a schema.
"""
schema_dialect = 'https://json-schema.org/draft/2020-12/schema'
# `self.render_warning_message` will do nothing if its argument `kind` is in `ignored_warning_kinds`;
# this value can be modified on subclasses to easily control which warnings are emitted
ignored_warning_kinds: set[JsonSchemaWarningKind] = {'skipped-choice'}
def __init__(self, by_alias: bool = True, ref_template: str = DEFAULT_REF_TEMPLATE):
self.by_alias = by_alias
self.ref_template = ref_template
self.core_to_json_refs: dict[CoreModeRef, JsonRef] = {}
self.core_to_defs_refs: dict[CoreModeRef, DefsRef] = {}
self.defs_to_core_refs: dict[DefsRef, CoreModeRef] = {}
self.json_to_defs_refs: dict[JsonRef, DefsRef] = {}
self.definitions: dict[DefsRef, JsonSchemaValue] = {}
self._config_wrapper_stack = _config.ConfigWrapperStack(_config.ConfigWrapper({}))
self._mode: JsonSchemaMode = 'validation'
# The following includes a mapping of a fully-unique defs ref choice to a list of preferred
# alternatives, which are generally simpler, such as only including the class name.
# At the end of schema generation, we use these to produce a JSON schema with more human-readable
# definitions, which would also work better in a generated OpenAPI client, etc.
self._prioritized_defsref_choices: dict[DefsRef, list[DefsRef]] = {}
self._collision_counter: dict[str, int] = defaultdict(int)
self._collision_index: dict[str, int] = {}
self._schema_type_to_method = self.build_schema_type_to_method()
# When we encounter definitions we need to try to build them immediately
# so that they are available schemas that reference them
# But it's possible that CoreSchema was never going to be used
# (e.g. because the CoreSchema that references short circuits is JSON schema generation without needing
# the reference) so instead of failing altogether if we can't build a definition we
# store the error raised and re-throw it if we end up needing that def
self._core_defs_invalid_for_json_schema: dict[DefsRef, PydanticInvalidForJsonSchema] = {}
# This changes to True after generating a schema, to prevent issues caused by accidental re-use
# of a single instance of a schema generator
self._used = False
@property
def _config(self) -> _config.ConfigWrapper:
return self._config_wrapper_stack.tail
@property
def mode(self) -> JsonSchemaMode:
if self._config.json_schema_mode_override is not None:
return self._config.json_schema_mode_override
else:
return self._mode
def build_schema_type_to_method(
self,
) -> dict[CoreSchemaOrFieldType, Callable[[CoreSchemaOrField], JsonSchemaValue]]:
"""Builds a dictionary mapping fields to methods for generating JSON schemas.
Returns:
A dictionary containing the mapping of `CoreSchemaOrFieldType` to a handler method.
Raises:
TypeError: If no method has been defined for generating a JSON schema for a given pydantic core schema type.
"""
mapping: dict[CoreSchemaOrFieldType, Callable[[CoreSchemaOrField], JsonSchemaValue]] = {}
core_schema_types: list[CoreSchemaOrFieldType] = _typing_extra.all_literal_values(
CoreSchemaOrFieldType # type: ignore
)
for key in core_schema_types:
method_name = f"{key.replace('-', '_')}_schema"
try:
mapping[key] = getattr(self, method_name)
except AttributeError as e: # pragma: no cover
raise TypeError(
f'No method for generating JsonSchema for core_schema.type={key!r} '
f'(expected: {type(self).__name__}.{method_name})'
) from e
return mapping
def generate_definitions(
self, inputs: Sequence[tuple[JsonSchemaKeyT, JsonSchemaMode, core_schema.CoreSchema]]
) -> tuple[dict[tuple[JsonSchemaKeyT, JsonSchemaMode], JsonSchemaValue], dict[DefsRef, JsonSchemaValue]]:
"""Generates JSON schema definitions from a list of core schemas, pairing the generated definitions with a
mapping that links the input keys to the definition references.
Args:
inputs: A sequence of tuples, where:
- The first element is a JSON schema key type.
- The second element is the JSON mode: either 'validation' or 'serialization'.
- The third element is a core schema.
Returns:
A tuple where:
- The first element is a dictionary whose keys are tuples of JSON schema key type and JSON mode, and
whose values are the JSON schema corresponding to that pair of inputs. (These schemas may have
JsonRef references to definitions that are defined in the second returned element.)
- The second element is a dictionary whose keys are definition references for the JSON schemas
from the first returned element, and whose values are the actual JSON schema definitions.
Raises:
PydanticUserError: Raised if the JSON schema generator has already been used to generate a JSON schema.
"""
if self._used:
raise PydanticUserError(
'This JSON schema generator has already been used to generate a JSON schema. '
f'You must create a new instance of {type(self).__name__} to generate a new JSON schema.',
code='json-schema-already-used',
)
for key, mode, schema in inputs:
self._mode = mode
self.generate_inner(schema)
definitions_remapping = self._build_definitions_remapping()
json_schemas_map: dict[tuple[JsonSchemaKeyT, JsonSchemaMode], DefsRef] = {}
for key, mode, schema in inputs:
self._mode = mode
json_schema = self.generate_inner(schema)
json_schemas_map[(key, mode)] = definitions_remapping.remap_json_schema(json_schema)
json_schema = {'$defs': self.definitions}
json_schema = definitions_remapping.remap_json_schema(json_schema)
self._used = True
return json_schemas_map, _sort_json_schema(json_schema['$defs']) # type: ignore
def generate(self, schema: CoreSchema, mode: JsonSchemaMode = 'validation') -> JsonSchemaValue:
"""Generates a JSON schema for a specified schema in a specified mode.
Args:
schema: A Pydantic model.
mode: The mode in which to generate the schema. Defaults to 'validation'.
Returns:
A JSON schema representing the specified schema.
Raises:
PydanticUserError: If the JSON schema generator has already been used to generate a JSON schema.
"""
self._mode = mode
if self._used:
raise PydanticUserError(
'This JSON schema generator has already been used to generate a JSON schema. '
f'You must create a new instance of {type(self).__name__} to generate a new JSON schema.',
code='json-schema-already-used',
)
json_schema: JsonSchemaValue = self.generate_inner(schema)
json_ref_counts = self.get_json_ref_counts(json_schema)
# Remove the top-level $ref if present; note that the _generate method already ensures there are no sibling keys
ref = cast(JsonRef, json_schema.get('$ref'))
while ref is not None: # may need to unpack multiple levels
ref_json_schema = self.get_schema_from_definitions(ref)
if json_ref_counts[ref] > 1 or ref_json_schema is None:
# Keep the ref, but use an allOf to remove the top level $ref
json_schema = {'allOf': [{'$ref': ref}]}
else:
# "Unpack" the ref since this is the only reference
json_schema = ref_json_schema.copy() # copy to prevent recursive dict reference
json_ref_counts[ref] -= 1
ref = cast(JsonRef, json_schema.get('$ref'))
self._garbage_collect_definitions(json_schema)
definitions_remapping = self._build_definitions_remapping()
if self.definitions:
json_schema['$defs'] = self.definitions
json_schema = definitions_remapping.remap_json_schema(json_schema)
# For now, we will not set the $schema key. However, if desired, this can be easily added by overriding
# this method and adding the following line after a call to super().generate(schema):
# json_schema['$schema'] = self.schema_dialect
self._used = True
return _sort_json_schema(json_schema)
def generate_inner(self, schema: CoreSchemaOrField) -> JsonSchemaValue: # noqa: C901
"""Generates a JSON schema for a given core schema.
Args:
schema: The given core schema.
Returns:
The generated JSON schema.
"""
# If a schema with the same CoreRef has been handled, just return a reference to it
# Note that this assumes that it will _never_ be the case that the same CoreRef is used
# on types that should have different JSON schemas
if 'ref' in schema:
core_ref = CoreRef(schema['ref']) # type: ignore[typeddict-item]
core_mode_ref = (core_ref, self.mode)
if core_mode_ref in self.core_to_defs_refs and self.core_to_defs_refs[core_mode_ref] in self.definitions:
return {'$ref': self.core_to_json_refs[core_mode_ref]}
# Generate the JSON schema, accounting for the json_schema_override and core_schema_override
metadata_handler = _core_metadata.CoreMetadataHandler(schema)
def populate_defs(core_schema: CoreSchema, json_schema: JsonSchemaValue) -> JsonSchemaValue:
if 'ref' in core_schema:
core_ref = CoreRef(core_schema['ref']) # type: ignore[typeddict-item]
defs_ref, ref_json_schema = self.get_cache_defs_ref_schema(core_ref)
json_ref = JsonRef(ref_json_schema['$ref'])
self.json_to_defs_refs[json_ref] = defs_ref
# Replace the schema if it's not a reference to itself
# What we want to avoid is having the def be just a ref to itself
# which is what would happen if we blindly assigned any
if json_schema.get('$ref', None) != json_ref:
self.definitions[defs_ref] = json_schema
self._core_defs_invalid_for_json_schema.pop(defs_ref, None)
json_schema = ref_json_schema
return json_schema
def convert_to_all_of(json_schema: JsonSchemaValue) -> JsonSchemaValue:
if '$ref' in json_schema and len(json_schema.keys()) > 1:
# technically you can't have any other keys next to a "$ref"
# but it's an easy mistake to make and not hard to correct automatically here
json_schema = json_schema.copy()
ref = json_schema.pop('$ref')
json_schema = {'allOf': [{'$ref': ref}], **json_schema}
return json_schema
def handler_func(schema_or_field: CoreSchemaOrField) -> JsonSchemaValue:
"""Generate a JSON schema based on the input schema.
Args:
schema_or_field: The core schema to generate a JSON schema from.
Returns:
The generated JSON schema.
Raises:
TypeError: If an unexpected schema type is encountered.
"""
# Generate the core-schema-type-specific bits of the schema generation:
json_schema: JsonSchemaValue | None = None
if self.mode == 'serialization' and 'serialization' in schema_or_field:
ser_schema = schema_or_field['serialization'] # type: ignore
json_schema = self.ser_schema(ser_schema)
if json_schema is None:
if _core_utils.is_core_schema(schema_or_field) or _core_utils.is_core_schema_field(schema_or_field):
generate_for_schema_type = self._schema_type_to_method[schema_or_field['type']]
json_schema = generate_for_schema_type(schema_or_field)
else:
raise TypeError(f'Unexpected schema type: schema={schema_or_field}')
if _core_utils.is_core_schema(schema_or_field):
json_schema = populate_defs(schema_or_field, json_schema)
json_schema = convert_to_all_of(json_schema)
return json_schema
current_handler = _schema_generation_shared.GenerateJsonSchemaHandler(self, handler_func)
for js_modify_function in metadata_handler.metadata.get('pydantic_js_functions', ()):
def new_handler_func(
schema_or_field: CoreSchemaOrField,
current_handler: GetJsonSchemaHandler = current_handler,
js_modify_function: GetJsonSchemaFunction = js_modify_function,
) -> JsonSchemaValue:
json_schema = js_modify_function(schema_or_field, current_handler)
if _core_utils.is_core_schema(schema_or_field):
json_schema = populate_defs(schema_or_field, json_schema)
original_schema = current_handler.resolve_ref_schema(json_schema)
ref = json_schema.pop('$ref', None)
if ref and json_schema:
original_schema.update(json_schema)
return original_schema
current_handler = _schema_generation_shared.GenerateJsonSchemaHandler(self, new_handler_func)
for js_modify_function in metadata_handler.metadata.get('pydantic_js_annotation_functions', ()):
def new_handler_func(
schema_or_field: CoreSchemaOrField,
current_handler: GetJsonSchemaHandler = current_handler,
js_modify_function: GetJsonSchemaFunction = js_modify_function,
) -> JsonSchemaValue:
json_schema = js_modify_function(schema_or_field, current_handler)
if _core_utils.is_core_schema(schema_or_field):
json_schema = populate_defs(schema_or_field, json_schema)
json_schema = convert_to_all_of(json_schema)
return json_schema
current_handler = _schema_generation_shared.GenerateJsonSchemaHandler(self, new_handler_func)
json_schema = current_handler(schema)
if _core_utils.is_core_schema(schema):
json_schema = populate_defs(schema, json_schema)
json_schema = convert_to_all_of(json_schema)
return json_schema
# ### Schema generation methods
def any_schema(self, schema: core_schema.AnySchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches any value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
return {}
def none_schema(self, schema: core_schema.NoneSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches `None`.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
return {'type': 'null'}
def bool_schema(self, schema: core_schema.BoolSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a bool value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
return {'type': 'boolean'}
def int_schema(self, schema: core_schema.IntSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches an int value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
json_schema: dict[str, Any] = {'type': 'integer'}
self.update_with_validations(json_schema, schema, self.ValidationsMapping.numeric)
json_schema = {k: v for k, v in json_schema.items() if v not in {math.inf, -math.inf}}
return json_schema
def float_schema(self, schema: core_schema.FloatSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a float value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
json_schema: dict[str, Any] = {'type': 'number'}
self.update_with_validations(json_schema, schema, self.ValidationsMapping.numeric)
json_schema = {k: v for k, v in json_schema.items() if v not in {math.inf, -math.inf}}
return json_schema
def decimal_schema(self, schema: core_schema.DecimalSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a decimal value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
json_schema = self.str_schema(core_schema.str_schema())
if self.mode == 'validation':
multiple_of = schema.get('multiple_of')
le = schema.get('le')
ge = schema.get('ge')
lt = schema.get('lt')
gt = schema.get('gt')
json_schema = {
'anyOf': [
self.float_schema(
core_schema.float_schema(
allow_inf_nan=schema.get('allow_inf_nan'),
multiple_of=None if multiple_of is None else float(multiple_of),
le=None if le is None else float(le),
ge=None if ge is None else float(ge),
lt=None if lt is None else float(lt),
gt=None if gt is None else float(gt),
)
),
json_schema,
],
}
return json_schema
def str_schema(self, schema: core_schema.StringSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a string value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
json_schema = {'type': 'string'}
self.update_with_validations(json_schema, schema, self.ValidationsMapping.string)
return json_schema
def bytes_schema(self, schema: core_schema.BytesSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a bytes value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
json_schema = {'type': 'string', 'format': 'base64url' if self._config.ser_json_bytes == 'base64' else 'binary'}
self.update_with_validations(json_schema, schema, self.ValidationsMapping.bytes)
return json_schema
def date_schema(self, schema: core_schema.DateSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a date value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
json_schema = {'type': 'string', 'format': 'date'}
self.update_with_validations(json_schema, schema, self.ValidationsMapping.date)
return json_schema
def time_schema(self, schema: core_schema.TimeSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a time value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
return {'type': 'string', 'format': 'time'}
def datetime_schema(self, schema: core_schema.DatetimeSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a datetime value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
return {'type': 'string', 'format': 'date-time'}
def timedelta_schema(self, schema: core_schema.TimedeltaSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a timedelta value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
if self._config.ser_json_timedelta == 'float':
return {'type': 'number'}
return {'type': 'string', 'format': 'duration'}
def literal_schema(self, schema: core_schema.LiteralSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a literal value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
expected = [v.value if isinstance(v, Enum) else v for v in schema['expected']]
# jsonify the expected values
expected = [to_jsonable_python(v) for v in expected]
result: dict[str, Any] = {'enum': expected}
if len(expected) == 1:
result['const'] = expected[0]
types = {type(e) for e in expected}
if types == {str}:
result['type'] = 'string'
elif types == {int}:
result['type'] = 'integer'
elif types == {float}:
result['type'] = 'numeric'
elif types == {bool}:
result['type'] = 'boolean'
elif types == {list}:
result['type'] = 'array'
return result
def enum_schema(self, schema: core_schema.EnumSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches an Enum value.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
enum_type = schema['cls']
description = None if not enum_type.__doc__ else inspect.cleandoc(enum_type.__doc__)
if (
description == 'An enumeration.'
): # This is the default value provided by enum.EnumMeta.__new__; don't use it
description = None
result: dict[str, Any] = {'title': enum_type.__name__, 'description': description}
result = {k: v for k, v in result.items() if v is not None}
expected = [to_jsonable_python(v.value) for v in schema['members']]
result['enum'] = expected
if len(expected) == 1:
result['const'] = expected[0]
types = {type(e) for e in expected}
if isinstance(enum_type, str) or types == {str}:
result['type'] = 'string'
elif isinstance(enum_type, int) or types == {int}:
result['type'] = 'integer'
elif isinstance(enum_type, float) or types == {float}:
result['type'] = 'numeric'
elif types == {bool}:
result['type'] = 'boolean'
elif types == {list}:
result['type'] = 'array'
return result
def is_instance_schema(self, schema: core_schema.IsInstanceSchema) -> JsonSchemaValue:
"""Handles JSON schema generation for a core schema that checks if a value is an instance of a class.
Unless overridden in a subclass, this raises an error.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
return self.handle_invalid_for_json_schema(schema, f'core_schema.IsInstanceSchema ({schema["cls"]})')
def is_subclass_schema(self, schema: core_schema.IsSubclassSchema) -> JsonSchemaValue:
"""Handles JSON schema generation for a core schema that checks if a value is a subclass of a class.
For backwards compatibility with v1, this does not raise an error, but can be overridden to change this.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
# Note: This is for compatibility with V1; you can override if you want different behavior.
return {}
def callable_schema(self, schema: core_schema.CallableSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a callable value.
Unless overridden in a subclass, this raises an error.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
return self.handle_invalid_for_json_schema(schema, 'core_schema.CallableSchema')
def list_schema(self, schema: core_schema.ListSchema) -> JsonSchemaValue:
"""Returns a schema that matches a list schema.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
items_schema = {} if 'items_schema' not in schema else self.generate_inner(schema['items_schema'])
json_schema = {'type': 'array', 'items': items_schema}
self.update_with_validations(json_schema, schema, self.ValidationsMapping.array)
return json_schema
@deprecated('`tuple_positional_schema` is deprecated. Use `tuple_schema` instead.', category=None)
@final
def tuple_positional_schema(self, schema: core_schema.TupleSchema) -> JsonSchemaValue:
"""Replaced by `tuple_schema`."""
warnings.warn(
'`tuple_positional_schema` is deprecated. Use `tuple_schema` instead.',
PydanticDeprecatedSince26,
stacklevel=2,
)
return self.tuple_schema(schema)
@deprecated('`tuple_variable_schema` is deprecated. Use `tuple_schema` instead.', category=None)
@final
def tuple_variable_schema(self, schema: core_schema.TupleSchema) -> JsonSchemaValue:
"""Replaced by `tuple_schema`."""
warnings.warn(
'`tuple_variable_schema` is deprecated. Use `tuple_schema` instead.',
PydanticDeprecatedSince26,
stacklevel=2,
)
return self.tuple_schema(schema)
def tuple_schema(self, schema: core_schema.TupleSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a tuple schema e.g. `Tuple[int,
str, bool]` or `Tuple[int, ...]`.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
json_schema: JsonSchemaValue = {'type': 'array'}
if 'variadic_item_index' in schema:
variadic_item_index = schema['variadic_item_index']
if variadic_item_index > 0:
json_schema['minItems'] = variadic_item_index
json_schema['prefixItems'] = [
self.generate_inner(item) for item in schema['items_schema'][:variadic_item_index]
]
if variadic_item_index + 1 == len(schema['items_schema']):
# if the variadic item is the last item, then represent it faithfully
json_schema['items'] = self.generate_inner(schema['items_schema'][variadic_item_index])
else:
# otherwise, 'items' represents the schema for the variadic
# item plus the suffix, so just allow anything for simplicity
# for now
json_schema['items'] = True
else:
prefixItems = [self.generate_inner(item) for item in schema['items_schema']]
if prefixItems:
json_schema['prefixItems'] = prefixItems
json_schema['minItems'] = len(prefixItems)
json_schema['maxItems'] = len(prefixItems)
self.update_with_validations(json_schema, schema, self.ValidationsMapping.array)
return json_schema
def set_schema(self, schema: core_schema.SetSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a set schema.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
return self._common_set_schema(schema)
def frozenset_schema(self, schema: core_schema.FrozenSetSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a frozenset schema.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
return self._common_set_schema(schema)
def _common_set_schema(self, schema: core_schema.SetSchema | core_schema.FrozenSetSchema) -> JsonSchemaValue:
items_schema = {} if 'items_schema' not in schema else self.generate_inner(schema['items_schema'])
json_schema = {'type': 'array', 'uniqueItems': True, 'items': items_schema}
self.update_with_validations(json_schema, schema, self.ValidationsMapping.array)
return json_schema
def generator_schema(self, schema: core_schema.GeneratorSchema) -> JsonSchemaValue:
"""Returns a JSON schema that represents the provided GeneratorSchema.
Args:
schema: The schema.
Returns:
The generated JSON schema.
"""
items_schema = {} if 'items_schema' not in schema else self.generate_inner(schema['items_schema'])
json_schema = {'type': 'array', 'items': items_schema}
self.update_with_validations(json_schema, schema, self.ValidationsMapping.array)
return json_schema
def dict_schema(self, schema: core_schema.DictSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a dict schema.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
json_schema: JsonSchemaValue = {'type': 'object'}
keys_schema = self.generate_inner(schema['keys_schema']).copy() if 'keys_schema' in schema else {}
keys_pattern = keys_schema.pop('pattern', None)
values_schema = self.generate_inner(schema['values_schema']).copy() if 'values_schema' in schema else {}
values_schema.pop('title', None) # don't give a title to the additionalProperties
if values_schema or keys_pattern is not None: # don't add additionalProperties if it's empty
if keys_pattern is None:
json_schema['additionalProperties'] = values_schema
else:
json_schema['patternProperties'] = {keys_pattern: values_schema}
self.update_with_validations(json_schema, schema, self.ValidationsMapping.object)
return json_schema
def _function_schema(
self,
schema: _core_utils.AnyFunctionSchema,
) -> JsonSchemaValue:
if _core_utils.is_function_with_inner_schema(schema):
# This could be wrong if the function's mode is 'before', but in practice will often be right, and when it
# isn't, I think it would be hard to automatically infer what the desired schema should be.
return self.generate_inner(schema['schema'])
# function-plain
return self.handle_invalid_for_json_schema(
schema, f'core_schema.PlainValidatorFunctionSchema ({schema["function"]})'
)
def function_before_schema(self, schema: core_schema.BeforeValidatorFunctionSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a function-before schema.
Args:
schema: The core schema.
Returns:
The generated JSON schema.
"""
return self._function_schema(schema)
def function_after_schema(self, schema: core_schema.AfterValidatorFunctionSchema) -> JsonSchemaValue:
"""Generates a JSON schema that matches a function-after schema.
Args:
schema: The core schema.
Returns:
The generated JSON schema.