Codebase list python-procrunner / 16c4c94
codestyle:black Markus Gerstel 5 years ago
7 changed file(s) with 804 addition(s) and 644 deletion(s). Raw diff Collapse all Expand all
44
55 .. image:: https://img.shields.io/pypi/v/procrunner.svg
66 :target: https://pypi.python.org/pypi/procrunner
7 :alt: PyPI release
78
89 .. image:: https://travis-ci.org/DiamondLightSource/python-procrunner.svg?branch=master
910 :target: https://travis-ci.org/DiamondLightSource/python-procrunner
11 :alt: Build status
1012
1113 .. image:: https://ci.appveyor.com/api/projects/status/jtq4brwri5q18d0u/branch/master
1214 :target: https://ci.appveyor.com/project/Anthchirp/python-procrunner
15 :alt: Build status
1316
1417 .. image:: https://readthedocs.org/projects/procrunner/badge/?version=latest
1518 :target: https://procrunner.readthedocs.io/en/latest/?badge=latest
1619 :alt: Documentation Status
1720
1821 .. image:: https://pyup.io/repos/github/DiamondLightSource/python-procrunner/shield.svg
19 :target: https://pyup.io/repos/github/DiamondLightSource/python-procrunner/
20 :alt: Updates
22 :target: https://pyup.io/repos/github/DiamondLightSource/python-procrunner/
23 :alt: Updates
2124
2225 .. image:: https://img.shields.io/pypi/pyversions/procrunner.svg
23 :target: https://pypi.python.org/pypi/procrunner
26 :target: https://pypi.python.org/pypi/procrunner
27 :alt: Supported Python versions
28
29 .. image:: https://img.shields.io/badge/code%20style-black-000000.svg
30 :target: https://github.com/ambv/black
31 :alt: Code style: black
2432
2533 Versatile utility function to run external processes
26
2734
2835 * Free software: BSD license
2936 * Documentation: https://procrunner.readthedocs.io.
1919 #
2020 import os
2121 import sys
22 sys.path.insert(0, os.path.abspath('..'))
22
23 sys.path.insert(0, os.path.abspath(".."))
2324
2425 import procrunner
2526
3132
3233 # Add any Sphinx extension module names here, as strings. They can be
3334 # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
34 extensions = ['sphinx.ext.autodoc', 'sphinx.ext.viewcode']
35 extensions = ["sphinx.ext.autodoc", "sphinx.ext.viewcode"]
3536
3637 # Add any paths that contain templates here, relative to this directory.
37 templates_path = ['_templates']
38 templates_path = ["_templates"]
3839
3940 # The suffix(es) of source filenames.
4041 # You can specify multiple suffix as a list of string:
4142 #
4243 # source_suffix = ['.rst', '.md']
43 source_suffix = '.rst'
44 source_suffix = ".rst"
4445
4546 # The master toctree document.
46 master_doc = 'index'
47 master_doc = "index"
4748
4849 # General information about the project.
49 project = u'ProcRunner'
50 project = u"ProcRunner"
5051 copyright = u"2018, Markus Gerstel"
5152 author = u"Markus Gerstel"
5253
6970 # List of patterns, relative to source directory, that match files and
7071 # directories to ignore when looking for source files.
7172 # This patterns also effect to html_static_path and html_extra_path
72 exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
73 exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
7374
7475 # The name of the Pygments (syntax highlighting) style to use.
75 pygments_style = 'sphinx'
76 pygments_style = "sphinx"
7677
7778 # If true, `todo` and `todoList` produce output, else they produce nothing.
7879 todo_include_todos = False
8384 # The theme to use for HTML and HTML Help pages. See the documentation for
8485 # a list of builtin themes.
8586 #
86 html_theme = 'alabaster'
87 html_theme = "alabaster"
8788
8889 # Theme options are theme-specific and customize the look and feel of a
8990 # theme further. For a list of options available for each theme, see the
9495 # Add any paths that contain custom static files (such as style sheets) here,
9596 # relative to this directory. They are copied after the builtin static files,
9697 # so a file named "default.css" will overwrite the builtin "default.css".
97 html_static_path = ['_static']
98 html_static_path = ["_static"]
9899
99100
100101 # -- Options for HTMLHelp output ---------------------------------------
101102
102103 # Output file base name for HTML help builder.
103 htmlhelp_basename = 'procrunnerdoc'
104 htmlhelp_basename = "procrunnerdoc"
104105
105106
106107 # -- Options for LaTeX output ------------------------------------------
109110 # The paper size ('letterpaper' or 'a4paper').
110111 #
111112 # 'papersize': 'letterpaper',
112
113113 # The font size ('10pt', '11pt' or '12pt').
114114 #
115115 # 'pointsize': '10pt',
116
117116 # Additional stuff for the LaTeX preamble.
118117 #
119118 # 'preamble': '',
120
121119 # Latex figure (float) alignment
122120 #
123121 # 'figure_align': 'htbp',
127125 # (source start file, target name, title, author, documentclass
128126 # [howto, manual, or own class]).
129127 latex_documents = [
130 (master_doc, 'procrunner.tex',
131 u'ProcRunner Documentation',
132 u'Markus Gerstel', 'manual'),
128 (
129 master_doc,
130 "procrunner.tex",
131 u"ProcRunner Documentation",
132 u"Markus Gerstel",
133 "manual",
134 )
133135 ]
134136
135137
137139
138140 # One entry per manual page. List of tuples
139141 # (source start file, name, description, authors, manual section).
140 man_pages = [
141 (master_doc, 'procrunner',
142 u'ProcRunner Documentation',
143 [author], 1)
144 ]
142 man_pages = [(master_doc, "procrunner", u"ProcRunner Documentation", [author], 1)]
145143
146144
147145 # -- Options for Texinfo output ----------------------------------------
150148 # (source start file, target name, title, author,
151149 # dir menu entry, description, category)
152150 texinfo_documents = [
153 (master_doc, 'procrunner',
154 u'ProcRunner Documentation',
155 author,
156 'procrunner',
157 'One line description of project.',
158 'Miscellaneous'),
151 (
152 master_doc,
153 "procrunner",
154 u"ProcRunner Documentation",
155 author,
156 "procrunner",
157 "One line description of project.",
158 "Miscellaneous",
159 )
159160 ]
160
161
162
5151 #
5252
5353 __author__ = """Markus Gerstel"""
54 __email__ = 'scientificsoftware@diamond.ac.uk'
55 __version__ = '0.8.0'
56
57 logger = logging.getLogger('procrunner')
54 __email__ = "scientificsoftware@diamond.ac.uk"
55 __version__ = "0.8.0"
56
57 logger = logging.getLogger("procrunner")
5858 logger.addHandler(logging.NullHandler())
5959
60
6061 class _LineAggregator(object):
61 '''Buffer that can be filled with stream data and will aggregate complete
62 """Buffer that can be filled with stream data and will aggregate complete
6263 lines. Lines can be printed or passed to an arbitrary callback function.
6364 The lines passed to the callback function are UTF-8 decoded and do not
64 contain a trailing newline character.'''
65 def __init__(self, print_line=False, callback=None):
66 '''Create aggregator object.'''
67 self._buffer = ''
68 self._print = print_line
69 self._callback = callback
70 self._decoder = codecs.getincrementaldecoder('utf-8')('replace')
71 def add(self, data):
72 '''Add a single character to buffer. If one or more full lines are found,
73 print them (if desired) and pass to callback function.'''
74 data = self._decoder.decode(data)
75 if not data: return
76 self._buffer += data
77 if '\n' in data:
78 to_print, remainder = self._buffer.rsplit('\n')
79 if self._print:
80 print(to_print)
81 if self._callback:
82 self._callback(to_print)
83 self._buffer = remainder
84 def flush(self):
85 '''Print/send any remaining data to callback function.'''
86 self._buffer += self._decoder.decode(b'', final=True)
87 if self._buffer:
88 if self._print:
89 print(self._buffer)
90 if self._callback:
91 self._callback(self._buffer)
92 self._buffer = ''
65 contain a trailing newline character."""
66
67 def __init__(self, print_line=False, callback=None):
68 """Create aggregator object."""
69 self._buffer = ""
70 self._print = print_line
71 self._callback = callback
72 self._decoder = codecs.getincrementaldecoder("utf-8")("replace")
73
74 def add(self, data):
75 """Add a single character to buffer. If one or more full lines are found,
76 print them (if desired) and pass to callback function."""
77 data = self._decoder.decode(data)
78 if not data:
79 return
80 self._buffer += data
81 if "\n" in data:
82 to_print, remainder = self._buffer.rsplit("\n")
83 if self._print:
84 print(to_print)
85 if self._callback:
86 self._callback(to_print)
87 self._buffer = remainder
88
89 def flush(self):
90 """Print/send any remaining data to callback function."""
91 self._buffer += self._decoder.decode(b"", final=True)
92 if self._buffer:
93 if self._print:
94 print(self._buffer)
95 if self._callback:
96 self._callback(self._buffer)
97 self._buffer = ""
98
9399
94100 class _NonBlockingStreamReader(object):
95 '''Reads a stream in a thread to avoid blocking/deadlocks'''
96 def __init__(self, stream, output=True, debug=False, notify=None, callback=None):
97 '''Creates and starts a thread which reads from a stream.'''
98 self._buffer = six.BytesIO()
99 self._closed = False
100 self._closing = False
101 self._debug = debug
102 self._stream = stream
103 self._terminated = False
104
105 def _thread_write_stream_to_buffer():
106 la = _LineAggregator(print_line=output, callback=callback)
107 char = True
108 while char:
109 if select.select([self._stream], [], [], 0.1)[0]:
110 char = self._stream.read(1)
111 if char:
112 self._buffer.write(char)
113 la.add(char)
101 """Reads a stream in a thread to avoid blocking/deadlocks"""
102
103 def __init__(self, stream, output=True, debug=False, notify=None, callback=None):
104 """Creates and starts a thread which reads from a stream."""
105 self._buffer = six.BytesIO()
106 self._closed = False
107 self._closing = False
108 self._debug = debug
109 self._stream = stream
110 self._terminated = False
111
112 def _thread_write_stream_to_buffer():
113 la = _LineAggregator(print_line=output, callback=callback)
114 char = True
115 while char:
116 if select.select([self._stream], [], [], 0.1)[0]:
117 char = self._stream.read(1)
118 if char:
119 self._buffer.write(char)
120 la.add(char)
121 else:
122 if self._closing:
123 break
124 self._terminated = True
125 la.flush()
126 if self._debug:
127 logger.debug("Stream reader terminated")
128 if notify:
129 notify()
130
131 def _thread_write_stream_to_buffer_windows():
132 line = True
133 while line:
134 line = self._stream.readline()
135 if line:
136 self._buffer.write(line)
137 if output or callback:
138 linedecode = line.decode("utf-8", "replace")
139 if output:
140 print(linedecode)
141 if callback:
142 callback(linedecode)
143 self._terminated = True
144 if self._debug:
145 logger.debug("Stream reader terminated")
146 if notify:
147 notify()
148
149 if os.name == "nt":
150 self._thread = Thread(target=_thread_write_stream_to_buffer_windows)
114151 else:
115 if self._closing: break
116 self._terminated = True
117 la.flush()
118 if self._debug:
119 logger.debug("Stream reader terminated")
120 if notify:
121 notify()
122
123 def _thread_write_stream_to_buffer_windows():
124 line = True
125 while line:
126 line = self._stream.readline()
127 if line:
128 self._buffer.write(line)
129 if output or callback:
130 linedecode = line.decode('utf-8', 'replace')
131 if output:
132 print(linedecode)
133 if callback:
134 callback(linedecode)
135 self._terminated = True
136 if self._debug:
137 logger.debug("Stream reader terminated")
138 if notify:
139 notify()
140
141 if os.name == "nt":
142 self._thread = Thread(target = _thread_write_stream_to_buffer_windows)
143 else:
144 self._thread = Thread(target = _thread_write_stream_to_buffer)
145 self._thread.daemon = True
146 self._thread.start()
147
148 def has_finished(self):
149 '''Returns whether the thread reading from the stream is still alive.'''
150 return self._terminated
151
152 def get_output(self):
153 '''Retrieve the stored data in full.
154 This call may block if the reading thread has not yet terminated.'''
155 self._closing = True
156 if not self.has_finished():
157 if self._debug:
158 # Main thread overtook stream reading thread.
159 underrun_debug_timer = timeit.default_timer()
160 logger.warn("NBSR underrun")
161 self._thread.join()
162 if not self.has_finished():
163 if self._debug:
164 logger.debug("NBSR join after %f seconds, underrun not resolved" % (timeit.default_timer() - underrun_debug_timer))
165 raise Exception('thread did not terminate')
166 if self._debug:
167 logger.debug("NBSR underrun resolved after %f seconds" % (timeit.default_timer() - underrun_debug_timer))
168 if self._closed:
169 raise Exception('streamreader double-closed')
170 self._closed = True
171 data = self._buffer.getvalue()
172 self._buffer.close()
173 return data
152 self._thread = Thread(target=_thread_write_stream_to_buffer)
153 self._thread.daemon = True
154 self._thread.start()
155
156 def has_finished(self):
157 """Returns whether the thread reading from the stream is still alive."""
158 return self._terminated
159
160 def get_output(self):
161 """Retrieve the stored data in full.
162 This call may block if the reading thread has not yet terminated."""
163 self._closing = True
164 if not self.has_finished():
165 if self._debug:
166 # Main thread overtook stream reading thread.
167 underrun_debug_timer = timeit.default_timer()
168 logger.warn("NBSR underrun")
169 self._thread.join()
170 if not self.has_finished():
171 if self._debug:
172 logger.debug(
173 "NBSR join after %f seconds, underrun not resolved"
174 % (timeit.default_timer() - underrun_debug_timer)
175 )
176 raise Exception("thread did not terminate")
177 if self._debug:
178 logger.debug(
179 "NBSR underrun resolved after %f seconds"
180 % (timeit.default_timer() - underrun_debug_timer)
181 )
182 if self._closed:
183 raise Exception("streamreader double-closed")
184 self._closed = True
185 data = self._buffer.getvalue()
186 self._buffer.close()
187 return data
188
174189
175190 class _NonBlockingStreamWriter(object):
176 '''Writes to a stream in a thread to avoid blocking/deadlocks'''
177 def __init__(self, stream, data, debug=False, notify=None):
178 '''Creates and starts a thread which writes data to stream.'''
179 self._buffer = data
180 self._buffer_len = len(data)
181 self._buffer_pos = 0
182 self._debug = debug
183 self._max_block_len = 4096
184 self._stream = stream
185 self._terminated = False
186
187 def _thread_write_buffer_to_stream():
188 while self._buffer_pos < self._buffer_len:
189 if (self._buffer_len - self._buffer_pos) > self._max_block_len:
190 block = self._buffer[self._buffer_pos:(self._buffer_pos + self._max_block_len)]
191 else:
192 block = self._buffer[self._buffer_pos:]
193 try:
194 self._stream.write(block)
195 except IOError as e:
196 if e.errno == 32: # broken pipe, ie. process terminated without reading entire stdin
191 """Writes to a stream in a thread to avoid blocking/deadlocks"""
192
193 def __init__(self, stream, data, debug=False, notify=None):
194 """Creates and starts a thread which writes data to stream."""
195 self._buffer = data
196 self._buffer_len = len(data)
197 self._buffer_pos = 0
198 self._debug = debug
199 self._max_block_len = 4096
200 self._stream = stream
201 self._terminated = False
202
203 def _thread_write_buffer_to_stream():
204 while self._buffer_pos < self._buffer_len:
205 if (self._buffer_len - self._buffer_pos) > self._max_block_len:
206 block = self._buffer[
207 self._buffer_pos : (self._buffer_pos + self._max_block_len)
208 ]
209 else:
210 block = self._buffer[self._buffer_pos :]
211 try:
212 self._stream.write(block)
213 except IOError as e:
214 if (
215 e.errno == 32
216 ): # broken pipe, ie. process terminated without reading entire stdin
217 self._stream.close()
218 self._terminated = True
219 if notify:
220 notify()
221 return
222 raise
223 self._buffer_pos += len(block)
224 if debug:
225 logger.debug("wrote %d bytes to stream" % len(block))
197226 self._stream.close()
198227 self._terminated = True
199228 if notify:
200 notify()
201 return
202 raise
203 self._buffer_pos += len(block)
204 if debug:
205 logger.debug("wrote %d bytes to stream" % len(block))
206 self._stream.close()
207 self._terminated = True
208 if notify:
209 notify()
210
211 self._thread = Thread(target = _thread_write_buffer_to_stream)
212 self._thread.daemon = True
213 self._thread.start()
214
215 def has_finished(self):
216 '''Returns whether the thread writing to the stream is still alive.'''
217 return self._terminated
218
219 def bytes_sent(self):
220 '''Return the number of bytes written so far.'''
221 return self._buffer_pos
222
223 def bytes_remaining(self):
224 '''Return the number of bytes still to be written.'''
225 return self._buffer_len - self._buffer_pos
229 notify()
230
231 self._thread = Thread(target=_thread_write_buffer_to_stream)
232 self._thread.daemon = True
233 self._thread.start()
234
235 def has_finished(self):
236 """Returns whether the thread writing to the stream is still alive."""
237 return self._terminated
238
239 def bytes_sent(self):
240 """Return the number of bytes written so far."""
241 return self._buffer_pos
242
243 def bytes_remaining(self):
244 """Return the number of bytes still to be written."""
245 return self._buffer_len - self._buffer_pos
246
226247
227248 def _windows_resolve(command):
228 '''Try and find the full path and file extension of the executable to run.
249 """Try and find the full path and file extension of the executable to run.
229250 This is so that e.g. calls to 'somescript' will point at 'somescript.cmd'
230251 without the need to set shell=True in the subprocess.
231252 If the executable contains periods it is a special case. Here the
237258 :return: Returns the command array with the executable resolved with the
238259 correct extension. If the executable cannot be resolved for any
239260 reason the original command array is returned.
240 '''
241 try:
242 import win32api
243 except ImportError:
244 if (2, 8) < sys.version_info < (3, 5):
245 logger.info("Resolving executable names only supported on Python 2.7 and 3.5+")
246 else:
247 logger.warn("Could not resolve executable name: package win32api missing")
261 """
262 try:
263 import win32api
264 except ImportError:
265 if (2, 8) < sys.version_info < (3, 5):
266 logger.info(
267 "Resolving executable names only supported on Python 2.7 and 3.5+"
268 )
269 else:
270 logger.warn("Could not resolve executable name: package win32api missing")
271 return command
272
273 try:
274 # Ensure the command parameter is iterable.
275 iter(command)
276 except TypeError:
277 # If it is not iterable it could be a Mock(). Return it untouched.
278 return command
279
280 try:
281 _, found_executable = win32api.FindExecutable(command[0])
282 logger.debug("Resolved %s as %s", command[0], found_executable)
283 return [found_executable] + command[1:]
284 except Exception as e:
285 if not hasattr(e, "winerror"):
286 raise
287 # Keep this error message for later in case we fail to resolve the name
288 logwarning = getattr(e, "strerror", str(e))
289
290 if "." in command[0]:
291 # Special case. The win32api may not properly check file extensions, so
292 # try to resolve the executable explicitly.
293 for extension in os.getenv("PATHEXT").split(os.pathsep):
294 try:
295 _, found_executable = win32api.FindExecutable(command[0] + extension)
296 logger.debug("Resolved %s as %s", command[0], found_executable)
297 return [found_executable] + command[1:]
298 except Exception as e:
299 if not hasattr(e, "winerror"):
300 raise
301
302 logger.warn("Error trying to resolve the executable: %s", logwarning)
248303 return command
249304
250 try:
251 # Ensure the command parameter is iterable.
252 iter(command)
253 except TypeError:
254 # If it is not iterable it could be a Mock(). Return it untouched.
255 return command
256
257 try:
258 _, found_executable = win32api.FindExecutable(command[0])
259 logger.debug("Resolved %s as %s", command[0], found_executable)
260 return [ found_executable ] + command[1:]
261 except Exception as e:
262 if not hasattr(e, 'winerror'): raise
263 # Keep this error message for later in case we fail to resolve the name
264 logwarning = getattr(e, 'strerror', str(e))
265
266 if '.' in command[0]:
267 # Special case. The win32api may not properly check file extensions, so
268 # try to resolve the executable explicitly.
269 for extension in os.getenv('PATHEXT').split(os.pathsep):
270 try:
271 _, found_executable = win32api.FindExecutable(command[0] + extension)
272 logger.debug("Resolved %s as %s", command[0], found_executable)
273 return [ found_executable ] + command[1:]
274 except Exception as e:
275 if not hasattr(e, 'winerror'): raise
276
277 logger.warn("Error trying to resolve the executable: %s", logwarning)
278 return command
279
280 def run(command, timeout=None, debug=False, stdin=None, print_stdout=True,
281 print_stderr=True, callback_stdout=None, callback_stderr=None,
282 environment=None, environment_override=None, win32resolve=True,
283 working_directory=None):
284 '''Run an external process.
305
306 def run(
307 command,
308 timeout=None,
309 debug=False,
310 stdin=None,
311 print_stdout=True,
312 print_stderr=True,
313 callback_stdout=None,
314 callback_stderr=None,
315 environment=None,
316 environment_override=None,
317 win32resolve=True,
318 working_directory=None,
319 ):
320 """Run an external process.
285321
286322 :param array command: Command line to be run, specified as array.
287323 :param timeout: Terminate program execution after this many seconds.
304340 within this working directory.
305341 :return: A dictionary containing stdout, stderr (both as bytestrings),
306342 runtime, exitcode, and more.
307 '''
308
309 time_start = time.strftime("%Y-%m-%d %H:%M:%S GMT", time.gmtime())
310 logger.debug("Starting external process: %s", command)
311
312 if stdin is None:
313 stdin_pipe = None
314 else:
315 assert sys.platform != 'win32', 'stdin argument not supported on Windows'
316 stdin_pipe = subprocess.PIPE
317
318 start_time = timeit.default_timer()
319 if timeout is not None:
320 max_time = start_time + timeout
321
322 if environment is not None:
323 env = environment
324 else:
325 env = os.environ
326 if environment_override:
327 env = copy.copy(env)
328 env.update({key: str(environment_override[key]) for key in environment_override})
329
330 if win32resolve and sys.platform == 'win32':
331 command = _windows_resolve(command)
332
333 p = subprocess.Popen(command, shell=False, cwd=working_directory, env=env,
334 stdin=stdin_pipe, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
335
336 thread_pipe_pool = []
337 notifyee, notifier = Pipe(False)
338 thread_pipe_pool.append(notifyee)
339 stdout = _NonBlockingStreamReader(p.stdout, output=print_stdout, debug=debug, notify=notifier.close, callback=callback_stdout)
340 notifyee, notifier = Pipe(False)
341 thread_pipe_pool.append(notifyee)
342 stderr = _NonBlockingStreamReader(p.stderr, output=print_stderr, debug=debug, notify=notifier.close, callback=callback_stderr)
343 if stdin is not None:
343 """
344
345 time_start = time.strftime("%Y-%m-%d %H:%M:%S GMT", time.gmtime())
346 logger.debug("Starting external process: %s", command)
347
348 if stdin is None:
349 stdin_pipe = None
350 else:
351 assert sys.platform != "win32", "stdin argument not supported on Windows"
352 stdin_pipe = subprocess.PIPE
353
354 start_time = timeit.default_timer()
355 if timeout is not None:
356 max_time = start_time + timeout
357
358 if environment is not None:
359 env = environment
360 else:
361 env = os.environ
362 if environment_override:
363 env = copy.copy(env)
364 env.update(
365 {key: str(environment_override[key]) for key in environment_override}
366 )
367
368 if win32resolve and sys.platform == "win32":
369 command = _windows_resolve(command)
370
371 p = subprocess.Popen(
372 command,
373 shell=False,
374 cwd=working_directory,
375 env=env,
376 stdin=stdin_pipe,
377 stdout=subprocess.PIPE,
378 stderr=subprocess.PIPE,
379 )
380
381 thread_pipe_pool = []
344382 notifyee, notifier = Pipe(False)
345383 thread_pipe_pool.append(notifyee)
346 stdin = _NonBlockingStreamWriter(p.stdin, data=stdin, debug=debug, notify=notifier.close)
347
348 timeout_encountered = False
349
350 while (p.returncode is None) and \
351 ((timeout is None) or (timeit.default_timer() < max_time)):
352 if debug and timeout is not None:
353 logger.debug("still running (T%.2fs)" % (timeit.default_timer() - max_time))
354
355 # wait for some time or until a stream is closed
356 try:
357 if thread_pipe_pool:
358 # Wait for up to 0.5 seconds or for a signal on a remaining stream,
359 # which could indicate that the process has terminated.
384 stdout = _NonBlockingStreamReader(
385 p.stdout,
386 output=print_stdout,
387 debug=debug,
388 notify=notifier.close,
389 callback=callback_stdout,
390 )
391 notifyee, notifier = Pipe(False)
392 thread_pipe_pool.append(notifyee)
393 stderr = _NonBlockingStreamReader(
394 p.stderr,
395 output=print_stderr,
396 debug=debug,
397 notify=notifier.close,
398 callback=callback_stderr,
399 )
400 if stdin is not None:
401 notifyee, notifier = Pipe(False)
402 thread_pipe_pool.append(notifyee)
403 stdin = _NonBlockingStreamWriter(
404 p.stdin, data=stdin, debug=debug, notify=notifier.close
405 )
406
407 timeout_encountered = False
408
409 while (p.returncode is None) and (
410 (timeout is None) or (timeit.default_timer() < max_time)
411 ):
412 if debug and timeout is not None:
413 logger.debug("still running (T%.2fs)" % (timeit.default_timer() - max_time))
414
415 # wait for some time or until a stream is closed
360416 try:
361 event = thread_pipe_pool[0].poll(0.5)
362 except IOError as e:
363 # on Windows this raises "IOError: [Errno 109] The pipe has been ended"
364 # which is for all intents and purposes equivalent to a True return value.
365 if e.errno != 109: raise
366 event = True
367 if event:
368 # One-shot, so remove stream and watch remaining streams
369 thread_pipe_pool.pop(0)
370 if debug:
371 logger.debug("Event received from stream thread")
372 else:
373 time.sleep(0.5)
374 except KeyboardInterrupt:
375 p.kill() # if user pressed Ctrl+C we won't be able to produce a proper report anyway
376 # but at least make sure the child process dies with us
377 raise
378
379 # check if process is still running
380 p.poll()
381
382 if p.returncode is None:
383 # timeout condition
384 timeout_encountered = True
385 if debug:
386 logger.debug("timeout (T%.2fs)" % (timeit.default_timer() - max_time))
387
388 # send terminate signal and wait some time for buffers to be read
389 p.terminate()
390 if thread_pipe_pool:
391 thread_pipe_pool[0].poll(0.5)
392 if not stdout.has_finished() or not stderr.has_finished():
393 time.sleep(2)
394 p.poll()
395
396 if p.returncode is None:
397 # thread still alive
398 # send kill signal and wait some more time for buffers to be read
399 p.kill()
400 if thread_pipe_pool:
401 thread_pipe_pool[0].poll(0.5)
402 if not stdout.has_finished() or not stderr.has_finished():
403 time.sleep(5)
404 p.poll()
405
406 if p.returncode is None:
407 raise RuntimeError("Process won't terminate")
408
409 runtime = timeit.default_timer() - start_time
410 if timeout is not None:
411 logger.debug("Process ended after %.1f seconds with exit code %d (T%.2fs)" % \
412 (runtime, p.returncode, timeit.default_timer() - max_time))
413 else:
414 logger.debug("Process ended after %.1f seconds with exit code %d" % \
415 (runtime, p.returncode))
416
417 stdout = stdout.get_output()
418 stderr = stderr.get_output()
419 time_end = time.strftime("%Y-%m-%d %H:%M:%S GMT", time.gmtime())
420
421 result = { 'exitcode': p.returncode, 'command': command,
422 'stdout': stdout, 'stderr': stderr,
423 'timeout': timeout_encountered, 'runtime': runtime,
424 'time_start': time_start, 'time_end': time_end }
425 if stdin is not None:
426 result.update({ 'stdin_bytes_sent': stdin.bytes_sent(),
427 'stdin_bytes_remain': stdin.bytes_remaining() })
428
429 return result
417 if thread_pipe_pool:
418 # Wait for up to 0.5 seconds or for a signal on a remaining stream,
419 # which could indicate that the process has terminated.
420 try:
421 event = thread_pipe_pool[0].poll(0.5)
422 except IOError as e:
423 # on Windows this raises "IOError: [Errno 109] The pipe has been ended"
424 # which is for all intents and purposes equivalent to a True return value.
425 if e.errno != 109:
426 raise
427 event = True
428 if event:
429 # One-shot, so remove stream and watch remaining streams
430 thread_pipe_pool.pop(0)
431 if debug:
432 logger.debug("Event received from stream thread")
433 else:
434 time.sleep(0.5)
435 except KeyboardInterrupt:
436 p.kill() # if user pressed Ctrl+C we won't be able to produce a proper report anyway
437 # but at least make sure the child process dies with us
438 raise
439
440 # check if process is still running
441 p.poll()
442
443 if p.returncode is None:
444 # timeout condition
445 timeout_encountered = True
446 if debug:
447 logger.debug("timeout (T%.2fs)" % (timeit.default_timer() - max_time))
448
449 # send terminate signal and wait some time for buffers to be read
450 p.terminate()
451 if thread_pipe_pool:
452 thread_pipe_pool[0].poll(0.5)
453 if not stdout.has_finished() or not stderr.has_finished():
454 time.sleep(2)
455 p.poll()
456
457 if p.returncode is None:
458 # thread still alive
459 # send kill signal and wait some more time for buffers to be read
460 p.kill()
461 if thread_pipe_pool:
462 thread_pipe_pool[0].poll(0.5)
463 if not stdout.has_finished() or not stderr.has_finished():
464 time.sleep(5)
465 p.poll()
466
467 if p.returncode is None:
468 raise RuntimeError("Process won't terminate")
469
470 runtime = timeit.default_timer() - start_time
471 if timeout is not None:
472 logger.debug(
473 "Process ended after %.1f seconds with exit code %d (T%.2fs)"
474 % (runtime, p.returncode, timeit.default_timer() - max_time)
475 )
476 else:
477 logger.debug(
478 "Process ended after %.1f seconds with exit code %d"
479 % (runtime, p.returncode)
480 )
481
482 stdout = stdout.get_output()
483 stderr = stderr.get_output()
484 time_end = time.strftime("%Y-%m-%d %H:%M:%S GMT", time.gmtime())
485
486 result = {
487 "exitcode": p.returncode,
488 "command": command,
489 "stdout": stdout,
490 "stderr": stderr,
491 "timeout": timeout_encountered,
492 "runtime": runtime,
493 "time_start": time_start,
494 "time_end": time_end,
495 }
496 if stdin is not None:
497 result.update(
498 {
499 "stdin_bytes_sent": stdin.bytes_sent(),
500 "stdin_bytes_remain": stdin.bytes_remaining(),
501 }
502 )
503
504 return result
430505
431506
432507 def run_process_dummy(command, **kwargs):
433 '''A stand-in function that returns a valid result dictionary indicating a
434 successful execution. The external process is not run.'''
435 warnings.warn("procrunner.run_process_dummy() is deprecated", DeprecationWarning)
436
437 time_start = time.strftime("%Y-%m-%d %H:%M:%S GMT", time.gmtime())
438 logger.info("run_process is disabled. Requested command: %s", command)
439
440 result = { 'exitcode': 0, 'command': command,
441 'stdout': '', 'stderr': '',
442 'timeout': False, 'runtime': 0,
443 'time_start': time_start, 'time_end': time_start }
444 if kwargs.get('stdin') is not None:
445 result.update({ 'stdin_bytes_sent': len(kwargs['stdin']),
446 'stdin_bytes_remain': 0 })
447 return result
508 """A stand-in function that returns a valid result dictionary indicating a
509 successful execution. The external process is not run."""
510 warnings.warn("procrunner.run_process_dummy() is deprecated", DeprecationWarning)
511
512 time_start = time.strftime("%Y-%m-%d %H:%M:%S GMT", time.gmtime())
513 logger.info("run_process is disabled. Requested command: %s", command)
514
515 result = {
516 "exitcode": 0,
517 "command": command,
518 "stdout": "",
519 "stderr": "",
520 "timeout": False,
521 "runtime": 0,
522 "time_start": time_start,
523 "time_end": time_start,
524 }
525 if kwargs.get("stdin") is not None:
526 result.update(
527 {"stdin_bytes_sent": len(kwargs["stdin"]), "stdin_bytes_remain": 0}
528 )
529 return result
530
448531
449532 def run_process(*args, **kwargs):
450 '''API used up to version 0.2.0.'''
451 warnings.warn("procrunner.run_process() is deprecated and has been renamed to run()", DeprecationWarning)
452 return run(*args, **kwargs)
533 """API used up to version 0.2.0."""
534 warnings.warn(
535 "procrunner.run_process() is deprecated and has been renamed to run()",
536 DeprecationWarning,
537 )
538 return run(*args, **kwargs)
33 from setuptools import setup, find_packages
44 import sys
55
6 with open('README.rst') as readme_file:
6 with open("README.rst") as readme_file:
77 readme = readme_file.read()
88
9 with open('HISTORY.rst') as history_file:
9 with open("HISTORY.rst") as history_file:
1010 history = history_file.read()
1111
1212 requirements = [
13 'six',
13 "six",
1414 # PyWin32 is only supported on 2.7 and 3.5+
1515 'pywin32; sys_platform=="win32" and python_version=="2.7"',
1616 'pywin32; sys_platform=="win32" and python_version>="3.5"',
1717 ]
1818
19 setup_requirements = ['pytest-runner', 'six']
19 setup_requirements = ["pytest-runner", "six"]
2020
21 test_requirements = ['mock', 'pytest']
21 test_requirements = ["mock", "pytest"]
2222
2323 setup(
2424 author="Markus Gerstel",
25 author_email='scientificsoftware@diamond.ac.uk',
25 author_email="scientificsoftware@diamond.ac.uk",
2626 classifiers=[
27 'Development Status :: 5 - Production/Stable',
28 'Intended Audience :: Developers',
29 'License :: OSI Approved :: BSD License',
30 'Natural Language :: English',
31 'Operating System :: OS Independent',
27 "Development Status :: 5 - Production/Stable",
28 "Intended Audience :: Developers",
29 "License :: OSI Approved :: BSD License",
30 "Natural Language :: English",
31 "Operating System :: OS Independent",
3232 "Programming Language :: Python :: 2",
33 'Programming Language :: Python :: 2.7',
34 'Programming Language :: Python :: 3',
35 'Programming Language :: Python :: 3.4',
36 'Programming Language :: Python :: 3.5',
37 'Programming Language :: Python :: 3.6',
38 'Programming Language :: Python :: 3.7',
39 'Programming Language :: Python :: Implementation :: PyPy',
40 'Programming Language :: Python :: Implementation :: CPython',
41 'Topic :: Software Development :: Libraries :: Python Modules',
33 "Programming Language :: Python :: 2.7",
34 "Programming Language :: Python :: 3",
35 "Programming Language :: Python :: 3.4",
36 "Programming Language :: Python :: 3.5",
37 "Programming Language :: Python :: 3.6",
38 "Programming Language :: Python :: 3.7",
39 "Programming Language :: Python :: Implementation :: PyPy",
40 "Programming Language :: Python :: Implementation :: CPython",
41 "Topic :: Software Development :: Libraries :: Python Modules",
4242 ],
4343 description="Versatile utility function to run external processes",
4444 install_requires=requirements,
4545 license="BSD license",
46 long_description=readme + '\n\n' + history,
46 long_description=readme + "\n\n" + history,
4747 include_package_data=True,
48 keywords='procrunner',
49 name='procrunner',
50 packages=find_packages(include=['procrunner']),
48 keywords="procrunner",
49 name="procrunner",
50 packages=find_packages(include=["procrunner"]),
5151 setup_requires=setup_requirements,
52 test_suite='tests',
52 test_suite="tests",
5353 tests_require=test_requirements,
54 url='https://github.com/DiamondLightSource/python-procrunner',
55 version='0.8.0',
54 url="https://github.com/DiamondLightSource/python-procrunner",
55 version="0.8.0",
5656 zip_safe=False,
5757 )
66 import pytest
77 import sys
88
9 @mock.patch('procrunner._NonBlockingStreamReader')
10 @mock.patch('procrunner.time')
11 @mock.patch('procrunner.subprocess')
12 @mock.patch('procrunner.Pipe')
13 def test_run_command_aborts_after_timeout(mock_pipe, mock_subprocess, mock_time, mock_streamreader):
14 mock_pipe.return_value = mock.Mock(), mock.Mock()
15 mock_process = mock.Mock()
16 mock_process.returncode = None
17 mock_subprocess.Popen.return_value = mock_process
18 task = ['___']
19
20 with pytest.raises(RuntimeError):
21 procrunner.run(task, -1, False)
22
23 assert mock_subprocess.Popen.called
24 assert mock_process.terminate.called
25 assert mock_process.kill.called
26
27
28 @mock.patch('procrunner._NonBlockingStreamReader')
29 @mock.patch('procrunner.subprocess')
30 def test_run_command_runs_command_and_directs_pipelines(mock_subprocess, mock_streamreader):
31 (mock_stdout, mock_stderr) = (mock.Mock(), mock.Mock())
32 mock_stdout.get_output.return_value = mock.sentinel.proc_stdout
33 mock_stderr.get_output.return_value = mock.sentinel.proc_stderr
34 (stream_stdout, stream_stderr) = (mock.sentinel.stdout, mock.sentinel.stderr)
35 mock_process = mock.Mock()
36 mock_process.stdout = stream_stdout
37 mock_process.stderr = stream_stderr
38 mock_process.returncode = 99
39 command = ['___']
40 def streamreader_processing(*args, **kwargs):
41 return {(stream_stdout,): mock_stdout, (stream_stderr,): mock_stderr}[args]
42 mock_streamreader.side_effect = streamreader_processing
43 mock_subprocess.Popen.return_value = mock_process
44
45 expected = {
46 'stderr': mock.sentinel.proc_stderr,
47 'stdout': mock.sentinel.proc_stdout,
48 'exitcode': mock_process.returncode,
49 'command': command,
50 'runtime': mock.ANY,
51 'timeout': False,
52 'time_start': mock.ANY,
53 'time_end': mock.ANY,
54 }
55
56 actual = procrunner.run(command, 0.5, False,
57 callback_stdout=mock.sentinel.callback_stdout, callback_stderr=mock.sentinel.callback_stderr,
58 working_directory=mock.sentinel.cwd)
59
60 assert mock_subprocess.Popen.called
61 assert mock_subprocess.Popen.call_args[1]['env'] == os.environ
62 assert mock_subprocess.Popen.call_args[1]['cwd'] == mock.sentinel.cwd
63 mock_streamreader.assert_has_calls([mock.call(stream_stdout, output=mock.ANY, debug=mock.ANY, notify=mock.ANY, callback=mock.sentinel.callback_stdout),
64 mock.call(stream_stderr, output=mock.ANY, debug=mock.ANY, notify=mock.ANY, callback=mock.sentinel.callback_stderr)],
65 any_order=True)
66 assert not mock_process.terminate.called
67 assert not mock_process.kill.called
68 assert actual == expected
69
70
71 @mock.patch('procrunner.subprocess')
9
10 @mock.patch("procrunner._NonBlockingStreamReader")
11 @mock.patch("procrunner.time")
12 @mock.patch("procrunner.subprocess")
13 @mock.patch("procrunner.Pipe")
14 def test_run_command_aborts_after_timeout(
15 mock_pipe, mock_subprocess, mock_time, mock_streamreader
16 ):
17 mock_pipe.return_value = mock.Mock(), mock.Mock()
18 mock_process = mock.Mock()
19 mock_process.returncode = None
20 mock_subprocess.Popen.return_value = mock_process
21 task = ["___"]
22
23 with pytest.raises(RuntimeError):
24 procrunner.run(task, -1, False)
25
26 assert mock_subprocess.Popen.called
27 assert mock_process.terminate.called
28 assert mock_process.kill.called
29
30
31 @mock.patch("procrunner._NonBlockingStreamReader")
32 @mock.patch("procrunner.subprocess")
33 def test_run_command_runs_command_and_directs_pipelines(
34 mock_subprocess, mock_streamreader
35 ):
36 (mock_stdout, mock_stderr) = (mock.Mock(), mock.Mock())
37 mock_stdout.get_output.return_value = mock.sentinel.proc_stdout
38 mock_stderr.get_output.return_value = mock.sentinel.proc_stderr
39 (stream_stdout, stream_stderr) = (mock.sentinel.stdout, mock.sentinel.stderr)
40 mock_process = mock.Mock()
41 mock_process.stdout = stream_stdout
42 mock_process.stderr = stream_stderr
43 mock_process.returncode = 99
44 command = ["___"]
45
46 def streamreader_processing(*args, **kwargs):
47 return {(stream_stdout,): mock_stdout, (stream_stderr,): mock_stderr}[args]
48
49 mock_streamreader.side_effect = streamreader_processing
50 mock_subprocess.Popen.return_value = mock_process
51
52 expected = {
53 "stderr": mock.sentinel.proc_stderr,
54 "stdout": mock.sentinel.proc_stdout,
55 "exitcode": mock_process.returncode,
56 "command": command,
57 "runtime": mock.ANY,
58 "timeout": False,
59 "time_start": mock.ANY,
60 "time_end": mock.ANY,
61 }
62
63 actual = procrunner.run(
64 command,
65 0.5,
66 False,
67 callback_stdout=mock.sentinel.callback_stdout,
68 callback_stderr=mock.sentinel.callback_stderr,
69 working_directory=mock.sentinel.cwd,
70 )
71
72 assert mock_subprocess.Popen.called
73 assert mock_subprocess.Popen.call_args[1]["env"] == os.environ
74 assert mock_subprocess.Popen.call_args[1]["cwd"] == mock.sentinel.cwd
75 mock_streamreader.assert_has_calls(
76 [
77 mock.call(
78 stream_stdout,
79 output=mock.ANY,
80 debug=mock.ANY,
81 notify=mock.ANY,
82 callback=mock.sentinel.callback_stdout,
83 ),
84 mock.call(
85 stream_stderr,
86 output=mock.ANY,
87 debug=mock.ANY,
88 notify=mock.ANY,
89 callback=mock.sentinel.callback_stderr,
90 ),
91 ],
92 any_order=True,
93 )
94 assert not mock_process.terminate.called
95 assert not mock_process.kill.called
96 assert actual == expected
97
98
99 @mock.patch("procrunner.subprocess")
72100 def test_default_process_environment_is_parent_environment(mock_subprocess):
73 mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
74 with pytest.raises(NotImplementedError):
75 procrunner.run(mock.Mock(), -1, False)
76 assert mock_subprocess.Popen.call_args[1]['env'] == os.environ
77
78
79 @mock.patch('procrunner.subprocess')
101 mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
102 with pytest.raises(NotImplementedError):
103 procrunner.run(mock.Mock(), -1, False)
104 assert mock_subprocess.Popen.call_args[1]["env"] == os.environ
105
106
107 @mock.patch("procrunner.subprocess")
80108 def test_pass_custom_environment_to_process(mock_subprocess):
81 mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
82 mock_env = { 'key': mock.sentinel.key }
83 # Pass an environment dictionary
84 with pytest.raises(NotImplementedError):
85 procrunner.run(mock.Mock(), -1, False, environment=copy.copy(mock_env))
86 assert mock_subprocess.Popen.call_args[1]['env'] == mock_env
87
88
89 @mock.patch('procrunner.subprocess')
109 mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
110 mock_env = {"key": mock.sentinel.key}
111 # Pass an environment dictionary
112 with pytest.raises(NotImplementedError):
113 procrunner.run(mock.Mock(), -1, False, environment=copy.copy(mock_env))
114 assert mock_subprocess.Popen.call_args[1]["env"] == mock_env
115
116
117 @mock.patch("procrunner.subprocess")
90118 def test_pass_custom_environment_to_process_and_add_another_value(mock_subprocess):
91 mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
92 mock_env1 = { 'keyA': mock.sentinel.keyA }
93 mock_env2 = { 'keyB': mock.sentinel.keyB, 'number': 5 }
94 # Pass an environment dictionary
95 with pytest.raises(NotImplementedError):
96 procrunner.run(mock.Mock(), -1, False, environment=copy.copy(mock_env1), environment_override=copy.copy(mock_env2))
97 mock_env_sum = copy.copy(mock_env1)
98 mock_env_sum.update({key: str(mock_env2[key]) for key in mock_env2})
99 assert mock_subprocess.Popen.call_args[1]['env'] == mock_env_sum
100
101
102 @mock.patch('procrunner.subprocess')
119 mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
120 mock_env1 = {"keyA": mock.sentinel.keyA}
121 mock_env2 = {"keyB": mock.sentinel.keyB, "number": 5}
122 # Pass an environment dictionary
123 with pytest.raises(NotImplementedError):
124 procrunner.run(
125 mock.Mock(),
126 -1,
127 False,
128 environment=copy.copy(mock_env1),
129 environment_override=copy.copy(mock_env2),
130 )
131 mock_env_sum = copy.copy(mock_env1)
132 mock_env_sum.update({key: str(mock_env2[key]) for key in mock_env2})
133 assert mock_subprocess.Popen.call_args[1]["env"] == mock_env_sum
134
135
136 @mock.patch("procrunner.subprocess")
103137 def test_use_default_process_environment_and_add_another_value(mock_subprocess):
104 mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
105 mock_env2 = { 'keyB': str(mock.sentinel.keyB) }
106 with pytest.raises(NotImplementedError):
107 procrunner.run(mock.Mock(), -1, False, environment_override=copy.copy(mock_env2))
108 random_environment_variable = list(os.environ)[0]
109 if random_environment_variable == list(mock_env2)[0]:
110 random_environment_variable = list(os.environ)[1]
111 random_environment_value = os.getenv(random_environment_variable)
112 assert random_environment_variable and random_environment_variable != list(mock_env2)[0]
113 assert mock_subprocess.Popen.call_args[1]['env'][list(mock_env2)[0]] == mock_env2[list(mock_env2)[0]]
114 assert mock_subprocess.Popen.call_args[1]['env'][random_environment_variable] == os.getenv(random_environment_variable)
115
116
117 @mock.patch('procrunner.subprocess')
138 mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
139 mock_env2 = {"keyB": str(mock.sentinel.keyB)}
140 with pytest.raises(NotImplementedError):
141 procrunner.run(
142 mock.Mock(), -1, False, environment_override=copy.copy(mock_env2)
143 )
144 random_environment_variable = list(os.environ)[0]
145 if random_environment_variable == list(mock_env2)[0]:
146 random_environment_variable = list(os.environ)[1]
147 random_environment_value = os.getenv(random_environment_variable)
148 assert (
149 random_environment_variable
150 and random_environment_variable != list(mock_env2)[0]
151 )
152 assert (
153 mock_subprocess.Popen.call_args[1]["env"][list(mock_env2)[0]]
154 == mock_env2[list(mock_env2)[0]]
155 )
156 assert mock_subprocess.Popen.call_args[1]["env"][
157 random_environment_variable
158 ] == os.getenv(random_environment_variable)
159
160
161 @mock.patch("procrunner.subprocess")
118162 def test_use_default_process_environment_and_override_a_value(mock_subprocess):
119 mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
120 random_environment_variable = list(os.environ)[0]
121 random_environment_value = os.getenv(random_environment_variable)
122 with pytest.raises(NotImplementedError):
123 procrunner.run(mock.Mock(), -1, False, environment_override={ random_environment_variable: 'X' + random_environment_value })
124 assert mock_subprocess.Popen.call_args[1]['env'][random_environment_variable] == 'X' + random_environment_value
125
126
127 @mock.patch('procrunner.select')
128 @pytest.mark.skipif(sys.platform == 'win32', reason="test only relevant on platforms supporting select()")
163 mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
164 random_environment_variable = list(os.environ)[0]
165 random_environment_value = os.getenv(random_environment_variable)
166 with pytest.raises(NotImplementedError):
167 procrunner.run(
168 mock.Mock(),
169 -1,
170 False,
171 environment_override={
172 random_environment_variable: "X" + random_environment_value
173 },
174 )
175 assert (
176 mock_subprocess.Popen.call_args[1]["env"][random_environment_variable]
177 == "X" + random_environment_value
178 )
179
180
181 @mock.patch("procrunner.select")
182 @pytest.mark.skipif(
183 sys.platform == "win32",
184 reason="test only relevant on platforms supporting select()",
185 )
129186 def test_nonblockingstreamreader_can_read(mock_select):
130 import time
131 class _stream(object):
132 def __init__(self):
133 self.data = b""
134 self.closed = False
135 def write(self, string):
136 self.data = self.data + string
137 def read(self, n):
138 if self.closed:
139 return b""
140 if self.data == b"":
141 time.sleep(0.01)
142 return b""
143 if (len(self.data) < n):
144 data = self.data
145 self.data = b""
146 else:
147 data = self.data[:n]
148 self.data = self.data[n:]
149 return data
150 def close(self):
151 self.closed=True
152 teststream = _stream()
153
154 def select_replacement(rlist, wlist, xlist, timeout):
155 assert teststream in rlist
156 if teststream.closed:
157 return ([teststream], [], [])
158 if teststream.data == b"":
159 return ([], [], [])
160 return ([teststream], [], [])
161 mock_select.select = select_replacement
162
163 streamreader = procrunner._NonBlockingStreamReader(teststream, output=False)
164 assert not streamreader.has_finished()
165 time.sleep(0.1)
166 testdata = b"abc\n" * 1024
167 teststream.write(testdata)
168 time.sleep(0.2)
169 teststream.close()
170 time.sleep(0.1)
171
172 assert streamreader.has_finished()
173 output = streamreader.get_output()
174 assert len(output) == len(testdata)
175 assert output == testdata
187 import time
188
189 class _stream(object):
190 def __init__(self):
191 self.data = b""
192 self.closed = False
193
194 def write(self, string):
195 self.data = self.data + string
196
197 def read(self, n):
198 if self.closed:
199 return b""
200 if self.data == b"":
201 time.sleep(0.01)
202 return b""
203 if len(self.data) < n:
204 data = self.data
205 self.data = b""
206 else:
207 data = self.data[:n]
208 self.data = self.data[n:]
209 return data
210
211 def close(self):
212 self.closed = True
213
214 teststream = _stream()
215
216 def select_replacement(rlist, wlist, xlist, timeout):
217 assert teststream in rlist
218 if teststream.closed:
219 return ([teststream], [], [])
220 if teststream.data == b"":
221 return ([], [], [])
222 return ([teststream], [], [])
223
224 mock_select.select = select_replacement
225
226 streamreader = procrunner._NonBlockingStreamReader(teststream, output=False)
227 assert not streamreader.has_finished()
228 time.sleep(0.1)
229 testdata = b"abc\n" * 1024
230 teststream.write(testdata)
231 time.sleep(0.2)
232 teststream.close()
233 time.sleep(0.1)
234
235 assert streamreader.has_finished()
236 output = streamreader.get_output()
237 assert len(output) == len(testdata)
238 assert output == testdata
176239
177240
178241 def test_lineaggregator_aggregates_data():
179 callback = mock.Mock()
180 aggregator = procrunner._LineAggregator(callback=callback)
181
182 aggregator.add(b'some')
183 aggregator.add(b'string')
184 callback.assert_not_called()
185 aggregator.add(b"\n")
186 callback.assert_called_once_with('somestring')
187 callback.reset_mock()
188 aggregator.add(b'more')
189 aggregator.add(b'stuff')
190 callback.assert_not_called()
191 aggregator.flush()
192 callback.assert_called_once_with('morestuff')
242 callback = mock.Mock()
243 aggregator = procrunner._LineAggregator(callback=callback)
244
245 aggregator.add(b"some")
246 aggregator.add(b"string")
247 callback.assert_not_called()
248 aggregator.add(b"\n")
249 callback.assert_called_once_with("somestring")
250 callback.reset_mock()
251 aggregator.add(b"more")
252 aggregator.add(b"stuff")
253 callback.assert_not_called()
254 aggregator.flush()
255 callback.assert_called_once_with("morestuff")
55 import procrunner
66 import pytest
77
8 @pytest.mark.skipif(sys.platform != 'win32', reason="windows specific test only")
8
9 @pytest.mark.skipif(sys.platform != "win32", reason="windows specific test only")
910 def test_pywin32_import():
10 import win32api
11 import win32api
1112
12 @pytest.mark.skipif(sys.platform != 'win32', reason="windows specific test only")
13
14 @pytest.mark.skipif(sys.platform != "win32", reason="windows specific test only")
1315 def test_name_resolution_for_simple_exe():
14 command = ['cmd.exe', '/c', 'echo', 'hello']
16 command = ["cmd.exe", "/c", "echo", "hello"]
1517
16 resolved = procrunner._windows_resolve(command)
18 resolved = procrunner._windows_resolve(command)
1719
18 # command should be replaced with full path to cmd.exe
19 assert resolved[0].lower().endswith('\\cmd.exe')
20 assert os.path.exists(resolved[0])
20 # command should be replaced with full path to cmd.exe
21 assert resolved[0].lower().endswith("\\cmd.exe")
22 assert os.path.exists(resolved[0])
2123
22 # parameters are unchanged
23 assert resolved[1:] == command[1:]
24 # parameters are unchanged
25 assert resolved[1:] == command[1:]
2426
25 @pytest.mark.skipif(sys.platform != 'win32', reason="windows specific test only")
27
28 @pytest.mark.skipif(sys.platform != "win32", reason="windows specific test only")
2629 def test_name_resolution_for_complex_cases(tmpdir):
27 tmpdir.chdir()
30 tmpdir.chdir()
2831
29 bat = 'simple_bat_extension'
30 cmd = 'simple_cmd_extension'
31 exe = 'simple_exe_extension'
32 dotshort = 'more_complex_filename_with_a.dot'
33 dotlong = 'more_complex_filename.withadot'
32 bat = "simple_bat_extension"
33 cmd = "simple_cmd_extension"
34 exe = "simple_exe_extension"
35 dotshort = "more_complex_filename_with_a.dot"
36 dotlong = "more_complex_filename.withadot"
3437
35 (tmpdir / bat + '.bat').ensure()
36 (tmpdir / cmd + '.cmd').ensure()
37 (tmpdir / exe + '.exe').ensure()
38 (tmpdir / dotshort + '.bat').ensure()
39 (tmpdir / dotlong + '.cmd').ensure()
38 (tmpdir / bat + ".bat").ensure()
39 (tmpdir / cmd + ".cmd").ensure()
40 (tmpdir / exe + ".exe").ensure()
41 (tmpdir / dotshort + ".bat").ensure()
42 (tmpdir / dotlong + ".cmd").ensure()
4043
41 def is_valid(command):
42 assert len(command) == 1
43 assert os.path.exists(command[0])
44 def is_valid(command):
45 assert len(command) == 1
46 assert os.path.exists(command[0])
4447
45 is_valid(procrunner._windows_resolve([bat]))
46 is_valid(procrunner._windows_resolve([cmd]))
47 is_valid(procrunner._windows_resolve([exe]))
48 is_valid(procrunner._windows_resolve([dotshort]))
49 is_valid(procrunner._windows_resolve([dotlong]))
48 is_valid(procrunner._windows_resolve([bat]))
49 is_valid(procrunner._windows_resolve([cmd]))
50 is_valid(procrunner._windows_resolve([exe]))
51 is_valid(procrunner._windows_resolve([dotshort]))
52 is_valid(procrunner._windows_resolve([dotlong]))
55 import procrunner
66 import pytest
77
8
89 def test_simple_command_invocation():
9 if os.name == 'nt':
10 command = ['cmd.exe', '/c', 'echo', 'hello']
11 else:
12 command = ['echo', 'hello']
10 if os.name == "nt":
11 command = ["cmd.exe", "/c", "echo", "hello"]
12 else:
13 command = ["echo", "hello"]
1314
14 result = procrunner.run(command)
15 result = procrunner.run(command)
1516
16 assert result['exitcode'] == 0
17 assert result['stdout'] == b'hello' + os.linesep.encode('utf-8')
18 assert result['stderr'] == b''
17 assert result["exitcode"] == 0
18 assert result["stdout"] == b"hello" + os.linesep.encode("utf-8")
19 assert result["stderr"] == b""
20
1921
2022 def test_decode_invalid_utf8_input(capsys):
21 test_string = b'test\xa0string\n'
22 if os.name == 'nt':
23 pytest.xfail("Test requires stdin feature which does not work on Windows")
24 command = ['cmd.exe', '/c', 'type', 'CON']
25 else:
26 command = ['cat']
27 result = procrunner.run(command, stdin=test_string)
28 assert result['exitcode'] == 0
29 assert not result['stderr']
30 if os.name == 'nt':
31 # Windows modifies line endings
32 assert result['stdout'] == test_string[:-1] + b'\r\n'
33 else:
34 assert result['stdout'] == test_string
35 out, err = capsys.readouterr()
36 assert out == u'test\ufffdstring\n'
37 assert err == u''
23 test_string = b"test\xa0string\n"
24 if os.name == "nt":
25 pytest.xfail("Test requires stdin feature which does not work on Windows")
26 command = ["cmd.exe", "/c", "type", "CON"]
27 else:
28 command = ["cat"]
29 result = procrunner.run(command, stdin=test_string)
30 assert result["exitcode"] == 0
31 assert not result["stderr"]
32 if os.name == "nt":
33 # Windows modifies line endings
34 assert result["stdout"] == test_string[:-1] + b"\r\n"
35 else:
36 assert result["stdout"] == test_string
37 out, err = capsys.readouterr()
38 assert out == u"test\ufffdstring\n"
39 assert err == u""
40
3841
3942 def test_running_wget(tmpdir):
40 tmpdir.chdir()
41 command = ['wget', 'https://www.google.com', '-O', '-']
42 try:
43 result = procrunner.run(command)
44 except OSError as e:
45 if e.errno == 2:
46 pytest.skip('wget not available')
47 raise
48 assert result['exitcode'] == 0
49 assert b'http' in result['stderr']
50 assert b'google' in result['stdout']
43 tmpdir.chdir()
44 command = ["wget", "https://www.google.com", "-O", "-"]
45 try:
46 result = procrunner.run(command)
47 except OSError as e:
48 if e.errno == 2:
49 pytest.skip("wget not available")
50 raise
51 assert result["exitcode"] == 0
52 assert b"http" in result["stderr"]
53 assert b"google" in result["stdout"]