-
Notifications
You must be signed in to change notification settings - Fork 1
/
scrape.py
272 lines (227 loc) · 8.93 KB
/
scrape.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
import argparse
import json
import re
from argparse import ArgumentParser
from pathlib import Path
from typing import TypeGuard
import requests
from bs4 import BeautifulSoup, NavigableString, PageElement, Tag
from tqdm import tqdm
_ENTRIES_PER_PAGE = 25
def main() -> None:
args = create_parser().parse_args()
result = []
most_recent_saved_page: int | None = None
fp = Path("buddhist_centers.json")
if args.reuse and fp.exists():
result.extend(json.loads(fp.read_text()))
if result:
most_recent_saved_page = result[-1]["page"]
for page in tqdm(range(args.to_page), unit="page"):
if most_recent_saved_page is not None and page + 1 <= most_recent_saved_page:
continue
offset = page * _ENTRIES_PER_PAGE
url = f"http://www.buddhanet.info/wbd/country.php?country_id=2&offset={offset}"
result.extend(scrape_buddhist_centers(url, page_number=page + 1))
output = json.dumps(result, indent=2)
fp.write_text(output)
def create_parser() -> argparse.ArgumentParser:
parser = ArgumentParser()
parser.add_argument(
"--to-page",
type=int,
# At the time of writing, there were only 2650 centers. So conservatively go up to 3000.
default=3000 // _ENTRIES_PER_PAGE,
help="Parse up to and including this page.",
)
parser.add_argument(
"--reuse",
action="store_true",
default=False,
help="If true, reuse results already saved to JSON. Only scrape missing pages.",
)
return parser
def scrape_buddhist_centers(
url: str, *, page_number: int
) -> list[dict[str, str | int]]:
headers = {
# Necessary to avoid a 403.
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0"
}
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")
entry_names = soup.find_all("p", class_="entryName")
entry_details = soup.find_all("p", class_="entryDetail")
return [
extract_center_info(name, details, page_number=page_number)
for name, details in zip(entry_names, entry_details, strict=True)
]
_KNOWN_KEY_NAMES = frozenset(
[
"Abbot",
"Address",
"Affiliation",
"Community Dharma Leader",
"Community Dharma Leaders",
"Contact",
"Contacts",
"Contact person",
"Contact and Teacher",
"Deshi",
"Director",
"Directors",
"E-mail",
"Executive Director",
"Founder",
"Founders",
"Founder Teacher",
"Founder Teachers",
"Group Coordinator",
"Group Coordinators",
"Guiding Teacher",
"Guiding Teachers",
"Lama-in-residence",
"Notes and Events",
"Main Contact",
"Main Contacts",
"Phone",
"Practice Leader",
"Practice Leaders",
"Resident Teacher",
"Resident Teachers",
"Rev.",
"Roshi",
"Senior Facilitator",
"Senior Facilitators",
"Spiritual Advisor",
"Spiritual Advisors",
"Spiritual Director",
"Spiritual Directors",
"Spiritual Director and Teacher",
"Teacher",
"Teachers",
"Tradition",
"Website",
"Venerable",
]
)
def extract_center_info(
name_tag: Tag, details_tag: Tag, *, page_number: int
) -> dict[str, str | int]:
result: dict[str, str | int] = {"name": name_tag.text.strip(), "page": page_number}
# Algorithm: Every key is a strong element in the form `Key:`, followed by a value,
# and ending in a `<br>` or `None` because the details have ended. So, process each
# strong-element one-at-a-time to fill in the details.
for strong_element in details_tag.find_all(["strong"]):
key_and_value_elements = _determine_key_and_value_elements(strong_element)
if not key_and_value_elements:
continue
key, value_elements = key_and_value_elements
val = _normalize_value(value_elements, key=key)
# Sometimes there are duplicate keys; if so, combine.
result[key] = f"{result[key]}, {val}" if key in result else val
# Also check if there is a `<p className="entryDesc">` after, which is used for
# "Notes and Events".
_maybe_add_entry_desc(details_tag, result)
return result
def _determine_key_and_value_elements(
strong_element: Tag,
) -> tuple[str, list[Tag | NavigableString]] | None:
# `Find on:` is broken and not useful.
if strong_element.text == "Find on:":
return None
# Strong elements can be included in the value for a key. We can skip those here because
# they will already be handled by looking at the key's `next_sibling`.
if not any(re.match(rf"{k}\s*:", strong_element.text) for k in _KNOWN_KEY_NAMES):
return None
key, key_value_text = strong_element.text.split(":", maxsplit=1)
key = key.strip()
value_elements: list[Tag | NavigableString] = [NavigableString(key_value_text)]
def is_valid_element(element: PageElement) -> TypeGuard[Tag | NavigableString]:
return isinstance(element, (Tag, NavigableString))
current_value = strong_element.next_sibling
while (
current_value is not None
and is_valid_element(current_value)
and current_value.name != "br"
):
if not is_valid_element(current_value):
raise AssertionError(
f"Unexpected element as sibling to the key `{key}`: "
f"{current_value} (type {type(current_value)})"
)
value_elements.append(current_value)
current_value = current_value.next_sibling
return key, value_elements
def _normalize_value(value_elements: list[Tag | NavigableString], *, key: str) -> str:
text_elements = []
for v in value_elements:
if isinstance(v, Tag) and v.name == "a":
# This shouldn't actually happen, but for some reason `Hawk Mountain Sangha` is failing
# by saying there is no `href`, even though there is!
if "href" not in v:
txt = v.text
else:
href = v["href"]
assert isinstance(href, str)
txt = href.removeprefix("mailto:")
else:
txt = v.text
text_elements.append(txt)
text = " ".join(text_elements).strip()
if key == "Address":
text = _normalize_address(text)
# Remove extra whitespace and `\xa0` characters in the middle of the string.
text = re.sub(r"\s*\xa0\s*|\s{2,}", " ", text)
return text
def _normalize_address(value: str) -> str:
# Remove 'Mailing:' and everything after it. `re.DOTALL` is because there are sometimes
# newlines after the `Mailing:`.
value = re.sub(r"\s*Mailing:.*$", "", value, flags=re.DOTALL)
# Replace `\r\n` and `\n` with `, `.
value = re.sub(r"(\r\n|\n)+", ", ", value)
# Remove 'Physical:' if it's at the beginning of the address
value = re.sub(r"^\s*Physical:\s*", "", value)
# Remove trailing whitespace, '\xa0', and 2-letter state code
value = re.sub(r"\s*\xa0\s*[A-Z]{2}$", "", value)
# Replace whitespace, '\xa0', and a little more whitespace followed by text. This
# sometimes separates the street from the city and state. Replace with `, `.
value = re.sub(r"\s*\xa0\s+(?=\S)", ", ", value)
# Some previous rules can result in occurrences like `,,`. Ensure it's only ever one comma.
value = re.sub(r",+", ",", value)
# Finally, replace multiple blank spaces with only one. Note that this happens at the end
# because the other replacements are more precise.
value = re.sub(r"\s+", " ", value)
return value
def _maybe_add_entry_desc(details_tag: Tag, result: dict[str, str | int]) -> None:
entry_desc = _find_entry_desc(details_tag)
if not entry_desc:
return
if "Notes and Events" not in result:
raise AssertionError(
f"entryDesc found for the center {result['name']}, but there was no "
"`Notes and Events` key."
)
if notes := result["Notes and Events"] != "":
raise AssertionError(
f"entryDesc found for the center {result['name']}, but the 'Notes and Events' key has "
f"its own text already: {notes}"
)
result["Notes and Events"] = "\n\n".join(
tag.get_text(strip=True).replace("\n", "") for tag in entry_desc
)
def _find_entry_desc(details_tag: Tag) -> list[Tag]:
current_sibling = details_tag.next_sibling
result: list[Tag] = []
while current_sibling is not None:
if isinstance(current_sibling, Tag):
if current_sibling.name == "hr" or current_sibling.get("class") == [
"entryName"
]:
return result
elif current_sibling.get_text(strip=True):
result.append(current_sibling)
current_sibling = current_sibling.next_sibling
return result
if __name__ == "__main__":
main()