diff --git a/pbcommand/models/common.py b/pbcommand/models/common.py
index 7688194..163dc95 100644
--- a/pbcommand/models/common.py
+++ b/pbcommand/models/common.py
@@ -24,7 +24,7 @@ log = logging.getLogger(__name__)
 REGISTERED_FILE_TYPES = {}
 
 # Light weight Dataset metatadata. Use pbcore for full dataset functionality
-DataSetMetaData = namedtuple("DataSetMetaData", 'uuid metatype')
+DataSetMetaData = namedtuple("DataSetMetaData", 'uuid metatype name')
 
 
 class PacBioNamespaces:
diff --git a/pbcommand/services/_service_access_layer.py b/pbcommand/services/_service_access_layer.py
index 7100246..77e5de1 100644
--- a/pbcommand/services/_service_access_layer.py
+++ b/pbcommand/services/_service_access_layer.py
@@ -2,6 +2,17 @@
 Utils for Updating state/progress and results to WebServices
 """
 
+from .utils import to_sal_summary
+from pbcommand.pb_io import load_report_from
+from .models import (SMRTServiceBaseError,
+                     JobResult, JobStates, JobExeError, JobTypes,
+                     ServiceResourceTypes, ServiceJob, JobEntryPoint,
+                     JobTask)
+from pbcommand.utils import get_dataset_metadata
+from pbcommand.models import (FileTypes,
+                              DataSetFileType,
+                              DataStore,
+                              DataStoreFile)
 import base64
 import datetime
 import json
@@ -14,21 +25,11 @@ import warnings
 import pytz
 import requests
 from requests import RequestException
+from requests.exceptions import HTTPError, ConnectionError
+from urllib3.exceptions import ProtocolError, InsecureRequestWarning  # pylint: disable=import-error
 # To disable the ssl cert check warning
-from requests.packages.urllib3.exceptions import InsecureRequestWarning  # pylint: disable=import-error
-requests.packages.urllib3.disable_warnings(InsecureRequestWarning)  # pylint: disable=no-member
-
-from pbcommand.models import (FileTypes,
-                              DataSetFileType,
-                              DataStore,
-                              DataStoreFile)
-from pbcommand.utils import get_dataset_metadata
-from .models import (SMRTServiceBaseError,
-                     JobResult, JobStates, JobExeError, JobTypes,
-                     ServiceResourceTypes, ServiceJob, JobEntryPoint,
-                     JobTask)
-from pbcommand.pb_io import load_report_from
-from .utils import to_sal_summary
+import urllib3
+urllib3.disable_warnings(InsecureRequestWarning)  # pylint: disable=no-member
 
 
 log = logging.getLogger(__name__)
@@ -45,7 +46,10 @@ __all__ = [
 
 
 class Constants:
-    HEADERS = {'Content-type': 'application/json'}
+    HEADERS = {
+        'Content-type': 'application/json',
+        'Accept-Encoding': 'gzip, deflate'
+    }
 
 
 def __jsonable_request(request_method, headers):
@@ -100,7 +104,7 @@ def _process_rget(total_url, ignore_errors=False, headers=None):
     r = _get_requests(__get_headers(headers))(total_url)
     _parse_base_service_error(r)
     if not r.ok and not ignore_errors:
-        log.error(
+        log.warning(
             "Failed ({s}) GET to {x}".format(
                 x=total_url,
                 s=r.status_code))
@@ -109,6 +113,27 @@ def _process_rget(total_url, ignore_errors=False, headers=None):
     return j
 
 
+def _process_rget_or_empty(total_url, ignore_errors=False, headers=None):
+    """
+    Process get request and return JSON response if populated, otherwise None.
+    Raise if not successful
+    """
+    r = _get_requests(__get_headers(headers))(total_url)
+    if len(r.content) > 0:
+        _parse_base_service_error(r)
+    if not r.ok and not ignore_errors:
+        log.warning(
+            "Failed ({s}) GET to {x}".format(
+                x=total_url,
+                s=r.status_code))
+    r.raise_for_status()
+    if len(r.content) > 0:
+        object_d = r.json()
+        if len(object_d) > 0:
+            return object_d
+    return None
+
+
 def _process_rget_with_transform(func, ignore_errors=False):
     """Post process the JSON result (if successful) with F(json_d) -> T"""
     def wrapper(total_url, headers=None):
@@ -162,12 +187,12 @@ def __process_creatable_to_json(f):
         _parse_base_service_error(r)
         # FIXME This should be strict to only return a 201
         if r.status_code not in (200, 201, 202, 204):
-            log.error(
+            log.warning(
                 "Failed ({s} to call {u}".format(
                     u=total_url,
                     s=r.status_code))
-            log.error("payload")
-            log.error("\n" + pprint.pformat(payload_d))
+            log.warning("payload")
+            log.warning("\n" + pprint.pformat(payload_d))
         r.raise_for_status()
         j = r.json()
         return j
@@ -218,6 +243,7 @@ def _get_job_by_id_or_raise(sal, job_id, error_klass,
     return job
 
 
+# FIXME this overlaps with job_poller.py, which is more fault-tolerant
 def _block_for_job_to_complete(sal, job_id, time_out=1200, sleep_time=2,
                                abort_on_interrupt=True,
                                retry_on_failure=False):
@@ -271,8 +297,8 @@ def _block_for_job_to_complete(sal, job_id, time_out=1200, sleep_time=2,
                     sal, job_id, JobExeError, error_messge_extras=msg)
             except JobExeError as e:
                 if retry_on_failure:
-                    log.error(e)
-                    log.warn("Polling job {i} failed".format(i=job_id))
+                    log.warning(e)
+                    log.warning("Polling job {i} failed".format(i=job_id))
                     continue
                 else:
                     raise
@@ -381,13 +407,6 @@ def _to_relative_tasks_url(job_type):
     return wrapper
 
 
-def _show_deprecation_warning(msg):
-    if "PB_TEST_MODE" not in os.environ:
-        warnings.simplefilter('once', DeprecationWarning)
-        warnings.warn(msg, DeprecationWarning)
-        warnings.simplefilter('default', DeprecationWarning)  # reset filte
-
-
 class ServiceAccessLayer:  # pragma: no cover
     """
     General Client Access Layer for interfacing with the job types on
