Fixed #529 -- use the native bytes syntax (#536)
diff --git a/tests/test_ssl.py b/tests/test_ssl.py
index ac57280..a2aad55 100644
--- a/tests/test_ssl.py
+++ b/tests/test_ssl.py
@@ -75,7 +75,7 @@
SSL_CB_ACCEPT_EXIT, SSL_CB_CONNECT_LOOP, SSL_CB_CONNECT_EXIT,
SSL_CB_HANDSHAKE_START, SSL_CB_HANDSHAKE_DONE)
-from .util import WARNING_TYPE_EXPECTED, NON_ASCII, TestCase, b
+from .util import WARNING_TYPE_EXPECTED, NON_ASCII, TestCase
from .test_crypto import (
cleartextCertificatePEM, cleartextPrivateKeyPEM,
client_cert_pem, client_key_pem, server_cert_pem, server_key_pem,
@@ -138,10 +138,10 @@
# Let's pass some unencrypted data to make sure our socket connection is
# fine. Just one byte, so we don't have to worry about buffers getting
# filled up or fragmentation.
- server.send(b("x"))
- assert client.recv(1024) == b("x")
- client.send(b("y"))
- assert server.recv(1024) == b("y")
+ server.send(b"x")
+ assert client.recv(1024) == b"x"
+ client.send(b"y")
+ assert server.recv(1024) == b"y"
# Most of our callers want non-blocking sockets, make it easy for them.
server.setblocking(False)
@@ -170,7 +170,7 @@
2. A new intermediate certificate signed by cacert (icert)
3. A new server certificate signed by icert (scert)
"""
- caext = X509Extension(b('basicConstraints'), False, b('CA:true'))
+ caext = X509Extension(b'basicConstraints', False, b'CA:true')
# Step 1
cakey = PKey()
@@ -179,8 +179,8 @@
cacert.get_subject().commonName = "Authority Certificate"
cacert.set_issuer(cacert.get_subject())
cacert.set_pubkey(cakey)
- cacert.set_notBefore(b("20000101000000Z"))
- cacert.set_notAfter(b("20200101000000Z"))
+ cacert.set_notBefore(b"20000101000000Z")
+ cacert.set_notAfter(b"20200101000000Z")
cacert.add_extensions([caext])
cacert.set_serial_number(0)
cacert.sign(cakey, "sha1")
@@ -192,8 +192,8 @@
icert.get_subject().commonName = "Intermediate Certificate"
icert.set_issuer(cacert.get_subject())
icert.set_pubkey(ikey)
- icert.set_notBefore(b("20000101000000Z"))
- icert.set_notAfter(b("20200101000000Z"))
+ icert.set_notBefore(b"20000101000000Z")
+ icert.set_notAfter(b"20200101000000Z")
icert.add_extensions([caext])
icert.set_serial_number(0)
icert.sign(cakey, "sha1")
@@ -205,10 +205,10 @@
scert.get_subject().commonName = "Server Certificate"
scert.set_issuer(icert.get_subject())
scert.set_pubkey(skey)
- scert.set_notBefore(b("20000101000000Z"))
- scert.set_notAfter(b("20200101000000Z"))
+ scert.set_notBefore(b"20000101000000Z")
+ scert.set_notAfter(b"20200101000000Z")
scert.add_extensions([
- X509Extension(b('basicConstraints'), True, b('CA:false'))])
+ X509Extension(b'basicConstraints', True, b'CA:false')])
scert.set_serial_number(0)
scert.sign(ikey, "sha1")
@@ -925,7 +925,7 @@
:py:obj:`Context.set_passwd_cb` accepts a callable which will be
invoked when a private key is loaded from an encrypted PEM.
"""
- passphrase = b("foobar")
+ passphrase = b"foobar"
pemFile = self._write_encrypted_pem(passphrase)
calledWith = []
@@ -945,7 +945,7 @@
:py:obj:`Context.use_privatekey_file` propagates any exception raised
by the passphrase callback.
"""
- pemFile = self._write_encrypted_pem(b("monkeys are nice"))
+ pemFile = self._write_encrypted_pem(b"monkeys are nice")
def passphraseCallback(maxlen, verify, extra):
raise RuntimeError("Sorry, I am a fail.")
@@ -960,7 +960,7 @@
:py:obj:`OpenSSL.SSL.Error` if the passphrase callback returns a false
value.
"""
- pemFile = self._write_encrypted_pem(b("monkeys are nice"))
+ pemFile = self._write_encrypted_pem(b"monkeys are nice")
def passphraseCallback(maxlen, verify, extra):
return b""
@@ -975,7 +975,7 @@
:py:obj:`OpenSSL.SSL.Error` if the passphrase callback returns a true
non-string value.
"""
- pemFile = self._write_encrypted_pem(b("monkeys are nice"))
+ pemFile = self._write_encrypted_pem(b"monkeys are nice")
def passphraseCallback(maxlen, verify, extra):
return 10
@@ -990,12 +990,12 @@
longer than the indicated maximum length, it is truncated.
"""
# A priori knowledge!
- passphrase = b("x") * 1024
+ passphrase = b"x" * 1024
pemFile = self._write_encrypted_pem(passphrase)
def passphraseCallback(maxlen, verify, extra):
assert maxlen == 1024
- return passphrase + b("y")
+ return passphrase + b"y"
context = Context(TLSv1_METHOD)
context.set_passwd_cb(passphraseCallback)
@@ -1680,11 +1680,11 @@
client = Connection(Context(TLSv1_METHOD), None)
client.set_connect_state()
- client.set_tlsext_host_name(b("foo1.example.com"))
+ client.set_tlsext_host_name(b"foo1.example.com")
self._interactInMemory(server, client)
- self.assertEqual([(server, b("foo1.example.com"))], args)
+ self.assertEqual([(server, b"foo1.example.com")], args)
class NextProtoNegotiationTests(TestCase, _LoopbackMixin):
@@ -2194,13 +2194,13 @@
self.assertRaises(TypeError, conn.set_tlsext_host_name, object())
self.assertRaises(TypeError, conn.set_tlsext_host_name, 123, 456)
self.assertRaises(
- TypeError, conn.set_tlsext_host_name, b("with\0null"))
+ TypeError, conn.set_tlsext_host_name, b"with\0null")
if PY3:
# On Python 3.x, don't accidentally implicitly convert from text.
self.assertRaises(
TypeError,
- conn.set_tlsext_host_name, b("example.com").decode("ascii"))
+ conn.set_tlsext_host_name, b"example.com".decode("ascii"))
def test_get_servername_wrong_args(self):
"""
@@ -2234,10 +2234,10 @@
:py:obj:`socket.MSG_PEEK` is passed.
"""
server, client = self._loopback()
- server.send(b('xy'))
- self.assertEqual(client.recv(2, MSG_PEEK), b('xy'))
- self.assertEqual(client.recv(2, MSG_PEEK), b('xy'))
- self.assertEqual(client.recv(2), b('xy'))
+ server.send(b'xy')
+ self.assertEqual(client.recv(2, MSG_PEEK), b'xy')
+ self.assertEqual(client.recv(2, MSG_PEEK), b'xy')
+ self.assertEqual(client.recv(2), b'xy')
def test_connect_wrong_args(self):
"""
@@ -2892,9 +2892,9 @@
all of it and returns the number of bytes sent.
"""
server, client = self._loopback()
- count = server.send(b('xy'))
+ count = server.send(b'xy')
self.assertEquals(count, 2)
- self.assertEquals(client.recv(2), b('xy'))
+ self.assertEquals(client.recv(2), b'xy')
def test_text(self):
"""
@@ -2923,9 +2923,9 @@
of bytes sent.
"""
server, client = self._loopback()
- count = server.send(memoryview(b('xy')))
+ count = server.send(memoryview(b'xy'))
self.assertEquals(count, 2)
- self.assertEquals(client.recv(2), b('xy'))
+ self.assertEquals(client.recv(2), b'xy')
@skip_if_py3
def test_short_buffer(self):
@@ -2935,9 +2935,9 @@
of bytes sent.
"""
server, client = self._loopback()
- count = server.send(buffer(b('xy')))
+ count = server.send(buffer(b'xy'))
self.assertEquals(count, 2)
- self.assertEquals(client.recv(2), b('xy'))
+ self.assertEquals(client.recv(2), b'xy')
def _make_memoryview(size):
@@ -2961,10 +2961,10 @@
output_buffer = factory(5)
server, client = self._loopback()
- server.send(b('xy'))
+ server.send(b'xy')
self.assertEqual(client.recv_into(output_buffer), 2)
- self.assertEqual(output_buffer, bytearray(b('xy\x00\x00\x00')))
+ self.assertEqual(output_buffer, bytearray(b'xy\x00\x00\x00'))
def test_bytearray_no_length(self):
"""
@@ -2982,11 +2982,11 @@
output_buffer = factory(10)
server, client = self._loopback()
- server.send(b('abcdefghij'))
+ server.send(b'abcdefghij')
self.assertEqual(client.recv_into(output_buffer, 5), 5)
self.assertEqual(
- output_buffer, bytearray(b('abcde\x00\x00\x00\x00\x00'))
+ output_buffer, bytearray(b'abcde\x00\x00\x00\x00\x00')
)
def test_bytearray_respects_length(self):
@@ -3007,12 +3007,12 @@
output_buffer = factory(5)
server, client = self._loopback()
- server.send(b('abcdefghij'))
+ server.send(b'abcdefghij')
self.assertEqual(client.recv_into(output_buffer), 5)
- self.assertEqual(output_buffer, bytearray(b('abcde')))
+ self.assertEqual(output_buffer, bytearray(b'abcde'))
rest = client.recv(5)
- self.assertEqual(b('fghij'), rest)
+ self.assertEqual(b'fghij', rest)
def test_bytearray_doesnt_overfill(self):
"""
@@ -3034,13 +3034,13 @@
def test_peek(self):
server, client = self._loopback()
- server.send(b('xy'))
+ server.send(b'xy')
for _ in range(2):
output_buffer = bytearray(5)
self.assertEqual(
client.recv_into(output_buffer, flags=MSG_PEEK), 2)
- self.assertEqual(output_buffer, bytearray(b('xy\x00\x00\x00')))
+ self.assertEqual(output_buffer, bytearray(b'xy\x00\x00\x00'))
@skip_if_py26
def test_memoryview_no_length(self):
@@ -3101,8 +3101,8 @@
passed to it.
"""
server, client = self._loopback()
- server.sendall(b('x'))
- self.assertEquals(client.recv(1), b('x'))
+ server.sendall(b'x')
+ self.assertEquals(client.recv(1), b'x')
def test_text(self):
"""
@@ -3129,8 +3129,8 @@
:py:obj:`Connection.sendall` transmits all of them.
"""
server, client = self._loopback()
- server.sendall(memoryview(b('x')))
- self.assertEquals(client.recv(1), b('x'))
+ server.sendall(memoryview(b'x'))
+ self.assertEquals(client.recv(1), b'x')
@skip_if_py3
def test_short_buffers(self):
@@ -3139,8 +3139,8 @@
:py:obj:`Connection.sendall` transmits all of them.
"""
server, client = self._loopback()
- server.sendall(buffer(b('x')))
- self.assertEquals(client.recv(1), b('x'))
+ server.sendall(buffer(b'x'))
+ self.assertEquals(client.recv(1), b'x')
def test_long(self):
"""
@@ -3152,7 +3152,7 @@
# Should be enough, underlying SSL_write should only do 16k at a time.
# On Windows, after 32k of bytes the write will block (forever
# - because no one is yet reading).
- message = b('x') * (1024 * 32 - 1) + b('y')
+ message = b'x' * (1024 * 32 - 1) + b'y'
server.sendall(message)
accum = []
received = 0
@@ -3160,7 +3160,7 @@
data = client.recv(1024)
accum.append(data)
received += len(data)
- self.assertEquals(message, b('').join(accum))
+ self.assertEquals(message, b''.join(accum))
def test_closed(self):
"""
@@ -3449,7 +3449,7 @@
client_conn.client_random(), client_conn.server_random())
# Here are the bytes we'll try to send.
- important_message = b('One if by land, two if by sea.')
+ important_message = b'One if by land, two if by sea.'
server_conn.write(important_message)
self.assertEquals(
@@ -3473,7 +3473,7 @@
"""
server_conn, client_conn = self._loopback()
- important_message = b("Help me Obi Wan Kenobi, you're my only hope.")
+ important_message = b"Help me Obi Wan Kenobi, you're my only hope."
client_conn.send(important_message)
msg = server_conn.recv(1024)
self.assertEqual(msg, important_message)