Codebase list clustershell / ac19068
Import upstream version 1.8.3+git20210302.1.a1875cb Debian Janitor 3 years ago
20 changed file(s) with 165 addition(s) and 48 deletion(s). Raw diff Collapse all Expand all
77 [xcat]
88
99 # list the nodes in the specified node group
10 map: lsdef -s -t node $GROUP | cut -d' ' -f1
10 map: lsdef -s -t node "$GROUP" | cut -d' ' -f1
1111
1212 # list all the nodes defined in the xCAT tables
1313 all: lsdef -s -t node | cut -d' ' -f1
3838 # and yes, ranges work for groups too!
3939 old: '@rack[1,3]'
4040 new: '@rack[2,4]'
41 # YAML lists
42 rack5:
43 - 'example[200-205]' # some comment about example[200-205]
44 - 'example245'
45 - 'example300,example[401-406]'
4146
4247 # Group source cpu:
4348 # define groups @cpu:ivy, @cpu:hsw and @cpu:all
280280 limit time for command to run on the node
281281 .TP
282282 .BI \-R \ WORKER\fP,\fB \ \-\-worker\fB= WORKER
283 worker name to use for connection (\fBexec\fP, \fBssh\fP, \fBrsh\fP, \fBpdsh\fP), default is \fBssh\fP
283 worker name to use for connection (\fBexec\fP, \fBssh\fP, \fBrsh\fP, \fBpdsh\fP, or the name of a Python worker module), default is \fBssh\fP
284284 .TP
285285 .BI \-\-remote\fB= REMOTE
286286 whether to enable remote execution: in tree mode, \(aqyes\(aq forces connections to the leaf nodes for execution, \(aqno\(aq establishes connections up to the leaf parent nodes for execution (default is \(aqyes\(aq)
doc/sphinx/clustershell-nautilus-logo200.png less more
Binary diff not shown
235235 compute: 'node[0001-0288]'
236236 gpu: 'node[0001-0008]'
237237
238 servers: # example of yaml list syntax for nodes
239 - 'server001' # in a group
240 - 'server002,server101'
241 - 'server[003-006]'
242
238243 cpu_only: '@compute!@gpu' # example of inline set operation
239244 # define group @cpu_only with node[0009-0288]
240245
632632 installed; doesn't provide write support (eg. you cannot ``cat file | clush
633633 --worker pdsh``); it is primarily an 1-to-n worker example.
634634
635 Worker modules distributed outside of ClusterShell are also supported by
636 specifying the case-sensitive full Python module name of a worker module.
635637
636638 .. [#] LLNL parallel remote shell utility
637639 (https://computing.llnl.gov/linux/pdsh.html)
8181 self.master_worker.write(msg)
8282
8383 class OutputHandler(EventHandler):
84 """Base class for clush output handlers."""
85
86 def __init__(self):
84 """Base class for generic output handlers."""
85
86 def __init__(self, prog=None):
8787 EventHandler.__init__(self)
8888 self._runtimer = None
89 self._prog = prog if prog else os.path.basename(sys.argv[0])
8990
9091 def runtimer_init(self, task, ntotal=0):
9192 """Init timer for live command-completed progressmeter."""
92 thandler = RunTimer(task, ntotal)
93 thandler = RunTimer(task, ntotal, prog=self._prog)
9394 self._runtimer = task.timer(1.33, thandler, interval=1./3.,
9495 autoclose=True)
9596
133134 class DirectOutputHandler(OutputHandler):
134135 """Direct output event handler class."""
135136
136 def __init__(self, display):
137 OutputHandler.__init__(self)
137 def __init__(self, display, prog=None):
138 OutputHandler.__init__(self, prog=prog)
138139 self._display = display
139140
140141 def ev_read(self, worker, node, sname, msg):
148149 verb = VERB_QUIET
149150 if self._display.maxrc:
150151 verb = VERB_STD
151 self._display.vprint_err(verb, "clush: %s: "
152 "exited with exit code %d" % (node, rc))
152 self._display.vprint_err(verb, "%s: %s: exited with exit code %d" %
153 (self._prog, node, rc))
153154
154155 def ev_close(self, worker, timedout):
155156 if timedout:
156157 nodeset = NodeSet._fromlist1(worker.iter_keys_timeout())
157158 self._display.vprint_err(VERB_QUIET,
158 "clush: %s: command timeout" % nodeset)
159 "%s: %s: command timeout" %
160 (self._prog, nodeset))
159161 self.update_prompt(worker)
160162
161163 class DirectProgressOutputHandler(DirectOutputHandler):
179181
180182 class CopyOutputHandler(DirectProgressOutputHandler):
181183 """Copy output event handler."""
182 def __init__(self, display, reverse=False):
183 DirectOutputHandler.__init__(self, display)
184 def __init__(self, display, reverse=False, prog=None):
185 DirectOutputHandler.__init__(self, display, prog=prog)
184186 self.reverse = reverse
185187
186188 def ev_close(self, worker, timedout):
203205 DirectOutputHandler.ev_close(self, worker, timedout)
204206
205207 class GatherOutputHandler(OutputHandler):
206 """Gathered output event handler class (clush -b)."""
207
208 def __init__(self, display):
209 OutputHandler.__init__(self)
208 """Gathered output event handler class (e.g. clush -b)."""
209
210 def __init__(self, display, prog=None):
211 OutputHandler.__init__(self, prog=prog)
210212 self._display = display
211213
212214 def ev_read(self, worker, node, sname, msg):
255257 nsdisp = ns = NodeSet._fromlist1(nodelist)
256258 if self._display.verbosity > VERB_QUIET and len(ns) > 1:
257259 nsdisp = "%s (%d)" % (ns, len(ns))
258 msgrc = "clush: %s: exited with exit code %d" % (nsdisp, rc)
260 msgrc = "%s: %s: exited with exit code %d" % (self._prog, nsdisp, rc)
259261 self._display.vprint_err(verbexit, msgrc)
260262
261263 # Display nodes that didn't answer within command timeout delay
262264 if worker.num_timeout() > 0:
263 self._display.vprint_err(verbexit, "clush: %s: command timeout" % \
264 NodeSet._fromlist1(worker.iter_keys_timeout()))
265 self._display.vprint_err(verbexit, "%s: %s: command timeout" % \
266 (self._prog, NodeSet._fromlist1(worker.iter_keys_timeout())))
265267
266268 class SortedOutputHandler(GatherOutputHandler):
267 """Sorted by node output event handler class (clush -L)."""
269 """Sorted by node output event handler class (e.g. clush -L)."""
268270
269271 def ev_close(self, worker, timedout):
270272 # Overrides GatherOutputHandler.ev_close()
289291 class LiveGatherOutputHandler(GatherOutputHandler):
290292 """Live line-gathered output event handler class (-bL)."""
291293
292 def __init__(self, display, nodes):
294 def __init__(self, display, nodes, prog=None):
293295 assert nodes is not None, "cannot gather local command"
294 GatherOutputHandler.__init__(self, display)
296 GatherOutputHandler.__init__(self, display, prog=prog)
295297 self._nodes = NodeSet(nodes)
296298 self._nodecnt = dict.fromkeys(self._nodes, 0)
297299 self._mtreeq = []
345347
346348 class RunTimer(EventHandler):
347349 """Running progress timer event handler"""
348 def __init__(self, task, total):
350 def __init__(self, task, total, prog=None):
349351 EventHandler.__init__(self)
350352 self.task = task
351353 self.total = total
356358 # updated by worker handler for progress
357359 self.start_time = 0
358360 self.bytes_written = 0
361 self._prog = prog if prog else os.path.basename(sys.argv[0])
359362
360363 def ev_timer(self, timer):
361364 self.update()
389392 if self.bytes_written > 0 or cnt != self.cnt_last:
390393 self.cnt_last = cnt
391394 # display completed/total clients
392 towrite = 'clush: %*d/%*d%s%s\r' % (self.tslen, self.total - cnt,
393 self.tslen, self.total, gwinfo,
394 wrbwinfo)
395 towrite = '%s: %*d/%*d%s%s\r' % (self._prog, self.tslen,
396 self.total - cnt, self.tslen,
397 self.total, gwinfo, wrbwinfo)
395398 self.wholelen = len(towrite)
396399 sys.stderr.write(towrite)
397400 self.started = True
402405 return
403406 self.erase_line()
404407 # display completed/total clients
405 fmt = 'clush: %*d/%*d'
408 fmt = '%s: %*d/%*d'
406409 if force_cr:
407410 fmt += '\n'
408411 else:
409412 fmt += '\r'
410 sys.stderr.write(fmt % (self.tslen, self.total, self.tslen, self.total))
413 sys.stderr.write(fmt % (self._prog, self.tslen, self.total, self.tslen,
414 self.total))
411415
412416
413417 def signal_handler(signum, frame):
5454 """
5555 Return the class pointer matching `workername`.
5656
57 This can be the 'short' name (such as `ssh`) or a fully-qualified
58 module path (such as ClusterShell.Worker.Ssh).
59
5760 The module is loaded if not done yet.
5861 """
59 modname = "ClusterShell.Worker.%s" % workername.capitalize()
60
62
63 # First try the worker name as a module under ClusterShell.Worker,
64 # but if that fails, try the worker name directly
65 try:
66 modname = "ClusterShell.Worker.%s" % workername.capitalize()
67 _import_module(modname)
68 except ImportError:
69 modname = workername
70 _import_module(modname)
71
72 # Get the class pointer
73 return sys.modules[modname].WORKER_CLASS
74
75 def _import_module(modname):
76 """Import a python module if not done yet."""
6177 # Iterate over a copy of sys.modules' keys to avoid RuntimeError
6278 if modname.lower() not in [mod.lower() for mod in list(sys.modules)]:
6379 # Import module if not yet loaded
6480 __import__(modname)
65
66 # Get the class pointer
67 return sys.modules[modname].WORKER_CLASS
6881
6982 def _local_workerclass(defaults):
7083 """Return default local worker class."""
253253 task = task_self()
254254 task._info.update(taskinfo)
255255 task.set_info('print_debug', _gw_print_debug)
256
257 for infokey in taskinfo:
258 if infokey.startswith('tree_default:'):
259 self.logger.debug('Setting default %s to %s', infokey[13:], taskinfo[infokey])
260 task.set_default(infokey[13:], taskinfo[infokey])
256261
257262 if task.info('debug'):
258263 self.logger.setLevel(logging.DEBUG)
444444 result = []
445445 assert source
446446 raw = getattr(source, 'resolv_%s' % what)(*args)
447 if isinstance(raw, list):
448 raw = ','.join(raw)
447449 for line in raw.splitlines():
448450 [result.append(x) for x in line.strip().split()]
449451 return result
955955 for rgvec in self._veclist:
956956 iveclist += product(*rgvec)
957957 assert(len(iveclist) == len(self))
958 rnd = RangeSetND(iveclist[index],
959 pads=[rg.padding for rg in self._veclist[0]],
958 rnd = RangeSetND(iveclist[index], pads=self.pads(),
960959 autostep=self.autostep)
961960 return rnd
962961
5757 basestring = str
5858
5959 from ClusterShell.Defaults import config_paths, DEFAULTS
60 from ClusterShell.Defaults import _local_workerclass, _distant_workerclass
60 from ClusterShell.Defaults import _local_workerclass, _distant_workerclass, _load_workerclass
6161 from ClusterShell.Engine.Engine import EngineAbortException
6262 from ClusterShell.Engine.Engine import EngineTimeoutException
6363 from ClusterShell.Engine.Engine import EngineAlreadyRunningError
469469 self._default_lock.acquire()
470470 try:
471471 self._default[default_key] = value
472 if default_key == 'local_workername':
473 self._default['local_worker'] = _load_workerclass(value)
474 elif default_key == 'distant_workername':
475 self._default['distant_worker'] = _load_workerclass(value)
472476 finally:
473477 self._default_lock.release()
474478
509513 - "command_timeout": Time in seconds to wait for a command to
510514 complete before aborting (default: 0, which means
511515 unlimited).
516 - "tree_default:<key>": In tree mode, overrides the key <key>
517 in Defaults (settings normally set in defaults.conf)
512518
513519 Threading considerations
514520 ========================
2525
2626 import os
2727 import shlex
28 import re
2829
2930 from ClusterShell.Worker.Exec import ExecClient, CopyClient, ExecWorker
3031
3334 """
3435 Rsh EngineClient.
3536 """
37
38 def __init__(self, node, command, worker, stderr, timeout, autoclose=False,
39 rank=None):
40 ExecClient.__init__(self, node, command, worker, stderr, timeout,
41 autoclose, rank)
42 self.rsh_rc = None
3643
3744 def _build_cmd(self):
3845 """
5865 cmd_l.append("%s" % self.key) # key is the node
5966 cmd_l.append("%s" % self.command)
6067
68 # rsh does not properly return exit status
69 # force the exit status to be printed out
70 cmd_l.append("; echo XXRETCODE: $?")
71
6172 return (cmd_l, None)
73
74 def _on_nodeset_msgline(self, nodes, msg, sname):
75 """Override _on_nodeset_msgline to parse magic return code"""
76 match = re.search("^XXRETCODE: (\d+)$", msg.decode())
77 if match:
78 self.rsh_rc = int(match.group(1))
79 else:
80 ExecClient._on_nodeset_msgline(self, nodes, msg, sname)
81
82 def _on_nodeset_close(self, nodes, rc):
83 """Override _on_nodeset_close to return rsh_rc"""
84 if (rc == 0 or rc == 1) and self.rsh_rc is not None:
85 rc = self.rsh_rc
86 ExecClient._on_nodeset_close(self, nodes, rc)
6287
6388
6489 class RcpClient(CopyClient):
124124
125125 if self.reverse:
126126 if user:
127 cmd_l.append("%s@%s:%s" % (user, self.key, self.source))
127 cmd_l.append("%s@[%s]:%s" % (user, self.key, self.source))
128128 else:
129 cmd_l.append("%s:%s" % (self.key, self.source))
129 cmd_l.append("[%s]:%s" % (self.key, self.source))
130130
131131 cmd_l.append(os.path.join(self.dest, "%s.%s" % \
132132 (os.path.basename(self.source), self.key)))
133133 else:
134134 cmd_l.append(self.source)
135135 if user:
136 cmd_l.append("%s@%s:%s" % (user, self.key, self.dest))
136 cmd_l.append("%s@[%s]:%s" % (user, self.key, self.dest))
137137 else:
138 cmd_l.append("%s:%s" % (self.key, self.dest))
138 cmd_l.append("[%s]:%s" % (self.key, self.dest))
139139
140140 return (cmd_l, None)
141141
279279 tree=False)
280280 else:
281281 assert self.source is None
282 worker = ExecWorker(nodes=targets,
283 command=self.command,
284 handler=self.metahandler,
285 timeout=self.timeout,
286 stderr=self.stderr)
282 workerclass = self.task.default('local_worker')
283 worker = workerclass(nodes=targets,
284 command=self.command,
285 handler=self.metahandler,
286 timeout=self.timeout,
287 stderr=self.stderr)
287288 self.task.schedule(worker)
288289
289290 self.workers.append(worker)
2727 doc/man/man5/clush.conf.5
2828 doc/man/man5/groups.conf.5
2929 doc/sphinx/Makefile
30 doc/sphinx/clustershell-nautilus-logo200.png
3130 doc/sphinx/conf.py
3231 doc/sphinx/config.rst
3332 doc/sphinx/further.rst
22
33 """Unit test for ClusterShell.Defaults"""
44
5 import os
6 import sys
7 import shutil
8
59 from textwrap import dedent
610 import unittest
711
8 from TLib import make_temp_file
12 from TLib import make_temp_file, make_temp_dir
913
1014 from ClusterShell.Defaults import Defaults, _task_print_debug
1115
97101 self.assertTrue(task.default("distant_worker") is WorkerSsh)
98102 task_terminate()
99103
104 dname = make_temp_dir()
105 modfile = open(os.path.join(dname, 'OutOfTree.py'), 'w')
106 modfile.write(dedent("""
107 class OutOfTreeWorker(object):
108 pass
109 WORKER_CLASS = OutOfTreeWorker"""))
110 modfile.flush()
111 modfile.close()
112 sys.path.append(dname)
113 self.defaults.distant_workername = 'OutOfTree'
114 task = task_self(self.defaults)
115 self.assertTrue(task.default("distant_worker").__name__ is 'OutOfTreeWorker')
116 task_terminate()
117 shutil.rmtree(dname, ignore_errors=True)
118
100119 def test_005_misc_value_errors(self):
101120 """test Defaults misc value errors"""
102121 task_terminate()
15881588 self.assertRaises(GroupResolverConfigError, YAMLGroupLoader, f.name)
15891589
15901590
1591 def test_list_group(self):
1592 f = make_temp_file(dedent("""
1593 rednecks:
1594 bubba:
1595 - pickup-1
1596 - pickup-2
1597 - tractor-[1-2]""").encode('ascii'))
1598 loader = YAMLGroupLoader(f.name)
1599 sources = list(loader)
1600 resolver = GroupResolver(sources[0])
1601 self.assertEqual(resolver.group_nodes('bubba'),
1602 [ 'pickup-1,pickup-2,tractor-[1-2]' ])
1603
15911604 class GroupResolverYAMLTest(unittest.TestCase):
15921605
15931606 def setUp(self):
431431 # steps
432432 self.assertEqual(str(rn1[0:12:2]), "0-3; 1\n10; 10,12\n")
433433 self.assertEqual(str(rn1[1:12:2]), "0-3; 2\n10; 11,13\n")
434 # GitHub #429
435 rn1 = RangeSetND([["110", "15-16"], ["107", "06"]])
436 self.assertEqual(str(rn1[0:3:2]), "107; 06\n110; 15\n")
434437
435438 def test_contiguous(self):
436439 rn0 = RangeSetND()
189189 self.assertEqual(teh.ev_timedout_cnt, 0)
190190 self.assertEqual(teh.ev_close_cnt, 1)
191191 self.assertEqual(teh.last_read, NODE_DISTANT.encode('ascii'))
192
193 def test_tree_run_noremote_alt_localworker(self):
194 """test tree run with remote=False and a non-exec localworker"""
195 teh = TEventHandler()
196 self.task.set_info('tree_default:local_workername', 'ssh')
197 self.task.run('echo %h', nodes=NODE_DISTANT, handler=teh, remote=False)
198 self.assertEqual(teh.ev_start_cnt, 1)
199 self.assertEqual(teh.ev_pickup_cnt, 1)
200 self.assertEqual(teh.ev_read_cnt, 1)
201 self.assertEqual(teh.ev_written_cnt, 0)
202 self.assertEqual(teh.ev_hup_cnt, 1)
203 self.assertEqual(teh.ev_timedout_cnt, 0)
204 self.assertEqual(teh.ev_close_cnt, 1)
205 # The exec worker will expand %h to the host, but ssh will just echo '%h'
206 self.assertEqual(teh.last_read, '%h'.encode('ascii'))
207 del self.task._info['tree_default:local_workername']
192208
193209 def test_tree_run_direct(self):
194210 """test tree run with direct target, in topology"""