-
Notifications
You must be signed in to change notification settings - Fork 0
/
generator.py
277 lines (226 loc) · 9.63 KB
/
generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
"""Generation of new problem instances"""
import time
# fork of multiprocessing that uses dill for pickling (usage of lambdas)
from queue import Queue
import multiprocess as multiprocessing
import psutil
import numpy as np
from scipy import stats
import networkx as nx
from overlay import OverlayNetwork
from infrastructure import InfrastructureNetwork
from embedding import PartialEmbedding
import baseline_agent
from hyperparameters import GENERATOR_DEFAULTS
def truncnorm(rand, mean=0, sd=1, low=-np.infty, upp=np.infty):
"""Convenience wrapper around scipys truncnorm"""
dist = stats.truncnorm(
(low - mean) / sd, (upp - mean) / sd, loc=mean, scale=sd
)
# for some reason this can't be set in the constructor
dist.random_state = rand
return float(dist.rvs())
class Generator:
"""Generates random problem instances from a given distribution"""
# pylint: disable=too-many-instance-attributes
def __init__(
# pylint: disable=too-many-arguments
self,
interm_nodes_dist,
pos_dist,
capacity_dist,
power_dist,
interm_blocks_dist,
pairwise_connection,
block_weight_dist,
requirement_dist,
num_sources_dist,
connection_choice,
):
self.interm_nodes_dist = interm_nodes_dist
self.pos_dist = pos_dist
self.capacity_dist = capacity_dist
self.power_dist = power_dist
self.interm_blocks_dist = interm_blocks_dist
self.pairwise_connection = pairwise_connection
self.block_weight_dist = block_weight_dist
self.requirement_dist = requirement_dist
self.num_sources_dist = num_sources_dist
self.connection_choice = connection_choice
def random_embedding(self, rand):
"""Generate matching random infrastructure + overlay + embedding"""
# at least one source, has to match between infra and overlay
num_sources = self.num_sources_dist(rand)
while True:
infra = self.random_infrastructure(num_sources, rand)
overlay = self.random_overlay(num_sources, rand)
source_mapping = list(
zip(list(overlay.sources), list(infra.sources))
)
# make sure all sources and the sink are actually embeddable
valid = True # be optimistic
for (block, node) in source_mapping + [(overlay.sink, infra.sink)]:
if overlay.requirement(block) > infra.capacity(node):
valid = False
if valid:
return PartialEmbedding(infra, overlay, source_mapping)
def validated_random(self, rand):
"""Returns a random embedding that is guaranteed to be solvable
together with a baseline solution"""
while True:
before = time.time()
emb = self.random_embedding(rand)
baseline = baseline_agent.play_episode(
emb, max_restarts=10, rand=rand
)
elapsed = round(time.time() - before, 1)
nodes = len(emb.infra.nodes())
blocks = len(emb.overlay.blocks())
links = len(emb.overlay.links())
if baseline is not None:
if elapsed > 60:
# pylint: disable=line-too-long
print(
f"Generated ({elapsed}s, {nodes} nodes, {blocks} blocks, {links} links )"
)
return (emb.reset(), baseline)
if elapsed > 60:
# pylint: disable=line-too-long
print(
f"Failed ({elapsed}s, {nodes} nodes, {blocks} blocks, {links} links)"
)
def random_infrastructure(self, num_sources: int, rand):
"""Generates a randomized infrastructure"""
assert num_sources > 0
infra = InfrastructureNetwork()
rand_node_args = lambda: {
"pos": self.pos_dist(rand),
"transmit_power_dbm": self.power_dist(rand),
"capacity": self.capacity_dist(rand),
}
infra.set_sink(**rand_node_args())
for _ in range(num_sources):
infra.add_source(**rand_node_args())
for _ in range(self.interm_nodes_dist(rand)):
infra.add_intermediate(**rand_node_args())
return infra
def random_overlay(self, num_sources: int, rand):
"""Generates a randomized overlay graph"""
# This is a complicated function, but it would only get harder to
# understand when split up into multiple single-use functions.
# pylint: disable=too-many-branches
assert num_sources > 0
overlay = OverlayNetwork()
rand_block_args = lambda: {
"requirement": self.block_weight_dist(rand),
"datarate": self.requirement_dist(rand),
}
# always one sink
overlay.set_sink(**rand_block_args())
# add sources
for _ in range(num_sources):
overlay.add_source(**rand_block_args())
# add intermediates
for _ in range(self.interm_blocks_dist(rand)):
overlay.add_intermediate(**rand_block_args())
# randomly add links
for source in sorted(overlay.graph.nodes()):
for target in sorted(overlay.graph.nodes()):
if target != source and self.pairwise_connection(rand):
overlay.add_link(source, target)
# add links necessary to have each block on a path from a source to
# the sink
accessible_from_source = set()
not_accessible_from_source = set()
has_path_to_sink = set()
no_path_to_sink = set()
for node in overlay.graph.nodes():
# check if the node can already reach the sink
if nx.has_path(overlay.graph, node, overlay.sink):
has_path_to_sink.add(node)
else:
no_path_to_sink.add(node)
# check if the node is already reachable from the source
source_path_found = False
for source in overlay.sources:
if nx.has_path(overlay.graph, source, node):
source_path_found = True
break
if source_path_found:
accessible_from_source.add(node)
else:
not_accessible_from_source.add(node)
# make sure all nodes are reachable from a source
for node in rand.permutation(
sorted(tuple(not_accessible_from_source))
):
connection = self.connection_choice(
rand, sorted(accessible_from_source)
)
overlay.add_link(connection, node)
accessible_from_source.add(node)
# make sure all nodes can reach the sink
for node in rand.permutation(sorted(tuple(no_path_to_sink))):
connection = self.connection_choice(rand, sorted(has_path_to_sink))
overlay.add_link(node, connection)
has_path_to_sink.add(node)
return overlay
class DefaultGenerator(Generator):
"""For quick examples"""
def __init__(self):
super(DefaultGenerator, self).__init__(**GENERATOR_DEFAULTS)
class ParallelGenerator:
"""Generator that uses multiprocessing to amortize generation"""
def __init__(self, generator, seedgen):
# reserver one cpu for the actual training
cpus = max(1, multiprocessing.cpu_count() - 1)
cpus = min(8, cpus)
self._pool = multiprocessing.Pool(cpus)
self._instance_queue = Queue()
self.generator = generator
self.seedgen = seedgen
def _spawn_new_job(self):
rand = np.random.RandomState(self.seedgen())
job = self._pool.map_async(self.generator.validated_random, [rand])
self._instance_queue.put_nowait(job)
def _grow_queue(self):
"""Grow the queue if sufficient resources are available"""
has_idle_core = min(psutil.cpu_percent(interval=0.1, percpu=True)) < 60
has_enough_ram = psutil.virtual_memory().percent < 80
if has_idle_core and has_enough_ram:
self._spawn_new_job()
def new_instance(self):
"""Transparently uses multiprocessing
Acts similar to a lazy infinite imap; preserves the order of the
generated elements to prevent under-representation of long
running ones and uses seeds in a deterministic order.
"""
# first spawn a new job to replace the result we're about to use
self._spawn_new_job()
next_job = self._instance_queue.get()
if not next_job.ready():
# If we're blocked, grow the queue. This way the queue
# dynamically grows until at some point we aren't blocked
# anymore (as long as the processor can keep up).
self._grow_queue()
print(f"Blocked on queue (size {self._instance_queue.qsize()})")
return next_job.get()[0]
def __getstate__(self):
state = self.__dict__.copy()
# don't pickle pool or queue (this means that the generator will
# become useless after pickling, but that is fine since it is
# only pickled when an agent is saved and not used afterwards)
del state["_pool"]
del state["_instance_queue"]
return state
def __setstate__(self, state):
self.__dict__.update(state)
def get_random_action(embedding: PartialEmbedding, rand):
"""Take a random action on the given partial embedding"""
possibilities = embedding.possibilities()
if len(possibilities) == 0:
return None
choice = rand.randint(0, len(possibilities))
return possibilities[choice]
if __name__ == "__main__":
print(DefaultGenerator().validated_random(rand=np.random))