forked from GaelVaroquaux/scikit-learn
/
hungarian.py
264 lines (225 loc) · 9.05 KB
/
hungarian.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
"""
Solve the unique lowest-cost assignment problem using the
Hungarian algorithm (also known as Munkres algorithm).
References
==========
1. http://www.public.iastate.edu/~ddoty/HungarianAlgorithm.html
2. Harold W. Kuhn. The Hungarian Method for the assignment problem.
*Naval Research Logistics Quarterly*, 2:83-97, 1955.
3. Harold W. Kuhn. Variants of the Hungarian method for assignment
problems. *Naval Research Logistics Quarterly*, 3: 253-258, 1956.
4. Munkres, J. Algorithms for the Assignment and Transportation Problems.
*Journal of the Society of Industrial and Applied Mathematics*,
5(1):32-38, March, 1957.
5. http://en.wikipedia.org/wiki/Hungarian_algorithm
"""
# Based on original code by Brain Clapper, adapted to numpy to scikits
# learn coding standards by G. Varoquaux
# Copyright (c) 2008 Brian M. Clapper <bmc@clapper.org>, G Varoquaux
# Author: Brian M. Clapper, G Varoquaux
# LICENSE: BSD
import numpy as np
################################################################################
# Object-oriented form of the algorithm
class _Hungarian(object):
"""
Calculate the Munkres solution to the classical assignment problem.
See the module documentation for usage.
"""
def compute(self, cost_matrix):
"""
Compute the indexes for the lowest-cost pairings between rows and
columns in the database. Returns a list of (row, column) tuples
that can be used to traverse the matrix.
Parameters
===========
cost_matrix: 2D square matrix
The cost matrix.
Returns
========
indices: 2D array of indices
The pairs of (col, row) indices in the original array giving
the original ordering.
"""
self.C = cost_matrix.copy()
self.n = n = self.C.shape[0]
self.m = m = self.C.shape[1]
self.row_uncovered = np.ones(n, dtype=np.bool)
self.col_uncovered = np.ones(m, dtype=np.bool)
self.Z0_r = 0
self.Z0_c = 0
self.path = np.zeros((n+m, 2), dtype=int)
self.marked = np.zeros((n, m), dtype=int)
done = False
step = 1
steps = { 1 : self._step1,
3 : self._step3,
4 : self._step4,
5 : self._step5,
6 : self._step6 }
while not done:
try:
func = steps[step]
step = func()
except KeyError:
done = True
# Look for the starred columns
results = np.array(np.where(self.marked == 1)).T
return results.tolist()
def _step1(self):
""" Steps 1 and 2 in the wikipedia page.
"""
# Step1: For each row of the matrix, find the smallest element and
# subtract it from every element in its row.
self.C -= self.C.min(axis=1)[:, np.newaxis]
# Step2: Find a zero (Z) in the resulting matrix. If there is no
# starred zero in its row or column, star Z. Repeat for each element
# in the matrix.
for i, j in zip(*np.where(self.C == 0)):
if self.col_uncovered[j] and self.row_uncovered[i]:
self.marked[i, j] = 1
self.col_uncovered[j] = False
self.row_uncovered[i] = False
self._clear_covers()
return 3
def _step3(self):
"""
Cover each column containing a starred zero. If n columns are
covered, the starred zeros describe a complete set of unique
assignments. In this case, Go to DONE, otherwise, Go to Step 4.
"""
marked = (self.marked == 1)
self.col_uncovered[np.any(marked, axis=0)] = False
if marked.sum() >= min(self.m, self.n) :
return 7 # done
else:
return 4
def _step4(self):
"""
Find a noncovered zero and prime it. If there is no starred zero
in the row containing this primed zero, Go to Step 5. Otherwise,
cover this row and uncover the column containing the starred
zero. Continue in this manner until there are no uncovered zeros
left. Save the smallest uncovered value and Go to Step 6.
"""
# We convert to int as numpy operations are faster on int
C = (self.C == 0).astype(np.int)
covered_C = C*self.row_uncovered[:, np.newaxis]
covered_C *= self.col_uncovered.astype(np.int)
n = self.n
m = self.m
while True:
# Find an uncovered zero
row, col = np.unravel_index(np.argmax(covered_C), (n, m))
if covered_C[row, col] == 0:
return 6
else:
self.marked[row, col] = 2
# Find the first starred element in the row
star_col = np.argmax(self.marked[row] == 1)
if not self.marked[row, star_col] == 1:
# Could not find one
self.Z0_r = row
self.Z0_c = col
return 5
else:
col = star_col
self.row_uncovered[row] = False
self.col_uncovered[col] = True
covered_C[:, col] = C[:, col]*(
self.row_uncovered.astype(np.int))
covered_C[row] = 0
def _step5(self):
"""
Construct a series of alternating primed and starred zeros as
follows. Let Z0 represent the uncovered primed zero found in Step 4.
Let Z1 denote the starred zero in the column of Z0 (if any).
Let Z2 denote the primed zero in the row of Z1 (there will always
be one). Continue until the series terminates at a primed zero
that has no starred zero in its column. Unstar each starred zero
of the series, star each primed zero of the series, erase all
primes and uncover every line in the matrix. Return to Step 3
"""
count = 0
path = self.path
path[count, 0] = self.Z0_r
path[count, 1] = self.Z0_c
done = False
while not done:
# Find the first starred element in the col defined by
# the path.
row = np.argmax(self.marked[:, path[count, 1]] == 1)
if not self.marked[row, path[count, 1]] == 1:
# Could not find one
done = True
else:
count += 1
path[count, 0] = row
path[count, 1] = path[count-1, 1]
if not done:
# Find the first prime element in the row defined by the
# first path step
col = np.argmax(self.marked[path[count, 0]] == 2)
if self.marked[row, col] != 2:
col = -1
count += 1
path[count, 0] = path[count-1, 0]
path[count, 1] = col
# Convert paths
for i in range(count+1):
if self.marked[path[i, 0], path[i, 1]] == 1:
self.marked[path[i, 0], path[i, 1]] = 0
else:
self.marked[path[i, 0], path[i, 1]] = 1
self._clear_covers()
# Erase all prime markings
self.marked[self.marked == 2] = 0
return 3
def _step6(self):
"""
Add the value found in Step 4 to every element of each covered
row, and subtract it from every element of each uncovered column.
Return to Step 4 without altering any stars, primes, or covered
lines.
"""
# the smallest uncovered value in the matrix
if np.any(self.row_uncovered) and np.any(self.col_uncovered):
minval = np.min(self.C[self.row_uncovered], axis=0)
minval = np.min(minval[self.col_uncovered])
self.C[np.logical_not(self.row_uncovered)] += minval
self.C[:, self.col_uncovered] -= minval
return 4
def _find_prime_in_row(self, row):
"""
Find the first prime element in the specified row. Returns
the column index, or -1 if no starred element was found.
"""
col = np.argmax(self.marked[row] == 2)
if self.marked[row, col] != 2:
col = -1
return col
def _clear_covers(self):
"""Clear all covered matrix cells"""
self.row_uncovered[:] = True
self.col_uncovered[:] = True
################################################################################
# Functional form for easier use
def hungarian(cost_matrix):
""" Return the indices to permute the columns of the matrix
to minimize its trace.
"""
H = _Hungarian()
indices = H.compute(cost_matrix)
indices.sort()
return np.array(indices).T[1]
def find_permutation(vectors, reference):
""" Returns the permutation indices of the vectors to maximize the
correlation to the reference
"""
# Compute a correlation matrix
reference = reference/(reference**2).sum(axis=-1)[:, np.newaxis]
vectors = vectors/(vectors**2).sum(axis=-1)[:, np.newaxis]
K = np.abs(np.dot(reference, vectors.T))
K -= 1 + K.max()
K *= -1
return hungarian(K)