diff --git a/PKG-INFO b/PKG-INFO
index e83782b..b891599 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 2.1
 Name: eliot
-Version: 1.11.0
+Version: 1.13.0
 Summary: Logging library that tells you why it happened
 Home-page: https://github.com/itamarst/eliot/
 Maintainer: Itamar Turner-Trauring
@@ -37,7 +37,7 @@ Description: Eliot: Logging that tells you *why* it happened
         
         Eliot is only used to generate your logs; you will might need tools like Logstash and ElasticSearch to aggregate and store logs if you are using multiple processes across multiple machines.
         
-        Eliot supports Python 3.5, 3.6, 3.7, and 3.8, as well as PyPy3.
+        Eliot supports Python 3.6, 3.7, 3.8, and 3.9, as well as PyPy3.
         It is maintained by Itamar Turner-Trauring, and released under the Apache 2.0 License.
         
         Python 2.7 is in legacy support mode, with the last release supported being 1.7; see `here <https://eliot.readthedocs.io/en/stable/python2.html>`_ for details.
@@ -64,14 +64,14 @@ Classifier: License :: OSI Approved :: Apache Software License
 Classifier: Operating System :: OS Independent
 Classifier: Programming Language :: Python
 Classifier: Programming Language :: Python :: 3
-Classifier: Programming Language :: Python :: 3.5
 Classifier: Programming Language :: Python :: 3.6
 Classifier: Programming Language :: Python :: 3.7
 Classifier: Programming Language :: Python :: 3.8
+Classifier: Programming Language :: Python :: 3.9
 Classifier: Programming Language :: Python :: Implementation :: CPython
 Classifier: Programming Language :: Python :: Implementation :: PyPy
 Classifier: Topic :: System :: Logging
-Requires-Python: >=3.5.3
-Provides-Extra: test
+Requires-Python: >=3.6.0
 Provides-Extra: journald
+Provides-Extra: test
 Provides-Extra: dev
diff --git a/README.rst b/README.rst
index 8965acd..6b80c0a 100644
--- a/README.rst
+++ b/README.rst
@@ -29,7 +29,7 @@ Eliot supports a range of use cases and 3rd party libraries:
 
 Eliot is only used to generate your logs; you will might need tools like Logstash and ElasticSearch to aggregate and store logs if you are using multiple processes across multiple machines.
 
-Eliot supports Python 3.5, 3.6, 3.7, and 3.8, as well as PyPy3.
+Eliot supports Python 3.6, 3.7, 3.8, and 3.9, as well as PyPy3.
 It is maintained by Itamar Turner-Trauring, and released under the Apache 2.0 License.
 
 Python 2.7 is in legacy support mode, with the last release supported being 1.7; see `here <https://eliot.readthedocs.io/en/stable/python2.html>`_ for details.
diff --git a/debian/changelog b/debian/changelog
index 88896e0..cb2878d 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,4 +1,4 @@
-python-eliot (1.11.0-2) UNRELEASED; urgency=medium
+python-eliot (1.13.0-1) UNRELEASED; urgency=medium
 
   [ Ondřej Nový ]
   * d/control: Update Maintainer field with new Debian Python Team
@@ -11,7 +11,7 @@ python-eliot (1.11.0-2) UNRELEASED; urgency=medium
     Repository-Browse.
   * Update standards version to 4.5.0, no changes needed.
 
- -- Ondřej Nový <onovy@debian.org>  Thu, 24 Sep 2020 08:44:30 +0200
+ -- Ondřej Nový <onovy@debian.org>  Mon, 29 Mar 2021 01:21:48 -0000
 
 python-eliot (1.11.0-1) unstable; urgency=medium
 
diff --git a/docs/source/generating/testing.rst b/docs/source/generating/testing.rst
index cb18bd6..86ce2a8 100644
--- a/docs/source/generating/testing.rst
+++ b/docs/source/generating/testing.rst
@@ -233,6 +233,35 @@ Or we can simplify further by using ``assertHasMessage`` and ``assertHasAction``
             self.assertEqual(servers, [msg.message["server"] for msg in messages])
 
 
