diff --git a/Doc/library/email.contentmanager.rst b/Doc/library/email.contentmanager.rst
new file mode 100644
index 0000000..f3ba794
--- /dev/null
+++ b/Doc/library/email.contentmanager.rst
@@ -0,0 +1,427 @@
+:mod:`email.contentmanager`: Managing MIME Content
+--------------------------------------------------
+
+.. module:: email.contentmanager
+   :synopsis: Storing and Retrieving Content from MIME Parts
+
+.. moduleauthor:: R. David Murray <rdmurray@bitdance.com>
+.. sectionauthor:: R. David Murray <rdmurray@bitdance.com>
+
+
+.. note::
+
+   The contentmanager module has been included in the standard library on a
+   :term:`provisional basis <provisional package>`. Backwards incompatible
+   changes (up to and including removal of the module) may occur if deemed
+   necessary by the core developers.
+
+.. versionadded:: 3.4
+   as a :term:`provisional module <provisional package>`.
+
+The :mod:`~email.message` module provides a class that can represent an
+arbitrary email message.  That basic message model has a useful and flexible
+API, but it provides only a lower-level API for interacting with the generic
+parts of a message (the headers, generic header parameters, and the payload,
+which may be a list of sub-parts).  This module provides classes and tools
+that provide an enhanced and extensible API for dealing with various specific
+types of content, including the ability to retrieve the content of the message
+as a specialized object type rather than as a simple bytes object.  The module
+automatically takes care of the RFC-specified MIME details (required headers
+and parameters, etc.) for the certain common content types content properties,
+and support for additional types can be added by an application using the
+extension mechanisms.
+
+This module defines the eponymous "Content Manager" classes.  The base
+:class:`.ContentManager` class defines an API for registering content
+management functions which extract data from ``Message`` objects or insert data
+and headers into ``Message`` objects, thus providing a way of converting
+between ``Message`` objects containing data and other representations of that
+data (Python data types, specialized Python objects, external files, etc).  The
+module also defines one concrete content manager: :data:`raw_data_manager`
+converts between MIME content types and ``str`` or ``bytes`` data.  It also
+provides a convenient API for managing the MIME parameters when inserting
+content into ``Message``\ s.  It also handles inserting and extracting
+``Message`` objects when dealing with the ``message/rfc822`` content type.
+
+Another part of the enhanced interface is subclasses of
+:class:`~email.message.Message` that provide new convenience API functions,
+including convenience methods for calling the Content Managers derived from
+this module.
+
+.. note::
+
+   Although :class:`.EmailMessage` and :class:`.MIMEPart` are currently
+   documented in this module because of the provisional nature of the code, the
+   implementation lives in the :mod:`email.message` module.
+
+
+.. class:: EmailMessage(policy=default)
+
+   If *policy* is specified (it must be an instance of a :mod:`~email.policy`
+   class) use the rules it specifies to udpate and serialize the representation
+   of the message.  If *policy* is not set, use the
+   :class:`~email.policy.default` policy, which follows the rules of the email
+   RFCs except for line endings (instead of the RFC mandated ``\r\n``, it uses
+   the Python standard ``\n`` line endings).  For more information see the
+   :mod:`~email.policy` documentation.
+
+   This class is a subclass of :class:`~email.message.Message`.  It adds
+   the following methods:
+
+
+   .. attribute:: is_attachment
+
+      Set to ``True`` if there is a :mailheader:`Content-Disposition` header
+      and its (case insensitive) value is ``attachment``, ``False`` otherwise.
+
+
+   .. method:: get_body(preferencelist=('related', 'html', 'plain'))
+
+      Return the MIME part that is the best candidate to be the "body" of the
+      message.
+
+      *preferencelist* must be a sequence of strings from the set ``related``,
+      ``html``, and ``plain``, and indicates the order of preference for the
+      content type of the part returned.
+
+      Start looking for candidate matches with the object on which the
+      ``get_body`` method is called.
+
+      If ``related`` is not included in *preferencelist*, consider the root
+      part (or subpart of the root part) of any related encountered as a
+      candidate if the (sub-)part matches a preference.
+
+      When encountering a ``multipart/related``, check the ``start`` parameter
+      and if a part with a matching :mailheader:`Content-ID` is found, consider
+      only it when looking for candidate matches.  Otherwise consider only the
+      first (default root) part of the ``multipart/related``.
+
+      If a part has a :mailheader:``Content-Disposition`` header, only consider
+      the part a candidate match if the value of the header is ``inline``.
+
+      If none of the candidates matches any of the preferences in
+      *preferneclist*, return ``None``.
+
+      Notes: (1) For most applications the only *preferencelist* combinations
+      that really make sense are ``('plain',)``, ``('html', 'plain')``, and the
+      default, ``('related', 'html', 'plain')``.  (2) Because matching starts
+      with the object on which ``get_body`` is called, calling ``get_body`` on
+      a ``multipart/related`` will return the object itself unless
+      *preferencelist* has a non-default value. (3) Messages (or message parts)
+      that do not specify a :mailheader:`Content-Type` or whose
+      :mailheader:`Content-Type` header is invalid will be treated as if they
+      are of type ``text/plain``, which may occasionally cause ``get_body`` to
+      return unexpected results.
+
+
+   .. method:: iter_attachments()
+
+      Return an iterator over all of the parts of the message that are not
+      candidate "body" parts.  That is, skip the first occurrence of each of
+      ``text/plain``, ``text/html``, ``multipart/related``, or
+      ``multipart/alternative`` (unless they are explicitly marked as
+      attachments via :mailheader:`Content-Disposition: attachment`), and
+      return all remaining parts.  When applied directly to a
+      ``multipart/related``, return an iterator over the all the related parts
+      except the root part (ie: the part pointed to by the ``start`` parameter,
+      or the first part if there is no ``start`` parameter or the ``start``
+      parameter doesn't match the :mailheader:`Content-ID` of any of the
+      parts).  When applied directly to a ``multipart/alternative`` or a
+      non-``multipart``, return an empty iterator.
+
+
+   .. method:: iter_parts()
+
+      Return an iterator over all of the immediate sub-parts of the message,
+      which will be empty for a non-``multipart``.  (See also
+      :meth:``~email.message.walk``.)
+
+
+   .. method:: get_content(*args, content_manager=None, **kw)
+
+      Call the ``get_content`` method of the *content_manager*, passing self
+      as the message object, and passing along any other arguments or keywords
+      as additional arguments.  If *content_manager* is not specified, use
+      the ``content_manager`` specified by the current :mod:`~email.policy`.
+
+
+   .. method:: set_content(*args, content_manager=None, **kw)
+
+      Call the ``set_content`` method of the *content_manager*, passing self
+      as the message object, and passing along any other arguments or keywords
+      as additional arguments.  If *content_manager* is not specified, use
+      the ``content_manager`` specified by the current :mod:`~email.policy`.
+
+
+   .. method:: make_related(boundary=None)
+
+      Convert a non-``multipart`` message into a ``multipart/related`` message,
+      moving any existing :mailheader:`Content-` headers and payload into a
+      (new) first part of the ``multipart``.  If *boundary* is specified, use
+      it as the boundary string in the multipart, otherwise leave the boundary
+      to be automatically created when it is needed (for example, when the
+      message is serialized).
+
+
+   .. method:: make_alternative(boundary=None)
+
+      Convert a non-``multipart`` or a ``multipart/related`` into a
+      ``multipart/alternative``, moving any existing :mailheader:`Content-`
+      headers and payload into a (new) first part of the ``multipart``.  If
+      *boundary* is specified, use it as the boundary string in the multipart,
+      otherwise leave the boundary to be automatically created when it is
+      needed (for example, when the message is serialized).
+
+
+   .. method:: make_mixed(boundary=None)
+
+      Convert a non-``multipart``, a ``multipart/related``, or a
+      ``multipart-alternative`` into a ``multipart/mixed``, moving any existing
+      :mailheader:`Content-` headers and payload into a (new) first part of the
+      ``multipart``.  If *boundary* is specified, use it as the boundary string
+      in the multipart, otherwise leave the boundary to be automatically
+      created when it is needed (for example, when the message is serialized).
+
+
+   .. method:: add_related(*args, content_manager=None, **kw)
+
+      If the message is a ``multipart/related``, create a new message
+      object, pass all of the arguments to its :meth:`set_content` method,
+      and :meth:`~email.message.Message.attach` it to the ``multipart``.  If
+      the message is a non-``multipart``, call :meth:`make_related` and then
+      proceed as above.  If the message is any other type of ``multipart``,
+      raise a :exc:`TypeError`. If *content_manager* is not specified, use
+      the ``content_manager`` specified by the current :mod:`~email.policy`.
+      If the added part has no :mailheader:`Content-Disposition` header,
+      add one with the value ``inline``.
+
+
+   .. method:: add_alternative(*args, content_manager=None, **kw)
+
+      If the message is a ``multipart/alternative``, create a new message
+      object, pass all of the arguments to its :meth:`set_content` method, and
+      :meth:`~email.message.Message.attach` it to the ``multipart``.  If the
+      message is a non-``multipart`` or ``multipart/related``, call
+      :meth:`make_alternative` and then proceed as above.  If the message is
+      any other type of ``multipart``, raise a :exc:`TypeError`. If
+      *content_manager* is not specified, use the ``content_manager`` specified
+      by the current :mod:`~email.policy`.
+
+
+   .. method:: add_attachment(*args, content_manager=None, **kw)
+
+      If the message is a ``multipart/mixed``, create a new message object,
+      pass all of the arguments to its :meth:`set_content` method, and
+      :meth:`~email.message.Message.attach` it to the ``multipart``.  If the
+      message is a non-``multipart``, ``multipart/related``, or
+      ``multipart/alternative``, call :meth:`make_mixed` and then proceed as
+      above. If *content_manager* is not specified, use the ``content_manager``
+      specified by the current :mod:`~email.policy`.  If the added part
+      has no :mailheader:`Content-Disposition` header, add one with the value
+      ``attachment``.  This method can be used both for explicit attachments
+      (:mailheader:`Content-Disposition: attachment` and ``inline`` attachments
+      (:mailheader:`Content-Disposition: inline`), by passing appropriate
+      options to the ``content_manager``.
+
+
+   .. method:: clear()
+
+      Remove the payload and all of the headers.
+
+
+   .. method:: clear_content()
+
+      Remove the payload and all of the :exc:`Content-` headers, leaving
+      all other headers intact and in their original order.
+
+
+.. class:: ContentManager()
+
+   Base class for content managers.  Provides the standard registry mechanisms
+   to register converters between MIME content and other representations, as
+   well as the ``get_content`` and ``set_content`` dispatch methods.
+
+
+   .. method:: get_content(msg, *args, **kw)
+
+      Look up a handler function based on the ``mimetype`` of *msg* (see next
+      paragraph), call it, passing through all arguments, and return the result
+      of the call.  The expectation is that the handler will extract the
+      payload from *msg* and return an object that encodes information about
+      the extracted data.
+
+      To find the handler, look for the following keys in the registry,
+      stopping with the first one found:
+
+            * the string representing the full MIME type (``maintype/subtype``)
+            * the string representing the ``maintype``
+            * the empty string
+
+      If none of these keys produce a handler, raise a :exc:`KeyError` for the
+      full MIME type.
+
+
+   .. method:: set_content(msg, obj, *args, **kw)
+
+      If the ``maintype`` is ``multipart``, raise a :exc:`TypeError`; otherwise
+      look up a handler function based on the type of *obj* (see next
+      paragraph), call :meth:`~email.message.EmailMessage.clear_content` on the
+      *msg*, and call the handler function, passing through all arguments.  The
+      expectation is that the handler will transform and store *obj* into
+      *msg*, possibly making other changes to *msg* as well, such as adding
+      various MIME headers to encode information needed to interpret the stored
+      data.
+
+      To find the handler, obtain the type of *obj* (``typ = type(obj)``), and
+      look for the following keys in the registry, stopping with the first one
+      found:
+
+           * the type itself (``typ``)
+           * the type's fully qualified name (``typ.__module__ + '.' +
+             typ.__qualname__``).
+           * the type's qualname (``typ.__qualname__``)
+           * the type's name (``typ.__name__``).
+
+      If none of the above match, repeat all of the checks above for each of
+      the types in the :term:`MRO` (``typ.__mro__``).  Finally, if no other key
+      yields a handler, check for a handler for the key ``None``.  If there is
+      no handler for ``None``, raise a :exc:`KeyError` for the fully
+      qualified name of the type.
+
+      Also add a :mailheader:`MIME-Version` header if one is not present (see
+      also :class:`.MIMEPart`).
+
+
+   .. method:: add_get_handler(key, handler)
+
+      Record the function *handler* as the handler for *key*.  For the possible
+      values of *key*, see :meth:`get_content`.
+
+
+   .. method:: add_set_handler(typekey, handler)
+
+      Record *handler* as the function to call when an object of a type
+      matching *typekey* is passed to :meth:`set_content`.  For the possible
+      values of *typekey*, see :meth:`set_content`.
+
+
+.. class:: MIMEPart(policy=default)
+
+    This class represents a subpart of a MIME message.  It is identical to
+    :class:`EmailMessage`, except that no :mailheader:`MIME-Version` headers are
+    added when :meth:`~EmailMessage.set_content` is called, since sub-parts do
+    not need their own :mailheader:`MIME-Version` headers.
+
+
+Content Manager Instances
+~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Currently the email package provides only one concrete content manager,
+:data:`raw_data_manager`, although more may be added in the future.
+:data:`raw_data_manager` is the
+:attr:`~email.policy.EmailPolicy.content_manager` provided by
+:attr:`~email.policy.EmailPolicy` and its derivatives.
+
+
+.. data:: raw_data_manager
+
+   This content manager provides only a minimum interface beyond that provided
+   by :class:`~email.message.Message` itself:  it deals only with text, raw
+   byte strings, and :class:`~email.message.Message` objects.  Nevertheless, it
+   provides significant advantages compared to the base API: ``get_content`` on
+   a text part will return a unicode string without the application needing to
+   manually decode it, ``set_content`` provides a rich set of options for
+   controlling the headers added to a part and controlling the content transfer
+   encoding, and it enables the use of the various ``add_`` methods, thereby
+   simplifying the creation of multipart messages.
+
+   .. method:: get_content(msg, errors='replace')
+
+      Return the payload of the part as either a string (for ``text`` parts), a
+      :class:`~email.message.EmailMessage` object (for ``message/rfc822``
+      parts), or a ``bytes`` object (for all other non-multipart types).  Raise
+      a :exc:`KeyError` if called on a ``multipart``.  If the part is a
+      ``text`` part and *errors* is specified, use it as the error handler when
+      decoding the payload to unicode.  The default error handler is
+      ``replace``.
+
+   .. method:: set_content(msg, <'str'>, subtype="plain", charset='utf-8' \
+                           cte=None, \
+                           disposition=None, filename=None, cid=None, \
+                           params=None, headers=None)
+               set_content(msg, <'bytes'>, maintype, subtype, cte="base64", \
+                           disposition=None, filename=None, cid=None, \
+                           params=None, headers=None)
+               set_content(msg, <'Message'>, cte=None, \
+                           disposition=None, filename=None, cid=None, \
+                           params=None, headers=None)
+               set_content(msg, <'list'>, subtype='mixed', \
+                           disposition=None, filename=None, cid=None, \
+                           params=None, headers=None)
+
+       Add headers and payload to *msg*:
+
+       Add a :mailheader:`Content-Type` header with a ``maintype/subtype``
+       value.
+
+           * For ``str``, set the MIME ``maintype`` to ``text``, and set the
+             subtype to *subtype* if it is specified, or ``plain`` if it is not.
+           * For ``bytes``, use the specified *maintype* and *subtype*, or
+             raise a :exc:`TypeError` if they are not specified.
+           * For :class:`~email.message.Message` objects, set the maintype to
+             ``message``, and set the subtype to *subtype* if it is specified
+             or ``rfc822`` if it is not.  If *subtype* is ``partial``, raise an
+             error (``bytes`` objects must be used to construct
+             ``message/partial`` parts).
+           * For *<'list'>*, which should be a list of
+             :class:`~email.message.Message` objects, set the ``maintype`` to
+             ``multipart``, and the ``subtype`` to *subtype* if it is
+             specified, and ``mixed`` if it is not.  If the message parts in
+             the *<'list'>* have :mailheader:`MIME-Version` headers, remove
+             them.
+
+       If *charset* is provided (which is valid only for ``str``), encode the
+       string to bytes using the specified character set.  The default is
+       ``utf-8``.  If the specified *charset* is a known alias for a standard
+       MIME charset name, use the standard charset instead.
+
+       If *cte* is set, encode the payload using the specified content transfer
+       encoding, and set the :mailheader:`Content-Transfer-Endcoding` header to
+       that value.  For ``str`` objects, if it is not set use heuristics to
+       determine the most compact encoding.  Possible values for *cte* are
+       ``quoted-printable``, ``base64``, ``7bit``, ``8bit``, and ``binary``.
+       If the input cannot be encoded in the specified encoding (eg: ``7bit``),
+       raise a :exc:`ValueError`.  For :class:`~email.message.Message`, per
+       :rfc:`2046`, raise an error if a *cte* of ``quoted-printable`` or
+       ``base64`` is requested for *subtype* ``rfc822``, and for any *cte*
+       other than ``7bit`` for *subtype* ``external-body``.  For
+       ``message/rfc822``, use ``8bit`` if *cte* is not specified.  For all
+       other values of *subtype*, use ``7bit``.
+
+       .. note:: A *cte* of ``binary`` does not actually work correctly yet.
+          The ``Message`` object as modified by ``set_content`` is correct, but
+          :class:`~email.generator.BytesGenerator` does not serialize it
+          correctly.
+
+       If *disposition* is set, use it as the value of the
+       :mailheader:`Content-Disposition` header.  If not specified, and
+       *filename* is specified, add the header with the value ``attachment``.
+       If it is not specified and *filename* is also not specified, do not add
+       the header.  The only valid values for *disposition* are ``attachment``
+       and ``inline``.
+
+       If *filename* is specified, use it as the value of the ``filename``
+       parameter of the :mailheader:`Content-Disposition` header.  There is no
+       default.
+
+       If *cid* is specified, add a :mailheader:`Content-ID` header with
+       *cid* as its value.
+
+       If *params* is specified, iterate its ``items`` method and use the
+       resulting ``(key, value)`` pairs to set additional paramters on the
+       :mailheader:`Content-Type` header.
+
+       If *headers* is specified and is a list of strings of the form
+       ``headername: headervalue`` or a list of ``header`` objects
+       (distinguised from strings by having a ``name`` attribute), add the
+       headers to *msg*.
diff --git a/Doc/library/email.message.rst b/Doc/library/email.message.rst
index 4a34c36..f278a0a 100644
--- a/Doc/library/email.message.rst
+++ b/Doc/library/email.message.rst
@@ -33,10 +33,11 @@
 
 .. class:: Message(policy=compat32)
 
