-
Notifications
You must be signed in to change notification settings - Fork 0
/
kuzin.py
executable file
·276 lines (256 loc) · 12.3 KB
/
kuzin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#!/usr/bin/env python3
import re
from collections import defaultdict
import random
import bs4
import numpy as np
from nltk.stem import lancaster, porter
from scipy.sparse import csr_matrix
from sklearn.metrics import f1_score
from sklearn.naive_bayes import MultinomialNB
import dmitrieva
def read_xml2(infile):
'''насчет soup= см. документацию bs4'''
soup = bs4.BeautifulSoup(infile, "xml")
root_tag = soup.find("reviews") #корневой тэг файла
texts, categories, fileids = [], [], []
#в тэг document входит text? спросить?
document_tags = list(root_tag.find_all("review"))
#if shuffle:
# random.shuffle(document_tags)
for document_tag in document_tags:
fileid = document_tag['id']
text_tag = document_tag.find("text")
cats = [cat for cat in document_tag.find_all('categories')[0].find_all('category')]
category = {cat['name']:cat['sentiment'] for cat in cats }['Food']
if(category == 'absence'):
continue
text = text_tag.string.strip()
texts.append(text)
categories.append(category)
fileids.append(int(fileid))
return texts, categories, fileids
def read_xml(infile):
'''насчет soup= см. документацию bs4'''
soup = bs4.BeautifulSoup(infile, "xml")
root_tag = soup.find("Texts") #корневой тэг файла
texts, categories, fileids = [], [], []
#в тэг document входит text? спросить?
document_tags = list(root_tag.find_all("document"))
#if shuffle:
# random.shuffle(document_tags)
for document_tag in document_tags:
category, fileid = document_tag['category'], document_tag['fileid']
text_tag = document_tag.find("text")
text = text_tag.string.strip()
texts.append(text)
categories.append(category)
fileids.append(int(fileid))
return texts, categories, fileids
def clear_text(text):
#удаляет первый абзац, если он содержит только информацию об отправителя
#то есть writesm wrote или article
lines = [line.strip() for line in text.split("\n")]
has_found = False
for i, line in enumerate(lines):
if line == "":
has_found = True
break
#находим непустую строку?
if has_found and i > 0 and bool(re.search("(article|writes|wrote)$",line[i-1])):
lines = line[i+1:]
return lines
class TextClassifier:
STEMMERS = {'lancaster': lancaster.LancasterStemmer(),
'porter': porter.PorterStemmer()}
def __init__(self, min_count=1, max_count=np.inf,
max_categories_count=np.inf, stemming=None):
self.min_count = min_count
self.max_count = max_count
# максимальное число категорий для слова
self.max_categories_count = max_categories_count
self.stemming = stemming
self.word_codes = dict()
self.dictionary_size = 0
self.categories_encoding = dict()
self.categories = []
self.initialize()
def initialize(self):
if self.stemming is None:
self.stemmer = None
elif self.stemming in self.STEMMERS:
self.stemmer = self.STEMMERS[self.stemming]
else:
raise KeyError("Unknown stemming mode {}".format(self.stemming))
def preprocess(self, texts, categories=None):
#texts --- список списков строк
"""
перекодирует тексты в векторы так же, как это делалось
при определении языка, используя в качестве признаков
отдельные слова в тексте
"""
# будем читать за один проход
# каждый текст кодировать словарём {слово: число вхождений слова в текст}
word_counts = defaultdict(int)
word_counts_in_texts = defaultdict(int)
words_in_texts = []
# словарь для категорий, в которые входили слова
if categories is not None:
words_categories = defaultdict(set)
for i, text in enumerate(texts):
current_text_words = defaultdict(int)
for line in text:
splitted_line = line.strip().lower().split()
for word in splitted_line:
word = word.strip(",.!?:$\"()")
if not word.isalpha() or len(word) == 0:
continue
if self.stemmer is not None:
word = self.stemmer.stem(word)
word_counts[word] += 1
current_text_words[word] += 1
for word in current_text_words:
word_counts_in_texts[word] += 1
words_in_texts.append(current_text_words)
if categories is not None:
for word in current_text_words:
words_categories[word].add(categories[i])
if categories is not None:
# sorted_words_by_counts = sorted(word_counts_in_texts.items(),
# key=(lambda x: x[1]), reverse=True)
# for word, count in sorted_words_by_counts[:100]:
# print(word, count)
# sys.exit()
# считаем для слова, в сколько категорий оно входило
self.categories_counts_by_words =\
{word: len(elem) for word, elem in words_categories.items()}
# словарь, считающий, сколько слов встретилось в k категориях
# counts_of_categories = defaultdict(int)
# for word, count in self.categories_counts_by_words.items():
# counts_of_categories[count] += 1
# print(" ".join(
# "{}:{}".format(key, count) for key, count
# in sorted(counts_of_categories.items())))
# sys.exit()
# self.word_counts_in_texts = word_counts_in_texts
words = [word for word, count in word_counts.items()
if count >= self.min_count
and word_counts_in_texts[word] <= self.max_count
and self.categories_counts_by_words[word] <= self.max_categories_count]
self.word_codes = {word: code for code, word in enumerate(words)}
self.dictionary_size = len(self.word_codes)
data, rows, cols = [], [], []
for i, words_for_text in enumerate(words_in_texts):
# words_for_text = {w_1: n_1, w_2: n_2, ...} -- словарь вхождений слов в текст
current_vector_counts = defaultdict(int)
for word, count in words_for_text.items():
word_code = self.word_codes.get(word)
if word_code is not None:
current_vector_counts[word_code] =\
np.log2(1.0 + count / np.log2(1.0 + self.categories_counts_by_words[word]))
#current_vector_counts[word_code] = count
sorted_current_vector_counts = sorted(current_vector_counts.items())
data.extend(x[1] for x in sorted_current_vector_counts)
cols.extend(x[0] for x in sorted_current_vector_counts)
rows.extend([i] * len(sorted_current_vector_counts))
# if categories is not None:
# word_counts = defaultdict(int)
# for text in texts:
# for line in text:
# splitted_line = line.strip().split()
# for word in splitted_line:
# word = word.strip(",.!?:$\"()")
# if not word.isalpha() or len(word) == 0:
# continue
# if self.stemmer is not None:
# word = self.stemmer.stem(word)
# word_counts[word] += 1
# # if word not in self.word_codes:
# # self.word_codes[word] = self.dictionary_size
# # self.dictionary_size += 1
# words = [word for word, count in word_counts.items()
# if count >= self.min_count]
# self.word_codes = {word: code for code, word in enumerate(words)}
# self.dictionary_size = len(self.word_codes)
# data, rows, cols = [], [], []
# for i, text in enumerate(texts):
# current_vector_counts = defaultdict(int)
# for line in text:
# splitted_line = line.strip().split()
# for word in splitted_line:
# word = word.strip(",.!?:$\"()")
# if not word.isalpha() or len(word) == 0:
# continue
# if self.stemmer is not None:
# word = self.stemmer.stem(word)
# word_code = self.word_codes.get(word)
# if word_code is not None:
# current_vector_counts[word_code] += 1
# sorted_current_vector_counts = sorted(current_vector_counts.items())
# data.extend(x[1] for x in sorted_current_vector_counts)
# cols.extend(x[0] for x in sorted_current_vector_counts)
# rows.extend([i] * len(sorted_current_vector_counts))
answer = csr_matrix((data,(rows, cols)), shape=(len(texts), self.dictionary_size))
# print(len(data), len(cols), len(rows))
recoded_categories = []
if categories is not None:
for category in categories:
if category not in self.categories_encoding:
self.categories_encoding[category] = len(self.categories_encoding)
self.categories.append(category)
recoded_categories.append(self.categories_encoding[category])
if categories is not None:
return answer, recoded_categories
else:
return answer
def fit(self, texts, text_categories):
X, y = self.preprocess(texts, text_categories)
print(X.shape)
self.cls = MultinomialNB(alpha=0.1)
# self.cls = LogisticRegression()
self.cls.fit(X, y)
return self
def fakepredict(self, texts):
categories = [random.randrange(0, len(self.categories)) for text in texts]
answer = [self.categories[elem] for elem in categories]
return answer
def predict(self, texts):
X = self.preprocess(texts)
categories = self.cls.predict(X)
answer = [self.categories[elem] for elem in categories]
return answer
def make_contingency_table(pred_labels, test_labels):
counts = defaultdict(int)
for elem in zip(test_labels, pred_labels):
counts[elem] += 1
return counts
if __name__ == "__main__":
# args = sys.argv[1:]
# if len(args) != 2:
# sys.exit
# infile_train, infile_test = args
#infile_train = "SentiRuEval_rest_markup_train.xml"
#infile_test = "SentiRuEval_rest_markup_test.xml"
infile_train = "train.xml"
infile_test = "test.xml"
with open(infile_train, "r", encoding="utf8") as fin:
contents_train = fin.read()
with open(infile_test, 'r', encoding='utf8') as fin:
contents_test = fin.read()
texts, categories, fileids = read_xml(contents_train)
test_texts, test_categories, test_fileids = read_xml(contents_test)
cls = TextClassifier(min_count=1, max_count=10000)
clear_train = [clear_text(text) for text in texts]
cls.fit(clear_train, categories)
clear_test = [clear_text(test_text) for test_text in test_texts]
answers = cls.predict(clear_test)
#print("Answers: ", answers)
contingency_table = make_contingency_table(answers, test_categories)
# for key, value in sorted(contingency_table.items()):
# print("{} {} {}".format(key[0], key[1], value))
precision_scores = f1_score(test_categories, answers, average=None)
for elem in precision_scores:
print("{:.2f}".format(100 * elem), end=" ")
print("")
print("Average precision score: {:.2f}".format(100 *
np.mean(precision_scores)))