+Custom JSON encoding
+--------------------
+
+Just like a ``FileDestination`` can have a custom JSON encoder, so can your tests, so you can validate your messages with that JSON encoder:
+
+.. code-block:: python
+
+   from unittest import TestCase
+   from eliot.json import EliotJSONEncoder
+   from eliot.testing import capture_logging
+
+   class MyClass:
+       def __init__(self, x):
+           self.x = x
+
+   class MyEncoder(EliotJSONEncoder):
+       def default(self, obj):
+           if isinstance(obj, MyClass):
+               return {"x": obj.x}
+           return EliotJSONEncoder.default(self, obj)
+
+   class LoggingTests(TestCase):
+       @capture_logging(None, encoder_=MyEncoder)
+       def test_logging(self, logger):
+           # Logged messages will be validated using MyEncoder....
+           ...
+
+Notice that the hyphen after ``encoder_`` is deliberate: by default keyword arguments are passed to the assertion function (the first argument to ``@capture_logging``) so it's marked this way to indicate it's part of Eliot's API.
+
 Custom testing setup
 --------------------
 
diff --git a/docs/source/index.rst b/docs/source/index.rst
index 2615746..c701a0f 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -28,7 +28,7 @@ Eliot is only used to generate your logs; you might still need tools like Logsta
 * **Start here:** :doc:`Quickstart documentation <quickstart>`
 * Need help or have any questions? `File an issue <https://github.com/itamarst/eliot/issues/new>`_.
 * Eliot is licensed under the `Apache 2.0 license <https://github.com/itamarst/eliot/blob/master/LICENSE>`_, and the source code is `available on GitHub <https://github.com/itamarst/eliot>`_.
-* Eliot supports Python 3.8, 3.7, 3.6, and 3.5, as well as PyPy3.
+* Eliot supports Python 3.9, 3.8, 3.7, 3.6, and PyPy3.
   Python 2.7 is in legacy support mode (see :ref:`python2` for details).
 * **Commercial support** is available from `Python⇒Speed <https://pythonspeed.com/services/#eliot>`_.
 * Read on for the full documentation.
diff --git a/docs/source/news.rst b/docs/source/news.rst
index bc82497..eb9fb0e 100644
--- a/docs/source/news.rst
+++ b/docs/source/news.rst
@@ -1,6 +1,29 @@
 What's New
 ==========
 
+1.13.0
+^^^^^^
+
+Features:
+
+* ``@capture_logging`` and ``MemoryLogger`` now support specifying a custom JSON encoder. By default they now use Eliot's encoder. This means tests can now match the encoding used by a ``FileDestination``.
+* Added support for Python 3.9.
+
+Deprecation:
+
+* Python 3.5 is no longer supported.
+
+1.12.0
+^^^^^^
+
+Features:
+
+* Dask support now includes support for tracing logging of ``dask.persist()``, via wrapper API ``eliot.dask.persist_with_trace()``.
+
+Bug fixes:
+
+* Dask edge cases that previously weren't handled correctly should work better.
+
 1.11.0
 ^^^^^^
 
diff --git a/docs/source/reading/reading.rst b/docs/source/reading/reading.rst
index 18c84a9..cba0b45 100644
--- a/docs/source/reading/reading.rst
+++ b/docs/source/reading/reading.rst
@@ -18,7 +18,7 @@ Eliot includes a command-line tool that makes it easier to read JSON-formatted E
 
 Run ``eliot-prettyprint --help`` to see the various formatting options; you can for example use a more compact one-message-per-line format.
 
-Additionally, the **highly recommended third-party `eliot-tree`_ tool** renders JSON-formatted Eliot messages into a tree visualizing the tasks' actions.
+Additionally, the **highly recommended** third-party `eliot-tree`_ tool renders JSON-formatted Eliot messages into a tree visualizing the tasks' actions.
 
 
 Filtering logs
