forked from scrapinghub/dateparser
/
__init__.py
238 lines (179 loc) · 6.61 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import calendar
import logging
import types
import unicodedata
from datetime import datetime
import regex as re
from tzlocal import get_localzone
from pytz import UTC, timezone, UnknownTimeZoneError
from collections import OrderedDict
from dateparser.timezone_parser import _tz_offsets, StaticTzInfo
def strip_braces(date_string):
return re.sub(r'[{}()<>\[\]]+', '', date_string)
def normalize_unicode(string, form='NFKD'):
return ''.join(
c for c in unicodedata.normalize(form, string)
if unicodedata.category(c) != 'Mn'
)
def combine_dicts(primary_dict, supplementary_dict):
combined_dict = OrderedDict()
for key, value in primary_dict.items():
if key in supplementary_dict:
if isinstance(value, list):
combined_dict[key] = value + supplementary_dict[key]
elif isinstance(value, dict):
combined_dict[key] = combine_dicts(value, supplementary_dict[key])
else:
combined_dict[key] = supplementary_dict[key]
else:
combined_dict[key] = primary_dict[key]
remaining_keys = [key for key in supplementary_dict.keys() if key not in primary_dict.keys()]
for key in remaining_keys:
combined_dict[key] = supplementary_dict[key]
return combined_dict
def find_date_separator(format):
m = re.search(r'(?:(?:%[dbBmaA])(\W))+', format)
if m:
return m.group(1)
def _get_missing_parts(fmt):
"""
Return a list containing missing parts (day, month, year)
from a date format checking its directives
"""
directive_mapping = {
'day': ['%d', '%-d', '%j', '%-j'],
'month': ['%b', '%B', '%m', '%-m'],
'year': ['%y', '%-y', '%Y']
}
missing = [
field for field in ('day', 'month', 'year')
if not any(directive in fmt for directive in directive_mapping[field])
]
return missing
def get_timezone_from_tz_string(tz_string):
try:
return timezone(tz_string)
except UnknownTimeZoneError as e:
for name, info in _tz_offsets:
if info['regex'].search(' %s' % tz_string):
return StaticTzInfo(name, info['offset'])
else:
raise e
def localize_timezone(date_time, tz_string):
if date_time.tzinfo:
return date_time
tz = get_timezone_from_tz_string(tz_string)
if hasattr(tz, 'localize'):
date_time = tz.localize(date_time)
else:
date_time = date_time.replace(tzinfo=tz)
return date_time
def apply_tzdatabase_timezone(date_time, pytz_string):
usr_timezone = timezone(pytz_string)
if date_time.tzinfo != usr_timezone:
date_time = date_time.astimezone(usr_timezone)
return date_time
def apply_dateparser_timezone(utc_datetime, offset_or_timezone_abb):
for name, info in _tz_offsets:
if info['regex'].search(' %s' % offset_or_timezone_abb):
tz = StaticTzInfo(name, info['offset'])
return utc_datetime.astimezone(tz)
def apply_timezone(date_time, tz_string):
if not date_time.tzinfo:
if hasattr(UTC, 'localize'):
date_time = UTC.localize(date_time)
else:
date_time = date_time.replace(tzinfo=UTC)
new_datetime = apply_dateparser_timezone(date_time, tz_string)
if not new_datetime:
new_datetime = apply_tzdatabase_timezone(date_time, tz_string)
return new_datetime
def apply_timezone_from_settings(date_obj, settings):
tz = get_localzone()
if settings is None:
return date_obj
if 'local' in settings.TIMEZONE.lower():
if hasattr(tz, 'localize'):
date_obj = tz.localize(date_obj)
else:
date_obj = date_obj.replace(tzinfo=tz)
else:
date_obj = localize_timezone(date_obj, settings.TIMEZONE)
if settings.TO_TIMEZONE:
date_obj = apply_timezone(date_obj, settings.TO_TIMEZONE)
if settings.RETURN_AS_TIMEZONE_AWARE is not True:
date_obj = date_obj.replace(tzinfo=None)
return date_obj
def get_last_day_of_month(year, month):
return calendar.monthrange(year, month)[1]
def get_previous_leap_year(year):
return _get_leap_year(year, future=False)
def get_next_leap_year(year):
return _get_leap_year(year, future=True)
def _get_leap_year(year, future):
"""
Iterate through previous or next years until it gets a valid leap year
This is performed to avoid missing or including centurial leap years
"""
step = 1 if future else -1
leap_year = year + step
while not calendar.isleap(leap_year):
leap_year += step
return leap_year
def set_correct_day_from_settings(date_obj, settings, current_day=None):
""" Set correct day attending the `PREFER_DAY_OF_MONTH` setting."""
options = {
'first': 1,
'last': get_last_day_of_month(date_obj.year, date_obj.month),
'current': current_day or datetime.now().day
}
try:
return date_obj.replace(day=options[settings.PREFER_DAY_OF_MONTH])
except ValueError:
return date_obj.replace(day=options['last'])
def registry(cls):
def choose(creator):
def constructor(cls, *args, **kwargs):
key = cls.get_key(*args, **kwargs)
if not hasattr(cls, "__registry_dict"):
setattr(cls, "__registry_dict", {})
registry_dict = getattr(cls, "__registry_dict")
if key not in registry_dict:
registry_dict[key] = creator(cls, *args)
setattr(registry_dict[key], 'registry_key', key)
return registry_dict[key]
return staticmethod(constructor)
if not (hasattr(cls, "get_key")
and isinstance(cls.get_key, types.MethodType)
and cls.get_key.__self__ is cls):
raise NotImplementedError("Registry classes require to implement class method get_key")
setattr(cls, '__new__', choose(cls.__new__))
return cls
def get_logger():
setup_logging()
return logging.getLogger('dateparser')
def setup_logging():
if len(logging.root.handlers):
return
config = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'console': {
'format': "%(asctime)s %(levelname)s: [%(name)s] %(message)s",
},
},
'handlers': {
'console': {
'level': logging.DEBUG,
'class': "logging.StreamHandler",
'formatter': "console",
'stream': "ext://sys.stdout",
},
},
'root': {
'level': logging.DEBUG,
'handlers': ["console"],
},
}
logging.config.dictConfig(config)