-   The *policy* argument determiens the :mod:`~email.policy` that will be used
-   to update the message model.  The default value, :class:`compat32
-   <email.policy.Compat32>` maintains backward compatibility with the
-   Python 3.2 version of the email package.  For more information see the
+   If *policy* is specified (it must be an instance of a :mod:`~email.policy`
+   class) use the rules it specifies to udpate and serialize the representation
+   of the message.  If *policy* is not set, use the :class`compat32
+   <email.policy.Compat32>` policy, which maintains backward compatibility with
+   the Python 3.2 version of the email package.  For more information see the
    :mod:`~email.policy` documentation.
 
    .. versionchanged:: 3.3 The *policy* keyword argument was added.
@@ -465,7 +466,8 @@
       to ``False``.
 
 
-   .. method:: set_param(param, value, header='Content-Type', requote=True, charset=None, language='')
+   .. method:: set_param(param, value, header='Content-Type', requote=True,
+                         charset=None, language='', replace=False)
 
       Set a parameter in the :mailheader:`Content-Type` header.  If the
       parameter already exists in the header, its value will be replaced with
@@ -482,6 +484,12 @@
       language, defaulting to the empty string.  Both *charset* and *language*
       should be strings.
 
+      If *replace* is ``False`` (the default) the header is moved to the
+      end of the list of headers.  If *replace* is ``True``, the header
+      will be updated in place.
+
+      .. versionchanged: 3.4 ``replace`` keyword was added.
+
 
    .. method:: del_param(param, header='content-type', requote=True)
 
diff --git a/Doc/library/email.policy.rst b/Doc/library/email.policy.rst
index 5856879..c2f9e6a 100644
--- a/Doc/library/email.policy.rst
+++ b/Doc/library/email.policy.rst
@@ -371,7 +371,7 @@
    to) :rfc:`5322`, :rfc:`2047`, and the current MIME RFCs.
 
    This policy adds new header parsing and folding algorithms.  Instead of
-   simple strings, headers are custom objects with custom attributes depending
+   simple strings, headers are ``str`` subclasses with attributes that depend
    on the type of the field.  The parsing and folding algorithm fully implement
    :rfc:`2047` and :rfc:`5322`.
 
@@ -408,6 +408,20 @@
       fields are treated as unstructured.  This list will be completed before
       the extension is marked stable.)
 
+   .. attribute:: content_manager
+
+      An object with at least two methods: get_content and set_content.  When
+      the :meth:`~email.message.Message.get_content` or
+      :meth:`~email.message.Message.set_content` method of a
+      :class:`~email.message.Message` object is called, it calls the
+      corresponding method of this object, passing it the message object as its
+      first argument, and any arguments or keywords that were passed to it as
+      additional arguments.  By default ``content_manager`` is set to
+      :data:`~email.contentmanager.raw_data_manager`.
+
+      .. versionadded 3.4
+
+
    The class provides the following concrete implementations of the abstract
    methods of :class:`Policy`:
 
@@ -427,7 +441,7 @@
       The name is returned unchanged.  If the input value has a ``name``
       attribute and it matches *name* ignoring case, the value is returned
       unchanged.  Otherwise the *name* and *value* are passed to
-      ``header_factory``, and the resulting custom header object is returned as
+      ``header_factory``, and the resulting header object is returned as
       the value.  In this case a ``ValueError`` is raised if the input value
       contains CR or LF characters.
 
@@ -435,7 +449,7 @@
 
       If the value has a ``name`` attribute, it is returned to unmodified.
       Otherwise the *name*, and the *value* with any CR or LF characters
-      removed, are passed to the ``header_factory``, and the resulting custom
+      removed, are passed to the ``header_factory``, and the resulting
       header object is returned.  Any surrogateescaped bytes get turned into
       the unicode unknown-character glyph.
 
@@ -445,9 +459,9 @@
       A value is considered to be a 'source value' if and only if it does not
       have a ``name`` attribute (having a ``name`` attribute means it is a
       header object of some sort).  If a source value needs to be refolded
-      according to the policy, it is converted into a custom header object by
+      according to the policy, it is converted into a header object by
       passing the *name* and the *value* with any CR and LF characters removed
-      to the ``header_factory``.  Folding of a custom header object is done by
+      to the ``header_factory``.  Folding of a header object is done by
       calling its ``fold`` method with the current policy.
 
       Source values are split into lines using :meth:`~str.splitlines`.  If
@@ -502,23 +516,23 @@
 the email package is changed from the Python 3.2 API in the following ways:
 
    * Setting a header on a :class:`~email.message.Message` results in that
-     header being parsed and a custom header object created.
+     header being parsed and a header object created.
 
    * Fetching a header value from a :class:`~email.message.Message` results
-     in that header being parsed and a custom header object created and
+     in that header being parsed and a header object created and
      returned.
 
-   * Any custom header object, or any header that is refolded due to the
+   * Any header object, or any header that is refolded due to the
      policy settings, is folded using an algorithm that fully implements the
      RFC folding algorithms, including knowing where encoded words are required
      and allowed.
 
 From the application view, this means that any header obtained through the
-:class:`~email.message.Message` is a custom header object with custom
+:class:`~email.message.Message` is a header object with extra
 attributes, whose string value is the fully decoded unicode value of the
 header.  Likewise, a header may be assigned a new value, or a new header
 created, using a unicode string, and the policy will take care of converting
 the unicode string into the correct RFC encoded form.
 
-The custom header objects and their attributes are described in
+The header objects and their attributes are described in
 :mod:`~email.headerregistry`.
diff --git a/Doc/library/email.rst b/Doc/library/email.rst
index a6cbbce..331d2ef 100644
--- a/Doc/library/email.rst
+++ b/Doc/library/email.rst
@@ -53,6 +53,7 @@
    email.generator.rst
    email.policy.rst
    email.headerregistry.rst
+   email.contentmanager.rst
    email.mime.rst
    email.header.rst
    email.charset.rst
diff --git a/Doc/whatsnew/3.4.rst b/Doc/whatsnew/3.4.rst
index d809c12..3610dbf 100644
--- a/Doc/whatsnew/3.4.rst
+++ b/Doc/whatsnew/3.4.rst
@@ -280,6 +280,21 @@
 
 (Contributed by R. David Murray in :issue:`18600`.)
 
+A pair of new subclasses of :class:`~email.message.Message` have been added,
+along with a new sub-module, :mod:`~email.contentmanager`.  All documentation
+is currently in the new module, which is being added as part of the new
+:term:`provisional <provosional package>` email API.  These classes provide a
+number of new methods that make extracting content from and inserting content
+into email messages much easier.  See the :mod:`~email.contentmanager`
+documentation for details.
+
+These API additions complete the bulk of the work that was planned as part of
+the email6 project.  The currently provisional API is scheduled to become final
+in Python 3.5 (possibly with a few minor additions in the area of error
+handling).
+
+(Contributed by R. David Murray in :issue:`18891`.)
+
 
 functools
 ---------
