-
Notifications
You must be signed in to change notification settings - Fork 1
/
pv_mcts.py
executable file
·157 lines (128 loc) · 5 KB
/
pv_mcts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# ====================
# モンテカルロ木探索の作成
# ====================
# パッケージのインポート
from game import State
from dual_network import DN_INPUT_SHAPE
from math import sqrt
from tensorflow.keras.models import load_model
from pathlib import Path
import numpy as np
# パラメータの準備
PV_EVALUATE_COUNT = 50 # 1推論あたりのシミュレーション回数(本家は1600)
# 推論
def predict(model, state):
# 推論のための入力データのシェイプの変換
a, b, c = DN_INPUT_SHAPE
x = np.array([state.pieces, state.enemy_pieces])
x = x.reshape(c, a, b).transpose(1, 2, 0).reshape(1, a, b, c)
# 推論
y = model.predict(x, batch_size=1)
# 方策の取得
policies = y[0][0][list(state.legal_actions())] # 合法手のみ
policies /= sum(policies) if sum(policies) else 1 # 合計1の確率分布に変換
# 価値の取得
value = y[1][0][0]
return policies, value
# ノードのリストを試行回数のリストに変換
def nodes_to_scores(nodes):
scores = []
for c in nodes:
scores.append(c.n)
return scores
# モンテカルロ木探索のスコアの取得
def pv_mcts_scores(model, state, temperature):
# モンテカルロ木探索のノードの定義
class Node:
# ノードの初期化
def __init__(self, state, p):
self.state = state # 状態
self.p = p # 方策
self.w = 0 # 累計価値
self.n = 0 # 試行回数
self.child_nodes = None # 子ノード群
# 局面の価値の計算
def evaluate(self):
# ゲーム終了時
if self.state.is_done():
# 勝敗結果で価値を取得
value = -1 if self.state.is_lose() else 0
# 累計価値と試行回数の更新
self.w += value
self.n += 1
return value
# 子ノードが存在しない時
if not self.child_nodes:
# ニューラルネットワークの推論で方策と価値を取得
policies, value = predict(model, self.state)
# 累計価値と試行回数の更新
self.w += value
self.n += 1
# 子ノードの展開
self.child_nodes = []
for action, policy in zip(self.state.legal_actions(), policies):
self.child_nodes.append(Node(self.state.next(action), policy))
return value
# 子ノードが存在する時
else:
# アーク評価値が最大の子ノードの評価で価値を取得
value = -self.next_child_node().evaluate()
# 累計価値と試行回数の更新
self.w += value
self.n += 1
return value
# アーク評価値が最大の子ノードを取得
def next_child_node(self):
# アーク評価値の計算
C_PUCT = 1.0
t = sum(nodes_to_scores(self.child_nodes))
pucb_values = []
for child_node in self.child_nodes:
pucb_values.append((-child_node.w / child_node.n if child_node.n else 0.0) +
C_PUCT * child_node.p * sqrt(t) / (1 + child_node.n))
# アーク評価値が最大の子ノードを返す
return self.child_nodes[np.argmax(pucb_values)]
# 現在の局面のノードの作成
root_node = Node(state, 0)
# 複数回の評価の実行
for _ in range(PV_EVALUATE_COUNT):
root_node.evaluate()
# 合法手の確率分布
scores = nodes_to_scores(root_node.child_nodes)
if temperature == 0: # 最大値のみ1
action = np.argmax(scores)
scores = np.zeros(len(scores))
scores[action] = 1
else: # ボルツマン分布でバラつき付加
scores = boltzman(scores, temperature)
return scores
# モンテカルロ木探索で行動選択
def pv_mcts_action(model, temperature=0):
def pv_mcts_action(state):
scores = pv_mcts_scores(model, state, temperature)
return np.random.choice(state.legal_actions(), p=scores)
return pv_mcts_action
# ボルツマン分布
def boltzman(xs, temperature):
xs = [x ** (1 / temperature) for x in xs]
return [x / sum(xs) for x in xs]
# 動作確認
if __name__ == '__main__':
# モデルの読み込み
path = sorted(Path('./model').glob('*.h5'))[-1]
model = load_model(str(path))
# 状態の生成
state = State()
# モンテカルロ木探索で行動取得を行う関数の生成
next_action = pv_mcts_action(model, 1.0)
# ゲーム終了までループ
while True:
# ゲーム終了時
if state.is_done():
break
# 行動の取得
action = next_action(state)
# 次の状態の取得
state = state.next(action)
# 文字列表示
print(state)