diff --git a/docs/source/scientific-computing.rst b/docs/source/scientific-computing.rst
index 900335b..9f7cec9 100644
--- a/docs/source/scientific-computing.rst
+++ b/docs/source/scientific-computing.rst
@@ -44,8 +44,9 @@ In order to do this you will need to:
 * Ensure all worker processes write the Eliot logs to disk (if you're using the ``multiprocessing`` or ``distributed`` backends).
 * If you're using multiple worker machines, aggregate all log files into a single place, so you can more easily analyze them with e.g. `eliot-tree <https://github.com/jonathanj/eliottree>`_.
 * Replace ``dask.compute()`` with ``eliot.dask.compute_with_trace()``.
+* Replace ``dask.persist()`` with ``eliot.dask.persist_with_trace()``.
 
-In the following example, you can see how this works for a Dask run using ``distributed``, the recommended Dask scheduler.
+In the following example, you can see how this works for a Dask run using ``distributed``, the recommended Dask scheduler for more sophisticated use cases.
 We'll be using multiple worker processes, but only use a single machine:
 
 .. literalinclude:: ../../examples/dask_eliot.py
diff --git a/eliot.egg-info/PKG-INFO b/eliot.egg-info/PKG-INFO
index e83782b..b891599 100644
--- a/eliot.egg-info/PKG-INFO
+++ b/eliot.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 2.1
 Name: eliot
-Version: 1.11.0
+Version: 1.13.0
 Summary: Logging library that tells you why it happened
 Home-page: https://github.com/itamarst/eliot/
 Maintainer: Itamar Turner-Trauring
@@ -37,7 +37,7 @@ Description: Eliot: Logging that tells you *why* it happened
         
         Eliot is only used to generate your logs; you will might need tools like Logstash and ElasticSearch to aggregate and store logs if you are using multiple processes across multiple machines.
         
-        Eliot supports Python 3.5, 3.6, 3.7, and 3.8, as well as PyPy3.
+        Eliot supports Python 3.6, 3.7, 3.8, and 3.9, as well as PyPy3.
         It is maintained by Itamar Turner-Trauring, and released under the Apache 2.0 License.
         
         Python 2.7 is in legacy support mode, with the last release supported being 1.7; see `here <https://eliot.readthedocs.io/en/stable/python2.html>`_ for details.
@@ -64,14 +64,14 @@ Classifier: License :: OSI Approved :: Apache Software License
 Classifier: Operating System :: OS Independent
 Classifier: Programming Language :: Python
 Classifier: Programming Language :: Python :: 3
-Classifier: Programming Language :: Python :: 3.5
 Classifier: Programming Language :: Python :: 3.6
 Classifier: Programming Language :: Python :: 3.7
 Classifier: Programming Language :: Python :: 3.8
+Classifier: Programming Language :: Python :: 3.9
 Classifier: Programming Language :: Python :: Implementation :: CPython
 Classifier: Programming Language :: Python :: Implementation :: PyPy
 Classifier: Topic :: System :: Logging
-Requires-Python: >=3.5.3
-Provides-Extra: test
+Requires-Python: >=3.6.0
 Provides-Extra: journald
+Provides-Extra: test
 Provides-Extra: dev
diff --git a/eliot.egg-info/requires.txt b/eliot.egg-info/requires.txt
index 1b72e65..22999a7 100644
--- a/eliot.egg-info/requires.txt
+++ b/eliot.egg-info/requires.txt
@@ -22,3 +22,4 @@ cffi>=1.1.2
 hypothesis>=1.14.0
 testtools
 pytest
+pytest-xdist
diff --git a/eliot/_output.py b/eliot/_output.py
index 6f3b0e0..debff7c 100644
--- a/eliot/_output.py
+++ b/eliot/_output.py
@@ -262,8 +262,12 @@ class MemoryLogger(object):
         not mutate this list.
     """
 
-    def __init__(self):
+    def __init__(self, encoder=EliotJSONEncoder):
+        """
+        @param encoder: A JSONEncoder subclass to use when encoding JSON.
+        """
         self._lock = Lock()
+        self._encoder = encoder
         self.reset()
 
     @exclusively
@@ -344,8 +348,7 @@ class MemoryLogger(object):
             serializer.serialize(dictionary)
 
         try:
-            bytesjson.dumps(dictionary)
-            pyjson.dumps(dictionary)
+            pyjson.dumps(dictionary, cls=self._encoder)
         except Exception as e:
             raise TypeError("Message %s doesn't encode to JSON: %s" % (dictionary, e))
 
@@ -462,6 +465,8 @@ def to_file(output_file, encoder=EliotJSONEncoder):
     Add a destination that writes a JSON message per line to the given file.
 
     @param output_file: A file-like object.
+
+    @param encoder: A JSONEncoder subclass to use when encoding JSON.
     """
     Logger._destinations.add(FileDestination(file=output_file, encoder=encoder))
 
diff --git a/eliot/_validation.py b/eliot/_validation.py
index 766f9ef..c69dc43 100644
--- a/eliot/_validation.py
+++ b/eliot/_validation.py
@@ -388,7 +388,7 @@ class ActionType(object):
         this action's start message.
 
     @ivar successFields: A C{list} of L{Field} instances which can appear in
-        this action's succesful finish message.
+        this action's successful finish message.
 
     @ivar failureFields: A C{list} of L{Field} instances which can appear in
         this action's failed finish message (in addition to the built-in
diff --git a/eliot/_version.py b/eliot/_version.py
index b349f64..7ec23e7 100644
--- a/eliot/_version.py
+++ b/eliot/_version.py
@@ -8,11 +8,11 @@ import json
 
 version_json = '''
 {
- "date": "2019-12-07T14:22:41-0500",
+ "date": "2020-12-15T14:09:24-0500",
  "dirty": false,
  "error": null,
- "full-revisionid": "4ca0fa7519321aceec860e982123a5c448a9debd",
- "version": "1.11.0"
+ "full-revisionid": "e858c8ef7302e22ca05f37565d929db8e0fab153",
+ "version": "1.13.0"
 }
 '''  # END VERSION_JSON
 
diff --git a/eliot/dask.py b/eliot/dask.py
index 6e0695d..2f8d07e 100644
--- a/eliot/dask.py
+++ b/eliot/dask.py
@@ -2,8 +2,17 @@
 
 from pyrsistent import PClass, field
 
-from dask import compute, optimize
-from dask.core import toposort, get_dependencies
+from dask import compute, optimize, persist
+
+try:
+    from dask.distributed import Future
+except:
+
+    class Future(object):
+        pass
+
+
+from dask.core import toposort, get_dependencies, ishashable
 from . import start_action, current_action, Action
 
 
@@ -75,6 +84,22 @@ def compute_with_trace(*args):
         return compute(*optimized, optimize_graph=False)
 
 
+def persist_with_trace(*args):
+    """Do Dask persist(), but with added Eliot tracing.
+
+    Known issues:
+
+        1. Retries will confuse Eliot.  Probably need different
+           distributed-tree mechanism within Eliot to solve that.
+    """
+    # 1. Create top-level Eliot Action:
+    with start_action(action_type="dask:persist"):
+        # In order to reduce logging verbosity, add logging to the already
+        # optimized graph:
+        optimized = optimize(*args, optimizations=[_add_logging])
+        return persist(*optimized, optimize_graph=False)
+
+
 def _add_logging(dsk, ignore=None):
     """
     Add logging to a Dask graph.
@@ -101,34 +126,43 @@ def _add_logging(dsk, ignore=None):
     key_names = {}
     for key in keys:
         value = dsk[key]
-        if not callable(value) and value in keys:
+        if not callable(value) and ishashable(value) and value in keys:
             # It's an alias for another key:
             key_names[key] = key_names[value]
         else:
             key_names[key] = simplify(key)
 
-    # 2. Create Eliot child Actions for each key, in topological order:
-    key_to_action_id = {key: str(ctx.serialize_task_id(), "utf-8") for key in keys}
+    # Values in the graph can be either:
+    #
+    # 1. A list of other values.
+    # 2. A tuple, where first value might be a callable, aka a task.
+    # 3. A literal of some sort.
+    def maybe_wrap(key, value):
+        if isinstance(value, list):
+            return [maybe_wrap(key, v) for v in value]
+        elif isinstance(value, tuple):
+            func = value[0]
+            args = value[1:]
+            if not callable(func):
+                # Not a callable, so nothing to wrap.
+                return value
+            wrapped_func = _RunWithEliotContext(
+                task_id=str(ctx.serialize_task_id(), "utf-8"),
+                func=func,
+                key=key_names[key],
+                dependencies=[key_names[k] for k in get_dependencies(dsk, key)],
+            )
+            return (wrapped_func,) + args
+        else:
+            return value
 
-    # 3. Replace function with wrapper that logs appropriate Action:
+    # Replace function with wrapper that logs appropriate Action; iterate in
+    # topological order so action task levels are in reasonable order.
     for key in keys:
-        func = dsk[key][0]
-        args = dsk[key][1:]
-        if not callable(func):
-            # This key is just an alias for another key, no need to add
-            # logging:
-            result[key] = dsk[key]
-            continue
-        wrapped_func = _RunWithEliotContext(
-            task_id=key_to_action_id[key],
-            func=func,
-            key=key_names[key],
-            dependencies=[key_names[k] for k in get_dependencies(dsk, key)],
-        )
-        result[key] = (wrapped_func,) + tuple(args)
+        result[key] = maybe_wrap(key, dsk[key])
 
     assert set(result.keys()) == set(dsk.keys())
     return result
 
 
-__all__ = ["compute_with_trace"]
+__all__ = ["compute_with_trace", "persist_with_trace"]
diff --git a/eliot/testing.py b/eliot/testing.py
index 4e0ba2c..7fdcba4 100644
--- a/eliot/testing.py
+++ b/eliot/testing.py
@@ -20,6 +20,7 @@ from ._action import (
 from ._message import MESSAGE_TYPE_FIELD, TASK_LEVEL_FIELD, TASK_UUID_FIELD
 from ._output import MemoryLogger
 from . import _output
+from .json import EliotJSONEncoder
 
 COMPLETED_STATUSES = (FAILED_STATUS, SUCCEEDED_STATUS)
 
@@ -298,7 +299,9 @@ def swap_logger(logger):
     return previous_logger
 
 
-def validateLogging(assertion, *assertionArgs, **assertionKwargs):
+def validateLogging(
+    assertion, *assertionArgs, encoder_=EliotJSONEncoder, **assertionKwargs
+):
     """
     Decorator factory for L{unittest.TestCase} methods to add logging
     validation.
@@ -330,6 +333,8 @@ def validateLogging(assertion, *assertionArgs, **assertionKwargs):
 
     @param assertionKwargs: Additional keyword arguments to pass to
         C{assertion}.
+
+    @param encoder_: C{json.JSONEncoder} subclass to use when validating JSON.
     """
 
     def decorator(function):
@@ -337,7 +342,7 @@ def validateLogging(assertion, *assertionArgs, **assertionKwargs):
         def wrapper(self, *args, **kwargs):
             skipped = False
 
-            kwargs["logger"] = logger = MemoryLogger()
+            kwargs["logger"] = logger = MemoryLogger(encoder=encoder_)
             self.addCleanup(check_for_errors, logger)
             # TestCase runs cleanups in reverse order, and we want this to
             # run *before* tracebacks are checked:
@@ -361,7 +366,9 @@ def validateLogging(assertion, *assertionArgs, **assertionKwargs):
 validate_logging = validateLogging
 
 
-def capture_logging(assertion, *assertionArgs, **assertionKwargs):
+def capture_logging(
+    assertion, *assertionArgs, encoder_=EliotJSONEncoder, **assertionKwargs
+):
     """
     Capture and validate all logging that doesn't specify a L{Logger}.
 
@@ -369,7 +376,9 @@ def capture_logging(assertion, *assertionArgs, **assertionKwargs):
     """
 
     def decorator(function):
-        @validate_logging(assertion, *assertionArgs, **assertionKwargs)
+        @validate_logging(
+            assertion, *assertionArgs, encoder_=encoder_, **assertionKwargs
+        )
         @wraps(function)
         def wrapper(self, *args, **kwargs):
             logger = kwargs["logger"]
diff --git a/eliot/tests/common.py b/eliot/tests/common.py
index 6e26b26..7aa042b 100644
--- a/eliot/tests/common.py
+++ b/eliot/tests/common.py
@@ -3,6 +3,20 @@ Common testing infrastructure.
 """
 
 from io import BytesIO
+from json import JSONEncoder
+
+
+class CustomObject(object):
+    """Gets encoded to JSON."""
+
+
+class CustomJSONEncoder(JSONEncoder):
+    """JSONEncoder that knows about L{CustomObject}."""
+
+    def default(self, o):
+        if isinstance(o, CustomObject):
+            return "CUSTOM!"
+        return JSONEncoder.default(self, o)
 
 
 class FakeSys(object):
diff --git a/eliot/tests/test_dask.py b/eliot/tests/test_dask.py
index 672aa9e..f652604 100644
--- a/eliot/tests/test_dask.py
+++ b/eliot/tests/test_dask.py
@@ -3,15 +3,23 @@
 from unittest import TestCase, skipUnless
 
 from ..testing import capture_logging, LoggedAction, LoggedMessage
-from .. import start_action, Message
+from .. import start_action, log_message
 
 try:
     import dask
     from dask.bag import from_sequence
+    from dask.distributed import Client
+    import dask.dataframe as dd
+    import pandas as pd
 except ImportError:
     dask = None
 else:
-    from ..dask import compute_with_trace, _RunWithEliotContext, _add_logging
+    from ..dask import (
+        compute_with_trace,
+        _RunWithEliotContext,
+        _add_logging,
+        persist_with_trace,
+    )
 
 
 @skipUnless(dask, "Dask not available.")
@@ -28,22 +36,66 @@ class DaskTests(TestCase):
         bag = bag.fold(lambda x, y: x + y)
         self.assertEqual(dask.compute(bag), compute_with_trace(bag))
 
+    def test_future(self):
+        """compute_with_trace() can handle Futures."""
+        client = Client(processes=False)
+        self.addCleanup(client.shutdown)
+        [bag] = dask.persist(from_sequence([1, 2, 3]))
+        bag = bag.map(lambda x: x * 5)
+        result = dask.compute(bag)
+        self.assertEqual(result, ([5, 10, 15],))
+        self.assertEqual(result, compute_with_trace(bag))
+
+    def test_persist_result(self):
+        """persist_with_trace() runs the same logic as process()."""
+        client = Client(processes=False)
+        self.addCleanup(client.shutdown)
+        bag = from_sequence([1, 2, 3])
+        bag = bag.map(lambda x: x * 7)
+        self.assertEqual(
+            [b.compute() for b in dask.persist(bag)],
+            [b.compute() for b in persist_with_trace(bag)],
+        )
+
+    def test_persist_pandas(self):
+        """persist_with_trace() with a Pandas dataframe.
+
+        This ensures we don't blow up, which used to be the case.
+        """
+        df = pd.DataFrame()
+        df = dd.from_pandas(df, npartitions=1)
+        persist_with_trace(df)
+
+    @capture_logging(None)
+    def test_persist_logging(self, logger):
+        """persist_with_trace() preserves Eliot context."""
+
+        def persister(bag):
+            [bag] = persist_with_trace(bag)
+            return dask.compute(bag)
+
+        self.assert_logging(logger, persister, "dask:persist")
+
     @capture_logging(None)
-    def test_logging(self, logger):
+    def test_compute_logging(self, logger):
         """compute_with_trace() preserves Eliot context."""
+        self.assert_logging(logger, compute_with_trace, "dask:compute")
+
+    def assert_logging(self, logger, run_with_trace, top_action_name):
+        """Utility function for _with_trace() logging tests."""
 
         def mult(x):
-            Message.log(message_type="mult")
+            log_message(message_type="mult")
             return x * 4
 
         def summer(x, y):
-            Message.log(message_type="finally")
+            log_message(message_type="finally")
             return x + y
 
         bag = from_sequence([1, 2])
         bag = bag.map(mult).fold(summer)
         with start_action(action_type="act1"):
-            compute_with_trace(bag)
+            run_with_trace(bag)
 
         [logged_action] = LoggedAction.ofType(logger.messages, "act1")
         self.assertEqual(
@@ -51,9 +103,11 @@ class DaskTests(TestCase):
             {
                 "act1": [
                     {
-                        "dask:compute": [
+                        top_action_name: [
                             {"eliot:remote_task": ["dask:task", "mult"]},
                             {"eliot:remote_task": ["dask:task", "mult"]},
+                            {"eliot:remote_task": ["dask:task"]},
+                            {"eliot:remote_task": ["dask:task"]},
                             {"eliot:remote_task": ["dask:task", "finally"]},
                         ]
                     }
@@ -62,20 +116,36 @@ class DaskTests(TestCase):
         )
 
         # Make sure dependencies are tracked:
-        mult1_msg, mult2_msg, final_msg = LoggedMessage.ofType(
-            logger.messages, "dask:task"
+        (
+            mult1_msg,
+            mult2_msg,
+            reduce1_msg,
+            reduce2_msg,
+            final_msg,
+        ) = LoggedMessage.ofType(logger.messages, "dask:task")
+        self.assertEqual(
+            reduce1_msg.message["dependencies"], [mult1_msg.message["key"]]
+        )
+        self.assertEqual(
+            reduce2_msg.message["dependencies"], [mult2_msg.message["key"]]
         )
         self.assertEqual(
             sorted(final_msg.message["dependencies"]),
-            sorted([mult1_msg.message["key"], mult2_msg.message["key"]]),
+            sorted([reduce1_msg.message["key"], reduce2_msg.message["key"]]),
         )
 
         # Make sure dependencies are logically earlier in the logs:
         self.assertTrue(
-            mult1_msg.message["task_level"] < final_msg.message["task_level"]
+            mult1_msg.message["task_level"] < reduce1_msg.message["task_level"]
+        )
+        self.assertTrue(
+            mult2_msg.message["task_level"] < reduce2_msg.message["task_level"]
+        )
+        self.assertTrue(
+            reduce1_msg.message["task_level"] < final_msg.message["task_level"]
         )
         self.assertTrue(
-            mult2_msg.message["task_level"] < final_msg.message["task_level"]
+            reduce2_msg.message["task_level"] < final_msg.message["task_level"]
         )
 
 
@@ -83,6 +153,8 @@ class DaskTests(TestCase):
 class AddLoggingTests(TestCase):
     """Tests for _add_logging()."""
 
+    maxDiff = None
+
     def test_add_logging_to_full_graph(self):
         """_add_logging() recreates Dask graph with wrappers."""
         bag = from_sequence([1, 2, 3])
@@ -104,3 +176,52 @@ class AddLoggingTests(TestCase):
             logging_removed[key] = value
 
         self.assertEqual(logging_removed, graph)
+
+    def test_add_logging_explicit(self):
+        """_add_logging() on more edge cases of the graph."""
+
+        def add(s):
+            return s + "s"
+
+        def add2(s):
+            return s + "s"
+
+        # b runs first, then d, then a and c.
+        graph = {
+            "a": "d",
+            "d": [1, 2, (add, "b")],
+            ("b", 0): 1,
+            "c": (add2, "d"),
+        }
+
+        with start_action(action_type="bleh") as action:
+            task_id = action.task_uuid
+            self.assertEqual(
+                _add_logging(graph),
+                {
+                    "d": [
+                        1,
+                        2,
+                        (
+                            _RunWithEliotContext(
+                                task_id=task_id + "@/2",
+                                func=add,
+                                key="d",
+                                dependencies=["b"],
+                            ),
+                            "b",
+                        ),
+                    ],
+                    "a": "d",
+                    ("b", 0): 1,
+                    "c": (
+                        _RunWithEliotContext(
+                            task_id=task_id + "@/3",
+                            func=add2,
+                            key="c",
+                            dependencies=["d"],
+                        ),
+                        "d",
+                    ),
+                },
+            )
diff --git a/eliot/tests/test_output.py b/eliot/tests/test_output.py
index fedad15..6b608c6 100644
--- a/eliot/tests/test_output.py
+++ b/eliot/tests/test_output.py
@@ -32,6 +32,7 @@ from .._output import (
 from .._validation import ValidationError, Field, _MessageSerializer
 from .._traceback import write_traceback
 from ..testing import assertContainsFields
+from .common import CustomObject, CustomJSONEncoder
 
 
 class MemoryLoggerTests(TestCase):
@@ -122,6 +123,27 @@ class MemoryLoggerTests(TestCase):
         )
         self.assertRaises(TypeError, logger.validate)
 
+    @skipUnless(np, "NumPy is not installed.")
+    def test_EliotJSONEncoder(self):
+        """
+        L{MemoryLogger.validate} uses the EliotJSONEncoder by default to do
+        encoding testing.
+        """
+        logger = MemoryLogger()
+        logger.write({"message_type": "type", "foo": np.uint64(12)}, None)
+        logger.validate()
+
+    def test_JSON_custom_encoder(self):
+        """
+        L{MemoryLogger.validate} will use a custom JSON encoder if one was given.
+        """
+        logger = MemoryLogger(encoder=CustomJSONEncoder)
+        logger.write(
+            {"message_type": "type", "custom": CustomObject()},
+            None,
+        )
+        logger.validate()
+
     def test_serialize(self):
         """
         L{MemoryLogger.serialize} returns a list of serialized versions of the
diff --git a/eliot/tests/test_testing.py b/eliot/tests/test_testing.py
index e507607..8df9874 100644
--- a/eliot/tests/test_testing.py
+++ b/eliot/tests/test_testing.py
@@ -4,7 +4,12 @@ Tests for L{eliot.testing}.
 
 from __future__ import unicode_literals
 
-from unittest import SkipTest, TestResult, TestCase
+from unittest import SkipTest, TestResult, TestCase, skipUnless
+
+try:
+    import numpy as np
+except ImportError:
+    np = None
 
 from ..testing import (
     issuperset,
@@ -25,7 +30,8 @@ from .._action import start_action
 from .._message import Message
 from .._validation import ActionType, MessageType, ValidationError, Field
 from .._traceback import write_traceback
-from .. import add_destination, remove_destination, _output
+from .. import add_destination, remove_destination, _output, log_message
+from .common import CustomObject, CustomJSONEncoder
 
 
 class IsSuperSetTests(TestCase):
@@ -740,6 +746,28 @@ class CaptureLoggingTests(ValidateLoggingTestsMixin, TestCase):
         )
 
 
+class JSONEncodingTests(TestCase):
+    """Tests for L{capture_logging} JSON encoder support."""
+
+    @skipUnless(np, "NumPy is not installed.")
+    @capture_logging(None)
+    def test_default_JSON_encoder(self, logger):
+        """
+        L{capture_logging} validates using L{EliotJSONEncoder} by default.
+        """
+        # Default JSON encoder can't handle NumPy:
+        log_message(message_type="hello", number=np.uint32(12))
+
+    @capture_logging(None, encoder_=CustomJSONEncoder)
+    def test_custom_JSON_encoder(self, logger):
+        """
+        L{capture_logging} can be called with a custom JSON encoder, which is then
+        used for validation.
+        """
+        # Default JSON encoder can't handle this custom object:
+        log_message(message_type="hello", object=CustomObject())
+
+
 MESSAGE1 = MessageType(
     "message1", [Field.forTypes("x", [int], "A number")], "A message for testing."
 )
diff --git a/setup.py b/setup.py
index 903a131..e7de861 100644
--- a/setup.py
+++ b/setup.py
@@ -18,10 +18,10 @@ setup(
         "Operating System :: OS Independent",
         "Programming Language :: Python",
         "Programming Language :: Python :: 3",
-        "Programming Language :: Python :: 3.5",
         "Programming Language :: Python :: 3.6",
         "Programming Language :: Python :: 3.7",
         "Programming Language :: Python :: 3.8",
+        "Programming Language :: Python :: 3.9",
         "Programming Language :: Python :: Implementation :: CPython",
         "Programming Language :: Python :: Implementation :: PyPy",
         "Topic :: System :: Logging",
@@ -30,7 +30,7 @@ setup(
     version=versioneer.get_version(),
     cmdclass=versioneer.get_cmdclass(),
     description="Logging library that tells you why it happened",
-    python_requires=">=3.5.3",
+    python_requires=">=3.6.0",
     install_requires=[
         # Python 3 compatibility:
         "six",
@@ -54,6 +54,7 @@ setup(
             # Tasteful testing for Python:
             "testtools",
             "pytest",
+            "pytest-xdist",
         ],
         "dev": [
             # Ensure we can do python_requires correctly: