Codebase list python-procrunner / 74c2274 tests / test_procrunner.py
74c2274

Tree @74c2274 (Download .tar.gz)

test_procrunner.py @74c2274

51e8a19
74c2274
51e8a19
e838522
51e8a19
e00f039
5c00c1e
e00f039
16c4c94
45248bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
abb67e6
45248bc
 
 
 
 
 
16c4c94
 
 
 
 
 
 
 
 
 
 
 
 
 
d7b9ba6
16c4c94
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17bf3f1
16c4c94
 
 
 
 
 
 
 
abb67e6
16c4c94
 
e838522
45248bc
16c4c94
 
 
 
e838522
 
 
 
16c4c94
 
 
 
 
d7b9ba6
16c4c94
 
 
 
 
 
d7b9ba6
16c4c94
 
 
 
 
 
 
 
a928b39
31e5d57
 
a928b39
 
 
 
16c4c94
 
 
51e8a19
16c4c94
 
d7b9ba6
16c4c94
51e8a19
 
d7b9ba6
 
 
 
 
 
 
 
 
 
 
16c4c94
51e8a19
16c4c94
 
 
 
45248bc
 
abb67e6
45248bc
 
 
16c4c94
51e8a19
 
16c4c94
51e8a19
16c4c94
 
 
 
 
 
17bf3f1
abb67e6
16c4c94
 
45248bc
16c4c94
 
 
 
 
 
 
51e8a19
16c4c94
 
 
 
45248bc
abb67e6
45248bc
 
16c4c94
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51e8a19
16c4c94
 
 
 
 
17bf3f1
abb67e6
16c4c94
 
 
45248bc
16c4c94
 
 
 
 
51e8a19
 
16c4c94
 
 
 
 
51e8a19
16c4c94
 
1c83a20
16c4c94
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e00f039
 
51e8a19
16c4c94
 
 
 
 
 
 
 
 
 
 
 
 
 
d6213fe
 
 
 
31e5d57
 
 
 
d6213fe
31e5d57
 
d6213fe
31e5d57
 
d6213fe
31e5d57
 
d6213fe
31e5d57
 
d6213fe
 
 
31e5d57
 
d6213fe
31e5d57
 
d6213fe
 
 
 
31e5d57
 
 
 
d6213fe
 
 
 
 
 
31e5d57
 
 
 
d6213fe
 
 
 
 
import copy
from unittest import mock
import os
import pathlib
import procrunner
import pytest
import sys


@mock.patch("procrunner._NonBlockingStreamReader")
@mock.patch("procrunner.time")
@mock.patch("procrunner.subprocess")
@mock.patch("procrunner.Pipe")
def test_run_command_aborts_after_timeout_legacy(
    mock_pipe, mock_subprocess, mock_time, mock_streamreader
):
    mock_pipe.return_value = mock.Mock(), mock.Mock()
    mock_process = mock.Mock()
    mock_process.returncode = None
    mock_subprocess.Popen.return_value = mock_process
    task = ["___"]

    with pytest.raises(RuntimeError):
        with pytest.warns(DeprecationWarning, match="timeout"):
            procrunner.run(task, timeout=-1, debug=False)

    assert mock_subprocess.Popen.called
    assert mock_process.terminate.called
    assert mock_process.kill.called


@mock.patch("procrunner._NonBlockingStreamReader")
@mock.patch("procrunner.time")
@mock.patch("procrunner.subprocess")
@mock.patch("procrunner.Pipe")
def test_run_command_aborts_after_timeout(
    mock_pipe, mock_subprocess, mock_time, mock_streamreader
):
    mock_pipe.return_value = mock.Mock(), mock.Mock()
    mock_process = mock.Mock()
    mock_process.returncode = None
    mock_subprocess.Popen.return_value = mock_process
    task = ["___"]

    with pytest.raises(RuntimeError):
        procrunner.run(task, timeout=-1, raise_timeout_exception=True)

    assert mock_subprocess.Popen.called
    assert mock_process.terminate.called
    assert mock_process.kill.called


