-
Notifications
You must be signed in to change notification settings - Fork 70
/
base.py
208 lines (173 loc) · 6.59 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""Import model."""
from importlib import import_module
from logging import getLogger
from os import path
from packaging import version
from numpy import log
import pastas as ps
from pandas import to_numeric
# Type Hinting
from pastas.typing import Model
logger = getLogger(__name__)
def load(fname: str, **kwargs) -> Model:
"""Method to load a Pastas Model from file.
Parameters
----------
fname: str
string with the name of the file to be imported including the file
extension.
kwargs:
extension specific keyword arguments
Returns
-------
ml: pastas.model.Model
Pastas Model instance.
Examples
--------
>>> import pastas as ps
>>> ml = ps.io.load("model.pas")
"""
if not path.exists(fname):
logger.error("File not found: %s", fname)
# Dynamic import of the export module
load_mod = import_module(f"pastas.io{path.splitext(fname)[1]}")
# Get dicts for all data sources
data = load_mod.load(fname, **kwargs)
ml = _load_model(data)
logger.info(
"Pastas Model from file %s successfully loaded. This file "
"was created with Pastas %s. Your current version of Pastas "
"is: %s",
fname,
data["file_info"]["pastas_version"],
ps.__version__,
)
return ml
def _load_model(data: dict) -> Model:
"""Internal method to create a model from a dictionary."""
# Create model
oseries = ps.TimeSeries(**data["oseries"])
if "constant" in data.keys():
constant = data["constant"]
else:
constant = False
if "metadata" in data.keys():
metadata = data["metadata"]
else:
metadata = None
if "name" in data.keys():
name = data["name"]
else:
name = None
if "noisemodel" in data.keys():
noise = True
else:
noise = False
ml = ps.Model(
oseries, constant=constant, noisemodel=noise, name=name, metadata=metadata
)
if "settings" in data.keys():
ml.settings.update(data["settings"])
if "file_info" in data.keys():
ml.file_info.update(data["file_info"])
# Add stressmodels
for name, ts in data["stressmodels"].items():
# Deal with old StressModel2 files for version 0.22.0. Remove in 0.23.0.
if ts["stressmodel"] == "StressModel2":
logger.warning(
"StressModel2 is removed since Pastas 0.22.0 and "
"is replaced by the RechargeModel using a Linear "
"recharge model. Make sure to save this file "
"again using Pastas version 0.22.0 as this file "
"cannot be loaded in newer Pastas versions. This "
"will automatically update your model to the newer "
"RechargeModel stress model."
)
ts["stressmodel"] = "RechargeModel"
ts["recharge"] = "Linear"
ts["prec"] = ts["stress"][0]
ts["evap"] = ts["stress"][1]
ts.pop("stress")
ts.pop("up")
# Deal with old parameter value b in HantushWellModel: b_new = np.log(b_old)
if ((ts["stressmodel"] == "WellModel") and
(version.parse(data["file_info"]["pastas_version"]) <
version.parse("0.22.0"))):
logger.warning("The value of parameter 'b' in HantushWellModel"
"was modified in 0.22.0: b_new = log(b_old). The value of "
"'b' is automatically updated on load.")
wnam = ts["name"]
for pcol in ["initial", "optimal", "pmin", "pmax"]:
if data["parameters"].loc[wnam + "_b", pcol] > 0:
data["parameters"].loc[wnam + "_b", pcol] = \
log(data["parameters"].loc[wnam + "_b", pcol])
stressmodel = getattr(ps.stressmodels, ts["stressmodel"])
ts.pop("stressmodel")
if "rfunc" in ts.keys():
rfunc_kwargs = {}
if "rfunc_kwargs" in ts:
rfunc_kwargs = ts.pop("rfunc_kwargs")
ts["rfunc"] = getattr(ps.rfunc, ts["rfunc"])(**rfunc_kwargs)
if "recharge" in ts.keys():
recharge_kwargs = {}
if "recharge_kwargs" in ts:
recharge_kwargs = ts.pop("recharge_kwargs")
ts["recharge"] = getattr(
ps.recharge, ts["recharge"])(**recharge_kwargs)
if "stress" in ts.keys():
for i, stress in enumerate(ts["stress"]):
ts["stress"][i] = ps.TimeSeries(**stress)
if "prec" in ts.keys():
ts["prec"] = ps.TimeSeries(**ts["prec"])
if "evap" in ts.keys():
ts["evap"] = ps.TimeSeries(**ts["evap"])
if "temp" in ts.keys() and ts["temp"] is not None:
ts["temp"] = ps.TimeSeries(**ts["temp"])
stressmodel = stressmodel(**ts)
ml.add_stressmodel(stressmodel)
# Add transform
if "transform" in data.keys():
transform = getattr(ps.transform, data["transform"]["transform"])
data["transform"].pop("transform")
transform = transform(**data["transform"])
ml.add_transform(transform)
# Add noisemodel if present
if "noisemodel" in data.keys():
n = getattr(ps.noisemodels, data["noisemodel"]["type"])()
ml.add_noisemodel(n)
# Add fit object to the model
if "fit" in data.keys():
fit = getattr(ps.solver, data["fit"]["name"])
data["fit"].pop("name")
ml.fit = fit(ml=ml, **data["fit"])
# Add parameters, use update to maintain correct order
ml.parameters = ml.get_init_parameters(noise=ml.settings["noise"])
ml.parameters.update(data["parameters"])
ml.parameters = ml.parameters.apply(to_numeric, errors="ignore")
# When initial values changed
for param, value in ml.parameters.loc[:, "initial"].items():
ml.set_parameter(name=param, initial=value)
return ml
def dump(fname: str, data: dict, **kwargs):
"""Method to save a pastas-model to a file.
Parameters
----------
fname: str
string with the name of the file, including a supported
file-extension. Currently supported extension are: .pas.
data: dict
dictionary with the information to store.
kwargs:
extension specific keyword arguments can be provided using kwargs.
Returns
-------
message:
Message if the file-saving was successful.
Notes
-----
The specific dump-module is automatically chosen based on the provided
file extension.
"""
ext = path.splitext(fname)[1]
dump_mod = import_module("pastas.io" + ext)
return dump_mod.dump(fname, data, **kwargs)