@@ -424,10 +443,6 @@ class ServiceAccessLayer:  # pragma: no cover
         self.debug = debug
         self._sleep_time = sleep_time
 
-        if self.__class__.__name__ == "ServiceAccessLayer":
-            _show_deprecation_warning(
-                "Please use the SmrtLinkAuthClient', direct localhost access is not publicly supported")
-
     def _get_headers(self):
         return Constants.HEADERS
 
@@ -493,7 +508,7 @@ class ServiceAccessLayer:  # pragma: no cover
             i=job_id,
             r=resource_type_id,
             p=ServiceAccessLayer.ROOT_JOBS)
-        return _process_rget_or_none(transform_func)(
+        return _process_rget_with_transform(transform_func)(
             _to_url(self.uri, "{p}/{t}/{i}/{r}".format(**_d)), headers=self._get_headers())
 
     def _get_jobs_by_job_type(self, job_type, query=None):
@@ -528,26 +543,21 @@ class ServiceAccessLayer:  # pragma: no cover
     def get_analysis_jobs(self, query=None):
         return self._get_jobs_by_job_type(JobTypes.ANALYSIS, query=query)
 
-    def get_pbsmrtpipe_jobs(self, query=None):
-        """:rtype: list[ServiceJob]"""
-        _show_deprecation_warning("Please use get_analysis_jobs() instead")
-        return self.get_analysis_jobs(query=query)
-
-    def get_cromwell_jobs(self):
+    def get_cromwell_jobs(self, query=None):
         """:rtype: list[ServiceJob]"""
-        return self._get_jobs_by_job_type(JobTypes.CROMWELL)
+        return self._get_jobs_by_job_type(JobTypes.CROMWELL, query=query)
 
-    def get_import_dataset_jobs(self):
+    def get_import_dataset_jobs(self, query=None):
         """:rtype: list[ServiceJob]"""
-        return self._get_jobs_by_job_type(JobTypes.IMPORT_DS)
+        return self._get_jobs_by_job_type(JobTypes.IMPORT_DS, query=query)
 
-    def get_merge_dataset_jobs(self):
+    def get_merge_dataset_jobs(self, query=None):
         """:rtype: list[ServiceJob]"""
-        return self._get_jobs_by_job_type(JobTypes.MERGE_DS)
+        return self._get_jobs_by_job_type(JobTypes.MERGE_DS, query=query)
 
-    def get_fasta_convert_jobs(self):
+    def get_fasta_convert_jobs(self, query=None):
         """:rtype: list[ServiceJob]"""
-        self._get_jobs_by_job_type(JobTypes.CONVERT_FASTA)
+        self._get_jobs_by_job_type(JobTypes.CONVERT_FASTA, query=query)
 
     def get_analysis_job_by_id(self, job_id):
         """Get an Analysis job by id or UUID or return None
@@ -560,13 +570,13 @@ class ServiceAccessLayer:  # pragma: no cover
         return self.get_job_by_type_and_id(JobTypes.IMPORT_DS, job_id)
 
     def get_analysis_job_datastore(self, job_id):
-        """Get DataStore output from (pbsmrtpipe) analysis job"""
+        """Get DataStore output from analysis job"""
         # this doesn't work the list is sli
         return self._get_job_resource_type_with_transform(
-            "pbsmrtpipe", job_id, ServiceResourceTypes.DATASTORE, _to_datastore)
+            "analysis", job_id, ServiceResourceTypes.DATASTORE, _to_datastore)
 
     def _to_dsf_id_url(self, job_id, dsf_uuid):
-        u = "/".join([ServiceAccessLayer.ROOT_JOBS, "pbsmrtpipe",
+        u = "/".join([ServiceAccessLayer.ROOT_JOBS, "analysis",
                       str(job_id), ServiceResourceTypes.DATASTORE, dsf_uuid])
         return _to_url(self.uri, u)
 
@@ -621,7 +631,7 @@ class ServiceAccessLayer:  # pragma: no cover
                     dsf_uuid, job_id))
 
     def get_analysis_job_reports(self, job_id):
-        """Get list of DataStore ReportFile types output from (pbsmrtpipe) analysis job"""
+        """Get list of DataStore ReportFile types output from analysis job"""
         return self._get_job_resource_type_with_transform(
             JobTypes.ANALYSIS, job_id, ServiceResourceTypes.REPORTS, _to_job_report_files)
 
@@ -769,27 +779,25 @@ class ServiceAccessLayer:  # pragma: no cover
 
         :rtype: JobResult
         """
-        dataset_meta_type = get_dataset_metadata(path)
+        dsmd = get_dataset_metadata(path)
+        result = self.search_dataset_by_uuid(dsmd.uuid)
 
-        result = self.get_dataset_by_uuid(dataset_meta_type.uuid,
-                                          ignore_errors=True)
         if result is None:
             log.info("Importing dataset {p}".format(p=path))
             job_result = self.run_import_dataset_by_type(
-                dataset_meta_type.metatype, path, avoid_duplicate_import=avoid_duplicate_import)
+                dsmd.metatype,
+                path,
+                avoid_duplicate_import=avoid_duplicate_import)
             log.info("Confirming database update")
             # validation 1: attempt to retrieve dataset info