diff --git a/Lib/email/contentmanager.py b/Lib/email/contentmanager.py
new file mode 100644
index 0000000..d363652
--- /dev/null
+++ b/Lib/email/contentmanager.py
@@ -0,0 +1,249 @@
+import binascii
+import email.charset
+import email.message
+import email.errors
+from email import quoprimime
+
+class ContentManager:
+
+    def __init__(self):
+        self.get_handlers = {}
+        self.set_handlers = {}
+
+    def add_get_handler(self, key, handler):
+        self.get_handlers[key] = handler
+
+    def get_content(self, msg, *args, **kw):
+        content_type = msg.get_content_type()
+        if content_type in self.get_handlers:
+            return self.get_handlers[content_type](msg, *args, **kw)
+        maintype = msg.get_content_maintype()
+        if maintype in self.get_handlers:
+            return self.get_handlers[maintype](msg, *args, **kw)
+        if '' in self.get_handlers:
+            return self.get_handlers[''](msg, *args, **kw)
+        raise KeyError(content_type)
+
+    def add_set_handler(self, typekey, handler):
+        self.set_handlers[typekey] = handler
+
+    def set_content(self, msg, obj, *args, **kw):
+        if msg.get_content_maintype() == 'multipart':
+            # XXX: is this error a good idea or not?  We can remove it later,
+            # but we can't add it later, so do it for now.
+            raise TypeError("set_content not valid on multipart")
+        handler = self._find_set_handler(msg, obj)
+        msg.clear_content()
+        handler(msg, obj, *args, **kw)
+
+    def _find_set_handler(self, msg, obj):
+        full_path_for_error = None
+        for typ in type(obj).__mro__:
+            if typ in self.set_handlers:
+                return self.set_handlers[typ]
+            qname = typ.__qualname__
+            modname = getattr(typ, '__module__', '')
+            full_path = '.'.join((modname, qname)) if modname else qname
+            if full_path_for_error is None:
+                full_path_for_error = full_path
+            if full_path in self.set_handlers:
+                return self.set_handlers[full_path]
+            if qname in self.set_handlers:
+                return self.set_handlers[qname]
+            name = typ.__name__
+            if name in self.set_handlers:
+                return self.set_handlers[name]
+        if None in self.set_handlers:
+            return self.set_handlers[None]
+        raise KeyError(full_path_for_error)
+
+
+raw_data_manager = ContentManager()
+
+
+def get_text_content(msg, errors='replace'):
+    content = msg.get_payload(decode=True)
+    charset = msg.get_param('charset', 'ASCII')
+    return content.decode(charset, errors=errors)
+raw_data_manager.add_get_handler('text', get_text_content)
+
+
+def get_non_text_content(msg):
+    return msg.get_payload(decode=True)
+for maintype in 'audio image video application'.split():
+    raw_data_manager.add_get_handler(maintype, get_non_text_content)
+
+
+def get_message_content(msg):
+    return msg.get_payload(0)
+for subtype in 'rfc822 external-body'.split():
+    raw_data_manager.add_get_handler('message/'+subtype, get_message_content)
+
+
+def get_and_fixup_unknown_message_content(msg):
+    # If we don't understand a message subtype, we are supposed to treat it as
+    # if it were application/octet-stream, per
+    # tools.ietf.org/html/rfc2046#section-5.2.4.  Feedparser doesn't do that,
+    # so do our best to fix things up.  Note that it is *not* appropriate to
+    # model message/partial content as Message objects, so they are handled
+    # here as well.  (How to reassemble them is out of scope for this comment :)
+    return bytes(msg.get_payload(0))
+raw_data_manager.add_get_handler('message',
+                                 get_and_fixup_unknown_message_content)
+
+
+def _prepare_set(msg, maintype, subtype, headers):
+    msg['Content-Type'] = '/'.join((maintype, subtype))
+    if headers:
+        if not hasattr(headers[0], 'name'):
+            mp = msg.policy
+            headers = [mp.header_factory(*mp.header_source_parse([header]))
+                       for header in headers]
+        try:
+            for header in headers:
+                if header.defects:
+                    raise header.defects[0]
+                msg[header.name] = header
+        except email.errors.HeaderDefect as exc:
+            raise ValueError("Invalid header: {}".format(
+                                header.fold(policy=msg.policy))) from exc
+
+
+def _finalize_set(msg, disposition, filename, cid, params):
+    if disposition is None and filename is not None:
+        disposition = 'attachment'
+    if disposition is not None:
+        msg['Content-Disposition'] = disposition
+    if filename is not None:
+        msg.set_param('filename',
+                      filename,
+                      header='Content-Disposition',
+                      replace=True)
+    if cid is not None:
+        msg['Content-ID'] = cid
+    if params is not None:
+        for key, value in params.items():
+            msg.set_param(key, value)
+
+
+# XXX: This is a cleaned-up version of base64mime.body_encode.  It would
+# be nice to drop both this and quoprimime.body_encode in favor of
+# enhanced binascii routines that accepted a max_line_length parameter.
+def _encode_base64(data, max_line_length):
+    encoded_lines = []
+    unencoded_bytes_per_line = max_line_length * 3 // 4
+    for i in range(0, len(data), unencoded_bytes_per_line):
+        thisline = data[i:i+unencoded_bytes_per_line]
+        encoded_lines.append(binascii.b2a_base64(thisline).decode('ascii'))
+    return ''.join(encoded_lines)
+
+
+def _encode_text(string, charset, cte, policy):
+    lines = string.encode(charset).splitlines()
+    linesep = policy.linesep.encode('ascii')
+    def embeded_body(lines): return linesep.join(lines) + linesep
+    def normal_body(lines): return b'\n'.join(lines) + b'\n'
+    if cte==None:
+        # Use heuristics to decide on the "best" encoding.
+        try:
+            return '7bit', normal_body(lines).decode('ascii')
+        except UnicodeDecodeError:
+            pass
+        if (policy.cte_type == '8bit' and
+                max(len(x) for x in lines) <= policy.max_line_length):
+            return '8bit', normal_body(lines).decode('ascii', 'surrogateescape')
+        sniff = embeded_body(lines[:10])
+        sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'),
+                                          policy.max_line_length)
+        sniff_base64 = binascii.b2a_base64(sniff)
+        # This is a little unfair to qp; it includes lineseps, base64 doesn't.
+        if len(sniff_qp) > len(sniff_base64):
+            cte = 'base64'
+        else:
+            cte = 'quoted-printable'
+            if len(lines) <= 10:
+                return cte, sniff_qp
+    if cte == '7bit':
+        data = normal_body(lines).decode('ascii')
+    elif cte == '8bit':
+        data = normal_body(lines).decode('ascii', 'surrogateescape')
+    elif cte == 'quoted-printable':
+        data = quoprimime.body_encode(normal_body(lines).decode('latin-1'),
+                                      policy.max_line_length)
+    elif cte == 'base64':
+        data = _encode_base64(embeded_body(lines), policy.max_line_length)
+    else:
+        raise ValueError("Unknown content transfer encoding {}".format(cte))
+    return cte, data
+
+
+def set_text_content(msg, string, subtype="plain", charset='utf-8', cte=None,
+                     disposition=None, filename=None, cid=None,
+                     params=None, headers=None):
+    _prepare_set(msg, 'text', subtype, headers)
+    cte, payload = _encode_text(string, charset, cte, msg.policy)
+    msg.set_payload(payload)
+    msg.set_param('charset',
+                  email.charset.ALIASES.get(charset, charset),
+                  replace=True)
+    msg['Content-Transfer-Encoding'] = cte
+    _finalize_set(msg, disposition, filename, cid, params)
+raw_data_manager.add_set_handler(str, set_text_content)
+
+
+def set_message_content(msg, message, subtype="rfc822", cte=None,
+                       disposition=None, filename=None, cid=None,
+                       params=None, headers=None):
+    if subtype == 'partial':
+        raise ValueError("message/partial is not supported for Message objects")
+    if subtype == 'rfc822':
+        if cte not in (None, '7bit', '8bit', 'binary'):
+            # http://tools.ietf.org/html/rfc2046#section-5.2.1 mandate.
+            raise ValueError(
+                "message/rfc822 parts do not support cte={}".format(cte))
+        # 8bit will get coerced on serialization if policy.cte_type='7bit'.  We
+        # may end up claiming 8bit when it isn't needed, but the only negative
+        # result of that should be a gateway that needs to coerce to 7bit
+        # having to look through the whole embedded message to discover whether
+        # or not it actually has to do anything.
+        cte = '8bit' if cte is None else cte
+    elif subtype == 'external-body':
+        if cte not in (None, '7bit'):
+            # http://tools.ietf.org/html/rfc2046#section-5.2.3 mandate.
+            raise ValueError(
+                "message/external-body parts do not support cte={}".format(cte))
+        cte = '7bit'
+    elif cte is None:
+        # http://tools.ietf.org/html/rfc2046#section-5.2.4 says all future
+        # subtypes should be restricted to 7bit, so assume that.
+        cte = '7bit'
+    _prepare_set(msg, 'message', subtype, headers)
+    msg.set_payload([message])
+    msg['Content-Transfer-Encoding'] = cte
+    _finalize_set(msg, disposition, filename, cid, params)
+raw_data_manager.add_set_handler(email.message.Message, set_message_content)
+
+
+def set_bytes_content(msg, data, maintype, subtype, cte='base64',
+                     disposition=None, filename=None, cid=None,
+                     params=None, headers=None):
+    _prepare_set(msg, maintype, subtype, headers)
+    if cte == 'base64':
+        data = _encode_base64(data, max_line_length=msg.policy.max_line_length)
+    elif cte == 'quoted-printable':
+        # XXX: quoprimime.body_encode won't encode newline characters in data,
+        # so we can't use it.  This means max_line_length is ignored.  Another
+        # bug to fix later.  (Note: encoders.quopri is broken on line ends.)
+        data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True)
+        data = data.decode('ascii')
+    elif cte == '7bit':
+        # Make sure it really is only ASCII.  The early warning here seems
+        # worth the overhead...if you care write your own content manager :).
+        data.encode('ascii')
+    elif cte in ('8bit', 'binary'):
+        data = data.decode('ascii', 'surrogateescape')
+    msg.set_payload(data)
+    msg['Content-Transfer-Encoding'] = cte
+    _finalize_set(msg, disposition, filename, cid, params)
+for typ in (bytes, bytearray, memoryview):
+    raw_data_manager.add_set_handler(typ, set_bytes_content)
diff --git a/Lib/email/message.py b/Lib/email/message.py
index ebaf1c1..cb2be21 100644
--- a/Lib/email/message.py
+++ b/Lib/email/message.py
@@ -8,8 +8,6 @@
 
 import re
 import uu
-import base64
-import binascii
 from io import BytesIO, StringIO
 
 # Intrapackage imports
@@ -679,7 +677,7 @@
         return failobj
 
     def set_param(self, param, value, header='Content-Type', requote=True,
-                  charset=None, language=''):
+                  charset=None, language='', replace=False):
         """Set a parameter in the Content-Type header.
 
         If the parameter already exists in the header, its value will be
@@ -723,8 +721,11 @@
                 else:
                     ctype = SEMISPACE.join([ctype, append_param])
         if ctype != self.get(header):
-            del self[header]
-            self[header] = ctype
+            if replace:
+                self.replace_header(header, ctype)
+            else:
+                del self[header]
+                self[header] = ctype
 
     def del_param(self, param, header='content-type', requote=True):
         """Remove the given parameter completely from the Content-Type header.
@@ -905,3 +906,208 @@
 
     # I.e. def walk(self): ...
     from email.iterators import walk
