from __future__ import annotations
import copy
import os
import pathlib
import sys
from unittest import mock
import pytest
import procrunner
@mock.patch("procrunner._NonBlockingStreamReader")
@mock.patch("procrunner.time")
@mock.patch("procrunner.subprocess")
@mock.patch("procrunner.Pipe")
def test_run_command_aborts_after_timeout_legacy(
mock_pipe, mock_subprocess, mock_time, mock_streamreader
):
mock_pipe.return_value = mock.Mock(), mock.Mock()
mock_process = mock.Mock()
mock_process.returncode = None
mock_subprocess.Popen.return_value = mock_process
task = ["___"]
with pytest.raises(RuntimeError):
with pytest.warns(DeprecationWarning, match="timeout"):
procrunner.run(task, timeout=-1)
assert mock_subprocess.Popen.called
assert mock_process.terminate.called
assert mock_process.kill.called
@mock.patch("procrunner._NonBlockingStreamReader")
@mock.patch("procrunner.time")
@mock.patch("procrunner.subprocess")
@mock.patch("procrunner.Pipe")
def test_run_command_aborts_after_timeout(
mock_pipe, mock_subprocess, mock_time, mock_streamreader
):
mock_pipe.return_value = mock.Mock(), mock.Mock()
mock_process = mock.Mock()
mock_process.returncode = None
mock_subprocess.Popen.return_value = mock_process
task = ["___"]
with pytest.raises(RuntimeError):
procrunner.run(task, timeout=-1)
assert mock_subprocess.Popen.called
assert mock_process.terminate.called
assert mock_process.kill.called
@mock.patch("procrunner._NonBlockingStreamReader")
@mock.patch("procrunner.subprocess")
def test_run_command_runs_command_and_directs_pipelines(
mock_subprocess, mock_streamreader
):
(mock_stdout, mock_stderr) = (mock.Mock(), mock.Mock())
mock_stdout.get_output.return_value = mock.sentinel.proc_stdout
mock_stderr.get_output.return_value = mock.sentinel.proc_stderr
(stream_stdout, stream_stderr) = (mock.sentinel.stdout, mock.sentinel.stderr)
mock_process = mock.Mock()
mock_process.stdout = stream_stdout
mock_process.stderr = stream_stderr
mock_process.returncode = 99
command = ["___"]
def streamreader_processing(*args, **kwargs):
return {(stream_stdout,): mock_stdout, (stream_stderr,): mock_stderr}[args]
mock_streamreader.side_effect = streamreader_processing
mock_subprocess.Popen.return_value = mock_process
actual = procrunner.run(
command,
timeout=0.5,
callback_stdout=mock.sentinel.callback_stdout,
callback_stderr=mock.sentinel.callback_stderr,
working_directory=pathlib.Path("somecwd"),
)
assert mock_subprocess.Popen.called
assert mock_subprocess.Popen.call_args[1]["env"] == os.environ
assert mock_subprocess.Popen.call_args[1]["cwd"] in (
pathlib.Path("somecwd"),
"somecwd",
)
mock_streamreader.assert_has_calls(
[
mock.call(
stream_stdout,
output=mock.ANY,
notify=mock.ANY,
callback=mock.sentinel.callback_stdout,
),
mock.call(
stream_stderr,
output=mock.ANY,
notify=mock.ANY,
callback=mock.sentinel.callback_stderr,
),
],
any_order=True,
)
assert not mock_process.terminate.called
assert not mock_process.kill.called
assert actual == mock_subprocess.CompletedProcess.return_value
mock_subprocess.CompletedProcess.assert_called_once_with(
args=tuple(command),
returncode=mock_process.returncode,
stdout=mock.sentinel.proc_stdout,
stderr=mock.sentinel.proc_stderr,
)
@mock.patch("procrunner.subprocess")
def test_default_process_environment_is_parent_environment(mock_subprocess):
mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
with pytest.raises(NotImplementedError):
procrunner.run([mock.Mock()], timeout=-1)
assert mock_subprocess.Popen.call_args[1]["env"] == os.environ
@mock.patch("procrunner.subprocess")
def test_pass_custom_environment_to_process(mock_subprocess):
mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
mock_env = {"key": mock.sentinel.key}
# Pass an environment dictionary
with pytest.raises(NotImplementedError):
procrunner.run(
[mock.Mock()],
timeout=-1,
environment=copy.copy(mock_env),
)
assert mock_subprocess.Popen.call_args[1]["env"] == mock_env
@mock.patch("procrunner.subprocess")
def test_pass_custom_environment_to_process_and_add_another_value(mock_subprocess):
mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
mock_env1 = {"keyA": mock.sentinel.keyA}
mock_env2 = {"keyB": mock.sentinel.keyB, "number": 5}
# Pass an environment dictionary
with pytest.raises(NotImplementedError):
procrunner.run(
[mock.Mock()],
timeout=-1,
environment=copy.copy(mock_env1),
environment_override=copy.copy(mock_env2),
)
mock_env_sum = copy.copy(mock_env1)
mock_env_sum.update({key: str(mock_env2[key]) for key in mock_env2})
assert mock_subprocess.Popen.call_args[1]["env"] == mock_env_sum
@mock.patch("procrunner.subprocess")
def test_use_default_process_environment_and_add_another_value(mock_subprocess):
mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
mock_env2 = {"keyB": str(mock.sentinel.keyB)}
with pytest.raises(NotImplementedError):
procrunner.run(
[mock.Mock()],
timeout=-1,
environment_override=copy.copy(mock_env2),
)
random_environment_variable = list(os.environ)[0]
if random_environment_variable == list(mock_env2)[0]:
random_environment_variable = list(os.environ)[1]
assert (
random_environment_variable
and random_environment_variable != list(mock_env2)[0]
)
assert (
mock_subprocess.Popen.call_args[1]["env"][list(mock_env2)[0]]
== mock_env2[list(mock_env2)[0]]
)
assert mock_subprocess.Popen.call_args[1]["env"][
random_environment_variable
] == os.getenv(random_environment_variable)
@mock.patch("procrunner.subprocess")
def test_use_default_process_environment_and_override_a_value(mock_subprocess):
mock_subprocess.Popen.side_effect = NotImplementedError() # cut calls short
random_environment_variable = list(os.environ)[0]
random_environment_value = os.getenv(random_environment_variable)
with pytest.raises(NotImplementedError):
procrunner.run(
[mock.Mock()],
timeout=-1,
environment_override={
random_environment_variable: "X" + random_environment_value
},
)
assert (
mock_subprocess.Popen.call_args[1]["env"][random_environment_variable]
== "X" + random_environment_value
)
@mock.patch("procrunner.select")
@pytest.mark.skipif(
sys.platform == "win32",
reason="test only relevant on platforms supporting select()",
)
def test_nonblockingstreamreader_can_read(mock_select):
import time
class _stream:
def __init__(self):
self.data = b""
self.closed = False
def write(self, string):
self.data = self.data + string
def read(self, n):
if self.closed:
return b""
if self.data == b"":
time.sleep(0.01)
return b""
if len(self.data) < n:
data = self.data
self.data = b""
else:
data = self.data[:n]
self.data = self.data[n:]
return data
def close(self):
self.closed = True
teststream = _stream()
def select_replacement(rlist, wlist, xlist, timeout):
assert teststream in rlist
if teststream.closed:
return ([teststream], [], [])
if teststream.data == b"":
return ([], [], [])
return ([teststream], [], [])
mock_select.select = select_replacement
streamreader = procrunner._NonBlockingStreamReader(teststream, output=False)
assert not streamreader.has_finished()
time.sleep(0.1)
testdata = b"abc\n" * 1024
teststream.write(testdata)
time.sleep(0.2)
teststream.close()
time.sleep(0.1)
assert streamreader.has_finished()
output = streamreader.get_output()
assert len(output) == len(testdata)
assert output == testdata
def test_lineaggregator_aggregates_data():
callback = mock.Mock()
aggregator = procrunner._LineAggregator(callback=callback)
aggregator.add(b"some")
aggregator.add(b"string")
callback.assert_not_called()
aggregator.add(b"\n")
callback.assert_called_once_with("somestring")
callback.reset_mock()
aggregator.add(b"more")
aggregator.add(b"stuff")
callback.assert_not_called()
aggregator.flush()
callback.assert_called_once_with("morestuff")