New Upstream Release - pytest-salt

Ready changes


Merged new upstream version: 2020.1.27 (was: 2019.6.13).

Resulting package

Built on 2022-03-16T18:28 (took 1m47s)

The resulting binary packages can be installed (if you have the apt repository enabled) by running one of:

apt install -t fresh-releases python3-pytestsalt

Lintian Result


diff --git a/ b/
index fc8090f..b4db1f1 100644
--- a/
+++ b/
@@ -1,2 +1,3 @@
 include pytestsalt/
+include pytestsalt/salt/coverage/
diff --git a/PKG-INFO b/PKG-INFO
index 73c164e..844f234 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: pytest-salt
-Version: 2019.6.13
+Version: 2020.1.27
 Summary: Pytest Salt Plugin
 Author: Pedro Algarvio
diff --git a/debian/changelog b/debian/changelog
index b63e49b..ca8a084 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,9 @@
+pytest-salt (2020.1.27-1) UNRELEASED; urgency=low
+  * New upstream release.
+ -- Debian Janitor <>  Wed, 16 Mar 2022 18:26:33 -0000
 pytest-salt (2019.6.13-1) unstable; urgency=medium
   * New upstream version
diff --git a/pytest_salt.egg-info/PKG-INFO b/pytest_salt.egg-info/PKG-INFO
index 73c164e..844f234 100644
--- a/pytest_salt.egg-info/PKG-INFO
+++ b/pytest_salt.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.2
 Name: pytest-salt
-Version: 2019.6.13
+Version: 2020.1.27
 Summary: Pytest Salt Plugin
 Author: Pedro Algarvio