@mock.patch("procrunner._NonBlockingStreamReader")
@mock.patch("procrunner.subprocess")
def test_run_command_runs_command_and_directs_pipelines(
    mock_subprocess, mock_streamreader
):
    (mock_stdout, mock_stderr) = (mock.Mock(), mock.Mock())
    mock_stdout.get_output.return_value = mock.sentinel.proc_stdout
    mock_stderr.get_output.return_value = mock.sentinel.proc_stderr
    (stream_stdout, stream_stderr) = (mock.sentinel.stdout, mock.sentinel.stderr)
    mock_process = mock.Mock()
    mock_process.stdout = stream_stdout
    mock_process.stderr = stream_stderr
    mock_process.returncode = 99
    command = ["___"]

    def streamreader_processing(*args, **kwargs):
        return {(stream_stdout,): mock_stdout, (stream_stderr,): mock_stderr}[args]

    mock_streamreader.side_effect = streamreader_processing
    mock_subprocess.Popen.return_value = mock_process

    expected = {
        "stderr": mock.sentinel.proc_stderr,
        "stdout": mock.sentinel.proc_stdout,
        "exitcode": mock_process.returncode,
        "command": tuple(command),
        "runtime": mock.ANY,
        "timeout": False,
        "time_start": mock.ANY,
        "time_end": mock.ANY,
    }

    actual = procrunner.run(
        command,
        timeout=0.5,
        callback_stdout=mock.sentinel.callback_stdout,
        callback_stderr=mock.sentinel.callback_stderr,
        working_directory=pathlib.Path("somecwd"),
        raise_timeout_exception=True,
    )

    assert mock_subprocess.Popen.called
    assert mock_subprocess.Popen.call_args[1]["env"] == os.environ
    assert mock_subprocess.Popen.call_args[1]["cwd"] in (
        pathlib.Path("somecwd"),
        "somecwd",
    )
    mock_streamreader.assert_has_calls(
        [
            mock.call(
                stream_stdout,
                output=mock.ANY,
                debug=None,
                notify=mock.ANY,
                callback=mock.sentinel.callback_stdout,
            ),
            mock.call(
                stream_stderr,
                output=mock.ANY,
                debug=None,
                notify=mock.ANY,
                callback=mock.sentinel.callback_stderr,
            ),
        ],
        any_order=True,
    )
    assert not mock_process.terminate.called
    assert not mock_process.kill.called
    for key in expected:
        with pytest.warns(DeprecationWarning):
            assert actual[key] == expected[key]
    assert actual.args == tuple(command)
    assert actual.returncode == mock_process.returncode
    assert actual.stdout == mock.sentinel.proc_stdout
    assert actual.stderr == mock.sentinel.proc_stderr


@mock.patch("procrunner.subprocess")
def test_default_process_environment_is_parent_environment(mock_subprocess):
    mock_subprocess.Popen.side_effect = NotImplementedError()  # cut calls short
    with pytest.raises(NotImplementedError):
        procrunner.run([mock.Mock()], timeout=-1, raise_timeout_exception=True)
    assert mock_subprocess.Popen.call_args[1]["env"] == os.environ


@mock.patch("procrunner.subprocess")
def test_using_debug_parameter_raises_warning(mock_subprocess):
    mock_subprocess.Popen.side_effect = NotImplementedError()  # cut calls short
    with pytest.warns(DeprecationWarning, match="debug"):
        with pytest.raises(NotImplementedError):
            procrunner.run([mock.Mock()], debug=True)
    with pytest.warns(DeprecationWarning, match="debug"):
        with pytest.raises(NotImplementedError):
            procrunner.run([mock.Mock()], debug=False)


@mock.patch("procrunner.subprocess")
def test_pass_custom_environment_to_process(mock_subprocess):
    mock_subprocess.Popen.side_effect = NotImplementedError()  # cut calls short
    mock_env = {"key": mock.sentinel.key}
    # Pass an environment dictionary
    with pytest.raises(NotImplementedError):
        procrunner.run(
            [mock.Mock()],
            timeout=-1,
            environment=copy.copy(mock_env),
            raise_timeout_exception=True,
        )
    assert mock_subprocess.Popen.call_args[1]["env"] == mock_env


@mock.patch("procrunner.subprocess")
def test_pass_custom_environment_to_process_and_add_another_value(mock_subprocess):
    mock_subprocess.Popen.side_effect = NotImplementedError()  # cut calls short
    mock_env1 = {"keyA": mock.sentinel.keyA}
    mock_env2 = {"keyB": mock.sentinel.keyB, "number": 5}
    # Pass an environment dictionary
    with pytest.raises(NotImplementedError):
        procrunner.run(
            [mock.Mock()],
            timeout=-1,
            environment=copy.copy(mock_env1),
            environment_override=copy.copy(mock_env2),
            raise_timeout_exception=True,
        )
    mock_env_sum = copy.copy(mock_env1)
    mock_env_sum.update({key: str(mock_env2[key]) for key in mock_env2})
    assert mock_subprocess.Popen.call_args[1]["env"] == mock_env_sum


@mock.patch("procrunner.subprocess")
def test_use_default_process_environment_and_add_another_value(mock_subprocess):
    mock_subprocess.Popen.side_effect = NotImplementedError()  # cut calls short
    mock_env2 = {"keyB": str(mock.sentinel.keyB)}
    with pytest.raises(NotImplementedError):
        procrunner.run(
            [mock.Mock()],
            timeout=-1,
            environment_override=copy.copy(mock_env2),
            raise_timeout_exception=True,
        )
    random_environment_variable = list(os.environ)[0]
    if random_environment_variable == list(mock_env2)[0]:
        random_environment_variable = list(os.environ)[1]
    assert (
        random_environment_variable
        and random_environment_variable != list(mock_env2)[0]
    )
    assert (
        mock_subprocess.Popen.call_args[1]["env"][list(mock_env2)[0]]
        == mock_env2[list(mock_env2)[0]]
    )
    assert mock_subprocess.Popen.call_args[1]["env"][
        random_environment_variable
    ] == os.getenv(random_environment_variable)