+
+
+class MIMEPart(Message):
+
+    def __init__(self, policy=None):
+        if policy is None:
+            from email.policy import default
+            policy = default
+        Message.__init__(self, policy)
+
+    @property
+    def is_attachment(self):
+        c_d = self.get('content-disposition')
+        if c_d is None:
+            return False
+        return c_d.lower() == 'attachment'
+
+    def _find_body(self, part, preferencelist):
+        if part.is_attachment:
+            return
+        maintype, subtype = part.get_content_type().split('/')
+        if maintype == 'text':
+            if subtype in preferencelist:
+                yield (preferencelist.index(subtype), part)
+            return
+        if maintype != 'multipart':
+            return
+        if subtype != 'related':
+            for subpart in part.iter_parts():
+                yield from self._find_body(subpart, preferencelist)
+            return
+        if 'related' in preferencelist:
+            yield (preferencelist.index('related'), part)
+        candidate = None
+        start = part.get_param('start')
+        if start:
+            for subpart in part.iter_parts():
+                if subpart['content-id'] == start:
+                    candidate = subpart
+                    break
+        if candidate is None:
+            subparts = part.get_payload()
+            candidate = subparts[0] if subparts else None
+        if candidate is not None:
+            yield from self._find_body(candidate, preferencelist)
+
+    def get_body(self, preferencelist=('related', 'html', 'plain')):
+        """Return best candidate mime part for display as 'body' of message.
+
+        Do a depth first search, starting with self, looking for the first part
+        matching each of the items in preferencelist, and return the part
+        corresponding to the first item that has a match, or None if no items
+        have a match.  If 'related' is not included in preferencelist, consider
+        the root part of any multipart/related encountered as a candidate
+        match.  Ignore parts with 'Content-Disposition: attachment'.
+        """
+        best_prio = len(preferencelist)
+        body = None
+        for prio, part in self._find_body(self, preferencelist):
+            if prio < best_prio:
+                best_prio = prio
+                body = part
+                if prio == 0:
+                    break
+        return body
+
+    _body_types = {('text', 'plain'),
+                   ('text', 'html'),
+                   ('multipart', 'related'),
+                   ('multipart', 'alternative')}
+    def iter_attachments(self):
+        """Return an iterator over the non-main parts of a multipart.
+
+        Skip the first of each occurrence of text/plain, text/html,
+        multipart/related, or multipart/alternative in the multipart (unless
+        they have a 'Content-Disposition: attachment' header) and include all
+        remaining subparts in the returned iterator.  When applied to a
+        multipart/related, return all parts except the root part.  Return an
+        empty iterator when applied to a multipart/alternative or a
+        non-multipart.
+        """
+        maintype, subtype = self.get_content_type().split('/')
+        if maintype != 'multipart' or subtype == 'alternative':
+            return
+        parts = self.get_payload()
+        if maintype == 'multipart' and subtype == 'related':
+            # For related, we treat everything but the root as an attachment.
+            # The root may be indicated by 'start'; if there's no start or we
+            # can't find the named start, treat the first subpart as the root.
+            start = self.get_param('start')
+            if start:
+                found = False
+                attachments = []
+                for part in parts:
+                    if part.get('content-id') == start:
+                        found = True
+                    else:
+                        attachments.append(part)
+                if found:
+                    yield from attachments
+                    return
+            parts.pop(0)
+            yield from parts
+            return
+        # Otherwise we more or less invert the remaining logic in get_body.
+        # This only really works in edge cases (ex: non-text relateds or
+        # alternatives) if the sending agent sets content-disposition.
+        seen = []   # Only skip the first example of each candidate type.
+        for part in parts:
+            maintype, subtype = part.get_content_type().split('/')
+            if ((maintype, subtype) in self._body_types and
+                    not part.is_attachment and subtype not in seen):
+                seen.append(subtype)
+                continue
+            yield part
+
+    def iter_parts(self):
+        """Return an iterator over all immediate subparts of a multipart.
+
+        Return an empty iterator for a non-multipart.
+        """
+        if self.get_content_maintype() == 'multipart':
+            yield from self.get_payload()
+
+    def get_content(self, *args, content_manager=None, **kw):
+        if content_manager is None:
+            content_manager = self.policy.content_manager
+        return content_manager.get_content(self, *args, **kw)
+
+    def set_content(self, *args, content_manager=None, **kw):
+        if content_manager is None:
+            content_manager = self.policy.content_manager
+        content_manager.set_content(self, *args, **kw)
+
+    def _make_multipart(self, subtype, disallowed_subtypes, boundary):
+        if self.get_content_maintype() == 'multipart':
+            existing_subtype = self.get_content_subtype()
+            disallowed_subtypes = disallowed_subtypes + (subtype,)
+            if existing_subtype in disallowed_subtypes:
+                raise ValueError("Cannot convert {} to {}".format(
+                    existing_subtype, subtype))
+        keep_headers = []
+        part_headers = []
+        for name, value in self._headers:
+            if name.lower().startswith('content-'):
+                part_headers.append((name, value))
+            else:
+                keep_headers.append((name, value))
+        if part_headers:
+            # There is existing content, move it to the first subpart.
+            part = type(self)(policy=self.policy)
+            part._headers = part_headers
+            part._payload = self._payload
+            self._payload = [part]
+        else:
+            self._payload = []
+        self._headers = keep_headers
+        self['Content-Type'] = 'multipart/' + subtype
+        if boundary is not None:
+            self.set_param('boundary', boundary)
+
+    def make_related(self, boundary=None):
+        self._make_multipart('related', ('alternative', 'mixed'), boundary)
+
+    def make_alternative(self, boundary=None):
+        self._make_multipart('alternative', ('mixed',), boundary)
+
+    def make_mixed(self, boundary=None):
+        self._make_multipart('mixed', (), boundary)
+
+    def _add_multipart(self, _subtype, *args, _disp=None, **kw):
+        if (self.get_content_maintype() != 'multipart' or
+                self.get_content_subtype() != _subtype):
+            getattr(self, 'make_' + _subtype)()
+        part = type(self)(policy=self.policy)
+        part.set_content(*args, **kw)
+        if _disp and 'content-disposition' not in part:
+            part['Content-Disposition'] = _disp
+        self.attach(part)
+
+    def add_related(self, *args, **kw):
+        self._add_multipart('related', *args, _disp='inline', **kw)
+
+    def add_alternative(self, *args, **kw):
+        self._add_multipart('alternative', *args, **kw)
+
+    def add_attachment(self, *args, **kw):
+        self._add_multipart('mixed', *args, _disp='attachment', **kw)
+
+    def clear(self):
+        self._headers = []
+        self._payload = None
+
+    def clear_content(self):
+        self._headers = [(n, v) for n, v in self._headers
+                         if not n.lower().startswith('content-')]
+        self._payload = None
+
+
+class EmailMessage(MIMEPart):
+
+    def set_content(self, *args, **kw):
+        super().set_content(*args, **kw)
+        if 'MIME-Version' not in self:
+            self['MIME-Version'] = '1.0'
diff --git a/Lib/email/policy.py b/Lib/email/policy.py
index 38e88af..f0b20f4 100644
--- a/Lib/email/policy.py
+++ b/Lib/email/policy.py
@@ -5,6 +5,7 @@
 from email._policybase import Policy, Compat32, compat32, _extend_docstrings
 from email.utils import _has_surrogates
 from email.headerregistry import HeaderRegistry as HeaderRegistry
+from email.contentmanager import raw_data_manager
 
 __all__ = [
     'Compat32',
@@ -58,10 +59,22 @@
                            special treatment, while all other fields are
                            treated as unstructured.  This list will be
                            completed before the extension is marked stable.)
+
+    content_manager     -- an object with at least two methods: get_content
+                           and set_content.  When the get_content or
+                           set_content method of a Message object is called,
+                           it calls the corresponding method of this object,
+                           passing it the message object as its first argument,
+                           and any arguments or keywords that were passed to
+                           it as additional arguments.  The default
+                           content_manager is
+                           :data:`~email.contentmanager.raw_data_manager`.
+
     """
 
     refold_source = 'long'
     header_factory = HeaderRegistry()
+    content_manager = raw_data_manager
 
     def __init__(self, **kw):
         # Ensure that each new instance gets a unique header factory
diff --git a/Lib/email/utils.py b/Lib/email/utils.py
index b3b42bb..25b0d56 100644
--- a/Lib/email/utils.py
+++ b/Lib/email/utils.py
@@ -68,9 +68,13 @@
 # How to deal with a string containing bytes before handing it to the
 # application through the 'normal' interface.
 def _sanitize(string):
-    # Turn any escaped bytes into unicode 'unknown' char.
-    original_bytes = string.encode('ascii', 'surrogateescape')
-    return original_bytes.decode('ascii', 'replace')
+    # Turn any escaped bytes into unicode 'unknown' char.  If the escaped
+    # bytes happen to be utf-8 they will instead get decoded, even if they
+    # were invalid in the charset the source was supposed to be in.  This
+    # seems like it is not a bad thing; a defect was still registered.
+    original_bytes = string.encode('utf-8', 'surrogateescape')
+    return original_bytes.decode('utf-8', 'replace')
+
 
 
 # Helpers
diff --git a/Lib/test/test_email/__init__.py b/Lib/test/test_email/__init__.py
index f206ace..d8896ee 100644
--- a/Lib/test/test_email/__init__.py
+++ b/Lib/test/test_email/__init__.py
@@ -2,6 +2,7 @@
 import sys
 import unittest
 import test.support
+import collections
 import email
 from email.message import Message
 from email._policybase import compat32
@@ -42,6 +43,8 @@
     # here we make minimal changes in the test_email tests compared to their
     # pre-3.3 state.
     policy = compat32
+    # Likewise, the default message object is Message.
+    message = Message
 
     def __init__(self, *args, **kw):
         super().__init__(*args, **kw)
@@ -54,11 +57,23 @@
         with openfile(filename) as fp:
             return email.message_from_file(fp, policy=self.policy)
 
-    def _str_msg(self, string, message=Message, policy=None):
+    def _str_msg(self, string, message=None, policy=None):
         if policy is None:
             policy = self.policy
+        if message is None:
+            message = self.message
         return email.message_from_string(string, message, policy=policy)
 
+    def _bytes_msg(self, bytestring, message=None, policy=None):
+        if policy is None:
+            policy = self.policy
+        if message is None:
+            message = self.message
+        return email.message_from_bytes(bytestring, message, policy=policy)
+
+    def _make_message(self):
+        return self.message(policy=self.policy)
+
     def _bytes_repr(self, b):
         return [repr(x) for x in b.splitlines(keepends=True)]
 
@@ -123,6 +138,7 @@
 
     """
     paramdicts = {}
+    testers = collections.defaultdict(list)
     for name, attr in cls.__dict__.items():
         if name.endswith('_params'):
             if not hasattr(attr, 'keys'):
@@ -134,7 +150,15 @@
                     d[n] = x
                 attr = d
             paramdicts[name[:-7] + '_as_'] = attr
+        if '_as_' in name:
+            testers[name.split('_as_')[0] + '_as_'].append(name)
     testfuncs = {}
+    for name in paramdicts:
+        if name not in testers:
+            raise ValueError("No tester found for {}".format(name))
+    for name in testers:
+        if name not in paramdicts:
+            raise ValueError("No params found for {}".format(name))
     for name, attr in cls.__dict__.items():
         for paramsname, paramsdict in paramdicts.items():
             if name.startswith(paramsname):
