-
Notifications
You must be signed in to change notification settings - Fork 0
/
naive_bayes.py
116 lines (86 loc) · 4.2 KB
/
naive_bayes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import logging
import math
logger = logging.getLogger(__name__)
class MNStrategy:
def __init___(self):
pass
def occurences(self, count):
return count
def log_likelihood(self, count, p_x_candidate, p_x_not_candidate):
return count * math.log (p_x_candidate / p_x_not_candidate)
class BNStrategy:
def __init___(self):
pass
def occurences(self, count):
return 1 if count > 0 else 0
def log_likelihood(self, count, p_x_candidate, p_x_not_candidate):
if count > 0:
return math.log (p_x_candidate / p_x_not_candidate)
else:
return math.log ((1.0-p_x_candidate) / (1.0 - p_x_not_candidate))
class Estimator:
"""Naive Base accumulator"""
def __init__ (self, strategy):
"""strategy is MNStrategy or BNStrategy
purchase_or_visit ('purchase' or 'visit') identifies which type of history
to accumulate stats on"""
self.strategy = strategy
self.known_attribute_values = set()
self.class_counts = {}
self.count_by_class_and_attribute = {}
self.class_count = 0
self.column_count = {}
self.row_count = {}
self.total_count = 0
def __repr__ (self):
return "Accumulator({0}".format(self.strategy)
def add (self, class_value, history_set):
self.class_counts[class_value] = self.class_counts.get(class_value, 0) + 1
self.class_count += 1
for historic_attribute_value,count in history_set.items():
self.known_attribute_values.add(historic_attribute_value)
count = self.strategy.occurences(count)
t = (class_value, historic_attribute_value)
self.count_by_class_and_attribute[t] = self.count_by_class_and_attribute.get(t, 0) + count
self.column_count[historic_attribute_value] = self.column_count.get(historic_attribute_value, 0) + count
self.row_count[class_value] = self.row_count.get(class_value, 0) + count
self.total_count += count
def dump (self, limit=None):
if limit:
logger.debug('Limited to {0} category values'.format(limit))
field_values = list(sorted(self.known_attribute_values))[0:limit]
logger.debug('value')
logger.debug('\t\tall {0}'.format(' '.join(map('{0:>10}'.format, field_values))))
class_values = list(sorted(self.class_counts.keys()))[0:limit]
for cls in class_values:
logger.debug('{0:>10}:'.format(cls))
logger.debug('\t{0:>10} {1}'.format(
self.row_count.get(cls, 0),
' '.join(map('{0:>10}'.format, [self.count_by_class_and_attribute.get((cls,pv), 0) for pv in field_values]))
))
logger.debug('column sums:')
logger.debug('\t{0:>10} {1}'.format(
self.total_count,
' '.join(map('{0:>10}'.format, (self.column_count.get(column, 0) for column in field_values)))
))
logger.debug('class counts:')
logger.debug('\t{0:>10} {1}'.format(
self.class_count,
' '.join(map('{0:>10}'.format, (self.class_counts.get(cls,0) for cls in class_values)))
))
def score (self, candidate_class_value, history_set):
N = len(self.known_attribute_values)
candidate_class_count = self.class_counts.get(candidate_class_value, 0)
not_candidate_class_count = self.class_count - candidate_class_count
p_class = float(1.0 + candidate_class_count) / (N + self.class_count)
p_not_class = 1.0 - p_class
log_likelihood = math.log(p_class / p_not_class)
candidate_row_count = self.row_count.get(candidate_class_value, 0)
for fv in self.known_attribute_values:
t = (candidate_class_value, fv)
field_value_count = self.count_by_class_and_attribute.get(t, 0)
p_x_candidate = float(1.0 + field_value_count) / (N + candidate_row_count)
p_x_not_candidate = float(1.0 + self.column_count.get(fv, 0) - field_value_count) / (N + self.total_count - candidate_row_count)
count = history_set.get(fv, 0)
log_likelihood += self.strategy.log_likelihood(count, p_x_candidate, p_x_not_candidate)
return log_likelihood