0 | |
from collections import Counter
|
|
0 |
from collections import defaultdict, Counter
|
|
1 |
from contextlib import suppress
|
1 | 2 |
from os import environ
|
2 | 3 |
from traceback import TracebackException
|
3 | 4 |
|
|
10 | 11 |
from .utils.text import bold, mark_for_translation as _, red
|
11 | 12 |
|
12 | 13 |
|
13 | |
MAX_METADATA_ITERATIONS = int(environ.get("BW_MAX_METADATA_ITERATIONS", "5000"))
|
|
14 |
MAX_METADATA_ITERATIONS = int(environ.get("BW_MAX_METADATA_ITERATIONS", "1000"))
|
|
15 |
|
|
16 |
|
|
17 |
class _StartOver(Exception):
|
|
18 |
"""
|
|
19 |
Raised when metadata processing needs to start from the top.
|
|
20 |
"""
|
|
21 |
pass
|
14 | 22 |
|
15 | 23 |
|
16 | 24 |
class MetadataGenerator:
|
|
23 | 31 |
# reactors that raised KeyErrors (and which ones)
|
24 | 32 |
self.__keyerrors = {}
|
25 | 33 |
# a Metastack for every node
|
26 | |
self.__metastacks = {}
|
|
34 |
self.__metastacks = defaultdict(Metastack)
|
27 | 35 |
# mapping each node to all nodes that depend on it
|
28 | |
self.__node_deps = {}
|
|
36 |
self.__node_deps = defaultdict(set)
|
|
37 |
# how often __run_reactors was called for a node
|
|
38 |
self.__node_iterations = defaultdict(int)
|
29 | 39 |
# A node is 'stable' when all its reactors return unchanged
|
30 | 40 |
# metadata, except for those reactors that look at other nodes.
|
31 | 41 |
# This dict maps node names to True/False indicating stable status.
|
|
41 | 51 |
# how often we called reactors
|
42 | 52 |
self.__reactors_run = 0
|
43 | 53 |
# how often each reactor changed
|
44 | |
self.__reactor_changes = {}
|
|
54 |
self.__reactor_changes = defaultdict(int)
|
45 | 55 |
# tracks which reactors on a node have look at other nodes
|
46 | 56 |
# through partial_metadata
|
47 | |
self.__reactors_with_deps = {}
|
|
57 |
self.__reactors_with_deps = defaultdict(set)
|
48 | 58 |
|
49 | 59 |
def _metadata_for_node(self, node_name, blame=False, stack=False):
|
50 | 60 |
"""
|
|
60 | 70 |
"""
|
61 | 71 |
if self.__in_a_reactor:
|
62 | 72 |
if node_name in self._node_metadata_complete:
|
|
73 |
io.debug(f"is already complete: {node_name}")
|
63 | 74 |
# We already completed metadata for this node, but partial must
|
64 | 75 |
# return a Metastack, so we build a single-layered one just for
|
65 | 76 |
# the interface.
|
|
72 | 83 |
return metastack
|
73 | 84 |
else:
|
74 | 85 |
self.__partial_metadata_accessed_for.add(node_name)
|
75 | |
return self.__metastacks.setdefault(node_name, Metastack())
|
|
86 |
return self.__metastacks[node_name]
|
76 | 87 |
|
77 | 88 |
if blame or stack:
|
78 | 89 |
# cannot return cached result here, force rebuild
|
79 | |
try:
|
|
90 |
with suppress(KeyError):
|
80 | 91 |
del self._node_metadata_complete[node_name]
|
81 | |
except KeyError:
|
82 | |
pass
|
83 | |
|
84 | |
try:
|
|
92 |
|
|
93 |
with suppress(KeyError):
|
85 | 94 |
return self._node_metadata_complete[node_name]
|
86 | |
except KeyError:
|
87 | |
pass
|
88 | 95 |
|
89 | 96 |
# Different worker threads might request metadata at the same time.
|
90 | 97 |
|
91 | 98 |
with self._node_metadata_lock:
|
92 | |
try:
|
|
99 |
with suppress(KeyError):
|
93 | 100 |
# maybe our metadata got completed while waiting for the lock
|
94 | 101 |
return self._node_metadata_complete[node_name]
|
95 | |
except KeyError:
|
96 | |
pass
|
97 | 102 |
|
98 | 103 |
self.__build_node_metadata(node_name)
|
99 | 104 |
|
|
120 | 125 |
else:
|
121 | 126 |
return self._node_metadata_complete[node_name]
|
122 | 127 |
|
|
128 |
def __run_new_nodes(self):
|
|
129 |
try:
|
|
130 |
node_name = self.__nodes_that_never_ran.pop()
|
|
131 |
except KeyError:
|
|
132 |
pass
|
|
133 |
else:
|
|
134 |
self.__nodes_that_ran_at_least_once.add(node_name)
|
|
135 |
self.__initial_run_for_node(node_name)
|
|
136 |
raise _StartOver
|
|
137 |
|
|
138 |
def __run_triggered_nodes(self):
|
|
139 |
try:
|
|
140 |
node_name = self.__triggered_nodes.pop()
|
|
141 |
except KeyError:
|
|
142 |
pass
|
|
143 |
else:
|
|
144 |
io.debug(f"triggered metadata run for {node_name}")
|
|
145 |
self.__run_reactors(
|
|
146 |
self.get_node(node_name),
|
|
147 |
with_deps=True,
|
|
148 |
without_deps=False,
|
|
149 |
)
|
|
150 |
raise _StartOver
|
|
151 |
|
|
152 |
def __run_unstable_nodes(self):
|
|
153 |
encountered_unstable_node = False
|
|
154 |
for node, stable in self.__node_stable.items():
|
|
155 |
if stable:
|
|
156 |
continue
|
|
157 |
|
|
158 |
io.debug(f"begin metadata stabilization test for {node.name}")
|
|
159 |
self.__run_reactors(node, with_deps=False, without_deps=True)
|
|
160 |
if self.__node_stable[node]:
|
|
161 |
io.debug(f"metadata stabilized for {node.name}")
|
|
162 |
else:
|
|
163 |
io.debug(f"metadata remains unstable for {node.name}")
|
|
164 |
encountered_unstable_node = True
|
|
165 |
if self.__nodes_that_never_ran:
|
|
166 |
# we have found a new dependency, process it immediately
|
|
167 |
# going wide early should be more efficient
|
|
168 |
raise _StartOver
|
|
169 |
if encountered_unstable_node:
|
|
170 |
# start over until everything is stable
|
|
171 |
io.debug("found an unstable node (without_deps=True)")
|
|
172 |
raise _StartOver
|
|
173 |
|
|
174 |
def __run_nodes_with_deps(self):
|
|
175 |
encountered_unstable_node = False
|
|
176 |
for node in randomize_order(self.__node_stable.keys()):
|
|
177 |
io.debug(f"begin final stabilization test for {node.name}")
|
|
178 |
self.__run_reactors(node, with_deps=True, without_deps=False)
|
|
179 |
if not self.__node_stable[node]:
|
|
180 |
io.debug(f"{node.name} still unstable")
|
|
181 |
encountered_unstable_node = True
|
|
182 |
if self.__nodes_that_never_ran:
|
|
183 |
# we have found a new dependency, process it immediately
|
|
184 |
# going wide early should be more efficient
|
|
185 |
raise _StartOver
|
|
186 |
if encountered_unstable_node:
|
|
187 |
# start over until everything is stable
|
|
188 |
io.debug("found an unstable node (with_deps=True)")
|
|
189 |
raise _StartOver
|
|
190 |
|
123 | 191 |
def __build_node_metadata(self, initial_node_name):
|
124 | 192 |
self.__reset()
|
125 | 193 |
self.__nodes_that_never_ran.add(initial_node_name)
|
126 | 194 |
|
127 | |
iterations = 0
|
128 | 195 |
while not QUIT_EVENT.is_set():
|
129 | |
iterations += 1
|
130 | |
if iterations > MAX_METADATA_ITERATIONS:
|
131 | |
top_changers = Counter(self.__reactor_changes).most_common(25)
|
132 | |
msg = _(
|
133 | |
"MAX_METADATA_ITERATIONS({m}) exceeded, "
|
134 | |
"likely an infinite loop between flip-flopping metadata reactors.\n"
|
135 | |
"These are the reactors that changed most often:\n\n"
|
136 | |
).format(m=MAX_METADATA_ITERATIONS)
|
137 | |
for reactor, count in top_changers:
|
138 | |
msg += f" {count}\t{reactor[0]}\t{reactor[1]}\n"
|
139 | |
raise RuntimeError(msg)
|
140 | |
|
141 | |
io.debug(f"metadata iteration #{iterations}")
|
142 | |
|
143 | |
jobmsg = _("{b} ({i} iterations, {n} nodes, {r} reactors, {e} runs)").format(
|
|
196 |
jobmsg = _("{b} ({n} nodes, {r} reactors, {e} runs)").format(
|
144 | 197 |
b=bold(_("running metadata reactors")),
|
145 | |
i=iterations,
|
146 | 198 |
n=len(self.__nodes_that_never_ran) + len(self.__nodes_that_ran_at_least_once),
|
147 | 199 |
r=len(self.__reactor_changes),
|
148 | 200 |
e=self.__reactors_run,
|
149 | 201 |
)
|
150 | |
with io.job(jobmsg):
|
151 | |
try:
|
152 | |
node_name = self.__nodes_that_never_ran.pop()
|
153 | |
except KeyError:
|
154 | |
pass
|
155 | |
else:
|
156 | |
self.__nodes_that_ran_at_least_once.add(node_name)
|
157 | |
self.__initial_run_for_node(node_name)
|
158 | |
continue
|
159 | |
|
160 | |
# at this point, we have run all relevant nodes at least once
|
161 | |
|
162 | |
# if we have any triggered nodes from below, run their reactors
|
163 | |
# with deps to see if they become unstable
|
164 | |
|
165 | |
try:
|
166 | |
node_name = self.__triggered_nodes.pop()
|
167 | |
except KeyError:
|
168 | |
pass
|
169 | |
else:
|
170 | |
io.debug(f"triggered metadata run for {node_name}")
|
171 | |
self.__run_reactors(
|
172 | |
self.get_node(node_name),
|
173 | |
with_deps=True,
|
174 | |
without_deps=False,
|
175 | |
)
|
176 | |
continue
|
177 | |
|
178 | |
# now (re)stabilize all nodes
|
179 | |
|
180 | |
encountered_unstable_node = False
|
181 | |
for node, stable in self.__node_stable.items():
|
182 | |
if stable:
|
183 | |
continue
|
184 | |
self.__run_reactors(node, with_deps=False, without_deps=True)
|
185 | |
if self.__node_stable[node]:
|
186 | |
io.debug(f"metadata stabilized for {node_name}")
|
187 | |
else:
|
188 | |
io.debug(f"metadata remains unstable for {node_name}")
|
189 | |
encountered_unstable_node = True
|
190 | |
if encountered_unstable_node:
|
191 | |
# start over until everything is stable
|
192 | |
continue
|
193 | |
|
194 | |
# at this point, all nodes should be stable except for their reactors with deps
|
195 | |
|
196 | |
encountered_unstable_node = False
|
197 | |
for node in randomize_order(self.__node_stable.keys()):
|
198 | |
self.__run_reactors(node, with_deps=True, without_deps=False)
|
199 | |
if not self.__node_stable[node]:
|
200 | |
encountered_unstable_node = True
|
201 | |
if encountered_unstable_node:
|
202 | |
# start over until everything is stable
|
203 | |
continue
|
204 | |
|
205 | |
# if we get here, we're done!
|
206 | |
break
|
|
202 |
try:
|
|
203 |
with io.job(jobmsg):
|
|
204 |
# Control flow here is a bit iffy. The functions in this block often raise
|
|
205 |
# _StartOver in order to aggressively process new nodes first etc.
|
|
206 |
# Each method represents a distinct stage of metadata processing that checks
|
|
207 |
# for nodes in certain states as described below.
|
|
208 |
|
|
209 |
# This checks for newly discovered nodes that haven't seen any processing at
|
|
210 |
# all so far. It is important that we run them as early as possible, so their
|
|
211 |
# static metadata becomes available to other nodes and we recursively discover
|
|
212 |
# additional nodes as quickly as possible.
|
|
213 |
self.__run_new_nodes()
|
|
214 |
# At this point, we have run all relevant nodes at least once.
|
|
215 |
|
|
216 |
# Nodes become "triggered" when they previously looked something up from a
|
|
217 |
# different node and that second node changed. In this method, we try to figure
|
|
218 |
# out if the change on the node we depend on actually has any effect on the
|
|
219 |
# depending node.
|
|
220 |
self.__run_triggered_nodes()
|
|
221 |
|
|
222 |
# In this stage, we run all unstable nodes to the point where everything is
|
|
223 |
# stable again, except for those reactors that depend on other nodes.
|
|
224 |
self.__run_unstable_nodes()
|
|
225 |
|
|
226 |
# The final step is to make sure nothing changes when we run reactors with
|
|
227 |
# dependencies on other nodes. If anything changes, we need to start over so
|
|
228 |
# local-only reactors on a node can react to changes caused by reactors looking
|
|
229 |
# at other nodes.
|
|
230 |
self.__run_nodes_with_deps()
|
|
231 |
|
|
232 |
# if we get here, we're done!
|
|
233 |
break
|
|
234 |
|
|
235 |
except _StartOver:
|
|
236 |
continue
|
207 | 237 |
|
208 | 238 |
if self.__keyerrors and not QUIT_EVENT.is_set():
|
209 | 239 |
msg = _(
|
|
217 | 247 |
msg += " " + line
|
218 | 248 |
raise MetadataPersistentKeyError(msg)
|
219 | 249 |
|
|
250 |
io.debug("metadata generation for selected nodes finished")
|
|
251 |
|
220 | 252 |
def __initial_run_for_node(self, node_name):
|
221 | 253 |
io.debug(f"initial metadata run for {node_name}")
|
222 | 254 |
node = self.get_node(node_name)
|
|
246 | 278 |
)
|
247 | 279 |
self.__metastacks[node_name]._cache_partition(0)
|
248 | 280 |
|
249 | |
self.__reactors_with_deps[node_name] = set()
|
250 | 281 |
# run all reactors once to get started
|
251 | 282 |
self.__run_reactors(node, with_deps=True, without_deps=True)
|
252 | 283 |
|
|
284 |
def __check_iteration_count(self, node_name):
|
|
285 |
self.__node_iterations[node_name] += 1
|
|
286 |
if self.__node_iterations[node_name] > MAX_METADATA_ITERATIONS:
|
|
287 |
top_changers = Counter(self.__reactor_changes).most_common(25)
|
|
288 |
msg = _(
|
|
289 |
"MAX_METADATA_ITERATIONS({m}) exceeded for {node}, "
|
|
290 |
"likely an infinite loop between flip-flopping metadata reactors.\n"
|
|
291 |
"These are the reactors that changed most often:\n\n"
|
|
292 |
).format(m=MAX_METADATA_ITERATIONS, node=node_name)
|
|
293 |
for reactor, count in top_changers:
|
|
294 |
msg += f" {count}\t{reactor[0]}\t{reactor[1]}\n"
|
|
295 |
raise RuntimeError(msg)
|
|
296 |
|
253 | 297 |
def __run_reactors(self, node, with_deps=True, without_deps=True):
|
|
298 |
self.__check_iteration_count(node.name)
|
254 | 299 |
any_reactor_changed = False
|
255 | 300 |
|
256 | 301 |
for depsonly in (True, False):
|
|
284 | 329 |
self.__nodes_that_never_ran.add(required_node_name)
|
285 | 330 |
# this is so we know the current node needs to be run
|
286 | 331 |
# again if the required node changes
|
287 | |
self.__node_deps.setdefault(required_node_name, set())
|
288 | 332 |
self.__node_deps[required_node_name].add(node.name)
|
289 | 333 |
|
290 | 334 |
if any_reactor_changed:
|
291 | 335 |
# something changed on this node, mark all dependent nodes as unstable
|
292 | |
for required_node_name in self.__node_deps.get(node.name, set()):
|
|
336 |
for required_node_name in self.__node_deps[node.name]:
|
293 | 337 |
io.debug(f"{node.name} triggering metadata rerun on {required_node_name}")
|
294 | 338 |
self.__triggered_nodes.add(required_node_name)
|
295 | 339 |
|
|
303 | 347 |
return False, set()
|
304 | 348 |
self.__partial_metadata_accessed_for = set()
|
305 | 349 |
self.__reactors_run += 1
|
306 | |
self.__reactor_changes.setdefault((node_name, reactor_name), 0)
|
307 | 350 |
# make sure the reactor doesn't react to its own output
|
308 | 351 |
old_metadata = self.__metastacks[node_name]._pop_layer(1, reactor_name)
|
309 | 352 |
self.__in_a_reactor = True
|
|
315 | 358 |
except DoNotRunAgain:
|
316 | 359 |
self.__do_not_run_again.add((node_name, reactor_name))
|
317 | 360 |
# clear any previously stored exception
|
318 | |
try:
|
|
361 |
with suppress(KeyError):
|
319 | 362 |
del self.__keyerrors[(node_name, reactor_name)]
|
320 | |
except KeyError:
|
321 | |
pass
|
322 | 363 |
return False, set()
|
323 | 364 |
except Exception as exc:
|
324 | 365 |
io.stderr(_(
|
|
334 | 375 |
self.__in_a_reactor = False
|
335 | 376 |
|
336 | 377 |
# reactor terminated normally, clear any previously stored exception
|
337 | |
try:
|
|
378 |
with suppress(KeyError):
|
338 | 379 |
del self.__keyerrors[(node_name, reactor_name)]
|
339 | |
except KeyError:
|
340 | |
pass
|
341 | 380 |
|
342 | 381 |
try:
|
343 | 382 |
self.__metastacks[node_name]._set_layer(
|