@mock.patch("procrunner.subprocess")
def test_use_default_process_environment_and_override_a_value(mock_subprocess):
    mock_subprocess.Popen.side_effect = NotImplementedError()  # cut calls short
    random_environment_variable = list(os.environ)[0]
    random_environment_value = os.getenv(random_environment_variable)
    with pytest.raises(NotImplementedError):
        procrunner.run(
            [mock.Mock()],
            timeout=-1,
            environment_override={
                random_environment_variable: "X" + random_environment_value
            },
            raise_timeout_exception=True,
        )
    assert (
        mock_subprocess.Popen.call_args[1]["env"][random_environment_variable]
        == "X" + random_environment_value
    )


@mock.patch("procrunner.select")
@pytest.mark.skipif(
    sys.platform == "win32",
    reason="test only relevant on platforms supporting select()",
)
def test_nonblockingstreamreader_can_read(mock_select):
    import time

    class _stream:
        def __init__(self):
            self.data = b""
            self.closed = False

        def write(self, string):
            self.data = self.data + string

        def read(self, n):
            if self.closed:
                return b""
            if self.data == b"":
                time.sleep(0.01)
                return b""
            if len(self.data) < n:
                data = self.data
                self.data = b""
            else:
                data = self.data[:n]
                self.data = self.data[n:]
            return data

        def close(self):
            self.closed = True

    teststream = _stream()

    def select_replacement(rlist, wlist, xlist, timeout):
        assert teststream in rlist
        if teststream.closed:
            return ([teststream], [], [])
        if teststream.data == b"":
            return ([], [], [])
        return ([teststream], [], [])

    mock_select.select = select_replacement

    streamreader = procrunner._NonBlockingStreamReader(teststream, output=False)
    assert not streamreader.has_finished()
    time.sleep(0.1)
    testdata = b"abc\n" * 1024
    teststream.write(testdata)
    time.sleep(0.2)
    teststream.close()
    time.sleep(0.1)

    assert streamreader.has_finished()
    output = streamreader.get_output()
    assert len(output) == len(testdata)
    assert output == testdata


def test_lineaggregator_aggregates_data():
    callback = mock.Mock()
    aggregator = procrunner._LineAggregator(callback=callback)

    aggregator.add(b"some")
    aggregator.add(b"string")
    callback.assert_not_called()
    aggregator.add(b"\n")
    callback.assert_called_once_with("somestring")
    callback.reset_mock()
    aggregator.add(b"more")
    aggregator.add(b"stuff")
    callback.assert_not_called()
    aggregator.flush()
    callback.assert_called_once_with("morestuff")


def test_return_object_semantics():
    ro = procrunner.ReturnObject(
        command=mock.sentinel.command,
        exitcode=0,
        stdout=mock.sentinel.stdout,
        stderr=mock.sentinel.stderr,
    )
    with pytest.warns(DeprecationWarning):
        assert ro["command"] == mock.sentinel.command
    assert ro.args == mock.sentinel.command
    with pytest.warns(DeprecationWarning):
        assert ro["exitcode"] == 0
    assert ro.returncode == 0
    with pytest.warns(DeprecationWarning):
        assert ro["stdout"] == mock.sentinel.stdout
    assert ro.stdout == mock.sentinel.stdout
    with pytest.warns(DeprecationWarning):
        assert ro["stderr"] == mock.sentinel.stderr
    assert ro.stderr == mock.sentinel.stderr

    with pytest.raises(KeyError):
        with pytest.warns(DeprecationWarning):
            ro["unknownkey"]
    ro.update({"unknownkey": mock.sentinel.key})
    with pytest.warns(DeprecationWarning):
        assert ro["unknownkey"] == mock.sentinel.key


def test_return_object_check_function_passes_on_success():
    ro = procrunner.ReturnObject(
        command=mock.sentinel.command,
        exitcode=0,
        stdout=mock.sentinel.stdout,
        stderr=mock.sentinel.stderr,
    )
    ro.check_returncode()


def test_return_object_check_function_raises_on_error():
    ro = procrunner.ReturnObject(
        command=mock.sentinel.command,
        exitcode=1,
        stdout=mock.sentinel.stdout,
        stderr=mock.sentinel.stderr,
    )
    with pytest.raises(Exception) as e:
        ro.check_returncode()
    assert repr(mock.sentinel.command) in str(e.value)
    assert "1" in str(e.value)