Factor the warning code into a shared helper. Remove some of the unnecessary repetition from the tests.
diff --git a/OpenSSL/SSL.py b/OpenSSL/SSL.py
index 01d654f..dc4ed39 100644
--- a/OpenSSL/SSL.py
+++ b/OpenSSL/SSL.py
@@ -1,4 +1,3 @@
-from warnings import warn
from sys import platform
from functools import wraps, partial
from itertools import count
@@ -7,13 +6,13 @@
from six import text_type as _text_type
from six import integer_types as integer_types
-from six import PY2 as _PY2
from OpenSSL._util import (
ffi as _ffi,
lib as _lib,
exception_from_error_queue as _exception_from_error_queue,
- native as _native)
+ native as _native,
+ warn_text as _warn_text)
from OpenSSL.crypto import (
FILETYPE_PEM, _PassphraseHelper, PKey, X509Name, X509, X509Store)
@@ -312,14 +311,7 @@
:param capath: In which directory we can find the certificates
:return: None
"""
-
- # Backward compatibility
- if isinstance(cafile, _text_type):
- if _PY2:
- warn("unicode in cafile is no longer accepted, use bytes", DeprecationWarning)
- else:
- warn("str in cafile is no longer accepted, use bytes", DeprecationWarning)
- cafile = cafile.encode('utf-8')
+ cafile = _warn_text("cafile", cafile)
if cafile is None:
cafile = _ffi.NULL
@@ -981,14 +973,8 @@
API, the value is ignored
:return: The number of bytes written
"""
-
# Backward compatibility
- if isinstance(buf, _text_type):
- if _PY2:
- warn("unicode in buf is no longer accepted, use bytes", DeprecationWarning)
- else:
- warn("str in buf is no longer accepted, use bytes", DeprecationWarning)
- buf = buf.encode('utf-8')
+ buf = _warn_text("buf", buf)
if isinstance(buf, _memoryview):
buf = buf.tobytes()
@@ -1014,14 +1000,7 @@
API, the value is ignored
:return: The number of bytes written
"""
-
- # Backward compatibility
- if isinstance(buf, _text_type):
- if _PY2:
- warn("unicode in buf is no longer accepted, use bytes", DeprecationWarning)
- else:
- warn("str in buf is no longer accepted, use bytes", DeprecationWarning)
- buf = buf.encode('utf-8')
+ buf = _warn_text("buf", buf)
if isinstance(buf, _memoryview):
buf = buf.tobytes()
@@ -1108,14 +1087,7 @@
:param buf: The string to put into the memory BIO.
:return: The number of bytes written
"""
-
- # Backward compatibility
- if isinstance(buf, _text_type):
- if _PY2:
- warn("unicode in buf is no longer accepted, use bytes", DeprecationWarning)
- else:
- warn("str in buf is no longer accepted, use bytes", DeprecationWarning)
- buf = buf.encode("ascii")
+ buf = _warn_text("buf", buf)
if self._into_ssl is None:
raise TypeError("Connection sock was not None")
diff --git a/OpenSSL/_util.py b/OpenSSL/_util.py
index baeecc6..da8270b 100644
--- a/OpenSSL/_util.py
+++ b/OpenSSL/_util.py
@@ -1,3 +1,5 @@
+from warnings import warn
+
from six import PY3, binary_type, text_type
from cryptography.hazmat.bindings.openssl.binding import Binding
@@ -51,3 +53,17 @@
else:
def byte_string(s):
return s
+
+_TEXT_WARNING = u"{} for {{}} is no longer accepted, use bytes".format(
+ text_type.__name__
+)
+
+def warn_text(label, obj):
+ if isinstance(obj, text_type):
+ warn(
+ _TEXT_WARNING.format(label),
+ category=DeprecationWarning,
+ stacklevel=3
+ )
+ return obj.encode('utf-8')
+ return obj
diff --git a/OpenSSL/crypto.py b/OpenSSL/crypto.py
index cd49c21..52320f3 100644
--- a/OpenSSL/crypto.py
+++ b/OpenSSL/crypto.py
@@ -1,4 +1,3 @@
-from warnings import warn
from time import time
from base64 import b16encode
from functools import partial
@@ -14,7 +13,8 @@
lib as _lib,
exception_from_error_queue as _exception_from_error_queue,
byte_string as _byte_string,
- native as _native)
+ native as _native,
+ warn_text as _warn_text)
FILETYPE_PEM = _lib.SSL_FILETYPE_PEM
FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1
@@ -1953,14 +1953,7 @@
:return: The string containing the PKCS12
"""
-
- # Backward compatibility
- if isinstance(passphrase, _text_type):
- if _PY3:
- warn("str in passphrase is no longer accepted, use bytes", DeprecationWarning)
- else:
- warn("unicode in passphrase is no longer accepted, use bytes", DeprecationWarning)
- passphrase = passphrase.encode('utf-8')
+ passphrase = _warn_text("passphrase", passphrase)
if self._cacerts is None:
cacerts = _ffi.NULL
@@ -2259,14 +2252,7 @@
:param digest: message digest to use
:return: signature
"""
-
- # Backward compatibility
- if isinstance(data, _text_type):
- if _PY3:
- warn("str in data is no longer accepted, use bytes", DeprecationWarning)
- else:
- warn("unicode in data is no longer accepted, use bytes", DeprecationWarning)
- data = data.encode('utf-8')
+ data = _warn_text("data", data)
digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
if digest_obj == _ffi.NULL:
@@ -2302,14 +2288,7 @@
:param digest: message digest to use
:return: None if the signature is correct, raise exception otherwise
"""
-
- # Backward compatibility
- if isinstance(data, _text_type):
- if _PY3:
- warn("str in data is no longer accepted, use bytes", DeprecationWarning)
- else:
- warn("unicode in data is no longer accepted, use bytes", DeprecationWarning)
- data = data.encode('utf-8')
+ data = _warn_text("data", data)
digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
if digest_obj == _ffi.NULL:
@@ -2402,14 +2381,7 @@
:param passphrase: (Optional) The password to decrypt the PKCS12 lump
:returns: The PKCS12 object
"""
-
- # Backward compatibility
- if isinstance(passphrase, _text_type):
- if _PY3:
- warn("str in passphrase is no longer accepted, use bytes", DeprecationWarning)
- else:
- warn("unicode in passphrase is no longer accepted, use bytes", DeprecationWarning)
- passphrase = passphrase.encode('utf-8')
+ passphrase = _warn_text("passphrase", passphrase)
if isinstance(buffer, _text_type):
buffer = buffer.encode("ascii")
diff --git a/OpenSSL/test/test_crypto.py b/OpenSSL/test/test_crypto.py
index ba4e0d5..0773342 100644
--- a/OpenSSL/test/test_crypto.py
+++ b/OpenSSL/test/test_crypto.py
@@ -31,7 +31,9 @@
from OpenSSL.crypto import NetscapeSPKI, NetscapeSPKIType
from OpenSSL.crypto import (
sign, verify, get_elliptic_curve, get_elliptic_curves)
-from OpenSSL.test.util import EqualityTestsMixin, TestCase
+from OpenSSL.test.util import (
+ EqualityTestsMixin, TestCase, WARNING_TYPE_EXPECTED
+)
from OpenSSL._util import native, lib
def normalize_certificate_pem(pem):
@@ -2006,14 +2008,16 @@
b"-passout", b"pass:" + passwd)
with catch_warnings(record=True) as w:
simplefilter("always")
- if not PY3:
- p12 = load_pkcs12(p12_str, passphrase=unicode("whatever"))
- self.assertTrue("unicode in passphrase is no longer accepted, "
- "use bytes" in str(w[-1].message))
- else:
- p12 = load_pkcs12(p12_str, passphrase=b"whatever".decode())
- self.assertTrue("str in passphrase is no longer accepted, "
- "use bytes" in str(w[-1].message))
+ p12 = load_pkcs12(p12_str, passphrase=u"whatever")
+
+ self.assertEqual(
+ u"{} for passphrase is no longer accepted, use bytes".format(
+ WARNING_TYPE_EXPECTED
+ ),
+ str(w[-1].message)
+ )
+ self.assertIs(w[-1].category, DeprecationWarning)
+
self.verify_pkcs12_container(p12)
@@ -2226,14 +2230,14 @@
with catch_warnings(record=True) as w:
simplefilter("always")
- if not PY3:
- dumped_p12 = p12.export(passphrase=unicode('randomtext'))
- self.assertTrue("unicode in passphrase is no longer accepted, "
- "use bytes" in str(w[-1].message))
- else:
- dumped_p12 = p12.export(passphrase=b'randomtext'.decode())
- self.assertTrue("str in passphrase is no longer accepted, "
- "use bytes" in str(w[-1].message))
+ dumped_p12 = p12.export(passphrase=u"randomtext")
+ self.assertEqual(
+ u"{} for passphrase is no longer accepted, use bytes".format(
+ WARNING_TYPE_EXPECTED
+ ),
+ str(w[-1].message)
+ )
+ self.assertIs(w[-1].category, DeprecationWarning)
self.check_recovery(
dumped_p12, key=server_key_pem, cert=server_cert_pem, passwd=b"randomtext")
@@ -3201,20 +3205,13 @@
:py:obj:`sign` generates a cryptographic signature which :py:obj:`verify` can check.
Deprecation warnings raised because using text instead of bytes as content
"""
- if not PY3:
- content = unicode(
- "It was a bright cold day in April, and the clocks were striking "
- "thirteen. Winston Smith, his chin nuzzled into his breast in an "
- "effort to escape the vile wind, slipped quickly through the "
- "glass doors of Victory Mansions, though not quickly enough to "
- "prevent a swirl of gritty dust from entering along with him.")
- else:
- content = b(
- "It was a bright cold day in April, and the clocks were striking "
- "thirteen. Winston Smith, his chin nuzzled into his breast in an "
- "effort to escape the vile wind, slipped quickly through the "
- "glass doors of Victory Mansions, though not quickly enough to "
- "prevent a swirl of gritty dust from entering along with him.").decode()
+ content = (
+ u"It was a bright cold day in April, and the clocks were striking "
+ u"thirteen. Winston Smith, his chin nuzzled into his breast in an "
+ u"effort to escape the vile wind, slipped quickly through the "
+ u"glass doors of Victory Mansions, though not quickly enough to "
+ u"prevent a swirl of gritty dust from entering along with him."
+ )
priv_key = load_privatekey(FILETYPE_PEM, root_key_pem)
cert = load_certificate(FILETYPE_PEM, root_cert_pem)
@@ -3222,21 +3219,26 @@
with catch_warnings(record=True) as w:
simplefilter("always")
sig = sign(priv_key, content, digest)
- if not PY3:
- self.assertTrue("unicode in data is no longer accepted, "
- "use bytes" in str(w[-1].message))
- else:
- self.assertTrue("str in data is no longer accepted, "
- "use bytes" in str(w[-1].message))
+
+ self.assertEqual(
+ u"{} for data is no longer accepted, use bytes".format(
+ WARNING_TYPE_EXPECTED
+ ),
+ str(w[-1].message)
+ )
+ self.assertIs(w[-1].category, DeprecationWarning)
+
with catch_warnings(record=True) as w:
simplefilter("always")
verify(cert, sig, content, digest)
- if not PY3:
- self.assertTrue("unicode in data is no longer accepted, "
- "use bytes" in str(w[-1].message))
- else:
- self.assertTrue("str in data is no longer accepted, "
- "use bytes" in str(w[-1].message))
+
+ self.assertEqual(
+ u"{} for data is no longer accepted, use bytes".format(
+ WARNING_TYPE_EXPECTED
+ ),
+ str(w[-1].message)
+ )
+ self.assertIs(w[-1].category, DeprecationWarning)
def test_sign_nulls(self):
diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py
index 960c109..16bbf39 100644
--- a/OpenSSL/test/test_ssl.py
+++ b/OpenSSL/test/test_ssl.py
@@ -1,3 +1,5 @@
+
+
# Copyright (C) Jean-Paul Calderone
# See LICENSE for details.
@@ -44,7 +46,7 @@
from OpenSSL.SSL import (
Context, ContextType, Session, Connection, ConnectionType, SSLeay_version)
-from OpenSSL.test.util import TestCase, b
+from OpenSSL.test.util import WARNING_TYPE_EXPECTED, TestCase, b
from OpenSSL.test.test_crypto import (
cleartextCertificatePEM, cleartextPrivateKeyPEM)
from OpenSSL.test.test_crypto import (
@@ -920,24 +922,24 @@
def test_load_verify_warning(self):
"""
- :py:obj:`Context.load_verify_locations` accepts a file name and uses the
- certificates within for verification purposes. Raises a warning when
- using a text in cafile.
+ :py:obj:`Context.load_verify_locations` accepts a file name and uses
+ the certificates within for verification purposes. Raises a warning
+ when using a text in cafile.
"""
- cafile = self.mktemp()
- fObj = open(cafile, 'w')
- fObj.write(cleartextCertificatePEM.decode('ascii'))
- fObj.close()
+ cafile = self.mktemp().decode("utf-8")
+ with open(cafile, 'w') as fObj:
+ fObj.write(cleartextCertificatePEM.decode("ascii"))
with catch_warnings(record=True) as w:
simplefilter("always")
- if not PY3:
- self._load_verify_locations_test(unicode(cafile))
- self.assertTrue("unicode in cafile is no longer accepted, use bytes" in str(w[-1].message))
- else:
- self._load_verify_locations_test(cafile.decode())
- self.assertTrue("str in cafile is no longer accepted, use bytes" in str(w[-1].message))
- self.assertTrue(issubclass(w[-1].category, DeprecationWarning))
+ self._load_verify_locations_test(cafile)
+ self.assertEqual(
+ u"{} for cafile is no longer accepted, use bytes".format(
+ WARNING_TYPE_EXPECTED
+ ),
+ str(w[-1].message)
+ )
+ self.assertIs(DeprecationWarning, w[-1].category)
def test_load_verify_invalid_file(self):
@@ -2259,21 +2261,22 @@
def test_text(self):
"""
- When passed a text, :py:obj:`Connection.send` transmits all of it and returns
- the number of bytes sent. It also raises a DeprecationWarning.
+ When passed a text, :py:obj:`Connection.send` transmits all of it and
+ returns the number of bytes sent. It also raises a DeprecationWarning.
"""
server, client = self._loopback()
with catch_warnings(record=True) as w:
simplefilter("always")
- if PY3:
- count = server.send(b'xy'.decode())
- self.assertTrue("str in buf is no longer accepted, use bytes" in str(w[-1].message))
- else:
- count = server.send(unicode('xy'))
- self.assertTrue("unicode in buf is no longer accepted, use bytes" in str(w[-1].message))
- self.assertTrue(issubclass(w[-1].category, DeprecationWarning))
+ count = server.send(u"xy")
+ self.assertEqual(
+ u"{} for buf is no longer accepted, use bytes".format(
+ WARNING_TYPE_EXPECTED
+ ),
+ str(w[-1].message)
+ )
+ self.assertIs(w[-1].category, DeprecationWarning)
self.assertEquals(count, 2)
- self.assertEquals(client.recv(2), b('xy'))
+ self.assertEquals(client.recv(2), b"xy")
try:
memoryview
@@ -2345,14 +2348,15 @@
server, client = self._loopback()
with catch_warnings(record=True) as w:
simplefilter("always")
- if PY3:
- server.sendall(b'x'.decode())
- self.assertTrue("str in buf is no longer accepted, use bytes" in str(w[-1].message))
- else:
- server.sendall(unicode('x'))
- self.assertTrue("unicode in buf is no longer accepted, use bytes" in str(w[-1].message))
- self.assertTrue(issubclass(w[-1].category, DeprecationWarning))
- self.assertEquals(client.recv(1), b('x'))
+ server.sendall(u"x")
+ self.assertEqual(
+ u"{} for buf is no longer accepted, use bytes".format(
+ WARNING_TYPE_EXPECTED
+ ),
+ str(w[-1].message)
+ )
+ self.assertIs(w[-1].category, DeprecationWarning)
+ self.assertEquals(client.recv(1), b"x")
try:
diff --git a/OpenSSL/test/util.py b/OpenSSL/test/util.py
index 4260eb0..b69e538 100644
--- a/OpenSSL/test/util.py
+++ b/OpenSSL/test/util.py
@@ -14,6 +14,8 @@
from unittest import TestCase
import sys
+from six import PY3
+
from OpenSSL._util import exception_from_error_queue
from OpenSSL.crypto import Error
@@ -447,3 +449,10 @@
a = self.anInstance()
b = Delegate()
self.assertEqual(a != b, [b])
+
+
+# The type name expected in warnings about using the wrong string type.
+if PY3:
+ WARNING_TYPE_EXPECTED = "str"
+else:
+ WARNING_TYPE_EXPECTED = "unicode"