-            result_new = self.get_dataset_by_uuid(dataset_meta_type.uuid)
+            result_new = self.get_dataset_by_uuid(dsmd.uuid)
             if result_new is None:
                 raise JobExeError(("Dataset {u} was imported but could " +
                                    "not be retrieved; this may indicate " +
-                                   "XML schema errors.").format(
-                    u=dataset_meta_type.uuid))
+                                   "XML schema errors.").format(u=dsmd.uuid))
             return job_result
         else:
-            log.info(
-                "{f} already imported. Skipping importing. {r}".format(
-                    r=result, f=dataset_meta_type.metatype))
+            log.info(f"Already imported: {result}")
             # need to clean this up
             return JobResult(self.get_job_by_id(result['jobId']), 0, "")
 
@@ -826,6 +834,14 @@ class ServiceAccessLayer:  # pragma: no cover
                                                p=ServiceAccessLayer.ROOT_DS)),
             headers=self._get_headers())
 
+    def search_dataset_by_uuid(self, uuid):
+        """
+        Better alternative to get_dataset_by_uuid, that does not trigger a 404
+        """
+        return _process_rget_or_empty(
+            _to_url(self.uri, f"{ServiceAccessLayer.ROOT_DS}/search/{uuid}"),
+            headers=self._get_headers())
+
     def get_dataset_by_id(self, dataset_type, int_or_uuid):
         """Get a Dataset using the DataSetMetaType and (int|uuid) of the dataset"""
         ds_endpoint = _get_endpoint_or_raise(dataset_type)
