Codebase list python-procrunner / efda472
Change package to src layout Markus Gerstel 2 years ago
4 changed file(s) with 692 addition(s) and 703 deletion(s). Raw diff Collapse all Expand all
+0
-677
procrunner/__init__.py less more
0 import codecs
1 import functools
2 import io
3 import logging
4 import os
5 import select
6 import shutil
7 import subprocess
8 import sys
9 import time
10 import timeit
11 import warnings
12 from multiprocessing import Pipe
13 from threading import Thread
14
15 #
16 # run() - A function to synchronously run an external process, supporting
17 # the following features:
18 #
19 # - runs an external process and waits for it to finish
20 # - does not deadlock, no matter the process stdout/stderr output behaviour
21 # - returns the exit code, stdout, stderr (separately) as a
22 # subprocess.CompletedProcess object
23 # - process can run in a custom environment, either as a modification of
24 # the current environment or in a new environment from scratch
25 # - stdin can be fed to the process
26 # - stdout and stderr is printed by default, can be disabled
27 # - stdout and stderr can be passed to any arbitrary function for
28 # live processing
29 # - optionally enforces a time limit on the process
30 #
31 #
32 # Usage example:
33 #
34 # import procrunner
35 # result = procrunner.run(['/bin/ls', '/some/path/containing spaces'])
36 #
37 # Returns:
38 #
39 # ReturnObject(
40 # args=('/bin/ls', '/some/path/containing spaces'),
41 # returncode=2,
42 # stdout=b'',
43 # stderr=b'/bin/ls: cannot access /some/path/containing spaces: No such file or directory\n'
44 # )
45 #
46 # which also offers (albeit deprecated)
47 #
48 # result.runtime == 0.12990689277648926
49 # result.time_end == '2017-11-12 19:54:49 GMT'
50 # result.time_start == '2017-11-12 19:54:49 GMT'
51 # result.timeout == False
52
53 __author__ = """Markus Gerstel"""
54 __email__ = "scientificsoftware@diamond.ac.uk"
55 __version__ = "2.3.1"
56
57 logger = logging.getLogger("procrunner")
58 logger.addHandler(logging.NullHandler())
59
60
61 class _LineAggregator:
62 """
63 Buffer that can be filled with stream data and will aggregate complete
64 lines. Lines can be printed or passed to an arbitrary callback function.
65 The lines passed to the callback function are UTF-8 decoded and do not
66 contain a trailing newline character.
67 """
68
69 def __init__(self, print_line=False, callback=None):
70 """Create aggregator object."""
71 self._buffer = ""
72 self._print = print_line
73 self._callback = callback
74 self._decoder = codecs.getincrementaldecoder("utf-8")("replace")
75
76 def add(self, data):
77 """
78 Add a single character to buffer. If one or more full lines are found,
79 print them (if desired) and pass to callback function.
80 """
81 data = self._decoder.decode(data)
82 if not data:
83 return
84 self._buffer += data
85 if "\n" in data:
86 to_print, remainder = self._buffer.rsplit("\n")
87 if self._print:
88 try:
89 print(to_print)
90 except UnicodeEncodeError:
91 print(to_print.encode(sys.getdefaultencoding(), errors="replace"))
92 if not hasattr(self, "_warned"):
93 logger.warning("output encoding error, characters replaced")
94 setattr(self, "_warned", True)
95 if self._callback:
96 self._callback(to_print)
97 self._buffer = remainder
98
99 def flush(self):
100 """Print/send any remaining data to callback function."""
101 self._buffer += self._decoder.decode(b"", final=True)
102 if self._buffer:
103 if self._print:
104 print(self._buffer)
105 if self._callback:
106 self._callback(self._buffer)
107 self._buffer = ""
108
109
110 class _NonBlockingStreamReader:
111 """Reads a stream in a thread to avoid blocking/deadlocks"""
112
113 def __init__(self, stream, output=True, debug=False, notify=None, callback=None):
114 """Creates and starts a thread which reads from a stream."""
115 self._buffer = io.BytesIO()
116 self._closed = False
117 self._closing = False
118 self._debug = debug
119 self._stream = stream
120 self._terminated = False
121
122 def _thread_write_stream_to_buffer():
123 la = _LineAggregator(print_line=output, callback=callback)
124 char = True
125 while char:
126 if select.select([self._stream], [], [], 0.1)[0]:
127 char = self._stream.read(1)
128 if char:
129 self._buffer.write(char)
130 la.add(char)
131 else:
132 if self._closing:
133 break
134 self._stream.close()
135 self._terminated = True
136 la.flush()
137 if self._debug:
138 logger.debug("Stream reader terminated")
139 if notify:
140 notify()
141
142 def _thread_write_stream_to_buffer_windows():
143 line = True
144 while line:
145 line = self._stream.readline()
146 if line:
147 self._buffer.write(line)
148 if output or callback:
149 linedecode = line.decode("utf-8", "replace")
150 if output:
151 print(linedecode)
152 if callback:
153 callback(linedecode)
154 self._stream.close()
155 self._terminated = True
156 if self._debug:
157 logger.debug("Stream reader terminated")
158 if notify:
159 notify()
160
161 if os.name == "nt":
162 self._thread = Thread(target=_thread_write_stream_to_buffer_windows)
163 else:
164 self._thread = Thread(target=_thread_write_stream_to_buffer)
165 self._thread.daemon = True
166 self._thread.start()
167
168 def has_finished(self):
169 """
170 Returns whether the thread reading from the stream is still alive.
171 """
172 return self._terminated
173
174 def get_output(self):
175 """
176 Retrieve the stored data in full.
177 This call may block if the reading thread has not yet terminated.
178 """
179 self._closing = True
180 if not self.has_finished():
181 if self._debug:
182 # Main thread overtook stream reading thread.
183 underrun_debug_timer = timeit.default_timer()
184 logger.warning("NBSR underrun")
185 self._thread.join()
186 if not self.has_finished():
187 if self._debug:
188 logger.debug(
189 "NBSR join after %f seconds, underrun not resolved",
190 timeit.default_timer() - underrun_debug_timer,
191 )
192 raise Exception("thread did not terminate")
193 if self._debug:
194 logger.debug(
195 "NBSR underrun resolved after %f seconds",
196 timeit.default_timer() - underrun_debug_timer,
197 )
198 if self._closed:
199 raise Exception("streamreader double-closed")
200 self._closed = True
201 data = self._buffer.getvalue()
202 self._buffer.close()
203 return data
204
205
206 class _NonBlockingStreamWriter:
207 """Writes to a stream in a thread to avoid blocking/deadlocks"""
208
209 def __init__(self, stream, data, debug=False, notify=None):
210 """Creates and starts a thread which writes data to stream."""
211 self._buffer = data
212 self._buffer_len = len(data)
213 self._buffer_pos = 0
214 self._max_block_len = 4096
215 self._stream = stream
216 self._terminated = False
217
218 def _thread_write_buffer_to_stream():
219 while self._buffer_pos < self._buffer_len:
220 if (self._buffer_len - self._buffer_pos) > self._max_block_len:
221 block = self._buffer[
222 self._buffer_pos : (self._buffer_pos + self._max_block_len)
223 ]
224 else:
225 block = self._buffer[self._buffer_pos :]
226 try:
227 self._stream.write(block)
228 except OSError as e:
229 if (
230 e.errno == 32
231 ): # broken pipe, ie. process terminated without reading entire stdin
232 self._stream.close()
233 self._terminated = True
234 if notify:
235 notify()
236 return
237 raise
238 self._buffer_pos += len(block)
239 if debug:
240 logger.debug("wrote %d bytes to stream", len(block))
241 self._stream.close()
242 self._terminated = True
243 if notify:
244 notify()
245
246 self._thread = Thread(target=_thread_write_buffer_to_stream)
247 self._thread.daemon = True
248 self._thread.start()
249
250 def has_finished(self):
251 """Returns whether the thread writing to the stream is still alive."""
252 return self._terminated
253
254 def bytes_sent(self):
255 """Return the number of bytes written so far."""
256 return self._buffer_pos
257
258 def bytes_remaining(self):
259 """Return the number of bytes still to be written."""
260 return self._buffer_len - self._buffer_pos
261
262
263 def _path_resolve(obj):
264 """
265 Resolve file system path (PEP-519) objects to strings.
266
267 :param obj: A file system path object or something else.
268 :return: A string representation of a file system path object or, for
269 anything that was not a file system path object, the original
270 object.
271 """
272 if obj and hasattr(obj, "__fspath__"):
273 return obj.__fspath__()
274 return obj
275
276
277 def _windows_resolve(command, path=None):
278 """
279 Try and find the full path and file extension of the executable to run.
280 This is so that e.g. calls to 'somescript' will point at 'somescript.cmd'
281 without the need to set shell=True in the subprocess.
282
283 :param command: The command array to be run, with the first element being
284 the command with or w/o path, with or w/o extension.
285 :return: Returns the command array with the executable resolved with the
286 correct extension. If the executable cannot be resolved for any
287 reason the original command array is returned.
288 """
289 if not command or not isinstance(command[0], str):
290 return command
291
292 found_executable = shutil.which(command[0], path=path)
293 if found_executable:
294 logger.debug("Resolved %s as %s", command[0], found_executable)
295 return (found_executable, *command[1:])
296
297 if "\\" in command[0]:
298 # Special case. shutil.which may not detect file extensions if a full
299 # path is given, so try to resolve the executable explicitly
300 for extension in os.getenv("PATHEXT").split(os.pathsep):
301 found_executable = shutil.which(command[0] + extension, path=path)
302 if found_executable:
303 return (found_executable, *command[1:])
304
305 logger.warning("Error trying to resolve the executable: %s", command[0])
306 return command
307
308
309 class ReturnObject(subprocess.CompletedProcess):
310 """
311 A subprocess.CompletedProcess-like object containing the executed
312 command, stdout and stderr (both as bytestrings), and the exitcode.
313 The check_returncode() function raises an exception if the process
314 exited with a non-zero exit code.
315 """
316
317 def __init__(self, exitcode=None, command=None, stdout=None, stderr=None, **kw):
318 super().__init__(
319 args=command, returncode=exitcode, stdout=stdout, stderr=stderr
320 )
321 self._extras = {
322 "timeout": kw.get("timeout"),
323 "runtime": kw.get("runtime"),
324 "time_start": kw.get("time_start"),
325 "time_end": kw.get("time_end"),
326 }
327
328 def __getitem__(self, key):
329 warnings.warn(
330 "dictionary access to a procrunner return object is deprecated",
331 DeprecationWarning,
332 stacklevel=2,
333 )
334 if key in self._extras:
335 return self._extras[key]
336 if not hasattr(self, key):
337 raise KeyError(f"Unknown attribute {key}")
338 return getattr(self, key)
339
340 def __eq__(self, other):
341 """Override equality operator to account for added fields"""
342 if type(other) is type(self):
343 return self.__dict__ == other.__dict__
344 return False
345
346 def __hash__(self):
347 """This object is not immutable, so mark it as unhashable"""
348 return None
349
350 @property
351 def cmd(self):
352 warnings.warn(
353 "procrunner return object .cmd is deprecated, use .args",
354 DeprecationWarning,
355 stacklevel=2,
356 )
357 return self.args
358
359 @property
360 def command(self):
361 warnings.warn(
362 "procrunner return object .command is deprecated, use .args",
363 DeprecationWarning,
364 stacklevel=2,
365 )
366 return self.args
367
368 @property
369 def exitcode(self):
370 warnings.warn(
371 "procrunner return object .exitcode is deprecated, use .returncode",
372 DeprecationWarning,
373 stacklevel=2,
374 )
375 return self.returncode
376
377 @property
378 def timeout(self):
379 warnings.warn(
380 "procrunner return object .timeout is deprecated",
381 DeprecationWarning,
382 stacklevel=2,
383 )
384 return self._extras["timeout"]
385
386 @property
387 def runtime(self):
388 warnings.warn(
389 "procrunner return object .runtime is deprecated",
390 DeprecationWarning,
391 stacklevel=2,
392 )
393 return self._extras["runtime"]
394
395 @property
396 def time_start(self):
397 warnings.warn(
398 "procrunner return object .time_start is deprecated",
399 DeprecationWarning,
400 stacklevel=2,
401 )
402 return self._extras["time_start"]
403
404 @property
405 def time_end(self):
406 warnings.warn(
407 "procrunner return object .time_end is deprecated",
408 DeprecationWarning,
409 stacklevel=2,
410 )
411 return self._extras["time_end"]
412
413 def update(self, dictionary):
414 self._extras.update(dictionary)
415
416
417 def _deprecate_argument_calling(f):
418 @functools.wraps(f)
419 def wrapper(*args, **kwargs):
420 if len(args) > 1:
421 warnings.warn(
422 "Calling procrunner.run() with unnamed arguments (apart from "
423 "the command) is deprecated. Use keyword arguments instead.",
424 DeprecationWarning,
425 stacklevel=2,
426 )
427 return f(*args, **kwargs)
428
429 return wrapper
430
431
432 @_deprecate_argument_calling
433 def run(
434 command,
435 timeout=None,
436 debug=None,
437 stdin=None,
438 print_stdout=True,
439 print_stderr=True,
440 callback_stdout=None,
441 callback_stderr=None,
442 environment=None,
443 environment_override=None,
444 win32resolve=True,
445 working_directory=None,
446 raise_timeout_exception=False,
447 ):
448 """
449 Run an external process.
450
451 File system path objects (PEP-519) are accepted in the command, environment,
452 and working directory arguments.
453
454 :param array command: Command line to be run, specified as array.
455 :param timeout: Terminate program execution after this many seconds.
456 :param boolean debug: Enable further debug messages. (deprecated)
457 :param stdin: Optional bytestring that is passed to command stdin.
458 :param boolean print_stdout: Pass stdout through to sys.stdout.
459 :param boolean print_stderr: Pass stderr through to sys.stderr.
460 :param callback_stdout: Optional function which is called for each
461 stdout line.
462 :param callback_stderr: Optional function which is called for each
463 stderr line.
464 :param dict environment: The full execution environment for the command.
465 :param dict environment_override: Change environment variables from the
466 current values for command execution.
467 :param boolean win32resolve: If on Windows, find the appropriate executable
468 first. This allows running of .bat, .cmd, etc.
469 files without explicitly specifying their
470 extension.
471 :param string working_directory: If specified, run the executable from
472 within this working directory.
473 :param boolean raise_timeout_exception: Forward compatibility flag. If set
474 then a subprocess.TimeoutExpired exception is raised
475 instead of returning an object that can be checked
476 for a timeout condition. Defaults to False, will be
477 changed to True in a future release.
478 :return: The exit code, stdout, stderr (separately, as byte strings)
479 as a subprocess.CompletedProcess object.
480 """
481
482 time_start = time.strftime("%Y-%m-%d %H:%M:%S GMT", time.gmtime())
483 logger.debug("Starting external process: %s", command)
484
485 if stdin is None:
486 stdin_pipe = None
487 else:
488 assert sys.platform != "win32", "stdin argument not supported on Windows"
489 stdin_pipe = subprocess.PIPE
490 if debug is not None:
491 warnings.warn(
492 "Use of the debug parameter is deprecated", DeprecationWarning, stacklevel=3
493 )
494
495 start_time = timeit.default_timer()
496 if timeout is not None:
497 max_time = start_time + timeout
498 if not raise_timeout_exception:
499 warnings.warn(
500 "Using procrunner with timeout and without raise_timeout_exception set is deprecated",
501 DeprecationWarning,
502 stacklevel=3,
503 )
504
505 if environment is not None:
506 env = {key: _path_resolve(environment[key]) for key in environment}
507 else:
508 env = {key: value for key, value in os.environ.items()}
509 if environment_override:
510 env.update(
511 {
512 key: str(_path_resolve(environment_override[key]))
513 for key in environment_override
514 }
515 )
516
517 command = tuple(_path_resolve(part) for part in command)
518 if win32resolve and sys.platform == "win32":
519 command = _windows_resolve(command)
520 if working_directory and sys.version_info < (3, 7):
521 working_directory = os.fspath(working_directory)
522
523 p = subprocess.Popen(
524 command,
525 shell=False,
526 cwd=working_directory,
527 env=env,
528 stdin=stdin_pipe,
529 stdout=subprocess.PIPE,
530 stderr=subprocess.PIPE,
531 )
532
533 thread_pipe_pool = []
534 notifyee, notifier = Pipe(False)
535 thread_pipe_pool.append(notifyee)
536 stdout = _NonBlockingStreamReader(
537 p.stdout,
538 output=print_stdout,
539 debug=debug,
540 notify=notifier.close,
541 callback=callback_stdout,
542 )
543 notifyee, notifier = Pipe(False)
544 thread_pipe_pool.append(notifyee)
545 stderr = _NonBlockingStreamReader(
546 p.stderr,
547 output=print_stderr,
548 debug=debug,
549 notify=notifier.close,
550 callback=callback_stderr,
551 )
552 if stdin is not None:
553 notifyee, notifier = Pipe(False)
554 thread_pipe_pool.append(notifyee)
555 stdin = _NonBlockingStreamWriter(
556 p.stdin, data=stdin, debug=debug, notify=notifier.close
557 )
558
559 timeout_encountered = False
560
561 while (p.returncode is None) and (
562 (timeout is None) or (timeit.default_timer() < max_time)
563 ):
564 if debug and timeout is not None:
565 logger.debug("still running (T%.2fs)", timeit.default_timer() - max_time)
566
567 # wait for some time or until a stream is closed
568 try:
569 if thread_pipe_pool:
570 # Wait for up to 0.5 seconds or for a signal on a remaining stream,
571 # which could indicate that the process has terminated.
572 try:
573 event = thread_pipe_pool[0].poll(0.5)
574 except BrokenPipeError as e:
575 # on Windows this raises "BrokenPipeError: [Errno 109] The pipe has been ended"
576 # which is for all intents and purposes equivalent to a True return value.
577 if e.winerror != 109:
578 raise
579 event = True
580 if event:
581 # One-shot, so remove stream and watch remaining streams
582 thread_pipe_pool.pop(0)
583 if debug:
584 logger.debug("Event received from stream thread")
585 else:
586 time.sleep(0.5)
587 except KeyboardInterrupt:
588 p.kill() # if user pressed Ctrl+C we won't be able to produce a proper report anyway
589 # but at least make sure the child process dies with us
590 raise
591
592 # check if process is still running
593 p.poll()
594
595 if p.returncode is None:
596 # timeout condition
597 timeout_encountered = True
598 if debug:
599 logger.debug("timeout (T%.2fs)", timeit.default_timer() - max_time)
600
601 # send terminate signal and wait some time for buffers to be read
602 p.terminate()
603 if thread_pipe_pool:
604 try:
605 thread_pipe_pool[0].poll(0.5)
606 except BrokenPipeError as e:
607 # on Windows this raises "BrokenPipeError: [Errno 109] The pipe has been ended"
608 # which is for all intents and purposes equivalent to a True return value.
609 if e.winerror != 109:
610 raise
611 thread_pipe_pool.pop(0)
612 if not stdout.has_finished() or not stderr.has_finished():
613 time.sleep(2)
614 p.poll()
615
616 if p.returncode is None:
617 # thread still alive
618 # send kill signal and wait some more time for buffers to be read
619 p.kill()
620 if thread_pipe_pool:
621 try:
622 thread_pipe_pool[0].poll(0.5)
623 except BrokenPipeError as e:
624 # on Windows this raises "BrokenPipeError: [Errno 109] The pipe has been ended"
625 # which is for all intents and purposes equivalent to a True return value.
626 if e.winerror != 109:
627 raise
628 thread_pipe_pool.pop(0)
629 if not stdout.has_finished() or not stderr.has_finished():
630 time.sleep(5)
631 p.poll()
632
633 if p.returncode is None:
634 raise RuntimeError("Process won't terminate")
635
636 runtime = timeit.default_timer() - start_time
637 if timeout is not None:
638 logger.debug(
639 "Process ended after %.1f seconds with exit code %d (T%.2fs)",
640 runtime,
641 p.returncode,
642 timeit.default_timer() - max_time,
643 )
644 else:
645 logger.debug(
646 "Process ended after %.1f seconds with exit code %d", runtime, p.returncode
647 )
648
649 stdout = stdout.get_output()
650 stderr = stderr.get_output()
651
652 if timeout_encountered and raise_timeout_exception:
653 raise subprocess.TimeoutExpired(
654 cmd=command, timeout=timeout, output=stdout, stderr=stderr
655 )
656
657 time_end = time.strftime("%Y-%m-%d %H:%M:%S GMT", time.gmtime())
658 result = ReturnObject(
659 exitcode=p.returncode,
660 command=command,
661 stdout=stdout,
662 stderr=stderr,
663 timeout=timeout_encountered,
664 runtime=runtime,
665 time_start=time_start,
666 time_end=time_end,
667 )
668 if stdin is not None:
669 result.update(
670 {
671 "stdin_bytes_sent": stdin.bytes_sent(),
672 "stdin_bytes_remain": stdin.bytes_remaining(),
673 }
674 )
675
676 return result
11 name = procrunner
22 description = Versatile utility function to run external processes
33 version = 2.3.1
4 author = Diamond Light Source - Scientific Software et al.
5 author_email = scientificsoftware@diamond.ac.uk
46 classifiers =
57 Development Status :: 5 - Production/Stable
68 Intended Audience :: Developers
1618 Topic :: Software Development :: Libraries :: Python Modules
1719 license = BSD
1820 license_file = LICENSE
19 project-urls =
21 project_urls =
2022 Download = https://github.com/DiamondLightSource/python-procrunner/tags
2123 Documentation = https://procrunner.readthedocs.io/
2224 GitHub = https://github.com/DiamondLightSource/python-procrunner
2325 Bug-Tracker = https://github.com/DiamondLightSource/python-procrunner/issues
26
27 [options]
28 include_package_data = True
29 packages = procrunner
30 package_dir =
31 =src
32 python_requires = >=3.6
33 zip_safe = False
34
35 [options.packages.find]
36 where = src
2437
2538 [flake8]
2639 # Black disagrees with flake8 on a few points. Ignore those.
3952 # flake8-comprehensions, https://github.com/adamchainz/flake8-comprehensions
4053 C4,
4154
42 [aliases]
43 test = pytest
44
4555 [tool:pytest]
4656 collect_ignore = ['setup.py']
47
0 #!/usr/bin/env python
1 # -*- coding: utf-8 -*-
2
3 from setuptools import find_packages, setup
0 from setuptools import setup
41
52 with open("README.rst") as readme_file:
63 readme = readme_file.read()
85 with open("HISTORY.rst") as history_file:
96 history = history_file.read()
107
11 requirements = []
12
13 setup_requirements = []
14
15 test_requirements = ["pytest"]
16
178 setup(
18 author="Markus Gerstel",
19 author_email="scientificsoftware@diamond.ac.uk",
20 install_requires=requirements,
219 long_description=readme + "\n\n" + history,
22 include_package_data=True,
2310 keywords="procrunner",
24 packages=find_packages(include=["procrunner"]),
25 python_requires=">=3.6",
26 setup_requires=setup_requirements,
27 test_suite="tests",
28 tests_require=test_requirements,
29 url="https://github.com/DiamondLightSource/python-procrunner",
30 zip_safe=False,
3111 )
0 import codecs
1 import functools
2 import io
3 import logging
4 import os
5 import select
6 import shutil
7 import subprocess
8 import sys
9 import time
10 import timeit
11 import warnings
12 from multiprocessing import Pipe
13 from threading import Thread
14
15 #
16 # run() - A function to synchronously run an external process, supporting
17 # the following features:
18 #
19 # - runs an external process and waits for it to finish
20 # - does not deadlock, no matter the process stdout/stderr output behaviour
21 # - returns the exit code, stdout, stderr (separately) as a
22 # subprocess.CompletedProcess object
23 # - process can run in a custom environment, either as a modification of
24 # the current environment or in a new environment from scratch
25 # - stdin can be fed to the process
26 # - stdout and stderr is printed by default, can be disabled
27 # - stdout and stderr can be passed to any arbitrary function for
28 # live processing
29 # - optionally enforces a time limit on the process
30 #
31 #
32 # Usage example:
33 #
34 # import procrunner
35 # result = procrunner.run(['/bin/ls', '/some/path/containing spaces'])
36 #
37 # Returns:
38 #
39 # ReturnObject(
40 # args=('/bin/ls', '/some/path/containing spaces'),
41 # returncode=2,
42 # stdout=b'',
43 # stderr=b'/bin/ls: cannot access /some/path/containing spaces: No such file or directory\n'
44 # )
45 #
46 # which also offers (albeit deprecated)
47 #
48 # result.runtime == 0.12990689277648926
49 # result.time_end == '2017-11-12 19:54:49 GMT'
50 # result.time_start == '2017-11-12 19:54:49 GMT'
51 # result.timeout == False
52
53 __author__ = """Markus Gerstel"""
54 __email__ = "scientificsoftware@diamond.ac.uk"
55 __version__ = "2.3.1"
56
57 logger = logging.getLogger("procrunner")
58 logger.addHandler(logging.NullHandler())
59
60
61 class _LineAggregator:
62 """
63 Buffer that can be filled with stream data and will aggregate complete
64 lines. Lines can be printed or passed to an arbitrary callback function.
65 The lines passed to the callback function are UTF-8 decoded and do not
66 contain a trailing newline character.
67 """
68
69 def __init__(self, print_line=False, callback=None):
70 """Create aggregator object."""
71 self._buffer = ""
72 self._print = print_line
73 self._callback = callback
74 self._decoder = codecs.getincrementaldecoder("utf-8")("replace")
75
76 def add(self, data):
77 """
78 Add a single character to buffer. If one or more full lines are found,
79 print them (if desired) and pass to callback function.
80 """
81 data = self._decoder.decode(data)
82 if not data:
83 return
84 self._buffer += data
85 if "\n" in data:
86 to_print, remainder = self._buffer.rsplit("\n")
87 if self._print:
88 try:
89 print(to_print)
90 except UnicodeEncodeError:
91 print(to_print.encode(sys.getdefaultencoding(), errors="replace"))
92 if not hasattr(self, "_warned"):
93 logger.warning("output encoding error, characters replaced")
94 setattr(self, "_warned", True)
95 if self._callback:
96 self._callback(to_print)
97 self._buffer = remainder
98
99 def flush(self):
100 """Print/send any remaining data to callback function."""
101 self._buffer += self._decoder.decode(b"", final=True)
102 if self._buffer:
103 if self._print:
104 print(self._buffer)
105 if self._callback:
106 self._callback(self._buffer)
107 self._buffer = ""
108
109
110 class _NonBlockingStreamReader:
111 """Reads a stream in a thread to avoid blocking/deadlocks"""
112
113 def __init__(self, stream, output=True, debug=False, notify=None, callback=None):
114 """Creates and starts a thread which reads from a stream."""
115 self._buffer = io.BytesIO()
116 self._closed = False
117 self._closing = False
118 self._debug = debug
119 self._stream = stream
120 self._terminated = False
121
122 def _thread_write_stream_to_buffer():
123 la = _LineAggregator(print_line=output, callback=callback)
124 char = True
125 while char:
126 if select.select([self._stream], [], [], 0.1)[0]:
127 char = self._stream.read(1)
128 if char:
129 self._buffer.write(char)
130 la.add(char)
131 else:
132 if self._closing:
133 break
134 self._stream.close()
135 self._terminated = True
136 la.flush()
137 if self._debug:
138 logger.debug("Stream reader terminated")
139 if notify:
140 notify()
141
142 def _thread_write_stream_to_buffer_windows():
143 line = True
144 while line:
145 line = self._stream.readline()
146 if line:
147 self._buffer.write(line)
148 if output or callback:
149 linedecode = line.decode("utf-8", "replace")
150 if output:
151 print(linedecode)
152 if callback:
153 callback(linedecode)
154 self._stream.close()
155 self._terminated = True
156 if self._debug:
157 logger.debug("Stream reader terminated")
158 if notify:
159 notify()
160
161 if os.name == "nt":
162 self._thread = Thread(target=_thread_write_stream_to_buffer_windows)
163 else:
164 self._thread = Thread(target=_thread_write_stream_to_buffer)
165 self._thread.daemon = True
166 self._thread.start()
167
168 def has_finished(self):
169 """
170 Returns whether the thread reading from the stream is still alive.
171 """
172 return self._terminated
173
174 def get_output(self):
175 """
176 Retrieve the stored data in full.
177 This call may block if the reading thread has not yet terminated.
178 """
179 self._closing = True
180 if not self.has_finished():
181 if self._debug:
182 # Main thread overtook stream reading thread.
183 underrun_debug_timer = timeit.default_timer()
184 logger.warning("NBSR underrun")
185 self._thread.join()
186 if not self.has_finished():
187 if self._debug:
188 logger.debug(
189 "NBSR join after %f seconds, underrun not resolved",
190 timeit.default_timer() - underrun_debug_timer,
191 )
192 raise Exception("thread did not terminate")
193 if self._debug:
194 logger.debug(
195 "NBSR underrun resolved after %f seconds",
196 timeit.default_timer() - underrun_debug_timer,
197 )
198 if self._closed:
199 raise Exception("streamreader double-closed")
200 self._closed = True
201 data = self._buffer.getvalue()
202 self._buffer.close()
203 return data
204
205
206 class _NonBlockingStreamWriter:
207 """Writes to a stream in a thread to avoid blocking/deadlocks"""
208
209 def __init__(self, stream, data, debug=False, notify=None):
210 """Creates and starts a thread which writes data to stream."""
211 self._buffer = data
212 self._buffer_len = len(data)
213 self._buffer_pos = 0
214 self._max_block_len = 4096
215 self._stream = stream
216 self._terminated = False
217
218 def _thread_write_buffer_to_stream():
219 while self._buffer_pos < self._buffer_len:
220 if (self._buffer_len - self._buffer_pos) > self._max_block_len:
221 block = self._buffer[
222 self._buffer_pos : (self._buffer_pos + self._max_block_len)
223 ]
224 else:
225 block = self._buffer[self._buffer_pos :]
226 try:
227 self._stream.write(block)
228 except OSError as e:
229 if (
230 e.errno == 32
231 ): # broken pipe, ie. process terminated without reading entire stdin
232 self._stream.close()
233 self._terminated = True
234 if notify:
235 notify()
236 return
237 raise
238 self._buffer_pos += len(block)
239 if debug:
240 logger.debug("wrote %d bytes to stream", len(block))
241 self._stream.close()
242 self._terminated = True
243 if notify:
244 notify()
245
246 self._thread = Thread(target=_thread_write_buffer_to_stream)
247 self._thread.daemon = True
248 self._thread.start()
249
250 def has_finished(self):
251 """Returns whether the thread writing to the stream is still alive."""
252 return self._terminated
253
254 def bytes_sent(self):
255 """Return the number of bytes written so far."""
256 return self._buffer_pos
257
258 def bytes_remaining(self):
259 """Return the number of bytes still to be written."""
260 return self._buffer_len - self._buffer_pos
261
262
263 def _path_resolve(obj):
264 """
265 Resolve file system path (PEP-519) objects to strings.
266
267 :param obj: A file system path object or something else.
268 :return: A string representation of a file system path object or, for
269 anything that was not a file system path object, the original
270 object.
271 """
272 if obj and hasattr(obj, "__fspath__"):
273 return obj.__fspath__()
274 return obj
275
276
277 def _windows_resolve(command, path=None):
278 """
279 Try and find the full path and file extension of the executable to run.
280 This is so that e.g. calls to 'somescript' will point at 'somescript.cmd'
281 without the need to set shell=True in the subprocess.
282
283 :param command: The command array to be run, with the first element being
284 the command with or w/o path, with or w/o extension.
285 :return: Returns the command array with the executable resolved with the
286 correct extension. If the executable cannot be resolved for any
287 reason the original command array is returned.
288 """
289 if not command or not isinstance(command[0], str):
290 return command
291
292 found_executable = shutil.which(command[0], path=path)
293 if found_executable:
294 logger.debug("Resolved %s as %s", command[0], found_executable)
295 return (found_executable, *command[1:])
296
297 if "\\" in command[0]:
298 # Special case. shutil.which may not detect file extensions if a full
299 # path is given, so try to resolve the executable explicitly
300 for extension in os.getenv("PATHEXT").split(os.pathsep):
301 found_executable = shutil.which(command[0] + extension, path=path)
302 if found_executable:
303 return (found_executable, *command[1:])
304
305 logger.warning("Error trying to resolve the executable: %s", command[0])
306 return command
307
308
309 class ReturnObject(subprocess.CompletedProcess):
310 """
311 A subprocess.CompletedProcess-like object containing the executed
312 command, stdout and stderr (both as bytestrings), and the exitcode.
313 The check_returncode() function raises an exception if the process
314 exited with a non-zero exit code.
315 """
316
317 def __init__(self, exitcode=None, command=None, stdout=None, stderr=None, **kw):
318 super().__init__(
319 args=command, returncode=exitcode, stdout=stdout, stderr=stderr
320 )
321 self._extras = {
322 "timeout": kw.get("timeout"),
323 "runtime": kw.get("runtime"),
324 "time_start": kw.get("time_start"),
325 "time_end": kw.get("time_end"),
326 }
327
328 def __getitem__(self, key):
329 warnings.warn(
330 "dictionary access to a procrunner return object is deprecated",
331 DeprecationWarning,
332 stacklevel=2,
333 )
334 if key in self._extras:
335 return self._extras[key]
336 if not hasattr(self, key):
337 raise KeyError(f"Unknown attribute {key}")
338 return getattr(self, key)
339
340 def __eq__(self, other):
341 """Override equality operator to account for added fields"""
342 if type(other) is type(self):
343 return self.__dict__ == other.__dict__
344 return False
345
346 def __hash__(self):
347 """This object is not immutable, so mark it as unhashable"""
348 return None
349
350 @property
351 def cmd(self):
352 warnings.warn(
353 "procrunner return object .cmd is deprecated, use .args",
354 DeprecationWarning,
355 stacklevel=2,
356 )
357 return self.args
358
359 @property
360 def command(self):
361 warnings.warn(
362 "procrunner return object .command is deprecated, use .args",
363 DeprecationWarning,
364 stacklevel=2,
365 )
366 return self.args
367
368 @property
369 def exitcode(self):
370 warnings.warn(
371 "procrunner return object .exitcode is deprecated, use .returncode",
372 DeprecationWarning,
373 stacklevel=2,
374 )
375 return self.returncode
376
377 @property
378 def timeout(self):
379 warnings.warn(
380 "procrunner return object .timeout is deprecated",
381 DeprecationWarning,
382 stacklevel=2,
383 )
384 return self._extras["timeout"]
385
386 @property
387 def runtime(self):
388 warnings.warn(
389 "procrunner return object .runtime is deprecated",
390 DeprecationWarning,
391 stacklevel=2,
392 )
393 return self._extras["runtime"]
394
395 @property
396 def time_start(self):
397 warnings.warn(
398 "procrunner return object .time_start is deprecated",
399 DeprecationWarning,
400 stacklevel=2,
401 )
402 return self._extras["time_start"]
403
404 @property
405 def time_end(self):
406 warnings.warn(
407 "procrunner return object .time_end is deprecated",
408 DeprecationWarning,
409 stacklevel=2,
410 )
411 return self._extras["time_end"]
412
413 def update(self, dictionary):
414 self._extras.update(dictionary)
415
416
417 def _deprecate_argument_calling(f):
418 @functools.wraps(f)
419 def wrapper(*args, **kwargs):
420 if len(args) > 1:
421 warnings.warn(
422 "Calling procrunner.run() with unnamed arguments (apart from "
423 "the command) is deprecated. Use keyword arguments instead.",
424 DeprecationWarning,
425 stacklevel=2,
426 )
427 return f(*args, **kwargs)
428
429 return wrapper
430
431
432 @_deprecate_argument_calling
433 def run(
434 command,
435 timeout=None,
436 debug=None,
437 stdin=None,
438 print_stdout=True,
439 print_stderr=True,
440 callback_stdout=None,
441 callback_stderr=None,
442 environment=None,
443 environment_override=None,
444 win32resolve=True,
445 working_directory=None,
446 raise_timeout_exception=False,
447 ):
448 """
449 Run an external process.
450
451 File system path objects (PEP-519) are accepted in the command, environment,
452 and working directory arguments.
453
454 :param array command: Command line to be run, specified as array.
455 :param timeout: Terminate program execution after this many seconds.
456 :param boolean debug: Enable further debug messages. (deprecated)
457 :param stdin: Optional bytestring that is passed to command stdin.
458 :param boolean print_stdout: Pass stdout through to sys.stdout.
459 :param boolean print_stderr: Pass stderr through to sys.stderr.
460 :param callback_stdout: Optional function which is called for each
461 stdout line.
462 :param callback_stderr: Optional function which is called for each
463 stderr line.
464 :param dict environment: The full execution environment for the command.
465 :param dict environment_override: Change environment variables from the
466 current values for command execution.
467 :param boolean win32resolve: If on Windows, find the appropriate executable
468 first. This allows running of .bat, .cmd, etc.
469 files without explicitly specifying their
470 extension.
471 :param string working_directory: If specified, run the executable from
472 within this working directory.
473 :param boolean raise_timeout_exception: Forward compatibility flag. If set
474 then a subprocess.TimeoutExpired exception is raised
475 instead of returning an object that can be checked
476 for a timeout condition. Defaults to False, will be
477 changed to True in a future release.
478 :return: The exit code, stdout, stderr (separately, as byte strings)
479 as a subprocess.CompletedProcess object.
480 """
481
482 time_start = time.strftime("%Y-%m-%d %H:%M:%S GMT", time.gmtime())
483 logger.debug("Starting external process: %s", command)
484
485 if stdin is None:
486 stdin_pipe = None
487 else:
488 assert sys.platform != "win32", "stdin argument not supported on Windows"
489 stdin_pipe = subprocess.PIPE
490 if debug is not None:
491 warnings.warn(
492 "Use of the debug parameter is deprecated", DeprecationWarning, stacklevel=3
493 )
494
495 start_time = timeit.default_timer()
496 if timeout is not None:
497 max_time = start_time + timeout
498 if not raise_timeout_exception:
499 warnings.warn(
500 "Using procrunner with timeout and without raise_timeout_exception set is deprecated",
501 DeprecationWarning,
502 stacklevel=3,
503 )
504
505 if environment is not None:
506 env = {key: _path_resolve(environment[key]) for key in environment}
507 else:
508 env = {key: value for key, value in os.environ.items()}
509 if environment_override:
510 env.update(
511 {
512 key: str(_path_resolve(environment_override[key]))
513 for key in environment_override
514 }
515 )
516
517 command = tuple(_path_resolve(part) for part in command)
518 if win32resolve and sys.platform == "win32":
519 command = _windows_resolve(command)
520 if working_directory and sys.version_info < (3, 7):
521 working_directory = os.fspath(working_directory)
522
523 p = subprocess.Popen(
524 command,
525 shell=False,
526 cwd=working_directory,
527 env=env,
528 stdin=stdin_pipe,
529 stdout=subprocess.PIPE,
530 stderr=subprocess.PIPE,
531 )
532
533 thread_pipe_pool = []
534 notifyee, notifier = Pipe(False)
535 thread_pipe_pool.append(notifyee)
536 stdout = _NonBlockingStreamReader(
537 p.stdout,
538 output=print_stdout,
539 debug=debug,
540 notify=notifier.close,
541 callback=callback_stdout,
542 )
543 notifyee, notifier = Pipe(False)
544 thread_pipe_pool.append(notifyee)
545 stderr = _NonBlockingStreamReader(
546 p.stderr,
547 output=print_stderr,
548 debug=debug,
549 notify=notifier.close,
550 callback=callback_stderr,
551 )
552 if stdin is not None:
553 notifyee, notifier = Pipe(False)
554 thread_pipe_pool.append(notifyee)
555 stdin = _NonBlockingStreamWriter(
556 p.stdin, data=stdin, debug=debug, notify=notifier.close
557 )
558
559 timeout_encountered = False
560
561 while (p.returncode is None) and (
562 (timeout is None) or (timeit.default_timer() < max_time)
563 ):
564 if debug and timeout is not None:
565 logger.debug("still running (T%.2fs)", timeit.default_timer() - max_time)
566
567 # wait for some time or until a stream is closed
568 try:
569 if thread_pipe_pool:
570 # Wait for up to 0.5 seconds or for a signal on a remaining stream,
571 # which could indicate that the process has terminated.
572 try:
573 event = thread_pipe_pool[0].poll(0.5)
574 except BrokenPipeError as e:
575 # on Windows this raises "BrokenPipeError: [Errno 109] The pipe has been ended"
576 # which is for all intents and purposes equivalent to a True return value.
577 if e.winerror != 109:
578 raise
579 event = True
580 if event:
581 # One-shot, so remove stream and watch remaining streams
582 thread_pipe_pool.pop(0)
583 if debug:
584 logger.debug("Event received from stream thread")
585 else:
586 time.sleep(0.5)
587 except KeyboardInterrupt:
588 p.kill() # if user pressed Ctrl+C we won't be able to produce a proper report anyway
589 # but at least make sure the child process dies with us
590 raise
591
592 # check if process is still running
593 p.poll()
594
595 if p.returncode is None:
596 # timeout condition
597 timeout_encountered = True
598 if debug:
599 logger.debug("timeout (T%.2fs)", timeit.default_timer() - max_time)
600
601 # send terminate signal and wait some time for buffers to be read
602 p.terminate()
603 if thread_pipe_pool:
604 try:
605 thread_pipe_pool[0].poll(0.5)
606 except BrokenPipeError as e:
607 # on Windows this raises "BrokenPipeError: [Errno 109] The pipe has been ended"
608 # which is for all intents and purposes equivalent to a True return value.
609 if e.winerror != 109:
610 raise
611 thread_pipe_pool.pop(0)
612 if not stdout.has_finished() or not stderr.has_finished():
613 time.sleep(2)
614 p.poll()
615
616 if p.returncode is None:
617 # thread still alive
618 # send kill signal and wait some more time for buffers to be read
619 p.kill()
620 if thread_pipe_pool:
621 try:
622 thread_pipe_pool[0].poll(0.5)
623 except BrokenPipeError as e:
624 # on Windows this raises "BrokenPipeError: [Errno 109] The pipe has been ended"
625 # which is for all intents and purposes equivalent to a True return value.
626 if e.winerror != 109:
627 raise
628 thread_pipe_pool.pop(0)
629 if not stdout.has_finished() or not stderr.has_finished():
630 time.sleep(5)
631 p.poll()
632
633 if p.returncode is None:
634 raise RuntimeError("Process won't terminate")
635
636 runtime = timeit.default_timer() - start_time
637 if timeout is not None:
638 logger.debug(
639 "Process ended after %.1f seconds with exit code %d (T%.2fs)",
640 runtime,
641 p.returncode,
642 timeit.default_timer() - max_time,
643 )
644 else:
645 logger.debug(
646 "Process ended after %.1f seconds with exit code %d", runtime, p.returncode
647 )
648
649 stdout = stdout.get_output()
650 stderr = stderr.get_output()
651
652 if timeout_encountered and raise_timeout_exception:
653 raise subprocess.TimeoutExpired(
654 cmd=command, timeout=timeout, output=stdout, stderr=stderr
655 )
656
657 time_end = time.strftime("%Y-%m-%d %H:%M:%S GMT", time.gmtime())
658 result = ReturnObject(
659 exitcode=p.returncode,
660 command=command,
661 stdout=stdout,
662 stderr=stderr,
663 timeout=timeout_encountered,
664 runtime=runtime,
665 time_start=time_start,
666 time_end=time_end,
667 )
668 if stdin is not None:
669 result.update(
670 {
671 "stdin_bytes_sent": stdin.bytes_sent(),
672 "stdin_bytes_remain": stdin.bytes_remaining(),
673 }
674 )
675
676 return result