diff --git a/Lib/test/test_email/test_contentmanager.py b/Lib/test/test_email/test_contentmanager.py
new file mode 100644
index 0000000..1629e2d
--- /dev/null
+++ b/Lib/test/test_email/test_contentmanager.py
@@ -0,0 +1,796 @@
+import unittest
+from test.test_email import TestEmailBase, parameterize
+import textwrap
+from email import policy
+from email.message import EmailMessage
+from email.contentmanager import ContentManager, raw_data_manager
+
+
+@parameterize
+class TestContentManager(TestEmailBase):
+
+    policy = policy.default
+    message = EmailMessage
+
+    get_key_params = {
+        'full_type':        (1, 'text/plain',),
+        'maintype_only':    (2, 'text',),
+        'null_key':         (3, '',),
+        }
+
+    def get_key_as_get_content_key(self, order, key):
+        def foo_getter(msg, foo=None):
+            bar = msg['X-Bar-Header']
+            return foo, bar
+        cm = ContentManager()
+        cm.add_get_handler(key, foo_getter)
+        m = self._make_message()
+        m['Content-Type'] = 'text/plain'
+        m['X-Bar-Header'] = 'foo'
+        self.assertEqual(cm.get_content(m, foo='bar'), ('bar', 'foo'))
+
+    def get_key_as_get_content_key_order(self, order, key):
+        def bar_getter(msg):
+            return msg['X-Bar-Header']
+        def foo_getter(msg):
+            return msg['X-Foo-Header']
+        cm = ContentManager()
+        cm.add_get_handler(key, foo_getter)
+        for precedence, key in self.get_key_params.values():
+            if precedence > order:
+                cm.add_get_handler(key, bar_getter)
+        m = self._make_message()
+        m['Content-Type'] = 'text/plain'
+        m['X-Bar-Header'] = 'bar'
+        m['X-Foo-Header'] = 'foo'
+        self.assertEqual(cm.get_content(m), ('foo'))
+
+    def test_get_content_raises_if_unknown_mimetype_and_no_default(self):
+        cm = ContentManager()
+        m = self._make_message()
+        m['Content-Type'] = 'text/plain'
+        with self.assertRaisesRegex(KeyError, 'text/plain'):
+            cm.get_content(m)
+
+    class BaseThing(str):
+        pass
+    baseobject_full_path = __name__ + '.' + 'TestContentManager.BaseThing'
+    class Thing(BaseThing):
+        pass
+    testobject_full_path = __name__ + '.' + 'TestContentManager.Thing'
+
+    set_key_params = {
+        'type':             (0,  Thing,),
+        'full_path':        (1,  testobject_full_path,),
+        'qualname':         (2,  'TestContentManager.Thing',),
+        'name':             (3,  'Thing',),
+        'base_type':        (4,  BaseThing,),
+        'base_full_path':   (5,  baseobject_full_path,),
+        'base_qualname':    (6,  'TestContentManager.BaseThing',),
+        'base_name':        (7,  'BaseThing',),
+        'str_type':         (8,  str,),
+        'str_full_path':    (9,  'builtins.str',),
+        'str_name':         (10, 'str',),   # str name and qualname are the same
+        'null_key':         (11, None,),
+        }
+
+    def set_key_as_set_content_key(self, order, key):
+        def foo_setter(msg, obj, foo=None):
+            msg['X-Foo-Header'] = foo
+            msg.set_payload(obj)
+        cm = ContentManager()
+        cm.add_set_handler(key, foo_setter)
+        m = self._make_message()
+        msg_obj = self.Thing()
+        cm.set_content(m, msg_obj, foo='bar')
+        self.assertEqual(m['X-Foo-Header'], 'bar')
+        self.assertEqual(m.get_payload(), msg_obj)
+
+    def set_key_as_set_content_key_order(self, order, key):
+        def foo_setter(msg, obj):
+            msg['X-FooBar-Header'] = 'foo'
+            msg.set_payload(obj)
+        def bar_setter(msg, obj):
+            msg['X-FooBar-Header'] = 'bar'
+        cm = ContentManager()
+        cm.add_set_handler(key, foo_setter)
+        for precedence, key in self.get_key_params.values():
+            if precedence > order:
+                cm.add_set_handler(key, bar_setter)
+        m = self._make_message()
+        msg_obj = self.Thing()
+        cm.set_content(m, msg_obj)
+        self.assertEqual(m['X-FooBar-Header'], 'foo')
+        self.assertEqual(m.get_payload(), msg_obj)
+
+    def test_set_content_raises_if_unknown_type_and_no_default(self):
+        cm = ContentManager()
+        m = self._make_message()
+        msg_obj = self.Thing()
+        with self.assertRaisesRegex(KeyError, self.testobject_full_path):
+            cm.set_content(m, msg_obj)
+
+    def test_set_content_raises_if_called_on_multipart(self):
+        cm = ContentManager()
+        m = self._make_message()
+        m['Content-Type'] = 'multipart/foo'
+        with self.assertRaises(TypeError):
+            cm.set_content(m, 'test')
+
+    def test_set_content_calls_clear_content(self):
+        m = self._make_message()
+        m['Content-Foo'] = 'bar'
+        m['Content-Type'] = 'text/html'
+        m['To'] = 'test'
+        m.set_payload('abc')
+        cm = ContentManager()
+        cm.add_set_handler(str, lambda *args, **kw: None)
+        m.set_content('xyz', content_manager=cm)
+        self.assertIsNone(m['Content-Foo'])
+        self.assertIsNone(m['Content-Type'])
+        self.assertEqual(m['To'], 'test')
+        self.assertIsNone(m.get_payload())
+
+
+@parameterize
+class TestRawDataManager(TestEmailBase):
+    # Note: these tests are dependent on the order in which headers are added
+    # to the message objects by the code.  There's no defined ordering in
+    # RFC5322/MIME, so this makes the tests more fragile than the standards
+    # require.  However, if the header order changes it is best to understand
+    # *why*, and make sure it isn't a subtle bug in whatever change was
+    # applied.
+
+    policy = policy.default.clone(max_line_length=60,
+                                  content_manager=raw_data_manager)
+    message = EmailMessage
+
+    def test_get_text_plain(self):
+        m = self._str_msg(textwrap.dedent("""\
+            Content-Type: text/plain
+
+            Basic text.
+            """))
+        self.assertEqual(raw_data_manager.get_content(m), "Basic text.\n")
+
+    def test_get_text_html(self):
+        m = self._str_msg(textwrap.dedent("""\
+            Content-Type: text/html
+
+            <p>Basic text.</p>
+            """))
+        self.assertEqual(raw_data_manager.get_content(m),
+                         "<p>Basic text.</p>\n")
+
+    def test_get_text_plain_latin1(self):
+        m = self._bytes_msg(textwrap.dedent("""\
+            Content-Type: text/plain; charset=latin1
+
+            Basìc tëxt.
+            """).encode('latin1'))
+        self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
+
+    def test_get_text_plain_latin1_quoted_printable(self):
+        m = self._str_msg(textwrap.dedent("""\
+            Content-Type: text/plain; charset="latin-1"
+            Content-Transfer-Encoding: quoted-printable
+
+            Bas=ECc t=EBxt.
+            """))
+        self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
+
+    def test_get_text_plain_utf8_base64(self):
+        m = self._str_msg(textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf8"
+            Content-Transfer-Encoding: base64
+
+            QmFzw6xjIHTDq3h0Lgo=
+            """))
+        self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt.\n")
+
+    def test_get_text_plain_bad_utf8_quoted_printable(self):
+        m = self._str_msg(textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf8"
+            Content-Transfer-Encoding: quoted-printable
+
+            Bas=c3=acc t=c3=abxt=fd.
+            """))
+        self.assertEqual(raw_data_manager.get_content(m), "Basìc tëxt�.\n")
+
+    def test_get_text_plain_bad_utf8_quoted_printable_ignore_errors(self):
+        m = self._str_msg(textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf8"
+            Content-Transfer-Encoding: quoted-printable
+
+            Bas=c3=acc t=c3=abxt=fd.
+            """))
+        self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
+                         "Basìc tëxt.\n")
+
+    def test_get_text_plain_utf8_base64_recoverable_bad_CTE_data(self):
+        m = self._str_msg(textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf8"
+            Content-Transfer-Encoding: base64
+
+            QmFzw6xjIHTDq3h0Lgo\xFF=
+            """))
+        self.assertEqual(raw_data_manager.get_content(m, errors='ignore'),
+                         "Basìc tëxt.\n")
+
+    def test_get_text_invalid_keyword(self):
+        m = self._str_msg(textwrap.dedent("""\
+            Content-Type: text/plain
+
+            Basic text.
+            """))
+        with self.assertRaises(TypeError):
+            raw_data_manager.get_content(m, foo='ignore')
+
+    def test_get_non_text(self):
+        template = textwrap.dedent("""\
+            Content-Type: {}
+            Content-Transfer-Encoding: base64
+
+            Ym9ndXMgZGF0YQ==
+            """)
+        for maintype in 'audio image video application'.split():
+            with self.subTest(maintype=maintype):
+                m = self._str_msg(template.format(maintype+'/foo'))
+                self.assertEqual(raw_data_manager.get_content(m), b"bogus data")
+
+    def test_get_non_text_invalid_keyword(self):
+        m = self._str_msg(textwrap.dedent("""\
+            Content-Type: image/jpg
+            Content-Transfer-Encoding: base64
+
+            Ym9ndXMgZGF0YQ==
+            """))
+        with self.assertRaises(TypeError):
+            raw_data_manager.get_content(m, errors='ignore')
+
+    def test_get_raises_on_multipart(self):
+        m = self._str_msg(textwrap.dedent("""\
+            Content-Type: multipart/mixed; boundary="==="
+
+            --===
+            --===--
+            """))
+        with self.assertRaises(KeyError):
+            raw_data_manager.get_content(m)
+
+    def test_get_message_rfc822_and_external_body(self):
+        template = textwrap.dedent("""\
+            Content-Type: message/{}
+
+            To: foo@example.com
+            From: bar@example.com
+            Subject: example
+
+            an example message
+            """)
+        for subtype in 'rfc822 external-body'.split():
+            with self.subTest(subtype=subtype):
+                m = self._str_msg(template.format(subtype))
+                sub_msg = raw_data_manager.get_content(m)
+                self.assertIsInstance(sub_msg, self.message)
+                self.assertEqual(raw_data_manager.get_content(sub_msg),
+                                 "an example message\n")
+                self.assertEqual(sub_msg['to'], 'foo@example.com')
+                self.assertEqual(sub_msg['from'].addresses[0].username, 'bar')
+
+    def test_get_message_non_rfc822_or_external_body_yields_bytes(self):
+        m = self._str_msg(textwrap.dedent("""\
+            Content-Type: message/partial
+
+            To: foo@example.com
+            From: bar@example.com
+            Subject: example
+
+            The real body is in another message.
+            """))
+        self.assertEqual(raw_data_manager.get_content(m)[:10], b'To: foo@ex')
+
+    def test_set_text_plain(self):
+        m = self._make_message()
+        content = "Simple message.\n"
+        raw_data_manager.set_content(m, content)
+        self.assertEqual(str(m), textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf-8"
+            Content-Transfer-Encoding: 7bit
+
+            Simple message.
+            """))
+        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+        self.assertEqual(m.get_content(), content)
+
+    def test_set_text_html(self):
+        m = self._make_message()
+        content = "<p>Simple message.</p>\n"
+        raw_data_manager.set_content(m, content, subtype='html')
+        self.assertEqual(str(m), textwrap.dedent("""\
+            Content-Type: text/html; charset="utf-8"
+            Content-Transfer-Encoding: 7bit
+
+            <p>Simple message.</p>
+            """))
+        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+        self.assertEqual(m.get_content(), content)
+
+    def test_set_text_charset_latin_1(self):
+        m = self._make_message()
+        content = "Simple message.\n"
+        raw_data_manager.set_content(m, content, charset='latin-1')
+        self.assertEqual(str(m), textwrap.dedent("""\
+            Content-Type: text/plain; charset="iso-8859-1"
+            Content-Transfer-Encoding: 7bit
+
+            Simple message.
+            """))
+        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+        self.assertEqual(m.get_content(), content)
+
+    def test_set_text_short_line_minimal_non_ascii_heuristics(self):
+        m = self._make_message()
+        content = "et là il est monté sur moi et il commence à m'éto.\n"
+        raw_data_manager.set_content(m, content)
+        self.assertEqual(bytes(m), textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf-8"
+            Content-Transfer-Encoding: 8bit
+
+            et là il est monté sur moi et il commence à m'éto.
+            """).encode('utf-8'))
+        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+        self.assertEqual(m.get_content(), content)
+
+    def test_set_text_long_line_minimal_non_ascii_heuristics(self):
+        m = self._make_message()
+        content = ("j'ai un problème de python. il est sorti de son"
+                   " vivarium.  et là il est monté sur moi et il commence"
+                   " à m'éto.\n")
+        raw_data_manager.set_content(m, content)
+        self.assertEqual(bytes(m), textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf-8"
+            Content-Transfer-Encoding: quoted-printable
+
+            j'ai un probl=C3=A8me de python. il est sorti de son vivari=
+            um.  et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
+            =C3=A0 m'=C3=A9to.
+            """).encode('utf-8'))
+        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+        self.assertEqual(m.get_content(), content)
+
+    def test_set_text_11_lines_long_line_minimal_non_ascii_heuristics(self):
+        m = self._make_message()
+        content = '\n'*10 + (
+                  "j'ai un problème de python. il est sorti de son"
+                  " vivarium.  et là il est monté sur moi et il commence"
+                  " à m'éto.\n")
+        raw_data_manager.set_content(m, content)
+        self.assertEqual(bytes(m), textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf-8"
+            Content-Transfer-Encoding: quoted-printable
+            """ + '\n'*10 + """
+            j'ai un probl=C3=A8me de python. il est sorti de son vivari=
+            um.  et l=C3=A0 il est mont=C3=A9 sur moi et il commence =
+            =C3=A0 m'=C3=A9to.
+            """).encode('utf-8'))
+        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+        self.assertEqual(m.get_content(), content)
+
+    def test_set_text_maximal_non_ascii_heuristics(self):
+        m = self._make_message()
+        content = "áàäéèęöő.\n"
+        raw_data_manager.set_content(m, content)
+        self.assertEqual(bytes(m), textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf-8"
+            Content-Transfer-Encoding: 8bit
+
+            áàäéèęöő.
+            """).encode('utf-8'))
+        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+        self.assertEqual(m.get_content(), content)
+
+    def test_set_text_11_lines_maximal_non_ascii_heuristics(self):
+        m = self._make_message()
+        content = '\n'*10 + "áàäéèęöő.\n"
+        raw_data_manager.set_content(m, content)
+        self.assertEqual(bytes(m), textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf-8"
+            Content-Transfer-Encoding: 8bit
+            """ + '\n'*10 + """
+            áàäéèęöő.
+            """).encode('utf-8'))
+        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+        self.assertEqual(m.get_content(), content)
+
+    def test_set_text_long_line_maximal_non_ascii_heuristics(self):
+        m = self._make_message()
+        content = ("áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
+                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
+                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
+        raw_data_manager.set_content(m, content)
+        self.assertEqual(bytes(m), textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf-8"
+            Content-Transfer-Encoding: base64
+
+            w6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOoxJnD
+            tsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TDqcOo
+            xJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOgw6TD
+            qcOoxJnDtsWRw6HDoMOkw6nDqMSZw7bFkcOhw6DDpMOpw6jEmcO2xZHDocOg
+            w6TDqcOoxJnDtsWRLgo=
+            """).encode('utf-8'))
+        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+        self.assertEqual(m.get_content(), content)
+
+    def test_set_text_11_lines_long_line_maximal_non_ascii_heuristics(self):
+        # Yes, it chooses "wrong" here.  It's a heuristic.  So this result
+        # could change if we come up with a better heuristic.
+        m = self._make_message()
+        content = ('\n'*10 +
+                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
+                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
+                   "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
+        raw_data_manager.set_content(m, "\n"*10 +
+                                        "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
+                                        "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő"
+                                        "áàäéèęöőáàäéèęöőáàäéèęöőáàäéèęöő.\n")
+        self.assertEqual(bytes(m), textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf-8"
+            Content-Transfer-Encoding: quoted-printable
+            """ + '\n'*10 + """
+            =C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=
+            =A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=
+            =C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=
+            =A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=
+            =C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=
+            =91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=
+            =C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=
+            =A4=C3=A9=C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=
+            =C3=A8=C4=99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=
+            =99=C3=B6=C5=91=C3=A1=C3=A0=C3=A4=C3=A9=C3=A8=C4=99=C3=B6=
+            =C5=91.
+            """).encode('utf-8'))
+        self.assertEqual(m.get_payload(decode=True).decode('utf-8'), content)
+        self.assertEqual(m.get_content(), content)
+
+    def test_set_text_non_ascii_with_cte_7bit_raises(self):
+        m = self._make_message()
+        with self.assertRaises(UnicodeError):
+            raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit')
+
+    def test_set_text_non_ascii_with_charset_ascii_raises(self):
+        m = self._make_message()
+        with self.assertRaises(UnicodeError):
+            raw_data_manager.set_content(m,"áàäéèęöő.\n", charset='ascii')
+
+    def test_set_text_non_ascii_with_cte_7bit_and_charset_ascii_raises(self):
+        m = self._make_message()
+        with self.assertRaises(UnicodeError):
+            raw_data_manager.set_content(m,"áàäéèęöő.\n", cte='7bit', charset='ascii')
+
+    def test_set_message(self):
+        m = self._make_message()
+        m['Subject'] = "Forwarded message"
+        content = self._make_message()
+        content['To'] = 'python@vivarium.org'
+        content['From'] = 'police@monty.org'
+        content['Subject'] = "get back in your box"
+        content.set_content("Or face the comfy chair.")
+        raw_data_manager.set_content(m, content)
+        self.assertEqual(str(m), textwrap.dedent("""\
+            Subject: Forwarded message
+            Content-Type: message/rfc822
+            Content-Transfer-Encoding: 8bit
+
+            To: python@vivarium.org
+            From: police@monty.org
+            Subject: get back in your box
+            Content-Type: text/plain; charset="utf-8"
+            Content-Transfer-Encoding: 7bit
+            MIME-Version: 1.0
+
+            Or face the comfy chair.
+            """))
+        payload = m.get_payload(0)
+        self.assertIsInstance(payload, self.message)
+        self.assertEqual(str(payload), str(content))
+        self.assertIsInstance(m.get_content(), self.message)
+        self.assertEqual(str(m.get_content()), str(content))
+
+    def test_set_message_with_non_ascii_and_coercion_to_7bit(self):
+        m = self._make_message()
+        m['Subject'] = "Escape report"
+        content = self._make_message()
+        content['To'] = 'police@monty.org'
+        content['From'] = 'victim@monty.org'
+        content['Subject'] = "Help"
+        content.set_content("j'ai un problème de python. il est sorti de son"
+                            " vivarium.")
+        raw_data_manager.set_content(m, content)
+        self.assertEqual(bytes(m), textwrap.dedent("""\
+            Subject: Escape report
+            Content-Type: message/rfc822
+            Content-Transfer-Encoding: 8bit
+
+            To: police@monty.org
+            From: victim@monty.org
+            Subject: Help
+            Content-Type: text/plain; charset="utf-8"
+            Content-Transfer-Encoding: 8bit
+            MIME-Version: 1.0
+
+            j'ai un problème de python. il est sorti de son vivarium.
+            """).encode('utf-8'))
+        # The choice of base64 for the body encoding is because generator
+        # doesn't bother with heuristics and uses it unconditionally for utf-8
+        # text.
+        # XXX: the first cte should be 7bit, too...that's a generator bug.
+        # XXX: the line length in the body also looks like a generator bug.
+        self.assertEqual(m.as_string(maxheaderlen=self.policy.max_line_length),
+                         textwrap.dedent("""\
+            Subject: Escape report
+            Content-Type: message/rfc822
+            Content-Transfer-Encoding: 8bit
+
+            To: police@monty.org
+            From: victim@monty.org
+            Subject: Help
+            Content-Type: text/plain; charset="utf-8"
+            MIME-Version: 1.0
+            Content-Transfer-Encoding: base64
+
+            aidhaSB1biBwcm9ibMOobWUgZGUgcHl0aG9uLiBpbCBlc3Qgc29ydGkgZGUgc29uIHZpdmFyaXVt
+            Lgo=
+            """))
+        self.assertIsInstance(m.get_content(), self.message)
+        self.assertEqual(str(m.get_content()), str(content))
+
+    def test_set_message_invalid_cte_raises(self):
+        m = self._make_message()
+        content = self._make_message()
+        for cte in 'quoted-printable base64'.split():
+            for subtype in 'rfc822 external-body'.split():
+                with self.subTest(cte=cte, subtype=subtype):
+                    with self.assertRaises(ValueError) as ar:
+                        m.set_content(content, subtype, cte=cte)
+                    exc = str(ar.exception)
+                    self.assertIn(cte, exc)
+                    self.assertIn(subtype, exc)
+        subtype = 'external-body'
+        for cte in '8bit binary'.split():
+            with self.subTest(cte=cte, subtype=subtype):
+                with self.assertRaises(ValueError) as ar:
+                    m.set_content(content, subtype, cte=cte)
+                exc = str(ar.exception)
+                self.assertIn(cte, exc)
+                self.assertIn(subtype, exc)
+
+    def test_set_image_jpg(self):
+        for content in (b"bogus content",
+                        bytearray(b"bogus content"),
+                        memoryview(b"bogus content")):
+            with self.subTest(content=content):
+                m = self._make_message()
+                raw_data_manager.set_content(m, content, 'image', 'jpeg')
+                self.assertEqual(str(m), textwrap.dedent("""\
+                    Content-Type: image/jpeg
+                    Content-Transfer-Encoding: base64
+
+                    Ym9ndXMgY29udGVudA==
+                    """))
+                self.assertEqual(m.get_payload(decode=True), content)
+                self.assertEqual(m.get_content(), content)
+
+    def test_set_audio_aif_with_quoted_printable_cte(self):
+        # Why you would use qp, I don't know, but it is technically supported.
+        # XXX: the incorrect line length is because binascii.b2a_qp doesn't
+        # support a line length parameter, but we must use it to get newline
+        # encoding.
+        # XXX: what about that lack of tailing newline?  Do we actually handle
+        # that correctly in all cases?  That is, if the *source* has an
+        # unencoded newline, do we add an extra newline to the returned payload
+        # or not?  And can that actually be disambiguated based on the RFC?
+        m = self._make_message()
+        content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
+        m.set_content(content, 'audio', 'aif', cte='quoted-printable')
+        self.assertEqual(bytes(m), textwrap.dedent("""\
+            Content-Type: audio/aif
+            Content-Transfer-Encoding: quoted-printable
+            MIME-Version: 1.0
+
+            b=FFgus=09con=0At=0Dent=20zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz=
+            zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz""").encode('latin-1'))
+        self.assertEqual(m.get_payload(decode=True), content)
+        self.assertEqual(m.get_content(), content)
+
+    def test_set_video_mpeg_with_binary_cte(self):
+        m = self._make_message()
+        content = b'b\xFFgus\tcon\nt\rent ' + b'z'*100
+        m.set_content(content, 'video', 'mpeg', cte='binary')
+        self.assertEqual(bytes(m), textwrap.dedent("""\
+            Content-Type: video/mpeg
+            Content-Transfer-Encoding: binary
+            MIME-Version: 1.0
+
+            """).encode('ascii') +
+            # XXX: the second \n ought to be a \r, but generator gets it wrong.
+            # THIS MEANS WE DON'T ACTUALLY SUPPORT THE 'binary' CTE.
+            b'b\xFFgus\tcon\nt\nent zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz' +
+            b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz')
+        self.assertEqual(m.get_payload(decode=True), content)
+        self.assertEqual(m.get_content(), content)
+
+    def test_set_application_octet_stream_with_8bit_cte(self):
+        # In 8bit mode, univeral line end logic applies.  It is up to the
+        # application to make sure the lines are short enough; we don't check.
+        m = self._make_message()
+        content = b'b\xFFgus\tcon\nt\rent\n' + b'z'*60 + b'\n'
+        m.set_content(content, 'application', 'octet-stream', cte='8bit')
+        self.assertEqual(bytes(m), textwrap.dedent("""\
+            Content-Type: application/octet-stream
+            Content-Transfer-Encoding: 8bit
+            MIME-Version: 1.0
+
+            """).encode('ascii') +
+            b'b\xFFgus\tcon\nt\nent\n' +
+            b'zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz\n')
+        self.assertEqual(m.get_payload(decode=True), content)
+        self.assertEqual(m.get_content(), content)
+
+    def test_set_headers_from_header_objects(self):
+        m = self._make_message()
+        content = "Simple message.\n"
+        header_factory = self.policy.header_factory
+        raw_data_manager.set_content(m, content, headers=(
+            header_factory("To", "foo@example.com"),
+            header_factory("From", "foo@example.com"),
+            header_factory("Subject", "I'm talking to myself.")))
+        self.assertEqual(str(m), textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf-8"
+            To: foo@example.com
+            From: foo@example.com
+            Subject: I'm talking to myself.
+            Content-Transfer-Encoding: 7bit
+
+            Simple message.
+            """))
+
+    def test_set_headers_from_strings(self):
+        m = self._make_message()
+        content = "Simple message.\n"
+        raw_data_manager.set_content(m, content, headers=(
+            "X-Foo-Header: foo",
+            "X-Bar-Header: bar",))
+        self.assertEqual(str(m), textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf-8"
+            X-Foo-Header: foo
+            X-Bar-Header: bar
+            Content-Transfer-Encoding: 7bit
+
+            Simple message.
+            """))
+
+    def test_set_headers_with_invalid_duplicate_string_header_raises(self):
+        m = self._make_message()
+        content = "Simple message.\n"
+        with self.assertRaisesRegex(ValueError, 'Content-Type'):
+            raw_data_manager.set_content(m, content, headers=(
+                "Content-Type: foo/bar",)
+                )
+
+    def test_set_headers_with_invalid_duplicate_header_header_raises(self):
+        m = self._make_message()
+        content = "Simple message.\n"
+        header_factory = self.policy.header_factory
+        with self.assertRaisesRegex(ValueError, 'Content-Type'):
+            raw_data_manager.set_content(m, content, headers=(
+                header_factory("Content-Type", " foo/bar"),)
+                )
+
+    def test_set_headers_with_defective_string_header_raises(self):
+        m = self._make_message()
+        content = "Simple message.\n"
+        with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
+            raw_data_manager.set_content(m, content, headers=(
+                'To: a@fairly@@invalid@address',)
+                )
+            print(m['To'].defects)
+
+    def test_set_headers_with_defective_header_header_raises(self):
+        m = self._make_message()
+        content = "Simple message.\n"
+        header_factory = self.policy.header_factory
+        with self.assertRaisesRegex(ValueError, 'a@fairly@@invalid@address'):
+            raw_data_manager.set_content(m, content, headers=(
+                header_factory('To', 'a@fairly@@invalid@address'),)
+                )
+            print(m['To'].defects)
+
+    def test_set_disposition_inline(self):
+        m = self._make_message()
+        m.set_content('foo', disposition='inline')
+        self.assertEqual(m['Content-Disposition'], 'inline')
+
+    def test_set_disposition_attachment(self):
+        m = self._make_message()
+        m.set_content('foo', disposition='attachment')
+        self.assertEqual(m['Content-Disposition'], 'attachment')
+
+    def test_set_disposition_foo(self):
+        m = self._make_message()
+        m.set_content('foo', disposition='foo')
+        self.assertEqual(m['Content-Disposition'], 'foo')
+
+    # XXX: we should have a 'strict' policy mode (beyond raise_on_defect) that
+    # would cause 'foo' above to raise.
+
+    def test_set_filename(self):
+        m = self._make_message()
+        m.set_content('foo', filename='bar.txt')
+        self.assertEqual(m['Content-Disposition'],
+                         'attachment; filename="bar.txt"')
+
+    def test_set_filename_and_disposition_inline(self):
+        m = self._make_message()
+        m.set_content('foo', disposition='inline', filename='bar.txt')
+        self.assertEqual(m['Content-Disposition'], 'inline; filename="bar.txt"')
+
+    def test_set_non_ascii_filename(self):
+        m = self._make_message()
+        m.set_content('foo', filename='ábárî.txt')
+        self.assertEqual(bytes(m), textwrap.dedent("""\
+            Content-Type: text/plain; charset="utf-8"
+            Content-Transfer-Encoding: 7bit
+            Content-Disposition: attachment;
+             filename*=utf-8''%C3%A1b%C3%A1r%C3%AE.txt
+            MIME-Version: 1.0
+
+            foo
+            """).encode('ascii'))
+
+    content_object_params = {
+        'text_plain': ('content', ()),
+        'text_html': ('content', ('html',)),
+        'application_octet_stream': (b'content',
+                                     ('application', 'octet_stream')),
+        'image_jpeg': (b'content', ('image', 'jpeg')),
+        'message_rfc822': (message(), ()),
+        'message_external_body': (message(), ('external-body',)),
+        }
+
+    def content_object_as_header_receiver(self, obj, mimetype):
+        m = self._make_message()
+        m.set_content(obj, *mimetype, headers=(
+            'To: foo@example.com',
+            'From: bar@simple.net'))
+        self.assertEqual(m['to'], 'foo@example.com')
+        self.assertEqual(m['from'], 'bar@simple.net')
+
+    def content_object_as_disposition_inline_receiver(self, obj, mimetype):
+        m = self._make_message()
+        m.set_content(obj, *mimetype, disposition='inline')
+        self.assertEqual(m['Content-Disposition'], 'inline')
+
+    def content_object_as_non_ascii_filename_receiver(self, obj, mimetype):
+        m = self._make_message()
+        m.set_content(obj, *mimetype, disposition='inline', filename='bár.txt')
+        self.assertEqual(m['Content-Disposition'], 'inline; filename="bár.txt"')
+        self.assertEqual(m.get_filename(), "bár.txt")
+        self.assertEqual(m['Content-Disposition'].params['filename'], "bár.txt")
+
+    def content_object_as_cid_receiver(self, obj, mimetype):
+        m = self._make_message()
+        m.set_content(obj, *mimetype, cid='some_random_stuff')
+        self.assertEqual(m['Content-ID'], 'some_random_stuff')
+
+    def content_object_as_params_receiver(self, obj, mimetype):
+        m = self._make_message()
+        params = {'foo': 'bár', 'abc': 'xyz'}
+        m.set_content(obj, *mimetype, params=params)
+        if isinstance(obj, str):
+            params['charset'] = 'utf-8'
+        self.assertEqual(m['Content-Type'].params, params)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/Lib/test/test_email/test_headerregistry.py b/Lib/test/test_email/test_headerregistry.py
index f829f83..f2df372 100644
--- a/Lib/test/test_email/test_headerregistry.py
+++ b/Lib/test/test_email/test_headerregistry.py
@@ -661,7 +661,7 @@
             'text/plain; name="ascii_is_the_default"'),
 
         'rfc2231_bad_character_in_charset_parameter_value': (
-            "text/plain; charset*=ascii''utf-8%E2%80%9D",
+            "text/plain; charset*=ascii''utf-8%F1%F2%F3",
             'text/plain',
             'text',
             'plain',
@@ -669,6 +669,18 @@
             [errors.UndecodableBytesDefect],
             'text/plain; charset="utf-8\uFFFD\uFFFD\uFFFD"'),
 