@@ -947,7 +963,7 @@ class ServiceAccessLayer:  # pragma: no cover
                                        task_options=(),
                                        workflow_options=(),
                                        tags=()):
-        """Creates and runs a pbsmrtpipe pipeline by pipeline template id
+        """Creates and runs an analysis workflow by pipeline template id
 
 
         :param tags: Tags should be a set of strings
@@ -1049,7 +1065,7 @@ class ServiceAccessLayer:  # pragma: no cover
     def terminate_job(self, job):
         """
         POST a terminate request appropriate to the job type.  Currently only
-        supported for pbsmrtpipe, cromwell, and analysis job types.
+        supported for cromwell, and analysis job types.
         """
         log.warn("Terminating job {i} ({u})".format(i=job.id, u=job.uuid))
         if job.external_job_id is not None:
@@ -1144,279 +1160,14 @@ class ServiceAccessLayer:  # pragma: no cover
         return _process_rpost_with_transform(ServiceJob.from_d)(
             u, job_options, headers=self._get_headers())
 
-
-def __run_and_ignore_errors(f, warn_message):
-    """
-    Black hole ignoring exceptions from a func with no-args and
-    logging the error has a warning.
-    """
-    try:
-        return f()
-    except Exception as e:
-        log.warn(warn_message + " {e}".format(e=e))
-
-
-def _run_func(f, warn_message, ignore_errors=True):
-    if ignore_errors:
-        return __run_and_ignore_errors(f, warn_message)
-    else:
-        return f()
-
-
-def log_pbsmrtpipe_progress(total_url, message, level, source_id, ignore_errors=True, headers=None):  # pragma: no cover
-    """Log the status of a pbsmrtpipe to SMRT Server"""
-    # Keeping this as public to avoid breaking pbsmrtpipe. The
-    # new public interface should be the JobServiceClient
-
-    # Need to clarify the model here. Trying to pass the most minimal
-    # data necessary to pbsmrtpipe.
-    _d = dict(message=message, level=level, sourceId=source_id)
-    warn_message = "Failed Request to {u} data: {d}".format(u=total_url, d=_d)
-
-    def f():
-        return _process_rpost(total_url, _d, headers=headers)
-
-    return _run_func(f, warn_message, ignore_errors=ignore_errors)
-
-
-def add_datastore_file(total_url, datastore_file, ignore_errors=True, headers=None):  # pragma: no cover
-    """Add datastore to SMRT Server
-
-    :type datastore_file: DataStoreFile
-    """
-    # Keeping this as public to avoid breaking pbsmrtpipe. The
-    # new public interface should be the JobServiceClient
-    _d = datastore_file.to_dict()
-    warn_message = "Failed Request to {u} data: {d}.".format(u=total_url, d=_d)
-
-    def f():
-        return _process_rpost(total_url, _d, headers=headers)
-
-    return _run_func(f, warn_message, ignore_errors=ignore_errors)
-
-
-def _create_job_task(job_tasks_url, create_job_task_record, ignore_errors=True, headers=None):  # pragma: no cover
-    """
-    :type create_job_task_record: CreateJobTaskRecord
-    :rtype: JobTask
-    """
-    warn_message = "Unable to create Task {c}".format(
-        c=repr(create_job_task_record))
-
-    def f():
-        return _process_rpost_with_transform(JobTask.from_d)(
-            job_tasks_url, create_job_task_record.to_dict(), headers=headers)
-
-    return _run_func(f, warn_message, ignore_errors)
-
-
-def _update_job_task_state(task_url, update_job_task_record, ignore_errors=True, headers=None):  # pragma: no cover
-    """
-    :type update_job_task_record: UpdateJobTaskRecord
-    :rtype: JobTask
-    """
-    warn_message = "Unable to update Task {c}".format(
-        c=repr(update_job_task_record))
-
-    def f():
-        return _process_rput_with_transform(JobTask.from_d)(
-            task_url, update_job_task_record.to_dict(), headers=headers)
-
-    return _run_func(f, warn_message, ignore_errors)
-
-
-def _update_datastore_file(datastore_url, uuid, path, file_size, set_is_active,
-                           ignore_errors=True, headers=None):  # pragma: no cover
-    warn_message = "Unable to update datastore file {u}".format(u=uuid)
-    total_url = "{b}/{u}".format(b=datastore_url, u=uuid)
-    d = {"fileSize": file_size, "path": path, "isActive": set_is_active}
-
-    def f():
-        return _process_rput(total_url, d, headers=headers)
-
-    return _run_func(f, warn_message, ignore_errors)
-
-
-class CreateJobTaskRecord:
-
-    def __init__(self, task_uuid, task_id, task_type_id,
-                 name, state, created_at=None):
-        self.task_uuid = task_uuid
-        self.task_id = task_id
-        self.task_type_id = task_type_id
-        self.name = name
-        # this must be consistent with the EngineJob states in the scala code
-        self.state = state
-        # Note, the created_at timestamp must have the form
-        # 2016-02-18T23:24:46.569Z
-        # or
-        # 2016-02-18T15:24:46.569-08:00
-        self.created_at = datetime.datetime.now(
-            pytz.utc) if created_at is None else created_at
-
-    def __repr__(self):
-        _d = dict(k=self.__class__.__name__,
-                  u=self.task_uuid,
-                  i=self.task_id,
-                  n=self.name,
-                  s=self.state)
-        return "<{k} uuid:{u} ix:{i} state:{s} name:{n} >".format(**_d)
-
-    def to_dict(self):
-        return dict(uuid=self.task_uuid,
-                    taskId=self.task_id,
-                    taskTypeId=self.task_type_id,
-                    name=self.name,
-                    state=self.state,
-                    createdAt=self.created_at.isoformat())
-
-
-class UpdateJobTaskRecord:
-
-    def __init__(self, task_uuid, state, message, error_message=None):
-        """:type error_message: str | None"""
-        self.task_uuid = task_uuid
-        self.state = state
-        self.message = message
-        # detailed error message (e.g., terse stack trace)
-        self.error_message = error_message
-
-    @staticmethod
-    def from_error(task_uuid, state, message, error_message):
-        # require an detailed error message
-        return UpdateJobTaskRecord(task_uuid, state,
-                                   message,
-                                   error_message=error_message)
-
-    def __repr__(self):
-        _d = dict(k=self.__class__.__name__,
-                  i=self.task_uuid,
-                  s=self.state)
-        return "<{k} i:{i} state:{s} >".format(**_d)
-
-    def to_dict(self):
-        _d = dict(uuid=self.task_uuid,
-                  state=self.state,
-                  message=self.message)
-
-        # spray API is a little odd here. it will complain about
-        # Expected String as JsString, but got null
-        # even though the model is Option[String]
-        if self.error_message is not None:
-            _d['errorMessage'] = self.error_message
-
-        return _d
-
-
-class JobServiceClient:  # pragma: no cover
-    # Keeping this class private. It should only be used from pbsmrtpipe
-
-    def __init__(self, job_root_url, ignore_errors=False):
-        """
-
-        :param job_root_url: Full Root URL to the job
-        :type job_root_url: str
-
-        :param ignore_errors: Only log errors, don't not raise if a request fails. This is intended to be used in a fire-and-forget usecase
-        :type ignore_errors: bool
-
-        This hides the root location of the URL and hides
-        the job id (as an int or uuid)
-
-        The Url has the form:
-
-        http://localhost:8888/smrt-link/job-manager/jobs/pbsmrtpipe/1234
-
-        or
-
-        http://localhost:8888/smrt-link/job-manager/jobs/pbsmrtpipe/5d562c74-e452-11e6-8b96-3c15c2cc8f88
-        """
-        self.job_root_url = job_root_url
-        self.ignore_errors = ignore_errors
-
-    def __repr__(self):
-        _d = dict(k=self.__class__.__name__, u=self.job_root_url)
-        return "<{k} Job URL:{u} >".format(**_d)
-
-    def _get_headers(self):
-        return Constants.HEADERS
-
-    def to_url(self, segment):
-        return "{i}/{s}".format(i=self.job_root_url, s=segment)
-
-    @property
-    def log_url(self):
-        return self.to_url("log")
-
-    @property
-    def datastore_url(self):
-        return self.to_url("datastore")
-
-    @property
-    def tasks_url(self):
-        return self.to_url("tasks")
-
-    def get_task_url(self, task_uuid):
-        """
-        :param task_uuid: Task UUID
-        :return:
-        """
-        return self.to_url("tasks/{t}".format(t=task_uuid))
-
-    def log_workflow_progress(self, message, level,
-                              source_id, ignore_errors=True):
-        return log_pbsmrtpipe_progress(
-            self.log_url, message, level, source_id, ignore_errors=ignore_errors)
-
-    def add_datastore_file(self, datastore_file, ignore_errors=True):
-        return add_datastore_file(
-            self.datastore_url, datastore_file, ignore_errors=ignore_errors)
-
-    def update_datastore_file(
-            self, uuid, file_size=None, path=None, set_is_active=True, ignore_errors=True):
-        return _update_datastore_file(
-            self.datastore_url, uuid, path, file_size, set_is_active, ignore_errors)
-
-    def create_task(self, task_uuid, task_id,
-                    task_type_id, name, created_at=None):
-        """
-
-        :param task_uuid: Globally unique task id
-        :param task_id: Unique within respect to the job
-        :param task_type_id: ToolContract or task id (e.g., pbcommand.tasks.alpha)
-        :param name: Display name of task
-        :param created_at: time task was created at (will be set if current time if None)
-        """
-        r = CreateJobTaskRecord(task_uuid, task_id, task_type_id,
-                                name, JobStates.CREATED, created_at=created_at)
-
-        return _create_job_task(self.tasks_url, r)
-
-    def update_task_status(self, task_uuid, state,
-                           message, error_message=None):
-
-        task_url = self.get_task_url(task_uuid)
-
-        u = UpdateJobTaskRecord(
-            task_uuid,
-            state,
-            message,
-            error_message=error_message)
-
-        return _update_job_task_state(
-            task_url, u, ignore_errors=self.ignore_errors)
-
-    def update_task_to_failed(self, task_uuid, message,
-                              detailed_error_message):
-        task_url = self.get_task_url(task_uuid)
-        state = JobStates.FAILED
-
-        u = UpdateJobTaskRecord(task_uuid,
-                                state,
-                                message,
-                                error_message=detailed_error_message)
-
-        return _update_job_task_state(task_url, u)
+    def get_sl_api(self, path):
+        service_url = f"{self.uri}{path}"
+        t1 = time.time()
+        r = requests.get(service_url, headers=self._get_headers(), verify=False)
+        t2 = time.time()
+        log.info("Response time: {:.1f}s".format(t2 - t1))
+        r.raise_for_status()
+        return r.json()
 
 
 # -----------------------------------------------------------------------
