-
Notifications
You must be signed in to change notification settings - Fork 24
/
schema.py
153 lines (122 loc) · 4.83 KB
/
schema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""
THIS MODULE IS BUNDLED FROM THE bids-specification PACKAGE.
Schema loading- and processing-related functions.
"""
from copy import deepcopy
import logging
import os
from pathlib import Path
from ruamel import yaml
from . import utils
lgr = utils.get_logger()
# Basic settings for output, for now just basic
utils.set_logger_level(lgr, os.environ.get("BIDS_SCHEMA_LOG_LEVEL", logging.INFO))
logging.basicConfig(format="%(asctime)-15s [%(levelname)8s] %(message)s")
def _get_entry_name(path):
if path.suffix == ".yaml":
return path.name[:-5] # no .yaml
else:
return path.name
def dereference_yaml(schema, struct):
"""Recursively search a dictionary-like object for $ref keys.
Each $ref key is replaced with the contents of the referenced field in the overall
dictionary-like object.
"""
if isinstance(struct, dict):
if "$ref" in struct:
ref_field = struct["$ref"]
template = schema[ref_field]
struct.pop("$ref")
# Result is template object with local overrides
struct = {**template, **struct}
struct = {key: dereference_yaml(schema, val) for key, val in struct.items()}
elif isinstance(struct, list):
struct = [dereference_yaml(schema, item) for item in struct]
return struct
def load_schema(schema_path):
"""Load the schema into a dictionary.
This function allows the schema, like BIDS itself, to be specified in
a hierarchy of directories and files.
File names (minus extensions) and directory names become keys
in the associative array (dict) of entries composed from content
of files and entire directories.
Parameters
----------
schema_path : str
Folder containing yaml files or yaml file.
Returns
-------
dict
Schema in dictionary form.
"""
_yaml = yaml.YAML(typ="safe", pure=True)
schema_path = Path(schema_path)
objects_dir = schema_path / "objects/"
rules_dir = schema_path / "rules/"
if not objects_dir.is_dir() or not rules_dir.is_dir():
raise ValueError(
f"Schema path or paths do not exist:\n\t{str(objects_dir)}\n\t{str(rules_dir)}"
)
schema = {}
schema["objects"] = {}
schema["rules"] = {}
# Load object definitions. All are present in single files.
for object_group_file in sorted(objects_dir.glob("*.yaml")):
lgr.debug(f"Loading {object_group_file.stem} objects.")
dict_ = _yaml.load(object_group_file.read_text())
schema["objects"][object_group_file.stem] = dereference_yaml(dict_, dict_)
# Grab single-file rule groups
for rule_group_file in sorted(rules_dir.glob("*.yaml")):
lgr.debug(f"Loading {rule_group_file.stem} rules.")
dict_ = _yaml.load(rule_group_file.read_text())
schema["rules"][rule_group_file.stem] = dereference_yaml(dict_, dict_)
# Load folders of rule subgroups.
for rule_group_file in sorted(rules_dir.glob("*/*.yaml")):
rule = schema["rules"].setdefault(rule_group_file.parent.name, {})
lgr.debug(f"Loading {rule_group_file.stem} rules.")
dict_ = _yaml.load(rule_group_file.read_text())
rule[rule_group_file.stem] = dereference_yaml(dict_, dict_)
return schema
def filter_schema(schema, **kwargs):
"""Filter the schema based on a set of keyword arguments.
Parameters
----------
schema : dict
The schema object, which is a dictionary with nested dictionaries and
lists stored within it.
kwargs : dict
Keyword arguments used to filter the schema.
Example kwargs that may be used include: "suffixes", "datatypes",
"extensions".
Returns
-------
new_schema : dict
The filtered version of the schema.
Notes
-----
This function calls itself recursively, in order to apply filters at
arbitrary depth.
Warning
-------
This function employs a *very* simple filter. It is very limited.
"""
new_schema = deepcopy(schema)
if isinstance(new_schema, dict):
# Reduce values in dict to only requested
for k, v in kwargs.items():
if k in new_schema.keys():
filtered_item = deepcopy(new_schema[k])
if isinstance(filtered_item, dict):
filtered_item = {
k1: v1 for k1, v1 in filtered_item.items() if k1 in v
}
else:
filtered_item = [i for i in filtered_item if i in v]
new_schema[k] = filtered_item
for k2, v2 in new_schema.items():
new_schema[k2] = filter_schema(new_schema[k2], **kwargs)
elif isinstance(new_schema, list):
for i, item in enumerate(new_schema):
if isinstance(item, dict):
new_schema[i] = filter_schema(item, **kwargs)
return new_schema