+        'rfc2231_utf_8_in_supposedly_ascii_charset_parameter_value': (
+            "text/plain; charset*=ascii''utf-8%E2%80%9D",
+            'text/plain',
+            'text',
+            'plain',
+            {'charset': 'utf-8”'},
+            [errors.UndecodableBytesDefect],
+            'text/plain; charset="utf-8”"',
+            ),
+            # XXX: if the above were *re*folded, it would get tagged as utf-8
+            # instead of ascii in the param, since it now contains non-ASCII.
+
         'rfc2231_encoded_then_unencoded_segments': (
             ('application/x-foo;'
                 '\tname*0*="us-ascii\'en-us\'My";'
diff --git a/Lib/test/test_email/test_message.py b/Lib/test/test_email/test_message.py
index 8cc3f80..e0ebb8a 100644
--- a/Lib/test/test_email/test_message.py
+++ b/Lib/test/test_email/test_message.py
@@ -1,6 +1,13 @@
 import unittest
+import textwrap
 from email import policy
-from test.test_email import TestEmailBase
+from email.message import EmailMessage, MIMEPart
+from test.test_email import TestEmailBase, parameterize
+
+
+# Helper.
+def first(iterable):
+    return next(filter(lambda x: x is not None, iterable), None)
 
 
 class Test(TestEmailBase):
@@ -14,5 +21,738 @@
             m['To'] = 'xyz@abc'
 
 
+@parameterize
+class TestEmailMessageBase:
+
+    policy = policy.default
+
+    # The first argument is a triple (related, html, plain) of indices into the
+    # list returned by 'walk' called on a Message constructed from the third.
+    # The indices indicate which part should match the corresponding part-type
+    # when passed to get_body (ie: the "first" part of that type in the
+    # message).  The second argument is a list of indices into the 'walk' list
+    # of the attachments that should be returned by a call to
+    # 'iter_attachments'.  The third argument is a list of indices into 'walk'
+    # that should be returned by a call to 'iter_parts'.  Note that the first
+    # item returned by 'walk' is the Message itself.
+
+    message_params = {
+
+        'empty_message': (
+            (None, None, 0),
+            (),
+            (),
+            ""),
+
+        'non_mime_plain': (
+            (None, None, 0),
+            (),
+            (),
+            textwrap.dedent("""\
+                To: foo@example.com
+
+                simple text body
+                """)),
+
+        'mime_non_text': (
+            (None, None, None),
+            (),
+            (),
+            textwrap.dedent("""\
+                To: foo@example.com
+                MIME-Version: 1.0
+                Content-Type: image/jpg
+
+                bogus body.
+                """)),
+
+        'plain_html_alternative': (
+            (None, 2, 1),
+            (),
+            (1, 2),
+            textwrap.dedent("""\
+                To: foo@example.com
+                MIME-Version: 1.0
+                Content-Type: multipart/alternative; boundary="==="
+
+                preamble
+
+                --===
+                Content-Type: text/plain
+
+                simple body
+
+                --===
+                Content-Type: text/html
+
+                <p>simple body</p>
+                --===--
+                """)),
+
+        'plain_html_mixed': (
+            (None, 2, 1),
+            (),
+            (1, 2),
+            textwrap.dedent("""\
+                To: foo@example.com
+                MIME-Version: 1.0
+                Content-Type: multipart/mixed; boundary="==="
+
+                preamble
+
+                --===
+                Content-Type: text/plain
+
+                simple body
+
+                --===
+                Content-Type: text/html
+
+                <p>simple body</p>
+
+                --===--
+                """)),
+
+        'plain_html_attachment_mixed': (
+            (None, None, 1),
+            (2,),
+            (1, 2),
+            textwrap.dedent("""\
+                To: foo@example.com
+                MIME-Version: 1.0
+                Content-Type: multipart/mixed; boundary="==="
+
+                --===
+                Content-Type: text/plain
+
+                simple body
+
+                --===
+                Content-Type: text/html
+                Content-Disposition: attachment
+
+                <p>simple body</p>
+
+                --===--
+                """)),
+
+        'html_text_attachment_mixed': (
+            (None, 2, None),
+            (1,),
+            (1, 2),
+            textwrap.dedent("""\
+                To: foo@example.com
+                MIME-Version: 1.0
+                Content-Type: multipart/mixed; boundary="==="
+
+                --===
+                Content-Type: text/plain
+                Content-Disposition: AtTaChment
+
+                simple body
+
+                --===
+                Content-Type: text/html
+
+                <p>simple body</p>
+
+                --===--
+                """)),
+
+        'html_text_attachment_inline_mixed': (
+            (None, 2, 1),
+            (),
+            (1, 2),
+            textwrap.dedent("""\
+                To: foo@example.com
+                MIME-Version: 1.0
+                Content-Type: multipart/mixed; boundary="==="
+
+                --===
+                Content-Type: text/plain
+                Content-Disposition: InLine
+
+                simple body
+
+                --===
+                Content-Type: text/html
+                Content-Disposition: inline
+
+                <p>simple body</p>
+
+                --===--
+                """)),
+
+        # RFC 2387
+        'related': (
+            (0, 1, None),
+            (2,),
+            (1, 2),
+            textwrap.dedent("""\
+                To: foo@example.com
+                MIME-Version: 1.0
+                Content-Type: multipart/related; boundary="==="; type=text/html
+
+                --===
+                Content-Type: text/html
+
+                <p>simple body</p>
+
+                --===
+                Content-Type: image/jpg
+                Content-ID: <image1>
+
+                bogus data
+
+                --===--
+                """)),
+
+        # This message structure will probably never be seen in the wild, but
+        # it proves we distinguish between text parts based on 'start'.  The
+        # content would not, of course, actually work :)
+        'related_with_start': (
+            (0, 2, None),
+            (1,),
+            (1, 2),
+            textwrap.dedent("""\
+                To: foo@example.com
+                MIME-Version: 1.0
+                Content-Type: multipart/related; boundary="==="; type=text/html;
+                 start="<body>"
+
+                --===
+                Content-Type: text/html
+                Content-ID: <include>
+
+                useless text
+
+                --===
+                Content-Type: text/html
+                Content-ID: <body>
+
+                <p>simple body</p>
+                <!--#include file="<include>"-->
+
+                --===--
+                """)),
+
+
+        'mixed_alternative_plain_related': (
+            (3, 4, 2),
+            (6, 7),
+            (1, 6, 7),
+            textwrap.dedent("""\
+                To: foo@example.com
+                MIME-Version: 1.0
+                Content-Type: multipart/mixed; boundary="==="
+
+                --===
+                Content-Type: multipart/alternative; boundary="+++"
+
+                --+++
+                Content-Type: text/plain
+
+                simple body
+
+                --+++
+                Content-Type: multipart/related; boundary="___"
+
+                --___
+                Content-Type: text/html
+
+                <p>simple body</p>
+
+                --___
+                Content-Type: image/jpg
+                Content-ID: <image1@cid>
+
+                bogus jpg body
+
+                --___--
+
+                --+++--
+
+                --===
+                Content-Type: image/jpg
+                Content-Disposition: attachment
+
+                bogus jpg body
+
+                --===
+                Content-Type: image/jpg
+                Content-Disposition: AttacHmenT
+
+                another bogus jpg body
+
+                --===--
+                """)),
+
+        # This structure suggested by Stephen J. Turnbull...may not exist/be
+        # supported in the wild, but we want to support it.
+        'mixed_related_alternative_plain_html': (
+            (1, 4, 3),
+            (6, 7),
+            (1, 6, 7),
+            textwrap.dedent("""\
+                To: foo@example.com
+                MIME-Version: 1.0
+                Content-Type: multipart/mixed; boundary="==="
+
+                --===
+                Content-Type: multipart/related; boundary="+++"
+
+                --+++
+                Content-Type: multipart/alternative; boundary="___"
+
+                --___
+                Content-Type: text/plain
+
+                simple body
+
+                --___
+                Content-Type: text/html
+
+                <p>simple body</p>
+
+                --___--
+
+                --+++
+                Content-Type: image/jpg
+                Content-ID: <image1@cid>
+
+                bogus jpg body
+
+                --+++--
+
+                --===
+                Content-Type: image/jpg
+                Content-Disposition: attachment
+
+                bogus jpg body
+
+                --===
+                Content-Type: image/jpg
+                Content-Disposition: attachment
+
+                another bogus jpg body
+
+                --===--
+                """)),
+
+        # Same thing, but proving we only look at the root part, which is the
+        # first one if there isn't any start parameter.  That is, this is a
+        # broken related.
+        'mixed_related_alternative_plain_html_wrong_order': (
+            (1, None, None),
+            (6, 7),
+            (1, 6, 7),
+            textwrap.dedent("""\
+                To: foo@example.com
+                MIME-Version: 1.0
+                Content-Type: multipart/mixed; boundary="==="
+
+                --===
+                Content-Type: multipart/related; boundary="+++"
+
+                --+++
+                Content-Type: image/jpg
+                Content-ID: <image1@cid>
+
+                bogus jpg body
+
+                --+++
+                Content-Type: multipart/alternative; boundary="___"
+
+                --___
+                Content-Type: text/plain
+
+                simple body
+
+                --___
+                Content-Type: text/html
+
+                <p>simple body</p>
+
+                --___--
+
+                --+++--
+
+                --===
+                Content-Type: image/jpg
+                Content-Disposition: attachment
+
+                bogus jpg body
+
+                --===
+                Content-Type: image/jpg
+                Content-Disposition: attachment
+
+                another bogus jpg body
+
+                --===--
+                """)),
+
+        'message_rfc822': (
+            (None, None, None),
+            (),
+            (),
+            textwrap.dedent("""\
+                To: foo@example.com
+                MIME-Version: 1.0
+                Content-Type: message/rfc822
+
+                To: bar@example.com
+                From: robot@examp.com
+
+                this is a message body.
+                """)),
+
+        'mixed_text_message_rfc822': (
+            (None, None, 1),
+            (2,),
+            (1, 2),
+            textwrap.dedent("""\
+                To: foo@example.com
+                MIME-Version: 1.0
+                Content-Type: multipart/mixed; boundary="==="
+
+                --===
+                Content-Type: text/plain
+
+                Your message has bounced, ser.
+
+                --===
+                Content-Type: message/rfc822
+
+                To: bar@example.com
+                From: robot@examp.com
+
+                this is a message body.
+
+                --===--
+                """)),
+
+         }
+
+    def message_as_get_body(self, body_parts, attachments, parts, msg):
+        m = self._str_msg(msg)
+        allparts = list(m.walk())
+        expected = [None if n is None else allparts[n] for n in body_parts]
+        related = 0; html = 1; plain = 2
+        self.assertEqual(m.get_body(), first(expected))
+        self.assertEqual(m.get_body(preferencelist=(
+                                        'related', 'html', 'plain')),
+                         first(expected))
+        self.assertEqual(m.get_body(preferencelist=('related', 'html')),
+                         first(expected[related:html+1]))
+        self.assertEqual(m.get_body(preferencelist=('related', 'plain')),
+                         first([expected[related], expected[plain]]))
+        self.assertEqual(m.get_body(preferencelist=('html', 'plain')),
+                         first(expected[html:plain+1]))
+        self.assertEqual(m.get_body(preferencelist=['related']),
+                         expected[related])
+        self.assertEqual(m.get_body(preferencelist=['html']), expected[html])
+        self.assertEqual(m.get_body(preferencelist=['plain']), expected[plain])
+        self.assertEqual(m.get_body(preferencelist=('plain', 'html')),
+                         first(expected[plain:html-1:-1]))
+        self.assertEqual(m.get_body(preferencelist=('plain', 'related')),
+                         first([expected[plain], expected[related]]))
+        self.assertEqual(m.get_body(preferencelist=('html', 'related')),
+                         first(expected[html::-1]))
+        self.assertEqual(m.get_body(preferencelist=('plain', 'html', 'related')),
+                         first(expected[::-1]))
+        self.assertEqual(m.get_body(preferencelist=('html', 'plain', 'related')),
+                         first([expected[html],
+                                expected[plain],
+                                expected[related]]))
+
+    def message_as_iter_attachment(self, body_parts, attachments, parts, msg):
+        m = self._str_msg(msg)
+        allparts = list(m.walk())
+        attachments = [allparts[n] for n in attachments]
+        self.assertEqual(list(m.iter_attachments()), attachments)
+
+    def message_as_iter_parts(self, body_parts, attachments, parts, msg):
+        m = self._str_msg(msg)
+        allparts = list(m.walk())
+        parts = [allparts[n] for n in parts]
+        self.assertEqual(list(m.iter_parts()), parts)
+
+    class _TestContentManager:
+        def get_content(self, msg, *args, **kw):
+            return msg, args, kw
+        def set_content(self, msg, *args, **kw):
+            self.msg = msg
+            self.args = args
+            self.kw = kw
+
+    def test_get_content_with_cm(self):
+        m = self._str_msg('')
+        cm = self._TestContentManager()
+        self.assertEqual(m.get_content(content_manager=cm), (m, (), {}))
+        msg, args, kw = m.get_content('foo', content_manager=cm, bar=1, k=2)
+        self.assertEqual(msg, m)
+        self.assertEqual(args, ('foo',))
+        self.assertEqual(kw, dict(bar=1, k=2))
+
+    def test_get_content_default_cm_comes_from_policy(self):
+        p = policy.default.clone(content_manager=self._TestContentManager())
+        m = self._str_msg('', policy=p)
+        self.assertEqual(m.get_content(), (m, (), {}))
+        msg, args, kw = m.get_content('foo', bar=1, k=2)
+        self.assertEqual(msg, m)
+        self.assertEqual(args, ('foo',))
+        self.assertEqual(kw, dict(bar=1, k=2))
+
+    def test_set_content_with_cm(self):
+        m = self._str_msg('')
+        cm = self._TestContentManager()
+        m.set_content(content_manager=cm)
+        self.assertEqual(cm.msg, m)
+        self.assertEqual(cm.args, ())
+        self.assertEqual(cm.kw, {})
+        m.set_content('foo', content_manager=cm, bar=1, k=2)
+        self.assertEqual(cm.msg, m)
+        self.assertEqual(cm.args, ('foo',))
+        self.assertEqual(cm.kw, dict(bar=1, k=2))
+
+    def test_set_content_default_cm_comes_from_policy(self):
+        cm = self._TestContentManager()
+        p = policy.default.clone(content_manager=cm)
+        m = self._str_msg('', policy=p)
+        m.set_content()
+        self.assertEqual(cm.msg, m)
+        self.assertEqual(cm.args, ())
+        self.assertEqual(cm.kw, {})
+        m.set_content('foo', bar=1, k=2)
+        self.assertEqual(cm.msg, m)
+        self.assertEqual(cm.args, ('foo',))
+        self.assertEqual(cm.kw, dict(bar=1, k=2))
+
+    # outcome is whether xxx_method should raise ValueError error when called
+    # on multipart/subtype.  Blank outcome means it depends on xxx (add
+    # succeeds, make raises).  Note: 'none' means there are content-type
+    # headers but payload is None...this happening in practice would be very
+    # unusual, so treating it as if there were content seems reasonable.
+    #    method          subtype           outcome
+    subtype_params = (
+        ('related',      'no_content',     'succeeds'),
+        ('related',      'none',           'succeeds'),
+        ('related',      'plain',          'succeeds'),
+        ('related',      'related',        ''),
+        ('related',      'alternative',    'raises'),
+        ('related',      'mixed',          'raises'),
+        ('alternative',  'no_content',     'succeeds'),
+        ('alternative',  'none',           'succeeds'),
+        ('alternative',  'plain',          'succeeds'),
+        ('alternative',  'related',        'succeeds'),
+        ('alternative',  'alternative',    ''),
+        ('alternative',  'mixed',          'raises'),
+        ('mixed',        'no_content',     'succeeds'),
+        ('mixed',        'none',           'succeeds'),
+        ('mixed',        'plain',          'succeeds'),
+        ('mixed',        'related',        'succeeds'),
+        ('mixed',        'alternative',    'succeeds'),
+        ('mixed',        'mixed',          ''),
+        )
+
+    def _make_subtype_test_message(self, subtype):
+        m = self.message()
+        payload = None
+        msg_headers =  [
+            ('To', 'foo@bar.com'),
+            ('From', 'bar@foo.com'),
+            ]
+        if subtype != 'no_content':
+            ('content-shadow', 'Logrus'),
+        msg_headers.append(('X-Random-Header', 'Corwin'))
+        if subtype == 'text':
+            payload = ''
+            msg_headers.append(('Content-Type', 'text/plain'))
+            m.set_payload('')
+        elif subtype != 'no_content':
+            payload = []
+            msg_headers.append(('Content-Type', 'multipart/' + subtype))
+        msg_headers.append(('X-Trump', 'Random'))
+        m.set_payload(payload)
+        for name, value in msg_headers:
+            m[name] = value
+        return m, msg_headers, payload
+
+    def _check_disallowed_subtype_raises(self, m, method_name, subtype, method):
+        with self.assertRaises(ValueError) as ar:
+            getattr(m, method)()
+        exc_text = str(ar.exception)
+        self.assertIn(subtype, exc_text)
+        self.assertIn(method_name, exc_text)
+
+    def _check_make_multipart(self, m, msg_headers, payload):
+        count = 0
+        for name, value in msg_headers:
+            if not name.lower().startswith('content-'):
+                self.assertEqual(m[name], value)
+                count += 1
+        self.assertEqual(len(m), count+1) # +1 for new Content-Type
+        part = next(m.iter_parts())
+        count = 0
+        for name, value in msg_headers:
+            if name.lower().startswith('content-'):
+                self.assertEqual(part[name], value)
+                count += 1
+        self.assertEqual(len(part), count)
+        self.assertEqual(part.get_payload(), payload)
+
+    def subtype_as_make(self, method, subtype, outcome):
+        m, msg_headers, payload = self._make_subtype_test_message(subtype)
+        make_method = 'make_' + method
+        if outcome in ('', 'raises'):
+            self._check_disallowed_subtype_raises(m, method, subtype, make_method)
+            return
+        getattr(m, make_method)()
+        self.assertEqual(m.get_content_maintype(), 'multipart')
+        self.assertEqual(m.get_content_subtype(), method)
+        if subtype == 'no_content':
+            self.assertEqual(len(m.get_payload()), 0)
+            self.assertEqual(m.items(),
+                             msg_headers + [('Content-Type',
+                                             'multipart/'+method)])
+        else:
+            self.assertEqual(len(m.get_payload()), 1)
+            self._check_make_multipart(m, msg_headers, payload)
+
+    def subtype_as_make_with_boundary(self, method, subtype, outcome):
+        # Doing all variation is a bit of overkill...
+        m = self.message()
+        if outcome in ('', 'raises'):
+            m['Content-Type'] = 'multipart/' + subtype
+            with self.assertRaises(ValueError) as cm:
+                getattr(m, 'make_' + method)()
+            return
+        if subtype == 'plain':
+            m['Content-Type'] = 'text/plain'
+        elif subtype != 'no_content':
+            m['Content-Type'] = 'multipart/' + subtype
+        getattr(m, 'make_' + method)(boundary="abc")
+        self.assertTrue(m.is_multipart())
+        self.assertEqual(m.get_boundary(), 'abc')
+
+    def test_policy_on_part_made_by_make_comes_from_message(self):
+        for method in ('make_related', 'make_alternative', 'make_mixed'):
+            m = self.message(policy=self.policy.clone(content_manager='foo'))
+            m['Content-Type'] = 'text/plain'
+            getattr(m, method)()
+            self.assertEqual(m.get_payload(0).policy.content_manager, 'foo')
+
+    class _TestSetContentManager:
+        def set_content(self, msg, content, *args, **kw):
+            msg['Content-Type'] = 'text/plain'
+            msg.set_payload(content)
+
+    def subtype_as_add(self, method, subtype, outcome):
+        m, msg_headers, payload = self._make_subtype_test_message(subtype)
+        cm = self._TestSetContentManager()
+        add_method = 'add_attachment' if method=='mixed' else 'add_' + method
+        if outcome == 'raises':
+            self._check_disallowed_subtype_raises(m, method, subtype, add_method)
+            return
+        getattr(m, add_method)('test', content_manager=cm)
+        self.assertEqual(m.get_content_maintype(), 'multipart')
+        self.assertEqual(m.get_content_subtype(), method)
+        if method == subtype or subtype == 'no_content':
+            self.assertEqual(len(m.get_payload()), 1)
+            for name, value in msg_headers:
+                self.assertEqual(m[name], value)
+            part = m.get_payload()[0]
+        else:
+            self.assertEqual(len(m.get_payload()), 2)
+            self._check_make_multipart(m, msg_headers, payload)
+            part = m.get_payload()[1]
+        self.assertEqual(part.get_content_type(), 'text/plain')
+        self.assertEqual(part.get_payload(), 'test')
+        if method=='mixed':
+            self.assertEqual(part['Content-Disposition'], 'attachment')
+        elif method=='related':
+            self.assertEqual(part['Content-Disposition'], 'inline')
+        else:
+            # Otherwise we don't guess.
+            self.assertIsNone(part['Content-Disposition'])
+
+    class _TestSetRaisingContentManager:
+        def set_content(self, msg, content, *args, **kw):
+            raise Exception('test')
+
+    def test_default_content_manager_for_add_comes_from_policy(self):
+        cm = self._TestSetRaisingContentManager()
+        m = self.message(policy=self.policy.clone(content_manager=cm))
+        for method in ('add_related', 'add_alternative', 'add_attachment'):
+            with self.assertRaises(Exception) as ar:
+                getattr(m, method)('')
+            self.assertEqual(str(ar.exception), 'test')
+
+    def message_as_clear(self, body_parts, attachments, parts, msg):
+        m = self._str_msg(msg)
+        m.clear()
+        self.assertEqual(len(m), 0)
+        self.assertEqual(list(m.items()), [])
+        self.assertIsNone(m.get_payload())
+        self.assertEqual(list(m.iter_parts()), [])
+
+    def message_as_clear_content(self, body_parts, attachments, parts, msg):
+        m = self._str_msg(msg)
+        expected_headers = [h for h in m.keys()
+                            if not h.lower().startswith('content-')]
+        m.clear_content()
+        self.assertEqual(list(m.keys()), expected_headers)
+        self.assertIsNone(m.get_payload())
+        self.assertEqual(list(m.iter_parts()), [])
+
+    def test_is_attachment(self):
+        m = self._make_message()
+        self.assertFalse(m.is_attachment)
+        m['Content-Disposition'] = 'inline'
+        self.assertFalse(m.is_attachment)
+        m.replace_header('Content-Disposition', 'attachment')
+        self.assertTrue(m.is_attachment)
+        m.replace_header('Content-Disposition', 'AtTachMent')
+        self.assertTrue(m.is_attachment)
+
+
+
+class TestEmailMessage(TestEmailMessageBase, TestEmailBase):
+    message = EmailMessage
+
+    def test_set_content_adds_MIME_Version(self):
+        m = self._str_msg('')
+        cm = self._TestContentManager()
+        self.assertNotIn('MIME-Version', m)
+        m.set_content(content_manager=cm)
+        self.assertEqual(m['MIME-Version'], '1.0')
+
+    class _MIME_Version_adding_CM:
+        def set_content(self, msg, *args, **kw):
+            msg['MIME-Version'] = '1.0'
+
+    def test_set_content_does_not_duplicate_MIME_Version(self):
+        m = self._str_msg('')
+        cm = self._MIME_Version_adding_CM()
+        self.assertNotIn('MIME-Version', m)
+        m.set_content(content_manager=cm)
+        self.assertEqual(m['MIME-Version'], '1.0')
+
+
+class TestMIMEPart(TestEmailMessageBase, TestEmailBase):
+    # Doing the full test run here may seem a bit redundant, since the two
+    # classes are almost identical.  But what if they drift apart?  So we do
+    # the full tests so that any future drift doesn't introduce bugs.
+    message = MIMEPart
+
+    def test_set_content_does_not_add_MIME_Version(self):
+        m = self._str_msg('')
+        cm = self._TestContentManager()
+        self.assertNotIn('MIME-Version', m)
+        m.set_content(content_manager=cm)
+        self.assertNotIn('MIME-Version', m)
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/Lib/test/test_email/test_policy.py b/Lib/test/test_email/test_policy.py
index 983bd49..06ad5f2 100644
--- a/Lib/test/test_email/test_policy.py
+++ b/Lib/test/test_email/test_policy.py
@@ -30,6 +30,7 @@
         'raise_on_defect':          False,
         'header_factory':           email.policy.EmailPolicy.header_factory,
         'refold_source':            'long',
+        'content_manager':          email.policy.EmailPolicy.content_manager,
         })
 
     # For each policy under test, we give here what we expect the defaults to
diff --git a/Misc/NEWS b/Misc/NEWS
index 8bda593..ddca1f5 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -42,6 +42,9 @@
 Library
 -------
 
+- Issue #18891: Completed the new email package (provisional) API additions
+  by adding new classes EmailMessage, MIMEPart, and ContentManager.
+
 - Issue #18468: The re.split, re.findall, and re.sub functions and the group()
   and groups() methods of match object now always return a string or a bytes
   object.