@@ -1546,3 +1297,54 @@ def get_smrtlink_client_from_args(args):
         port=args.port,
         user=args.user,
         password=args.password)
+
+
+def run_client_with_retry(fx, host, port, user, password,
+                          sleep_time=60,
+                          retry_on=(500, 503)):
+    """
+    Obtain a services client and run the specified function on it - this can
+    be essentially any services call - and return the result.  If the query
+    fails due to 401 Unauthorized, the call will be retried with a new token.
+    HTTP 500 and 503 errors will be retried without re-authenticating.
+    """
+    def _get_client():
+        return get_smrtlink_client(host, port, user, password)
+    auth_errors = 0
+    started_at = time.time()
+    retry_time = sleep_time
+    client = _get_client()
+    while True:
+        try:
+            result = fx(client)
+        except HTTPError as e:
+            status = e.response.status_code
+            log.info("Got error {e} (code = {c})".format(e=str(e), c=status))
+            if status == 401:
+                auth_errors += 1
+                if auth_errors > 10:
+                    raise RuntimeError(
+                        "10 successive HTTP 401 errors, exiting")
+                log.warning("Authentication error, will retry with new token")
+                client = _get_client()
+                continue
+            elif status in retry_on:
+                log.warning("Got HTTP {c}, will retry in {d}s".format(
+                    c=status, d=retry_time))
+                time.sleep(retry_time)
+                # if a retryable error occurs, we increment the retry time
+                # up to a max of 30 minutes
+                retry_time = max(1800, retry_time + sleep_time)
+                continue
+            else:
+                raise
+        except (ConnectionError, ProtocolError) as e:
+            log.warning("Connection error: {e}".format(e=str(e)))
+            log.info("Will retry in {d}s".format(d=retry_time))
+            time.sleep(retry_time)
+            # if a retryable error occurs, we increment the retry time
+            # up to a max of 30 minutes
+            retry_time = max(1800, retry_time + sleep_time)
+            continue
+        else:
+            return result
diff --git a/pbcommand/services/job_poller.py b/pbcommand/services/job_poller.py
index b738c9b..c3db2af 100644
--- a/pbcommand/services/job_poller.py
+++ b/pbcommand/services/job_poller.py
@@ -24,6 +24,8 @@ __version__ = "0.1"
 log = logging.getLogger(__name__)
 
 
