More flake8 goodness
Also nicer skipping of tests.
diff --git a/OpenSSL/test/test_ssl.py b/OpenSSL/test/test_ssl.py
index 4a23ef7..f3a3a7c 100644
--- a/OpenSSL/test/test_ssl.py
+++ b/OpenSSL/test/test_ssl.py
@@ -99,6 +99,9 @@
"""
+skip_if_py3 = pytest.mark.skipif(not PY3, reason="Python 2 only")
+
+
def join_bytes_or_unicode(prefix, suffix):
"""
Join two path components of either ``bytes`` or ``unicode``.
@@ -368,13 +371,13 @@
self.assertRaises(TypeError, Context, "")
self.assertRaises(ValueError, Context, 10)
- if not PY3:
- def test_method_long(self):
- """
- On Python 2 :py:class:`Context` accepts values of type
- :py:obj:`long` as well as :py:obj:`int`.
- """
- Context(long(TLSv1_METHOD))
+ @skip_if_py3
+ def test_method_long(self):
+ """
+ On Python 2 :py:class:`Context` accepts values of type
+ :py:obj:`long` as well as :py:obj:`int`.
+ """
+ Context(long(TLSv1_METHOD))
def test_type(self):
"""
@@ -440,13 +443,13 @@
FILETYPE_PEM,
)
- if not PY3:
- def test_use_privatekey_file_long(self):
- """
- On Python 2 :py:obj:`Context.use_privatekey_file` accepts a
- filetype of type :py:obj:`long` as well as :py:obj:`int`.
- """
- self._use_privatekey_file_test(self.mktemp(), long(FILETYPE_PEM))
+ @skip_if_py3
+ def test_use_privatekey_file_long(self):
+ """
+ On Python 2 :py:obj:`Context.use_privatekey_file` accepts a
+ filetype of type :py:obj:`long` as well as :py:obj:`int`.
+ """
+ self._use_privatekey_file_test(self.mktemp(), long(FILETYPE_PEM))
def test_use_certificate_wrong_args(self):
"""
@@ -545,18 +548,18 @@
filename = self.mktemp().decode(getfilesystemencoding()) + NON_ASCII
self._use_certificate_file_test(filename)
- if not PY3:
- def test_use_certificate_file_long(self):
- """
- On Python 2 :py:obj:`Context.use_certificate_file` accepts a
- filetype of type :py:obj:`long` as well as :py:obj:`int`.
- """
- pem_filename = self.mktemp()
- with open(pem_filename, "wb") as pem_file:
- pem_file.write(cleartextCertificatePEM)
+ @skip_if_py3
+ def test_use_certificate_file_long(self):
+ """
+ On Python 2 :py:obj:`Context.use_certificate_file` accepts a
+ filetype of type :py:obj:`long` as well as :py:obj:`int`.
+ """
+ pem_filename = self.mktemp()
+ with open(pem_filename, "wb") as pem_file:
+ pem_file.write(cleartextCertificatePEM)
- ctx = Context(TLSv1_METHOD)
- ctx.use_certificate_file(pem_filename, long(FILETYPE_PEM))
+ ctx = Context(TLSv1_METHOD)
+ ctx.use_certificate_file(pem_filename, long(FILETYPE_PEM))
def test_check_privatekey_valid(self):
"""
@@ -637,15 +640,15 @@
options = context.set_options(OP_NO_SSLv2)
self.assertTrue(OP_NO_SSLv2 & options)
- if not PY3:
- def test_set_options_long(self):
- """
- On Python 2 :py:obj:`Context.set_options` accepts values of type
- :py:obj:`long` as well as :py:obj:`int`.
- """
- context = Context(TLSv1_METHOD)
- options = context.set_options(long(OP_NO_SSLv2))
- self.assertTrue(OP_NO_SSLv2 & options)
+ @skip_if_py3
+ def test_set_options_long(self):
+ """
+ On Python 2 :py:obj:`Context.set_options` accepts values of type
+ :py:obj:`long` as well as :py:obj:`int`.
+ """
+ context = Context(TLSv1_METHOD)
+ options = context.set_options(long(OP_NO_SSLv2))
+ self.assertTrue(OP_NO_SSLv2 & options)
def test_set_mode_wrong_args(self):
"""
@@ -667,15 +670,15 @@
self.assertTrue(
MODE_RELEASE_BUFFERS & context.set_mode(MODE_RELEASE_BUFFERS))
- if not PY3:
- def test_set_mode_long(self):
- """
- On Python 2 :py:obj:`Context.set_mode` accepts values of type
- :py:obj:`long` as well as :py:obj:`int`.
- """
- context = Context(TLSv1_METHOD)
- mode = context.set_mode(long(MODE_RELEASE_BUFFERS))
- self.assertTrue(MODE_RELEASE_BUFFERS & mode)
+ @skip_if_py3
+ def test_set_mode_long(self):
+ """
+ On Python 2 :py:obj:`Context.set_mode` accepts values of type
+ :py:obj:`long` as well as :py:obj:`int`.
+ """
+ context = Context(TLSv1_METHOD)
+ mode = context.set_mode(long(MODE_RELEASE_BUFFERS))
+ self.assertTrue(MODE_RELEASE_BUFFERS & mode)
else:
"MODE_RELEASE_BUFFERS unavailable - OpenSSL version may be too old"
@@ -707,15 +710,15 @@
context.set_timeout(1234)
self.assertEquals(context.get_timeout(), 1234)
- if not PY3:
- def test_timeout_long(self):
- """
- On Python 2 :py:obj:`Context.set_timeout` accepts values of type
- `long` as well as int.
- """
- context = Context(TLSv1_METHOD)
- context.set_timeout(long(1234))
- self.assertEquals(context.get_timeout(), 1234)
+ @skip_if_py3
+ def test_timeout_long(self):
+ """
+ On Python 2 :py:obj:`Context.set_timeout` accepts values of type
+ `long` as well as int.
+ """
+ context = Context(TLSv1_METHOD)
+ context.set_timeout(long(1234))
+ self.assertEquals(context.get_timeout(), 1234)
def test_set_verify_depth_wrong_args(self):
"""
@@ -745,15 +748,15 @@
context.set_verify_depth(11)
self.assertEquals(context.get_verify_depth(), 11)
- if not PY3:
- def test_verify_depth_long(self):
- """
- On Python 2 :py:obj:`Context.set_verify_depth` accepts values of
- type `long` as well as int.
- """
- context = Context(TLSv1_METHOD)
- context.set_verify_depth(long(11))
- self.assertEquals(context.get_verify_depth(), 11)
+ @skip_if_py3
+ def test_verify_depth_long(self):
+ """
+ On Python 2 :py:obj:`Context.set_verify_depth` accepts values of
+ type `long` as well as int.
+ """
+ context = Context(TLSv1_METHOD)
+ context.set_verify_depth(long(11))
+ self.assertEquals(context.get_verify_depth(), 11)
def _write_encrypted_pem(self, passphrase):
"""
@@ -1309,18 +1312,18 @@
self.assertEquals(
context.get_verify_mode(), VERIFY_PEER | VERIFY_CLIENT_ONCE)
- if not PY3:
- def test_set_verify_mode_long(self):
- """
- On Python 2 :py:obj:`Context.set_verify_mode` accepts values of
- type :py:obj:`long` as well as :py:obj:`int`.
- """
- context = Context(TLSv1_METHOD)
- self.assertEquals(context.get_verify_mode(), 0)
- context.set_verify(
- long(VERIFY_PEER | VERIFY_CLIENT_ONCE), lambda *args: None)
- self.assertEquals(
- context.get_verify_mode(), VERIFY_PEER | VERIFY_CLIENT_ONCE)
+ @skip_if_py3
+ def test_set_verify_mode_long(self):
+ """
+ On Python 2 :py:obj:`Context.set_verify_mode` accepts values of
+ type :py:obj:`long` as well as :py:obj:`int`.
+ """
+ context = Context(TLSv1_METHOD)
+ self.assertEquals(context.get_verify_mode(), 0)
+ context.set_verify(
+ long(VERIFY_PEER | VERIFY_CLIENT_ONCE), lambda *args: None)
+ self.assertEquals(
+ context.get_verify_mode(), VERIFY_PEER | VERIFY_CLIENT_ONCE)
def test_load_tmp_dh_wrong_args(self):
"""
@@ -1448,16 +1451,16 @@
self.assertEqual(SESS_CACHE_OFF, off)
self.assertEqual(SESS_CACHE_BOTH, context.get_session_cache_mode())
- if not PY3:
- def test_session_cache_mode_long(self):
- """
- On Python 2 :py:obj:`Context.set_session_cache_mode` accepts values
- of type :py:obj:`long` as well as :py:obj:`int`.
- """
- context = Context(TLSv1_METHOD)
- context.set_session_cache_mode(long(SESS_CACHE_BOTH))
- self.assertEqual(
- SESS_CACHE_BOTH, context.get_session_cache_mode())
+ @skip_if_py3
+ def test_session_cache_mode_long(self):
+ """
+ On Python 2 :py:obj:`Context.set_session_cache_mode` accepts values
+ of type :py:obj:`long` as well as :py:obj:`int`.
+ """
+ context = Context(TLSv1_METHOD)
+ context.set_session_cache_mode(long(SESS_CACHE_BOTH))
+ self.assertEqual(
+ SESS_CACHE_BOTH, context.get_session_cache_mode())
def test_get_cert_store(self):
"""
@@ -1595,9 +1598,11 @@
"""
advertise_args = []
select_args = []
+
def advertise(conn):
advertise_args.append((conn,))
return [b'http/1.1', b'spdy/2']
+
def select(conn, options):
select_args.append((conn, options))
return b'spdy/2'
@@ -1629,7 +1634,6 @@
self.assertEqual(server.get_next_proto_negotiated(), b'spdy/2')
self.assertEqual(client.get_next_proto_negotiated(), b'spdy/2')
-
def test_npn_client_fail(self):
"""
Tests that when clients and servers cannot agree on what protocol
@@ -1637,9 +1641,11 @@
"""
advertise_args = []
select_args = []
+
def advertise(conn):
advertise_args.append((conn,))
return [b'http/1.1', b'spdy/2']
+
def select(conn, options):
select_args.append((conn, options))
return b''
@@ -1669,16 +1675,17 @@
self.assertEqual([(server,)], advertise_args)
self.assertEqual([(client, [b'http/1.1', b'spdy/2'])], select_args)
-
def test_npn_select_error(self):
"""
Test that we can handle exceptions in the select callback. If
select fails it should be fatal to the connection.
"""
advertise_args = []
+
def advertise(conn):
advertise_args.append((conn,))
return [b'http/1.1', b'spdy/2']
+
def select(conn, options):
raise TypeError
@@ -1707,15 +1714,16 @@
)
self.assertEqual([(server,)], advertise_args)
-
def test_npn_advertise_error(self):
"""
Test that we can handle exceptions in the advertise callback. If
advertise fails no NPN is advertised to the client.
"""
select_args = []
+
def advertise(conn):
raise TypeError
+
def select(conn, options):
select_args.append((conn, options))
return b''
@@ -1768,7 +1776,6 @@
self.assertRaises(NotImplementedError, method)
-
class ApplicationLayerProtoNegotiationTests(TestCase, _LoopbackMixin):
"""
Tests for ALPN in PyOpenSSL.
@@ -1783,6 +1790,7 @@
by the connections.
"""
select_args = []
+
def select(conn, options):
select_args.append((conn, options))
return b'spdy/2'
@@ -1813,13 +1821,13 @@
self.assertEqual(server.get_alpn_proto_negotiated(), b'spdy/2')
self.assertEqual(client.get_alpn_proto_negotiated(), b'spdy/2')
-
def test_alpn_set_on_connection(self):
"""
The same as test_alpn_success, but setting the ALPN protocols on
the connection rather than the context.
"""
select_args = []
+
def select(conn, options):
select_args.append((conn, options))
return b'spdy/2'
@@ -1852,13 +1860,13 @@
self.assertEqual(server.get_alpn_proto_negotiated(), b'spdy/2')
self.assertEqual(client.get_alpn_proto_negotiated(), b'spdy/2')
-
def test_alpn_server_fail(self):
"""
When clients and servers cannot agree on what protocol to use next
the TLS connection does not get established.
"""
select_args = []
+
def select(conn, options):
select_args.append((conn, options))
return b''
@@ -1887,7 +1895,6 @@
self.assertEqual([(server, [b'http/1.1', b'spdy/2'])], select_args)
-
def test_alpn_no_server(self):
"""
When clients and servers cannot agree on what protocol to use next
@@ -1916,12 +1923,12 @@
self.assertEqual(client.get_alpn_proto_negotiated(), b'')
-
def test_alpn_callback_exception(self):
"""
We can handle exceptions in the ALPN select callback.
"""
select_args = []
+
def select(conn, options):
select_args.append((conn, options))
raise TypeError()
@@ -1972,20 +1979,18 @@
)
-
class SessionTests(TestCase):
"""
Unit tests for :py:obj:`OpenSSL.SSL.Session`.
"""
def test_construction(self):
"""
- :py:class:`Session` can be constructed with no arguments, creating a new
- instance of that type.
+ :py:class:`Session` can be constructed with no arguments, creating
+ a new instance of that type.
"""
new_session = Session()
self.assertTrue(isinstance(new_session, Session))
-
def test_construction_wrong_args(self):
"""
If any arguments are passed to :py:class:`Session`, :py:obj:`TypeError`
@@ -1996,7 +2001,6 @@
self.assertRaises(TypeError, Session, object())
-
class ConnectionTests(TestCase, _LoopbackMixin):
"""
Unit tests for :py:obj:`OpenSSL.SSL.Connection`.
@@ -2019,38 +2023,35 @@
def test_type(self):
"""
- :py:obj:`Connection` and :py:obj:`ConnectionType` refer to the same type object and
- can be used to create instances of that type.
+ :py:obj:`Connection` and :py:obj:`ConnectionType` refer to the same
+ type object and can be used to create instances of that type.
"""
self.assertIdentical(Connection, ConnectionType)
ctx = Context(TLSv1_METHOD)
self.assertConsistentType(Connection, 'Connection', ctx, None)
-
def test_get_context(self):
"""
- :py:obj:`Connection.get_context` returns the :py:obj:`Context` instance used to
- construct the :py:obj:`Connection` instance.
+ :py:obj:`Connection.get_context` returns the :py:obj:`Context` instance
+ used to construct the :py:obj:`Connection` instance.
"""
context = Context(TLSv1_METHOD)
connection = Connection(context, None)
self.assertIdentical(connection.get_context(), context)
-
def test_get_context_wrong_args(self):
"""
- :py:obj:`Connection.get_context` raises :py:obj:`TypeError` if called with any
- arguments.
+ :py:obj:`Connection.get_context` raises :py:obj:`TypeError` if called
+ with any arguments.
"""
connection = Connection(Context(TLSv1_METHOD), None)
self.assertRaises(TypeError, connection.get_context, None)
-
def test_set_context_wrong_args(self):
"""
- :py:obj:`Connection.set_context` raises :py:obj:`TypeError` if called with a
- non-:py:obj:`Context` instance argument or with any number of arguments other
- than 1.
+ :py:obj:`Connection.set_context` raises :py:obj:`TypeError` if called
+ with a non-:py:obj:`Context` instance argument or with any number of
+ arguments other than 1.
"""
ctx = Context(TLSv1_METHOD)
connection = Connection(ctx, None)
@@ -2063,27 +2064,26 @@
TypeError, connection.set_context, Context(TLSv1_METHOD), 2)
self.assertIdentical(ctx, connection.get_context())
-
def test_set_context(self):
"""
- :py:obj:`Connection.set_context` specifies a new :py:obj:`Context` instance to be used
- for the connection.
+ :py:obj:`Connection.set_context` specifies a new :py:obj:`Context`
+ instance to be used for the connection.
"""
original = Context(SSLv23_METHOD)
replacement = Context(TLSv1_METHOD)
connection = Connection(original, None)
connection.set_context(replacement)
self.assertIdentical(replacement, connection.get_context())
- # Lose our references to the contexts, just in case the Connection isn't
- # properly managing its own contributions to their reference counts.
+ # Lose our references to the contexts, just in case the Connection
+ # isn't properly managing its own contributions to their reference
+ # counts.
del original, replacement
collect()
-
def test_set_tlsext_host_name_wrong_args(self):
"""
- If :py:obj:`Connection.set_tlsext_host_name` is called with a non-byte string
- argument or a byte string with an embedded NUL or other than one
+ If :py:obj:`Connection.set_tlsext_host_name` is called with a non-byte
+ string argument or a byte string with an embedded NUL or other than one
argument, :py:obj:`TypeError` is raised.
"""
conn = Connection(Context(TLSv1_METHOD), None)
@@ -2099,18 +2099,16 @@
TypeError,
conn.set_tlsext_host_name, b("example.com").decode("ascii"))
-
def test_get_servername_wrong_args(self):
"""
- :py:obj:`Connection.get_servername` raises :py:obj:`TypeError` if called with any
- arguments.
+ :py:obj:`Connection.get_servername` raises :py:obj:`TypeError` if
+ called with any arguments.
"""
connection = Connection(Context(TLSv1_METHOD), None)
self.assertRaises(TypeError, connection.get_servername, object())
self.assertRaises(TypeError, connection.get_servername, 1)
self.assertRaises(TypeError, connection.get_servername, "hello")
-
def test_pending(self):
"""
:py:obj:`Connection.pending` returns the number of bytes available for
@@ -2119,18 +2117,18 @@
connection = Connection(Context(TLSv1_METHOD), None)
self.assertEquals(connection.pending(), 0)
-
def test_pending_wrong_args(self):
"""
- :py:obj:`Connection.pending` raises :py:obj:`TypeError` if called with any arguments.
+ :py:obj:`Connection.pending` raises :py:obj:`TypeError` if called with
+ any arguments.
"""
connection = Connection(Context(TLSv1_METHOD), None)
self.assertRaises(TypeError, connection.pending, None)
-
def test_peek(self):
"""
- :py:obj:`Connection.recv` peeks into the connection if :py:obj:`socket.MSG_PEEK` is passed.
+ :py:obj:`Connection.recv` peeks into the connection if
+ :py:obj:`socket.MSG_PEEK` is passed.
"""
server, client = self._loopback()
server.send(b('xy'))
@@ -2138,21 +2136,22 @@
self.assertEqual(client.recv(2, MSG_PEEK), b('xy'))
self.assertEqual(client.recv(2), b('xy'))
-
def test_connect_wrong_args(self):
"""
- :py:obj:`Connection.connect` raises :py:obj:`TypeError` if called with a non-address
- argument or with the wrong number of arguments.
+ :py:obj:`Connection.connect` raises :py:obj:`TypeError` if called with
+ a non-address argument or with the wrong number of arguments.
"""
connection = Connection(Context(TLSv1_METHOD), socket())
self.assertRaises(TypeError, connection.connect, None)
self.assertRaises(TypeError, connection.connect)
- self.assertRaises(TypeError, connection.connect, ("127.0.0.1", 1), None)
+ self.assertRaises(
+ TypeError, connection.connect, ("127.0.0.1", 1), None
+ )
def test_connection_undefined_attr(self):
"""
- :py:obj:`Connection.connect` raises :py:obj:`TypeError` if called with a non-address
- argument or with the wrong number of arguments.
+ :py:obj:`Connection.connect` raises :py:obj:`TypeError` if called with
+ a non-address argument or with the wrong number of arguments.
"""
def attr_access_test(connection):
@@ -2163,8 +2162,8 @@
def test_connect_refused(self):
"""
- :py:obj:`Connection.connect` raises :py:obj:`socket.error` if the underlying socket
- connect method raises it.
+ :py:obj:`Connection.connect` raises :py:obj:`socket.error` if the
+ underlying socket connect method raises it.
"""
client = socket()
context = Context(TLSv1_METHOD)
@@ -2179,7 +2178,8 @@
def test_connect(self):
"""
- :py:obj:`Connection.connect` establishes a connection to the specified address.
+ :py:obj:`Connection.connect` establishes a connection to the specified
+ address.
"""
port = socket()
port.bind(('', 0))
@@ -2189,40 +2189,39 @@
clientSSL.connect(('127.0.0.1', port.getsockname()[1]))
# XXX An assertion? Or something?
+ @pytest.mark.skipif(
+ platform == "darwin",
+ reason="connect_ex sometimes causes a kernel panic on OS X 10.6.4"
+ )
+ def test_connect_ex(self):
+ """
+ If there is a connection error, :py:obj:`Connection.connect_ex`
+ returns the errno instead of raising an exception.
+ """
+ port = socket()
+ port.bind(('', 0))
+ port.listen(3)
- if platform == "darwin":
- "connect_ex sometimes causes a kernel panic on OS X 10.6.4"
- else:
- def test_connect_ex(self):
- """
- If there is a connection error, :py:obj:`Connection.connect_ex` returns the
- errno instead of raising an exception.
- """
- port = socket()
- port.bind(('', 0))
- port.listen(3)
-
- clientSSL = Connection(Context(TLSv1_METHOD), socket())
- clientSSL.setblocking(False)
- result = clientSSL.connect_ex(port.getsockname())
- expected = (EINPROGRESS, EWOULDBLOCK)
- self.assertTrue(
- result in expected, "%r not in %r" % (result, expected))
-
+ clientSSL = Connection(Context(TLSv1_METHOD), socket())
+ clientSSL.setblocking(False)
+ result = clientSSL.connect_ex(port.getsockname())
+ expected = (EINPROGRESS, EWOULDBLOCK)
+ self.assertTrue(
+ result in expected, "%r not in %r" % (result, expected))
def test_accept_wrong_args(self):
"""
- :py:obj:`Connection.accept` raises :py:obj:`TypeError` if called with any arguments.
+ :py:obj:`Connection.accept` raises :py:obj:`TypeError` if called with
+ any arguments.
"""
connection = Connection(Context(TLSv1_METHOD), socket())
self.assertRaises(TypeError, connection.accept, None)
-
def test_accept(self):
"""
- :py:obj:`Connection.accept` accepts a pending connection attempt and returns a
- tuple of a new :py:obj:`Connection` (the accepted client) and the address the
- connection originated from.
+ :py:obj:`Connection.accept` accepts a pending connection attempt and
+ returns a tuple of a new :py:obj:`Connection` (the accepted client) and
+ the address the connection originated from.
"""
ctx = Context(TLSv1_METHOD)
ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
@@ -2234,8 +2233,8 @@
clientSSL = Connection(Context(TLSv1_METHOD), socket())
- # Calling portSSL.getsockname() here to get the server IP address sounds
- # great, but frequently fails on Windows.
+ # Calling portSSL.getsockname() here to get the server IP address
+ # sounds great, but frequently fails on Windows.
clientSSL.connect(('127.0.0.1', portSSL.getsockname()[1]))
serverSSL, address = portSSL.accept()
@@ -2244,11 +2243,10 @@
self.assertIdentical(serverSSL.get_context(), ctx)
self.assertEquals(address, clientSSL.getsockname())
-
def test_shutdown_wrong_args(self):
"""
- :py:obj:`Connection.shutdown` raises :py:obj:`TypeError` if called with the wrong
- number of arguments or with arguments other than integers.
+ :py:obj:`Connection.shutdown` raises :py:obj:`TypeError` if called with
+ the wrong number of arguments or with arguments other than integers.
"""
connection = Connection(Context(TLSv1_METHOD), None)
self.assertRaises(TypeError, connection.shutdown, None)
@@ -2257,10 +2255,10 @@
self.assertRaises(TypeError, connection.set_shutdown, None)
self.assertRaises(TypeError, connection.set_shutdown, 0, 1)
-
def test_shutdown(self):
"""
- :py:obj:`Connection.shutdown` performs an SSL-level connection shutdown.
+ :py:obj:`Connection.shutdown` performs an SSL-level connection
+ shutdown.
"""
server, client = self._loopback()
self.assertFalse(server.shutdown())
@@ -2268,15 +2266,18 @@
self.assertRaises(ZeroReturnError, client.recv, 1024)
self.assertEquals(client.get_shutdown(), RECEIVED_SHUTDOWN)
client.shutdown()
- self.assertEquals(client.get_shutdown(), SENT_SHUTDOWN | RECEIVED_SHUTDOWN)
+ self.assertEquals(
+ client.get_shutdown(), SENT_SHUTDOWN | RECEIVED_SHUTDOWN
+ )
self.assertRaises(ZeroReturnError, server.recv, 1024)
- self.assertEquals(server.get_shutdown(), SENT_SHUTDOWN | RECEIVED_SHUTDOWN)
-
+ self.assertEquals(
+ server.get_shutdown(), SENT_SHUTDOWN | RECEIVED_SHUTDOWN
+ )
def test_shutdown_closed(self):
"""
- If the underlying socket is closed, :py:obj:`Connection.shutdown` propagates the
- write error from the low level write call.
+ If the underlying socket is closed, :py:obj:`Connection.shutdown`
+ propagates the write error from the low level write call.
"""
server, client = self._loopback()
server.sock_shutdown(2)
@@ -2286,7 +2287,6 @@
else:
self.assertEqual(exc.args[0], EPIPE)
-
def test_shutdown_truncated(self):
"""
If the underlying connection is truncated, :obj:`Connection.shutdown`
@@ -2306,40 +2306,36 @@
server.bio_shutdown()
self.assertRaises(Error, server.shutdown)
-
def test_set_shutdown(self):
"""
- :py:obj:`Connection.set_shutdown` sets the state of the SSL connection shutdown
- process.
+ :py:obj:`Connection.set_shutdown` sets the state of the SSL connection
+ shutdown process.
"""
connection = Connection(Context(TLSv1_METHOD), socket())
connection.set_shutdown(RECEIVED_SHUTDOWN)
self.assertEquals(connection.get_shutdown(), RECEIVED_SHUTDOWN)
-
- if not PY3:
- def test_set_shutdown_long(self):
- """
- On Python 2 :py:obj:`Connection.set_shutdown` accepts an argument
- of type :py:obj:`long` as well as :py:obj:`int`.
- """
- connection = Connection(Context(TLSv1_METHOD), socket())
- connection.set_shutdown(long(RECEIVED_SHUTDOWN))
- self.assertEquals(connection.get_shutdown(), RECEIVED_SHUTDOWN)
-
+ @skip_if_py3
+ def test_set_shutdown_long(self):
+ """
+ On Python 2 :py:obj:`Connection.set_shutdown` accepts an argument
+ of type :py:obj:`long` as well as :py:obj:`int`.
+ """
+ connection = Connection(Context(TLSv1_METHOD), socket())
+ connection.set_shutdown(long(RECEIVED_SHUTDOWN))
+ self.assertEquals(connection.get_shutdown(), RECEIVED_SHUTDOWN)
def test_app_data_wrong_args(self):
"""
- :py:obj:`Connection.set_app_data` raises :py:obj:`TypeError` if called with other than
- one argument. :py:obj:`Connection.get_app_data` raises :py:obj:`TypeError` if called
- with any arguments.
+ :py:obj:`Connection.set_app_data` raises :py:obj:`TypeError` if called
+ with other than one argument. :py:obj:`Connection.get_app_data` raises
+ :py:obj:`TypeError` if called with any arguments.
"""
conn = Connection(Context(TLSv1_METHOD), None)
self.assertRaises(TypeError, conn.get_app_data, None)
self.assertRaises(TypeError, conn.set_app_data)
self.assertRaises(TypeError, conn.set_app_data, None, None)
-
def test_app_data(self):
"""
Any object can be set as app data by passing it to
@@ -2351,20 +2347,18 @@
conn.set_app_data(app_data)
self.assertIdentical(conn.get_app_data(), app_data)
-
def test_makefile(self):
"""
- :py:obj:`Connection.makefile` is not implemented and calling that method raises
- :py:obj:`NotImplementedError`.
+ :py:obj:`Connection.makefile` is not implemented and calling that
+ method raises :py:obj:`NotImplementedError`.
"""
conn = Connection(Context(TLSv1_METHOD), None)
self.assertRaises(NotImplementedError, conn.makefile)
-
def test_get_peer_cert_chain_wrong_args(self):
"""
- :py:obj:`Connection.get_peer_cert_chain` raises :py:obj:`TypeError` if called with any
- arguments.
+ :py:obj:`Connection.get_peer_cert_chain` raises :py:obj:`TypeError` if
+ called with any arguments.
"""
conn = Connection(Context(TLSv1_METHOD), None)
self.assertRaises(TypeError, conn.get_peer_cert_chain, 1)
@@ -2372,11 +2366,10 @@
self.assertRaises(TypeError, conn.get_peer_cert_chain, object())
self.assertRaises(TypeError, conn.get_peer_cert_chain, [])
-
def test_get_peer_cert_chain(self):
"""
- :py:obj:`Connection.get_peer_cert_chain` returns a list of certificates which
- the connected server returned for the certification verification.
+ :py:obj:`Connection.get_peer_cert_chain` returns a list of certificates
+ which the connected server returned for the certification verification.
"""
chain = _create_certificate_chain()
[(cakey, cacert), (ikey, icert), (skey, scert)] = chain
@@ -2406,11 +2399,10 @@
self.assertEqual(
"Authority Certificate", chain[2].get_subject().CN)
-
def test_get_peer_cert_chain_none(self):
"""
- :py:obj:`Connection.get_peer_cert_chain` returns :py:obj:`None` if the peer sends no
- certificate chain.
+ :py:obj:`Connection.get_peer_cert_chain` returns :py:obj:`None` if the
+ peer sends no certificate chain.
"""
ctx = Context(TLSv1_METHOD)
ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
@@ -2422,7 +2414,6 @@
self._interactInMemory(client, server)
self.assertIdentical(None, server.get_peer_cert_chain())
-
def test_get_session_wrong_args(self):
"""
:py:obj:`Connection.get_session` raises :py:obj:`TypeError` if called
@@ -2434,7 +2425,6 @@
self.assertRaises(TypeError, server.get_session, "hello")
self.assertRaises(TypeError, server.get_session, object())
-
def test_get_session_unconnected(self):
"""
:py:obj:`Connection.get_session` returns :py:obj:`None` when used with
@@ -2445,7 +2435,6 @@
session = server.get_session()
self.assertIdentical(None, session)
-
def test_server_get_session(self):
"""
On the server side of a connection, :py:obj:`Connection.get_session`
@@ -2456,7 +2445,6 @@
session = server.get_session()
self.assertIsInstance(session, Session)
-
def test_client_get_session(self):
"""
On the client side of a connection, :py:obj:`Connection.get_session`
@@ -2467,7 +2455,6 @@
session = client.get_session()
self.assertIsInstance(session, Session)
-
def test_set_session_wrong_args(self):
"""
If called with an object that is not an instance of :py:class:`Session`,
@@ -2483,7 +2470,6 @@
self.assertRaises(
TypeError, connection.set_session, Session(), Session())
-
def test_client_set_session(self):
"""
:py:obj:`Connection.set_session`, when used prior to a connection being
@@ -2516,20 +2502,19 @@
clientFactory=makeClient)
# This is a proxy: in general, we have no access to any unique
- # identifier for the session (new enough versions of OpenSSL expose a
- # hash which could be usable, but "new enough" is very, very new).
+ # identifier for the session (new enough versions of OpenSSL expose
+ # a hash which could be usable, but "new enough" is very, very new).
# Instead, exploit the fact that the master key is re-used if the
- # session is re-used. As long as the master key for the two connections
- # is the same, the session was re-used!
+ # session is re-used. As long as the master key for the two
+ # connections is the same, the session was re-used!
self.assertEqual(
originalServer.master_key(), resumedServer.master_key())
-
def test_set_session_wrong_method(self):
"""
If :py:obj:`Connection.set_session` is passed a :py:class:`Session`
- instance associated with a context using a different SSL method than the
- :py:obj:`Connection` is using, a :py:class:`OpenSSL.SSL.Error` is
+ instance associated with a context using a different SSL method than
+ the :py:obj:`Connection` is using, a :py:class:`OpenSSL.SSL.Error` is
raised.
"""
key = load_privatekey(FILETYPE_PEM, server_key_pem)
@@ -2559,7 +2544,6 @@
Error,
self._loopback, clientFactory=makeClient, serverFactory=makeServer)
-
def test_wantWriteError(self):
"""
:py:obj:`Connection` methods which generate output raise
@@ -2602,7 +2586,6 @@
connection = Connection(ctx, None)
self.assertEqual(connection.get_finished(), None)
-
def test_get_peer_finished_before_connect(self):
"""
:py:obj:`Connection.get_peer_finished` returns :py:obj:`None` before
@@ -2612,7 +2595,6 @@
connection = Connection(ctx, None)
self.assertEqual(connection.get_peer_finished(), None)
-
def test_get_finished(self):
"""
:py:obj:`Connection.get_finished` method returns the TLS Finished
@@ -2625,7 +2607,6 @@
self.assertNotEqual(server.get_finished(), None)
self.assertTrue(len(server.get_finished()) > 0)
-
def test_get_peer_finished(self):
"""
:py:obj:`Connection.get_peer_finished` method returns the TLS Finished
@@ -2637,7 +2618,6 @@
self.assertNotEqual(server.get_peer_finished(), None)
self.assertTrue(len(server.get_peer_finished()) > 0)
-
def test_tls_finished_message_symmetry(self):
"""
The TLS Finished message send by server must be the TLS Finished message
@@ -2651,7 +2631,6 @@
self.assertEqual(server.get_finished(), client.get_peer_finished())
self.assertEqual(client.get_finished(), server.get_peer_finished())
-
def test_get_cipher_name_before_connect(self):
"""
:py:obj:`Connection.get_cipher_name` returns :py:obj:`None` if no
@@ -2661,7 +2640,6 @@
conn = Connection(ctx, None)
self.assertIdentical(conn.get_cipher_name(), None)
-
def test_get_cipher_name(self):
"""
:py:obj:`Connection.get_cipher_name` returns a :py:class:`unicode`
@@ -2676,7 +2654,6 @@
self.assertEqual(server_cipher_name, client_cipher_name)
-
def test_get_cipher_version_before_connect(self):
"""
:py:obj:`Connection.get_cipher_version` returns :py:obj:`None` if no
@@ -2686,7 +2663,6 @@
conn = Connection(ctx, None)
self.assertIdentical(conn.get_cipher_version(), None)
-
def test_get_cipher_version(self):
"""
:py:obj:`Connection.get_cipher_version` returns a :py:class:`unicode`
@@ -2701,7 +2677,6 @@
self.assertEqual(server_cipher_version, client_cipher_version)
-
def test_get_cipher_bits_before_connect(self):
"""
:py:obj:`Connection.get_cipher_bits` returns :py:obj:`None` if no
@@ -2711,7 +2686,6 @@
conn = Connection(ctx, None)
self.assertIdentical(conn.get_cipher_bits(), None)
-
def test_get_cipher_bits(self):
"""
:py:obj:`Connection.get_cipher_bits` returns the number of secret bits
@@ -2726,7 +2700,6 @@
self.assertEqual(server_cipher_bits, client_cipher_bits)
-
def test_get_protocol_version_name(self):
"""
:py:obj:`Connection.get_protocol_version_name()` returns a string
@@ -2739,8 +2712,9 @@
self.assertIsInstance(server_protocol_version_name, text_type)
self.assertIsInstance(client_protocol_version_name, text_type)
- self.assertEqual(server_protocol_version_name, client_protocol_version_name)
-
+ self.assertEqual(
+ server_protocol_version_name, client_protocol_version_name
+ )
def test_get_protocol_version(self):
"""
@@ -2757,7 +2731,6 @@
self.assertEqual(server_protocol_version, client_protocol_version)
-
class ConnectionGetCipherListTests(TestCase):
"""
Tests for :py:obj:`Connection.get_cipher_list`.