-
Notifications
You must be signed in to change notification settings - Fork 3
/
RxOntologyLookup.py
314 lines (246 loc) · 11.5 KB
/
RxOntologyLookup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""RxOntologyLookup.py
Lookup the Rxcui found by JsonBasedReader.
Uses the RxNav, RxClass API to search for the ingredients of a Rxcui. If the Rxcui can not be found (expired?), searches
will be run on the original extract text corresponding to the Rxcui to try to find the ingredients.
ATC levels 1-4 are searched for using the public API. Level 5 (==drug) was not found and not included.
Disclaimer: This product uses publicly available data from the U.S. National Library of Medicine (NLM),
National Institutes of Health, Department of Health and Human Services; NLM is not responsible for the product
and does not endorse or recommend this or any other product
"""
from re import split
import requests
import xml.etree.ElementTree
import time
from MLDataProcessing import save_to_json, load_dict_json, load_dict_pickle, pickle_something
from collections import defaultdict
import operator
from pathlib import Path
ingredients_name = {}
cache_cui_to_ingredients = {}
cache_cui_to_atc = {}
manual_ingredient_entries = {}
rxnorm_blank_search_results = []
def main(working_dir, find_ingreds = True, find_ATC = True, output_ATC_count=False):
print("Starting RxNorm code lookup")
working_dir = Path(working_dir)
print("Loading Data from: " + str(working_dir))
global cache_cui_to_ingredients, cache_cui_to_atc, manual_ingredient_entries
rxnorm_savefile = working_dir / 'data' / 'rxcui_found.json'
save_atc_file = working_dir / 'data' / 'rxcui_atc.json'
ingredient_dict_file = working_dir / 'data' / 'rxcui_ingredient.json'
manual_ingredient_entries_file = working_dir / 'data' / 'rxcui_ingred_manual_entries.json'
ingredients_name_file = working_dir / 'data' / "rxcui_ingredient_names.json"
rxcui_name_file = working_dir / 'data' / "rxcui_names.json"
cache_cui_to_ingredients = load_dict_json(ingredient_dict_file, create_local_if_not_found=True)
cache_cui_to_atc = load_dict_json(save_atc_file, create_local_if_not_found=True)
manual_ingredient_entries = load_dict_json(manual_ingredient_entries_file, create_local_if_not_found=True)
if not manual_ingredient_entries:
print("Manual entries for expired etc. Rxcui can be added at:\n" + str(manual_ingredient_entries_file))
ingredients_name = load_dict_json(ingredients_name_file, create_local_if_not_found=True)
#rxcui_name = load_dict_json(rxcui_name_file, create_local_if_not_found=True)
rxcui_name= {}
# load rxcui to search for
rxcui_to_lookup = load_dict_json(rxnorm_savefile, create_local_if_not_found=False)
rxcui_to_atc = {}
rxcui_to_ingredients = {}
# count = 0
# for rxcui in rxcui_to_lookup.keys():
# count += 1
# rxcui_name[rxcui] = query_rxnorm_name(rxcui)
# if count % 100 == 0 or count == len(rxcui_to_lookup):
# print("Working:", count)
# save_to_json(rxcui_name, rxcui_name_file, indent=4)
#
if find_ingreds:
count = 0
for rxcui in rxcui_to_lookup.keys():
rxcui_to_ingredients[rxcui] = get_rxnorm_ingredients(rxcui)
count += 1
if count % 100 == 0 or count ==len(rxcui_to_lookup):
save_to_json(rxcui_to_ingredients, ingredient_dict_file, indent=4)
save_to_json(ingredients_name, ingredients_name_file, indent=4)
print("Stage 1/4: Ingredients Lookup: ", count, "/", len(rxcui_to_lookup), " entries processed.")
# try to look up missing entries because sometimes rxcui get retired
for rxcui, ingredients in rxcui_to_ingredients.items():
if ingredients:
continue
if rxcui in rxcui_to_lookup:
original_text_from_FHIR = str(rxcui_to_lookup[rxcui])
else:
print("Can not find rxcui: ", rxcui)
continue
search_results = get_rxnorm_ingredients_using_multisearch(original_text_from_FHIR)
search_results = list(set(search_results))
if search_results:
rxcui_to_ingredients[rxcui] = search_results
print("Rxcui term found: ", rxcui, original_text_from_FHIR, search_results)
else:
print("Rxcui term could NOT be found:", original_text_from_FHIR)
save_to_json(rxcui_to_ingredients, ingredient_dict_file, indent=4)
save_to_json(ingredients_name, ingredients_name_file, indent=4)
# try to find ingredients from original ingredients
for rxcui, ingredients in rxcui_to_ingredients.items():
new_ingredients = []
for ingredient in ingredients:
new_ingredients = new_ingredients + get_rxnorm_ingredients(ingredient)
rxcui_to_ingredients[rxcui] = list(set(ingredients + new_ingredients))
#find ATC codes
if find_ATC:
count = 0
for code, ingredients in rxcui_to_ingredients.items():
ATC = get_rxnorm_ATC(code)
if not ATC:
for ingredient in ingredients:
ATC = ATC + get_rxnorm_ATC(ingredient)
if ATC:
rxcui_to_atc[code] = list(set(ATC))
else:
print("ATC could not be found for ", code, '(%d/%d)' % (count, len(rxcui_to_ingredients)))
count += 1
if count % 100 == 0 or count ==len(rxcui_to_ingredients):
save_to_json(rxcui_to_atc, save_atc_file, indent=4)
print("Stage 4/4: ATC: ", count, "/", len(rxcui_to_ingredients), " entries processed.")
#save data
if find_ingreds:
save_to_json(rxcui_to_ingredients, ingredient_dict_file,indent=4, print_save_loc=True)
save_to_json(ingredients_name, ingredients_name_file, indent=4, print_save_loc=True)
if find_ATC:
save_to_json(rxcui_to_atc, save_atc_file, indent=4, print_save_loc=True)
count = 0
new_count = 0
keys_to_del = []
if output_ATC_count:
ATC_struct = defaultdict(int)
for key, ATCs in rxcui_to_atc.items():
if not ATCs:
continue
for ATC in ATCs:
for subcode in [ATC[0:1], ATC[0:3], ATC[0:4], ATC[0:5]]:
ATC_struct[subcode] += 1
ATC_struct = sorted(ATC_struct.items(), key=operator.itemgetter(1), reverse=True)
print("ATC count:")
print(ATC_struct)
def is_int(string_:str):
try:
int(string_)
return True
except ValueError:
return False
#attempts various searches to find the rxnorm ingredients using different iterations on the original extracted text
def get_rxnorm_ingredients_using_multisearch(term_to_search:str):
results = []
# try simple search with whole string
simple_result = get_rxnorm_ingredients_using_search(term_to_search)
if simple_result:
return simple_result
pos = term_to_search.rfind(':')
if pos > 0:
new_search_term = term_to_search[pos + 1:]
results = get_rxnorm_ingredients_using_search(new_search_term)
if results:
return results
# try filtering words:
sep = split('\W+', term_to_search)
longer_words = [word for word in sep if len(word) >= 3 and not is_int(word)]
combined = ' '.join(longer_words)
if not combined: # no valid words of sufficient length => give up
return []
results = get_rxnorm_ingredients_using_search(combined)
if results:
return results
#try individual words
for word in longer_words:
results = results + get_rxnorm_ingredients_using_search(word)
return results
def get_rxnorm_ingredients_using_search(term_to_search:str):
global rxnorm_blank_search_results
if term_to_search in rxnorm_blank_search_results:
return []
results = []
search_results = list(query_rxnorm_ingredients_using_search(term_to_search))
set_of_done = set()
if search_results:
for one_result in search_results:
code = one_result[0]
if code in set_of_done:
continue
set_of_done.add(code)
results = results + get_rxnorm_ingredients(str(code))
if not results:
rxnorm_blank_search_results.append(term_to_search)
return results
def query_rxnorm_ingredients_using_search(term):
time.sleep(0.05)
base_uri = 'http://rxnav.nlm.nih.gov/REST'
url = '{base_uri}/approximateTerm?term={term}&maxEntries=4'.format(base_uri=base_uri, term=term)
response = requests.get(url)
tree = xml.etree.ElementTree.fromstring(response.text)
xml_ingredients = tree.findall("./approximateGroup/candidate")
for xml_ingredient in xml_ingredients:
yield tuple(xml_ingredient.findtext(tag) for tag in ['rxcui', 'score', 'rank'])
def query_rxnorm_name(rxcui):
time.sleep(0.05) #Limit API requests to max of 20/s
base_uri = 'http://rxnav.nlm.nih.gov/REST'
url = '{base_uri}/rxcui/{rxcui}/'.format(base_uri = base_uri, rxcui = rxcui)
response = requests.get(url)
tree = xml.etree.ElementTree.fromstring(response.text)
for name_ in tree.findall("./idGroup/name"):
return name_.text
return ''
def get_rxnorm_ingredients(rxcui):
global cache_cui_to_ingredients, ingredients_name, manual_ingredient_entries
if rxcui in cache_cui_to_ingredients:
return cache_cui_to_ingredients[rxcui]
if rxcui in manual_ingredient_entries:
return manual_ingredient_entries[rxcui]
ingredients = list(query_rxnorm_ingredients(rxcui))
ingredients_list = []
if ingredients:
for x in ingredients:
ingredients_list.append(x[0])
ingredients_name[x[0]] = x[1]
cache_cui_to_ingredients[rxcui] = ingredients_list
return ingredients_list
def query_rxnorm_ingredients(rxcui):
time.sleep(0.05) #Limit API requests to max of 20/s
base_uri = 'http://rxnav.nlm.nih.gov/REST'
url = '{base_uri}/rxcui/{rxcui}/related?tty=IN'.format(base_uri = base_uri, rxcui = rxcui)
response = requests.get(url)
tree = xml.etree.ElementTree.fromstring(response.text)
xml_ingredients = tree.findall("./allRelatedGroup/conceptGroup[tty='IN']/conceptProperties")
for xml_ingredient in xml_ingredients:
assert xml_ingredient.findtext('tty') == 'IN'
yield tuple(xml_ingredient.findtext(tag) for tag in ['rxcui', 'name', 'umlscui'])
xml_ingredients = tree.findall("./relatedGroup/conceptGroup[tty='IN']/conceptProperties")
for xml_ingredient in xml_ingredients:
assert xml_ingredient.findtext('tty') == 'IN'
yield tuple(xml_ingredient.findtext(tag) for tag in ['rxcui', 'name', 'umlscui'])
def get_rxnorm_ATC(rxcui):
global cache_cui_to_atc
if rxcui in cache_cui_to_atc:
return cache_cui_to_atc[rxcui]
ATCs = list(query_rxnorm_ATC(rxcui))
atc_list = []
if ATCs:
for atc in ATCs:
atc_list.append(atc[0])
cache_cui_to_atc[rxcui] = atc_list
return atc_list
def query_rxnorm_ATC(rxcui):
time.sleep(0.05) #limit requests under 20 requests/s
base_uri = 'https://rxnav.nlm.nih.gov/REST'
url = '{base_uri}/rxclass/class/byRxcui?rxcui={rxcui}&relaSource=ATC'.format(base_uri = base_uri, rxcui = rxcui)
response = requests.get(url)
tree = xml.etree.ElementTree.fromstring(response.text)
xml_ATC = tree.findall("./rxclassDrugInfoList/rxclassDrugInfo/rxclassMinConceptItem[classType='ATC1-4']")
for ATC in xml_ATC:
assert ATC.findtext('classType') == 'ATC1-4'
yield tuple(ATC.findtext(tag) for tag in ['classId', 'className', 'classType'])
if __name__ == '__main__':
import sys
if(len(sys.argv) > 1):
main(sys.argv[1])
else:
main(input("Please enter working directory: "))