+# FIXME this overlaps with run_client_with_retry, can we consolidate the
+# general error handling?
 def poll_for_job_completion(job_id,
                             host,
                             port,
@@ -40,6 +42,7 @@ def poll_for_job_completion(job_id,
     auth_errors = 0
     external_job_id = None
     LOG_INTERVAL = 600
+    SLEEP_TIME_MAX = 600
     i = 0
     try:
         client = _get_client()
@@ -51,12 +54,15 @@ def poll_for_job_completion(job_id,
                 job_json = _process_rget(url, headers=client._get_headers())
             except HTTPError as e:
                 status = e.response.status_code
-                log.info("Got error {e} (code = {c})".format(e=str(e), c=status))
+                log.info("Got error {e} (code = {c})".format(
+                    e=str(e), c=status))
                 if status == 401:
                     auth_errors += 1
                     if auth_errors > 10:
-                        raise RuntimeError("10 successive HTTP 401 errors, exiting")
-                    log.warning("Authentication error, will retry with new token")
+                        raise RuntimeError(
+                            "10 successive HTTP 401 errors, exiting")
+                    log.warning(
+                        "Authentication error, will retry with new token")
                     client = _get_client()
                     continue
                 elif status in retry_on:
@@ -100,6 +106,9 @@ def poll_for_job_completion(job_id,
                             "Exceeded runtime {r} of {t}".format(
                                 r=run_time, t=time_out))
                 time.sleep(sleep_time)
+                # after an hour we increase the wait
+                if run_time > 3600:
+                    sleep_time = min(SLEEP_TIME_MAX, sleep_time * 5)
     except KeyboardInterrupt:
         if abort_on_interrupt:
             client.terminate_job_id(job_id)
diff --git a/pbcommand/services/resolver.py b/pbcommand/services/resolver.py
index 1dbdb98..483471b 100644
--- a/pbcommand/services/resolver.py
+++ b/pbcommand/services/resolver.py
@@ -11,7 +11,7 @@ import os.path as op
 import sys
 
 from pbcommand.models.common import FileTypes
-from pbcommand.services._service_access_layer import get_smrtlink_client
+from pbcommand.services._service_access_layer import get_smrtlink_client, run_client_with_retry
 from pbcommand.utils import setup_log
 from pbcommand.cli import get_default_argparser_with_base_opts, pacbio_args_runner
 
@@ -80,16 +80,8 @@ def _find_alignments(datastore):
 
 
 class Resolver:
-    def __init__(self,
-                 host,
-                 port,
-                 user=None,
-                 password=None):
-        self._host = host
-        self._port = port
-        self._user = user
-        self._password = password
-        self._client = get_smrtlink_client(host, port, user, password)
+    def __init__(self, client):
+        self._client = client
 
     def _get_job_datastore_reports(self, job_id):
         datastore = self._client.get_analysis_job_datastore(job_id)
@@ -130,24 +122,31 @@ class Resolver:
 
 
 def run_args(args):
-    resolver = Resolver(args.host, args.port, args.user, args.password)
-    resource = None
-    if args.resource_type == ResourceTypes.JOB_PATH:
-        resource = resolver.resolve_job(args.job_id)
-    elif args.resource_type == ResourceTypes.ALIGNMENTS:
-        resource = resolver.resolve_alignments(args.job_id)
-    elif args.resource_type == ResourceTypes.PREASSEMBLY:
-        resource = resolver.resolve_preassembly_stats(args.job_id)
-    elif args.resource_type == ResourceTypes.POLISHED_ASSEMBLY:
-        resource = resolver.resolve_polished_assembly_stats(args.job_id)
-    elif args.resource_type == ResourceTypes.MAPPING_STATS:
-        resource = resolver.resolve_mapping_stats(args.job_id)
-    elif args.resource_type == ResourceTypes.SUBREADS_ENTRY:
-        resource = resolver.resolve_input_subreads(args.job_id)
-    else:
-        raise NotImplementedError(
-            "Can't retrieve resource type '%s'" %
-            args.resource_type)
+    def _run_resolver(client):
+        resolver = Resolver(client)
+        resource = None
+        if args.resource_type == ResourceTypes.JOB_PATH:
+            resource = resolver.resolve_job(args.job_id)
+        elif args.resource_type == ResourceTypes.ALIGNMENTS:
+            resource = resolver.resolve_alignments(args.job_id)
+        elif args.resource_type == ResourceTypes.PREASSEMBLY:
+            resource = resolver.resolve_preassembly_stats(args.job_id)
+        elif args.resource_type == ResourceTypes.POLISHED_ASSEMBLY:
+            resource = resolver.resolve_polished_assembly_stats(args.job_id)
+        elif args.resource_type == ResourceTypes.MAPPING_STATS:
+            resource = resolver.resolve_mapping_stats(args.job_id)
+        elif args.resource_type == ResourceTypes.SUBREADS_ENTRY:
+            resource = resolver.resolve_input_subreads(args.job_id)
+        else:
+            raise NotImplementedError(
+                "Can't retrieve resource type '%s'" % args.resource_type)
+        return resource
+
+    resource = run_client_with_retry(_run_resolver,
+                                     args.host,
+                                     args.port,
+                                     args.user,
+                                     args.password)
     print(resource)
     if args.make_symlink is not None:
         if op.exists(args.make_symlink):
diff --git a/pbcommand/utils.py b/pbcommand/utils.py
index 6b22bd8..1f17e6e 100644
--- a/pbcommand/utils.py
+++ b/pbcommand/utils.py
@@ -470,16 +470,17 @@ def get_dataset_metadata(path):
     :raises: ValueError
     :return: DataSetMetaData
     """
-    uuid = mt = None
+    uuid = mt = name = None
     for event, element in ET.iterparse(path, events=("start",)):
         uuid = element.get("UniqueId")
         mt = element.get("MetaType")
+        name = element.get("Name")
         break
     else:
         raise ValueError(
             'Did not find events=("start",) in XML path={}'.format(path))
     if mt in FileTypes.ALL_DATASET_TYPES().keys():
-        return DataSetMetaData(uuid, mt)
+        return DataSetMetaData(uuid, mt, name)
     else:
         raise ValueError("Unsupported dataset type '{t}'".format(t=mt))
 
diff --git a/setup.py b/setup.py
index 0dbf94c..cfbe0a6 100644
--- a/setup.py
+++ b/setup.py
@@ -15,7 +15,7 @@ test_deps = [
 
 setup(
     name='pbcommand',
-    version='2.2.2',
+    version='2.3.2',
     author='Pacific Biosciences',
     author_email='devnet@pacificbiosciences.com',
     description='Library and Tools for interfacing with PacBio® data CLI tools',
@@ -45,5 +45,5 @@ setup(
         'interactive': [
             'prompt_toolkit',
         ]},
-    python_requires='>=3.7',
+    python_requires='>=3.9',
 )
diff --git a/tests/test_utils.py b/tests/test_utils.py
index 1587ecb..9514c03 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -105,6 +105,7 @@ class TestUtils:
         import pbtestdata
         md = get_dataset_metadata(pbtestdata.get_file("subreads-xml"))
         assert md.metatype == "PacBio.DataSet.SubreadSet"
+        assert md.name == "subreads-xml"
 
         from pbcore.io import SubreadSet
         ds = SubreadSet(pbtestdata.get_file("subreads-xml"))