diff --git a/pytest_salt.egg-info/SOURCES.txt b/pytest_salt.egg-info/SOURCES.txt
index 1dbf06b..15cc651 100644
--- a/pytest_salt.egg-info/SOURCES.txt
+++ b/pytest_salt.egg-info/SOURCES.txt
@@ -1,12 +1,7 @@
@@ -25,6 +20,7 @@ pytestsalt/fixtures/
@@ -33,8 +29,4 @@ pytestsalt/utils/
\ No newline at end of file
\ No newline at end of file
diff --git a/pytestsalt/ b/pytestsalt/
index 762b06f..f34e548 100644
--- a/pytestsalt/
+++ b/pytestsalt/
@@ -9,11 +9,11 @@ import json
 version_json = '''
- "date": "2019-06-12T13:32:25+0100",
+ "date": "2020-01-27T20:17:50+0000",
  "dirty": false,
  "error": null,
- "full-revisionid": "5d8883b10459d2ec7fe65566a1d773b936bec649",
- "version": "2019.6.13"
+ "full-revisionid": "58f5e95c05fa41df9f471772664839226610d5af",
+ "version": "2020.1.27"
diff --git a/pytestsalt/fixtures/ b/pytestsalt/fixtures/
index a211532..88dc0c4 100644
--- a/pytestsalt/fixtures/
+++ b/pytestsalt/fixtures/
@@ -783,12 +783,16 @@ def apply_master_config(default_options,
     if 'engines_dirs' not in default_options:
         default_options['engines_dirs'] = []
-    default_options['engines_dirs'].insert(0, engines_dir)
+    if engines_dir not in default_options['engines']:
+        default_options['engines_dirs'].insert(0, engines_dir)
     default_options['pytest_engine_port'] = engine_port
     if 'log_handlers_dirs' not in default_options:
         default_options['log_handlers_dirs'] = []
-    default_options['log_handlers_dirs'].insert(0, log_handlers_dir)
+    if log_handlers_dir not in default_options['log_handlers_dirs']:
+        default_options['log_handlers_dirs'].insert(0, log_handlers_dir)
     default_options['pytest_log_host'] = 'localhost'
     default_options['pytest_log_port'] = log_server_port
@@ -800,9 +804,9 @@ def apply_master_config(default_options,
         # Apply it!
         dictupdate.update(default_options, direct_overrides, merge_lists=True)
-'Writing to configuration file %s. Configuration:\n%s',
-             config_file,
-             pprint.pformat(default_options))
+    log.debug('Writing to configuration file %s. Configuration:\n%s',
+              config_file,
+              pprint.pformat(default_options))
     # Write down the computed configuration into the config file
     with compat.fopen(config_file, 'w') as wfh:
@@ -1146,7 +1150,9 @@ def apply_minion_config(default_options,
     if 'log_handlers_dirs' not in default_options:
         default_options['log_handlers_dirs'] = []
-    default_options['log_handlers_dirs'].insert(0, log_handlers_dir)
+    if log_handlers_dir not in default_options['log_handlers_dirs']:
+        default_options['log_handlers_dirs'].insert(0, log_handlers_dir)
     default_options['pytest_log_host'] = 'localhost'
     default_options['pytest_log_port'] = log_server_port
@@ -1158,9 +1164,9 @@ def apply_minion_config(default_options,
         # Apply it!
         dictupdate.update(default_options, direct_overrides, merge_lists=True)
-'Writing to configuration file %s. Configuration:\n%s',
-             config_file,
-             pprint.pformat(default_options))
+    log.debug('Writing to configuration file %s. Configuration:\n%s',
+              config_file,
+              pprint.pformat(default_options))
     # Write down the computed configuration into the config file
     with compat.fopen(config_file, 'w') as wfh:
@@ -1261,7 +1267,9 @@ def apply_proxy_config(default_options,
     if 'log_handlers_dirs' not in default_options:
         default_options['log_handlers_dirs'] = []
-    default_options['log_handlers_dirs'].insert(0, log_handlers_dir)
+    if log_handlers_dir not in default_options['log_handlers_dirs']:
+        default_options['log_handlers_dirs'].insert(0, log_handlers_dir)
     default_options['pytest_log_host'] = 'localhost'
     default_options['pytest_log_port'] = log_server_port
@@ -1273,9 +1281,9 @@ def apply_proxy_config(default_options,
         # Apply it!
         dictupdate.update(default_options, direct_overrides, merge_lists=True)
-'Writing to configuration file %s. Configuration:\n%s',
-             config_file,
-             pprint.pformat(default_options))
+    log.debug('Writing to configuration file %s. Configuration:\n%s',
+              config_file,
+              pprint.pformat(default_options))
     # Write down the computed configuration into the config file
     with compat.fopen(config_file, 'w') as wfh:
@@ -1499,9 +1507,9 @@ def apply_syndic_config(syndic_default_options,
     syndic_master_config_file = syndic_conf_dir.join('master').realpath().strpath
     # Write down the master computed configuration into the config file
-'Writing to configuration file %s. Configuration:\n%s',
-             syndic_master_config_file,
-             pprint.pformat(master_config))
+    log.debug('Writing to configuration file %s. Configuration:\n%s',
+              syndic_master_config_file,
+              pprint.pformat(master_config))
     with compat.fopen(syndic_master_config_file, 'w') as wfh:
         yamlserialize.safe_dump(master_config, wfh, default_flow_style=False)
diff --git a/pytestsalt/fixtures/ b/pytestsalt/fixtures/
index 3c9cc7c..71e6173 100644
--- a/pytestsalt/fixtures/
+++ b/pytestsalt/fixtures/
@@ -1580,14 +1580,9 @@ class SaltMinion(SaltDaemonScriptBase):
         return script_args
     def get_check_events(self):
-        if sys.platform.startswith('win'):
-            return super(SaltMinion, self).get_check_events()
         return set(['salt/{}/{}/start'.format(self.config['__role'], self.config['id'])])
     def get_check_ports(self):
-        if sys.platform.startswith('win'):
-            return set([self.config['tcp_pub_port'],
-                        self.config['tcp_pull_port']])
         return super(SaltMinion, self).get_check_ports()
@@ -1605,14 +1600,9 @@ class SaltProxy(SaltDaemonScriptBase):
         return script_args
     def get_check_events(self):
-        if sys.platform.startswith('win'):
-            return super(SaltProxy, self).get_check_events()
         return set(['salt/{}/{}/start'.format(self.config['__role'], self.config['id'])])
     def get_check_ports(self):
-        if sys.platform.startswith('win'):
-            return set([self.config['tcp_pub_port'],
-                        self.config['tcp_pull_port']])
         return super(SaltProxy, self).get_check_ports()
@@ -1625,14 +1615,9 @@ class SaltMaster(SaltDaemonScriptBase):
         return ['-l', 'quiet']
     def get_check_events(self):
-        if sys.platform.startswith('win'):
-            return super(SaltMaster, self).get_check_events()
         return set(['salt/{}/{}/start'.format(self.config['__role'], self.config['id'])])
     def get_check_ports(self):
-        if sys.platform.startswith('win'):
-            return set([self.config['ret_port'],
-                        self.config['publish_port']])
         return super(SaltMaster, self).get_check_ports()
diff --git a/pytestsalt/fixtures/ b/pytestsalt/fixtures/
index 821846d..cedb149 100644
--- a/pytestsalt/fixtures/
+++ b/pytestsalt/fixtures/
@@ -25,6 +25,13 @@ IS_WINDOWS = sys.platform.startswith('win')
 class SaltTerminalReporter(TerminalReporter):
     def __init__(self, config):
         TerminalReporter.__init__(self, config)
+        self._session = None
+        self._show_sys_stats = config.getoption('--sys-stats') is True
+        self._sys_stats_no_children = config.getoption('--sys-stats-no-children') is True
+        if config.getoption('--sys-stats-uss-mem') is True:
+            self._sys_stats_mem_type = 'uss'
+        else:
+            self._sys_stats_mem_type = 'rss'
     def pytest_sessionstart(self, session):
@@ -35,39 +42,76 @@ class SaltTerminalReporter(TerminalReporter):
         TerminalReporter.pytest_runtest_logreport(self, report)
         if self.verbosity <= 0:
         if report.when != 'call':
-        if self.config.getoption('--sys-stats') is False:
+        if self._show_sys_stats is False:
         if self.verbosity > 1:
             self.section('Processes Statistics', sep='-', bold=True)
             left_padding = len(max(['System'] + list(self._session.stats_processes), key=len))
-            template = '  ...{}  {}  -  CPU: {:6.2f} %   MEM: {:6.2f} %'
-            if not IS_WINDOWS:
-                template += '   SWAP: {:6.2f} %'
+            template = '  ...{dots}  {name}  -  CPU: {cpu:6.2f} %   MEM: {mem:6.2f} % (Virtual Memory)'
+            stats = {
+                'name': 'System',
+                'dots': '.' * (left_padding - len('System')),
+                'cpu': psutil.cpu_percent(),
+                'mem': psutil.virtual_memory().percent
+            }
+            swap = psutil.swap_memory().percent
+            if swap > 0:
+                template += '  SWAP: {swap:6.2f} %'
+                stats['swap'] = swap
             template += '\n'
-            self.write(
-                template.format(
-                    '.' * (left_padding - len('System')),
-                    'System',
-                    psutil.cpu_percent(),
-                    psutil.virtual_memory().percent,
-                    psutil.swap_memory().percent
-                )
-            )
+            self.write(template.format(**stats))
+            template = '  ...{dots}  {name}  -  CPU: {cpu:6.2f} %   MEM: {mem:6.2f} % ({m_type})'
+            children_template = template + '   MEM SUM: {c_mem} % ({m_type})   CHILD PROCS: {c_count}\n'
+            no_children_template = template + '\n'
             for name, psproc in self._session.stats_processes.items():
-                with psproc.oneshot():
-                    cpu = psproc.cpu_percent()
-                    mem = psproc.memory_percent('vms')
-                    dots = '.' * (left_padding - len(name))
-                    if not IS_WINDOWS:
-                        swap = psproc.memory_percent('swap')
-                        formatted = template.format(dots, name, cpu, mem, swap)
-                    else:
-                        formatted = template.format(dots, name, cpu, mem)
-                    self.write(formatted)
+                template = no_children_template
+                dots = '.' * (left_padding - len(name))
+                pids = []
+                try:
+                    with psproc.oneshot():
+                        stats = {
+                            'name': name,
+                            'dots': dots,
+                            'cpu': psproc.cpu_percent(),
+                            'mem': psproc.memory_percent(self._sys_stats_mem_type),
+                            'm_type': self._sys_stats_mem_type.upper()
+                        }
+                        if self._sys_stats_no_children is False:
+                            pids.append(
+                            children = psproc.children(recursive=True)
+                            if children:
+                                template = children_template
+                                stats['c_count'] = 0
+                                c_mem = stats['mem']
+                                for child in children:
+                                    if in pids:
+                                        continue
+                                    pids.append(
+                                    if not psutil.pid_exists(
+                                        continue
+                                    try:
+                                        c_mem += child.memory_percent(self._sys_stats_mem_type)
+                                        stats['c_count'] += 1
+                                    except (psutil.AccessDenied, psutil.NoSuchProcess):
+                                        continue
+                                if stats['c_count']:
+                                    stats['c_mem'] = '{:6.2f}'.format(c_mem)
+                                else:
+                                    template = no_children_template
+                        self.write(template.format(**stats))
+                except psutil.NoSuchProcess:
+                    continue
     def _get_progress_information_message(self):
         msg = TerminalReporter._get_progress_information_message(self)
@@ -99,6 +143,19 @@ def pytest_addoption(parser):
         help='Print System CPU and MEM statistics after each test execution.'
+    output_options_group.addoption(
+        '--sys-stats-no-children',
+        default=False,
+        action='store_true',
+        help='Don\'t include child processes memory statistics.'
+    )
+    output_options_group.addoption(
+        '--sys-stats-uss-mem',
+        default=False,
+        action='store_true',
+        help='Use the USS("Unique Set Size", memory unique to a process which would be freed if the process was '
+             'terminated) memory instead which is more expensive to calculate.'
+    )
diff --git a/pytestsalt/salt/coverage/ b/pytestsalt/salt/coverage/
new file mode 100644
index 0000000..e8500dd
--- /dev/null
+++ b/pytestsalt/salt/coverage/
@@ -0,0 +1,6 @@
+# -*- coding: utf-8 -*-
+    import coverage
+    coverage.process_startup()
+except ImportError:
+    pass
diff --git a/pytestsalt/salt/engines/ b/pytestsalt/salt/engines/
index 7ee9b05..f17b7cc 100644
--- a/pytestsalt/salt/engines/
+++ b/pytestsalt/salt/engines/
@@ -22,6 +22,10 @@ try:
     HAS_SALT_ASYNC = True
 except ImportError:
     HAS_SALT_ASYNC = False
+    import salt.ext.six as six
+except ImportError:
+    import six
 # Import 3rd-party libs
 from tornado import gen
@@ -104,13 +108,40 @@ class PyTestEngine(object):
         # 30 seconds should be more than enough to fire these events every second in order
         # for pytest-salt to pickup that the master is running
         timeout = 30
+        event_bus = None
+        call_destroy = False
+        try:
+            with salt.utils.event.get_master_event(self.opts,
+                                                   self.sock_dir,
+                                                   listen=False) as event_bus:
+                yield self._fire_master_started_event(event_bus,
+                                                      load,
+                                                      master_start_event_tag,
+                                                      timeout)
+        except AttributeError as exc:
+            if '__enter__' not in str(exc):
+                six.reraise(*sys.exc_info())
+            call_destroy = True
+            event_bus = salt.utils.event.get_master_event(self.opts,
+                                                          self.sock_dir,
+                                                          listen=False)
+            yield self._fire_master_started_event(event_bus,
+                                                  load,
+                                                  master_start_event_tag,
+                                                  timeout)
+        finally:
+            if call_destroy and event_bus is not None:
+                event_bus.destroy()
+    @gen.coroutine
+    def _fire_master_started_event(self, event_bus, load, tag, timeout):
         while True:
             if self.stop_sending_events_file and not os.path.exists(self.stop_sending_events_file):
       'The stop sending events file "marker" is done. Stop sending events...')
             timeout -= 1
-                event_bus.fire_event(load, master_start_event_tag, timeout=500)
+                event_bus.fire_event(load, tag, timeout=500)
                 if timeout <= 0:
                 yield gen.sleep(1)
diff --git a/pytestsalt/salt/log_handlers/ b/pytestsalt/salt/log_handlers/
index a7788a8..80a745f 100644
--- a/pytestsalt/salt/log_handlers/
+++ b/pytestsalt/salt/log_handlers/
@@ -18,7 +18,6 @@ from multiprocessing import Queue
 import msgpack
 # Import Salt libs
-import salt.log.setup
 # pylint: disable=no-member,invalid-name
     import salt.utils.stringutils
@@ -40,6 +39,9 @@ log = logging.getLogger(__name__)
 def __virtual__():
+    if 'log_forwarding_consumer' in __opts__:
+        # New Salt Logging in place. This handler is not needed
+        return False, "New Salt Logging in place. Not loading."
     if 'pytest_log_port' not in __opts__:
         return False, "'pytest_log_port' not in options"
     return True
@@ -83,9 +85,12 @@ def setup_handlers():
         # start dropping. This will contain a memory leak in case `process_queue`
         # can't process fast enough of in case it can't deliver the log records at all.
         queue_size = 10000000
+    # Late Imports Because of Salt's logging refactoring
+    from salt.log.setup import SaltLogQueueHandler, LOG_LEVELS
     queue = Queue(queue_size)
-    handler = salt.log.setup.SaltLogQueueHandler(queue)
-    level = salt.log.setup.LOG_LEVELS[(__opts__.get('pytest_log_level') or 'error').lower()]
+    handler = SaltLogQueueHandler(queue)
+    level = LOG_LEVELS[(__opts__.get('pytest_log_level') or 'error').lower()]
     pytest_log_prefix = os.environ.get('PYTEST_LOG_PREFIX') or __opts__['pytest_log_prefix']
     process_queue_thread = threading.Thread(target=process_queue,
@@ -106,7 +111,7 @@ def process_queue(host, port, prefix, queue):
-    log.warning('Sending log records to Remote log server')
+    log.debug('Sending log records to Remote log server')
     while True:
             record = queue.get()
diff --git a/pytestsalt/utils/ b/pytestsalt/utils/
index 1a3680a..6b63a58 100644
--- a/pytestsalt/utils/
+++ b/pytestsalt/utils/
@@ -15,11 +15,13 @@ import json
 import time
 import errno
 import atexit
+import pprint
 import signal
 import socket
 import logging
 import subprocess
 import threading
+import weakref
 from operator import itemgetter
 from collections import namedtuple
@@ -34,12 +36,6 @@ except ImportError:
 log = logging.getLogger(__name__)
-if sys.platform.startswith('win'):
-    SIGINT = SIGTERM = signal.CTRL_BREAK_EVENT  # pylint: disable=no-member
-    SIGINT = signal.SIGINT
-    SIGTERM = signal.SIGTERM
 def set_proc_title(title):
     if HAS_SETPROCTITLE is False:
@@ -71,7 +67,53 @@ def collect_child_processes(pid):
     return children
+def _get_cmdline(proc):
+    # pylint: disable=protected-access
+    try:
+        return proc._cmdline
+    except AttributeError:
+        # Cache the cmdline since that will be inaccessible once the process is terminated
+        # and we use it in log calls
+        try:
+            cmdline = proc.cmdline()
+        except (psutil.NoSuchProcess, psutil.AccessDenied):
+            # OSX is more restrictive about the above information
+            cmdline = None
+        except OSError:
+            # On Windows we've seen something like:
+            #   File " c: ... \lib\site-packages\pytestsalt\utils\", line 182, in terminate_process
+            #     terminate_process_list(process_list, kill=slow_stop is False, slow_stop=slow_stop)
+            #   File " c: ... \lib\site-packages\pytestsalt\utils\", line 130, in terminate_process_list
+            #     _terminate_process_list(process_list, kill=kill, slow_stop=slow_stop)
+            #   File " c: ... \lib\site-packages\pytestsalt\utils\", line 78, in _terminate_process_list
+            #     cmdline = process.cmdline()
+            #   File " c: ... \lib\site-packages\psutil\", line 786, in cmdline
+            #     return self._proc.cmdline()
+            #   File " c: ... \lib\site-packages\psutil\", line 667, in wrapper
+            #     return fun(self, *args, **kwargs)
+            #   File " c: ... \lib\site-packages\psutil\", line 745, in cmdline
+            #     ret = cext.proc_cmdline(, use_peb=True)
+            #   OSError: [WinError 299] Only part of a ReadProcessMemory or WriteProcessMemory request was completed: 'originated from ReadProcessMemory(ProcessParameters)
+            # Late import
+            cmdline = None
+        if not cmdline:
+            try:
+                cmdline = proc.as_dict()
+            except psutil.NoSuchProcess:
+                cmdline = '<could not be retrived; dead process: {}>'.format(proc)
+            except (psutil.AccessDenied, OSError):
+                cmdline = weakref.proxy(proc)
+        proc._cmdline = cmdline
+    return proc._cmdline
+    # pylint: enable=protected-access
 def _terminate_process_list(process_list, kill=False, slow_stop=False):
+        'Terminating process list:\n%s',
+        pprint.pformat([_get_cmdline(proc) for proc in process_list])
+    )
     for process in process_list[:]:  # Iterate over copy of the list
         if not psutil.pid_exists(
@@ -80,22 +122,15 @@ def _terminate_process_list(process_list, kill=False, slow_stop=False):
             if not kill and process.status() == psutil.STATUS_ZOMBIE:
                 # Zombie processes will exit once child processes also exit
-            try:
-                cmdline = process.cmdline()
-            except psutil.AccessDenied:
-                # OSX is more restrictive about the above information
-                cmdline = None
-            if not cmdline:
-                cmdline = process.as_dict()
             if kill:
-      'Killing process(%s): %s',, cmdline)
+      'Killing process(%s): %s',, _get_cmdline(process))
-      'Terminating process(%s): %s',, cmdline)
+      'Terminating process(%s): %s',, _get_cmdline(process))
                     if slow_stop:
                         # Allow coverage data to be written down to disk
-                        process.send_signal(SIGTERM)
+                        process.send_signal(signal.SIGTERM)
                         except psutil.TimeoutExpired:
@@ -120,19 +155,17 @@ def terminate_process_list(process_list, kill=False, slow_stop=False):
     # Try to terminate processes with the provided kill and slow_stop parameters'Terminating process list. 1st step. kill: %s, slow stop: %s', kill, slow_stop)
-    # Cache the cmdline since that will be inaccessible once the process is terminated
-    for proc in process_list:
-        try:
-            cmdline = proc.cmdline()
-        except (psutil.NoSuchProcess, psutil.AccessDenied):
-            # OSX is more restrictive about the above information
-            cmdline = None
-        if not cmdline:
-            try:
-                cmdline = proc
-            except (psutil.NoSuchProcess, psutil.AccessDenied):
-                cmdline = '<could not be retrived; dead process: {}>'.format(proc)
-        proc._cmdline = cmdline
+    # Remove duplicates from the process list
+    seen_pids = []
+    start_count = len(process_list)
+    for proc in process_list[:]:
+        if in seen_pids:
+            process_list.remove(proc)
+        seen_pids.append(
+    end_count = len(process_list)
+    if end_count < start_count:
+        log.debug('Removed %d duplicates from the initial process list', start_count - end_count)
     _terminate_process_list(process_list, kill=kill, slow_stop=slow_stop)
     psutil.wait_procs(process_list, timeout=15, callback=on_process_terminated)
@@ -153,14 +186,16 @@ def terminate_process_list(process_list, kill=False, slow_stop=False):
         log.warning('Some processes failed to properly terminate: %s', process_list)
-def terminate_process(pid=None, process=None, children=None, kill_children=False, slow_stop=False):
+def terminate_process(pid=None, process=None, children=None, kill_children=None, slow_stop=False):
     Try to terminate/kill the started processe
     children = children or []
     process_list = []
-    # Always kill children if kill the parent process.
-    kill_children = True if slow_stop is False else kill_children
+    if kill_children is None:
+        # Always kill children if kill the parent process and kill_children was not set
+        kill_children = True if slow_stop is False else kill_children
     if pid and not process:
@@ -172,11 +207,7 @@ def terminate_process(pid=None, process=None, children=None, kill_children=False
     if kill_children:
         if process:
-            if not children:
-                children = collect_child_processes(
-            else:
-                # Let's collect children again since there might be new ones
-                children.extend(collect_child_processes(pid))
+            children.extend(collect_child_processes(pid))
         if children:
@@ -260,12 +291,11 @@ def start_daemon(request,
       '[%s] pytest %s(%s) stopped', daemon_log_prefix, daemon_name, daemon_id)
-            return process
+            break
             terminate_process(, kill_children=True, slow_stop=slow_stop)
-    else:   # pylint: disable=useless-else-on-loop
-            # Wrong, we have a return, its not useless
+    else:
         if process is not None:
             terminate_process(, kill_children=True, slow_stop=slow_stop)
@@ -275,6 +305,7 @@ def start_daemon(request,
+    return process
 class SaltScriptBase(object):
@@ -306,7 +337,7 @@ class SaltScriptBase(object):
         self.cli_script_name = cli_script_name
         if self.cli_display_name is None:
             self.cli_display_name = '{}({})'.format(self.__class__.__name__,
-                                                      self.cli_script_name)
+                                                    self.cli_script_name)
         self.slow_stop = slow_stop
         self.environ = environ or os.environ.copy()
         self.cwd = cwd or os.getcwd()
@@ -476,15 +507,13 @@ class SaltDaemonScriptBase(SaltScriptBase):
         Blocking call to wait for the daemon to start listening
-        # Late import
-        import salt.ext.six as six
         if self._connectable.is_set():
             return True
         expire = time.time() + timeout
         check_ports = self.get_check_ports()
         if check_ports:
-            log.debug(
                 '[%s][%s] Checking the following ports to assure running status: %s',
@@ -492,80 +521,66 @@ class SaltDaemonScriptBase(SaltScriptBase):
         check_events = self.get_check_events()
         if check_events:
-            log.debug(
                 '[%s][%s] Checking the following event tags to assure running status: %s',
         log.debug('Wait until running expire: %s  Timeout: %s  Current Time: %s', expire, timeout, time.time())
-        event_listener = EventListener(
-            self.event_listener_config_dir or self.config_dir,
-            self.log_prefix
-        )
-        try:
-            while True:
-                if self._running.is_set() is False:
-                    # No longer running, break
-                    log.warning('No longer running!')
-                    break
-                if time.time() > expire:
-                    # Timeout, break
-                    log.debug('Expired at %s(was set to %s)', time.time(), expire)
-                    break
-                if not check_ports and not check_events:
-                    self._connectable.set()
-                    break
-                if check_events:
-                    for tag in event_listener.wait_for_events(check_events, timeout=timeout - 0.5):
-                        check_events.remove(tag)
-                if not check_events:
-                    stop_sending_events_file = self.config.get('pytest_stop_sending_events_file')
-                    if stop_sending_events_file and os.path.exists(stop_sending_events_file):
-                        log.warning('Removing pytest_stop_sending_events_file: %s', stop_sending_events_file)
-                        os.unlink(stop_sending_events_file)
-                for port in set(check_ports):
-                    if isinstance(port, int):
-                        log.debug('[%s][%s] Checking connectable status on port: %s',
-                                  self.log_prefix,
-                                  self.cli_display_name,
-                                  port)
-                        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-                        conn = sock.connect_ex(('localhost', port))
-                        try:
-                            if conn == 0:
-                                log.debug('[%s][%s] Port %s is connectable!',
-                                          self.log_prefix,
-                                          self.cli_display_name,
-                                          port)
-                                check_ports.remove(port)
-                                sock.shutdown(socket.SHUT_RDWR)
-                        except socket.error:
-                            continue
-                        finally:
-                            sock.close()
-                            del sock
-                    elif isinstance(port, six.string_types):
-                        salt_run = self.get_salt_run_fixture()
-                        minions_joined ='manage.joined')
-                        if minions_joined.exitcode == 0:
-                            if minions_joined.json and port in minions_joined.json:
-                                check_ports.remove(port)
-                                log.warning('Removed ID %r  Still left: %r', port, check_ports)
-                            elif minions_joined.json is None:
-                                log.debug('salt-run manage.join did not return any valid JSON: %s', minions_joined)
-                time.sleep(0.5)
-        except KeyboardInterrupt:
-            return self._connectable.is_set()
-        finally:
-            event_listener.terminate()
+        with EventListener(self.event_listener_config_dir or self.config_dir, self.log_prefix) as event_listener:
+            try:
+                while True:
+                    if self._running.is_set() is False:
+                        # No longer running, break
+                        log.warning('No longer running!')
+                        break
+                    if time.time() > expire:
+                        # Timeout, break
+                        log.warning('Wait until running expired at %s(was set to %s)', time.time(), expire)
+                        break
+                    if not check_ports and not check_events:
+                        self._connectable.set()
+                        break
+                    if check_events:
+                        for tag in event_listener.wait_for_events(check_events, timeout=timeout - 0.5):
+                            check_events.remove(tag)
+                    if not check_events:
+                        stop_sending_events_file = self.config.get('pytest_stop_sending_events_file')
+                        if stop_sending_events_file and os.path.exists(stop_sending_events_file):
+                  'Removing pytest_stop_sending_events_file: %s', stop_sending_events_file)
+                            os.unlink(stop_sending_events_file)
+                    for port in set(check_ports):
+                        if isinstance(port, int):
+                            log.debug('[%s][%s] Checking connectable status on port: %s',
+                                      self.log_prefix,
+                                      self.cli_display_name,
+                                      port)
+                            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                            conn = sock.connect_ex(('localhost', port))
+                            try:
+                                if conn == 0:
+                                    log.debug('[%s][%s] Port %s is connectable!',
+                                              self.log_prefix,
+                                              self.cli_display_name,
+                                              port)
+                                    check_ports.remove(port)
+                                    sock.shutdown(socket.SHUT_RDWR)
+                            except socket.error:
+                                continue
+                            finally:
+                                sock.close()
+                                del sock
+                    time.sleep(0.5)
+            except KeyboardInterrupt:
+                pass
         if self._connectable.is_set():
-            log.debug('[%s][%s] All ports checked. Running!', self.log_prefix, self.cli_display_name)
+  '[%s][%s] All ports checked. Running!', self.log_prefix, self.cli_display_name)
         return self._connectable.is_set()
@@ -579,8 +594,8 @@ class ShellResult(namedtuple('Result', ('exitcode', 'stdout', 'stderr', 'json'))
     __slots__ = ()
-    def __new__(cls, exitcode, stdout, stderr, json):
-        return super(ShellResult, cls).__new__(cls, exitcode, stdout, stderr, json)
+    def __new__(cls, exitcode, stdout, stderr, _json):
+        return super(ShellResult, cls).__new__(cls, exitcode, stdout, stderr, _json)
     # These are copied from the namedtuple verbose output in order to quiet down PyLint
     exitcode = property(itemgetter(0), doc='Alias for field number 0')
@@ -695,8 +710,8 @@ class SaltCliScriptBase(SaltScriptBase):
                             '[{}][{}] Timed out after {} seconds!'.format(self.log_prefix,
-                                                                             self.cli_display_name,
-                                                                             timeout)
+                                                                          self.cli_display_name,
+                                                                          timeout)
         except (SystemExit, KeyboardInterrupt):
@@ -729,145 +744,7 @@ class SaltCliScriptBase(SaltScriptBase):
         return stdout, stderr, json_out
-class SaltRunEventListener(SaltCliScriptBase):
-    '''
-    Class which runs 'salt-run state.event *' to match agaist a provided set of event tags
-    '''
-    EVENT_MATCH_RE = re.compile(r'^(?P<tag>[\w/-]+)(?:[\s]+)(?P<data>[\S\W]+)$')
-    def get_base_script_args(self):
-        return SaltScriptBase.get_base_script_args(self)
-    def get_script_args(self):  # pylint: disable=no-self-use
-        '''
-        Returns any additional arguments to pass to the CLI script
-        '''
-        return ['state.event']
-    def run(self, tags=(), timeout=10):  # pylint: disable=arguments-differ
-        '''
-        Run the given command synchronously
-        '''
-'%s checking for tags: %s', self.__class__.__name__, tags)
-        # Late import
-        import salt.ext.six as six
-        exitcode = 0
-        timeout_expire = time.time() + timeout
-        environ = self.environ.copy()
-        environ['PYTEST_LOG_PREFIX'] = '{}[EventListen]'.format(self.log_prefix)
-        environ['PYTHONUNBUFFERED'] = '1'
-        proc_args = [
-            self.get_script_path(self.cli_script_name)
-        ] + self.get_base_script_args() + self.get_script_args()
-        if sys.platform.startswith('win'):
-            # Windows needs the python executable to come first
-            proc_args.insert(0, sys.executable)
-'[%s][%s] Running \'%s\' in CWD: %s...',
-                 self.log_prefix, self.cli_display_name, ' '.join(proc_args), self.cwd)
-        to_match_events = set(tags)
-        matched_events = {}
-        terminal = self.init_terminal(proc_args,
-                                      cwd=self.cwd,
-                                      env=environ,
-                                      stdout=subprocess.PIPE,
-                                      stderr=subprocess.PIPE)
-        # Consume the output
-        stdout = six.b('')
-        stderr = six.b('')
-        process_output = six.b('')
-        events_processed = 0
-        try:
-            while True:
-                time.sleep(0.5)
-                if terminal.stdout is not None:
-                    try:
-                        out = terminal.recv(4096)
-                    except IOError:
-                        out = six.b('')
-                    if out:
-                        stdout += out
-                        process_output += out
-                if terminal.stderr is not None:
-                    try:
-                        err = terminal.recv_err(4096)
-                    except IOError:
-                        err = ''
-                    if err:
-                        stderr += err
-                if out is None and err is None:
-                    if to_match_events:
-                        exitcode = 1
-                    log.warning('[%s][%s] Premature exit?! Failed to find all of the required event tags. '
-                                'Total events processed: %s',
-                                self.log_prefix,
-                                self.cli_display_name,
-                                events_processed)
-                    break
-                if process_output:
-                    lines = process_output.split(b'}\n')
-                    if lines[-1] != b'':
-                        process_output = lines.pop()
-                    else:
-                        process_output = six.b('')
-                        lines.pop()
-                    for line in lines:
-                        match = self.EVENT_MATCH_RE.match(line.decode(__salt_system_encoding__))  # pylint: disable=undefined-variable
-                        if match:
-                            events_processed += 1
-                            tag, data = match.groups()
-                            if tag in to_match_events:
-                                matched_events[tag] = json.loads(data + '}')
-                                to_match_events.remove(tag)
-          '[%s][%s] Events processed so far: %d',
-                             self.log_prefix,
-                             self.cli_display_name,
-                             events_processed)
-                if not to_match_events:
-                    log.debug('[%s][%s] ALL EVENT TAGS FOUND!!!', self.log_prefix, self.cli_display_name)
-                    break
-                if timeout_expire < time.time():
-                    log.warning('[%s][%s] Failed to find all of the required event tags. Total events processed: %s',
-                                self.log_prefix,
-                                self.cli_display_name,
-                                events_processed)
-                    exitcode = 1
-                    break
-        except (SystemExit, KeyboardInterrupt):
-            pass
-        finally:
-            self.terminate()
-        if six.PY3:
-            # pylint: disable=undefined-variable
-            stdout = stdout.decode(__salt_system_encoding__)
-            stderr = stderr.decode(__salt_system_encoding__)
-            # pylint: enable=undefined-variable
-        if to_match_events:
-            stop_sending_events_file = self.config.get('pytest_stop_sending_events_file')
-            if stop_sending_events_file and os.path.exists(stop_sending_events_file):
-                log.warning('Removing pytest_stop_sending_events_file: %s', stop_sending_events_file)
-                os.unlink(stop_sending_events_file)
-        json_out = {
-            'matched': matched_events,
-            'unmatched': to_match_events
-        }
-        return ShellResult(exitcode, stdout, stderr, json_out)
-class EventListener:
+class EventListener(object):
@@ -888,6 +765,8 @@ class EventListener:
         events_to_match = set(check_events)
         events_processed = 0
         max_timeout = time.time() + timeout
+        last_log = 0
+        log_freq = 10
         while True:
             if not events_to_match:
       '%s ALL EVENT TAGS FOUND!!!', self.log_prefix)
@@ -895,42 +774,50 @@ class EventListener:
             if time.time() > max_timeout:
-                    '%s Failed to find all of the required event tags. '
-                    'Total events processed: %s',
+                    '%s Failed to find all of the required event tags(%s). '
+                    'Total events processed: %s. Total events found: %s.',
-                    events_processed
+                    check_events,
+                    events_processed,
+                    len(matched_events)
                 return matched_events
-            event = self.listener.get_event(full=True, auto_reconnect=True)
+            event = self._listener.get_event(full=True, auto_reconnect=True)
             if event is None:
             tag = event['tag']
-            log.warning('Got event: %s', event)
+  'Got event: %s', event)
             if tag in events_to_match:
             events_processed += 1
-  '%s Events processed so far: %d',
-                     self.log_prefix,
-                     events_processed)
+            if time.time() - last_log > log_freq:
+                log.debug('%s Events processed so far: %d',
+                          self.log_prefix,
+                          events_processed)
+                last_log = time.time()
     def terminate(self):
-        listener = self.listener
-        self._listener = None
-        listener.destroy()
+        if self._listener is not None:
+            listener = self._listener
+            self._listener = None
+            listener.destroy()
-    @property
-    def listener(self):
+    def __enter__(self):
         if self._listener is None:
             # Late import
             import salt.config
             import salt.utils.event
             opts = salt.config.master_config(os.path.join(self.config_dir, 'master'))
             self._listener = salt.utils.event.get_event('master', opts=opts, listen=True)
-        return self._listener
+            atexit.register(self.terminate)
+        return self
+    def __exit__(self, *args):
+        self.terminate()
diff --git a/pytestsalt/utils/ b/pytestsalt/utils/
index 7346a39..adc16f1 100644
--- a/pytestsalt/utils/
+++ b/pytestsalt/utils/
@@ -13,10 +13,17 @@ import threading
 # Import 3rd-party libs
 import msgpack
-from tornado import gen
-from tornado.ioloop import IOLoop
-from tornado.tcpserver import TCPServer
-from tornado.iostream import StreamClosedError
+    from salt.ext.tornado import gen
+    from salt.ext.tornado.ioloop import IOLoop
+    from salt.ext.tornado.tcpserver import TCPServer
+    from salt.ext.tornado.iostream import StreamClosedError
+except ImportError:
+    from tornado import gen
+    from tornado.ioloop import IOLoop
+    from tornado.tcpserver import TCPServer
+    from tornado.iostream import StreamClosedError
 log = logging.getLogger(__name__)
diff --git a/tests/ b/tests/
[The following lists of changes regard files as different if they have different names, permissions or owners.]

Files in second set of .debs but not in first

-rw-r--r--  root/root   /usr/lib/python3/dist-packages/pytest_salt-2020.1.27.egg-info/PKG-INFO
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/pytest_salt-2020.1.27.egg-info/dependency_links.txt
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/pytest_salt-2020.1.27.egg-info/entry_points.txt
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/pytest_salt-2020.1.27.egg-info/requires.txt
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/pytest_salt-2020.1.27.egg-info/top_level.txt

Files in first set of .debs but not in second

-rw-r--r--  root/root   /usr/lib/python3/dist-packages/pytest_salt-2019.6.13.egg-info/PKG-INFO
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/pytest_salt-2019.6.13.egg-info/dependency_links.txt
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/pytest_salt-2019.6.13.egg-info/entry_points.txt
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/pytest_salt-2019.6.13.egg-info/requires.txt
-rw-r--r--  root/root   /usr/lib/python3/dist-packages/pytest_salt-2019.6.13.egg-info/top_level.txt

No differences were encountered in the control files

More details

Full run details