Put some shared code into a shared module and start using it from all three of the main implementation modules.
Also add a test for the MemoryError behavior of OpenSSL.rand.bytes.
Also make the other error handling behavior of that function probably correct maybe.
At least, give it a better comment about why it is untested.
diff --git a/OpenSSL/SSL.py b/OpenSSL/SSL.py
index 48c8227..1e85795 100644
--- a/OpenSSL/SSL.py
+++ b/OpenSSL/SSL.py
@@ -1,17 +1,17 @@
-from functools import wraps
+from functools import wraps, partial
from itertools import count
from weakref import WeakValueDictionary
from errno import errorcode
-from cryptography.hazmat.backends.openssl import backend
-_ffi = backend.ffi
-_lib = backend.lib
+from OpenSSL._util import (
+ ffi as _ffi,
+ lib as _lib,
+ new_mem_buf as _new_mem_buf,
+ exception_from_error_queue as _exception_from_error_queue)
from OpenSSL.crypto import (
- FILETYPE_PEM, _PassphraseHelper, PKey, X509Name, X509, X509Store,
-
- _raise_current_error, _new_mem_buf)
+ FILETYPE_PEM, _PassphraseHelper, PKey, X509Name, X509, X509Store)
_unspecified = object()
@@ -104,6 +104,39 @@
SSL_CB_HANDSHAKE_DONE = _lib.SSL_CB_HANDSHAKE_DONE
+class Error(Exception):
+ pass
+
+
+
+_raise_current_error = partial(_exception_from_error_queue, Error)
+
+
+class WantReadError(Error):
+ pass
+
+
+
+class WantWriteError(Error):
+ pass
+
+
+
+class WantX509LookupError(Error):
+ pass
+
+
+
+class ZeroReturnError(Error):
+ pass
+
+
+
+class SysCallError(Error):
+ pass
+
+
+
class _VerifyHelper(object):
def __init__(self, connection, callback):
self._problems = []
@@ -134,41 +167,13 @@
def raise_if_problem(self):
if self._problems:
try:
- _raise_current_error(Error)
+ _raise_current_error()
except Error:
pass
raise self._problems.pop(0)
-class Error(Exception):
- pass
-
-
-
-class WantReadError(Error):
- pass
-
-
-
-class WantWriteError(Error):
- pass
-
-
-
-class WantX509LookupError(Error):
- pass
-
-
-
-class ZeroReturnError(Error):
- pass
-
-
-class SysCallError(Error):
- pass
-
-
def _asFileDescriptor(obj):
fd = None
@@ -284,7 +289,7 @@
load_result = _lib.SSL_CTX_load_verify_locations(self._context, cafile, capath)
if not load_result:
- _raise_current_error(Error)
+ _raise_current_error()
def _wrap_callback(self, callback):
@@ -323,7 +328,7 @@
set_result = _lib.SSL_CTX_set_default_verify_paths(self._context)
if not set_result:
1/0
- _raise_current_error(Error)
+ _raise_current_error()
def use_certificate_chain_file(self, certfile):
@@ -338,7 +343,7 @@
result = _lib.SSL_CTX_use_certificate_chain_file(self._context, certfile)
if not result:
- _raise_current_error(Error)
+ _raise_current_error()
def use_certificate_file(self, certfile, filetype=FILETYPE_PEM):
@@ -356,7 +361,7 @@
use_result = _lib.SSL_CTX_use_certificate_file(self._context, certfile, filetype)
if not use_result:
- _raise_current_error(Error)
+ _raise_current_error()
def use_certificate(self, cert):
@@ -371,7 +376,7 @@
use_result = _lib.SSL_CTX_use_certificate(self._context, cert._x509)
if not use_result:
- _raise_current_error(Error)
+ _raise_current_error()
def add_extra_chain_cert(self, certobj):
@@ -388,13 +393,13 @@
add_result = _lib.SSL_CTX_add_extra_chain_cert(self._context, copy)
if not add_result:
# _lib.X509_free(copy)
- # _raise_current_error(Error)
+ # _raise_current_error()
1/0
def _raise_passphrase_exception(self):
if self._passphrase_helper is None:
- _raise_current_error(Error)
+ _raise_current_error()
exception = self._passphrase_helper.raise_if_problem(Error)
if exception is not None:
raise exception
@@ -550,7 +555,7 @@
bio = _lib.BIO_new_file(dhfile, "r")
if bio == _ffi.NULL:
- _raise_current_error(Error)
+ _raise_current_error()
bio = _ffi.gc(bio, _lib.BIO_free)
dh = _lib.PEM_read_bio_DHparams(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
@@ -570,7 +575,7 @@
result = _lib.SSL_CTX_set_cipher_list(self._context, cipher_list)
if not result:
- _raise_current_error(Error)
+ _raise_current_error()
def set_client_ca_list(self, certificate_authorities):
@@ -586,7 +591,7 @@
name_stack = _lib.sk_X509_NAME_new_null()
if name_stack == _ffi.NULL:
1/0
- _raise_current_error(Error)
+ _raise_current_error()
try:
for ca_name in certificate_authorities:
@@ -597,11 +602,11 @@
copy = _lib.X509_NAME_dup(ca_name._name)
if copy == _ffi.NULL:
1/0
- _raise_current_error(Error)
+ _raise_current_error()
push_result = _lib.sk_X509_NAME_push(name_stack, copy)
if not push_result:
_lib.X509_NAME_free(copy)
- _raise_current_error(Error)
+ _raise_current_error()
except:
_lib.sk_X509_NAME_free(name_stack)
raise
@@ -626,7 +631,7 @@
self._context, certificate_authority._x509)
if not add_result:
1/0
- _raise_current_error(Error)
+ _raise_current_error()
def set_timeout(self, timeout):
@@ -822,11 +827,11 @@
else:
# TODO Untested
1/0
- _raise_current_error(Error)
+ _raise_current_error()
elif error == _lib.SSL_ERROR_NONE:
pass
else:
- _raise_current_error(Error)
+ _raise_current_error()
def get_context(self):
@@ -979,7 +984,7 @@
else:
1/0
# TODO Untested
- _raise_current_error(Error)
+ _raise_current_error()
def bio_read(self, bufsiz):
@@ -1162,7 +1167,7 @@
copy = _lib.X509_NAME_dup(name)
if copy == _ffi.NULL:
1/0
- _raise_current_error(Error)
+ _raise_current_error()
pyname = X509Name.__new__(X509Name)
pyname._name = _ffi.gc(copy, _lib.X509_NAME_free)
@@ -1379,6 +1384,6 @@
result = _lib.SSL_set_session(self._ssl, session._session)
if not result:
- _raise_current_error(Error)
+ _raise_current_error()
ConnectionType = Connection
diff --git a/OpenSSL/_util.py b/OpenSSL/_util.py
new file mode 100644
index 0000000..4062fe5
--- /dev/null
+++ b/OpenSSL/_util.py
@@ -0,0 +1,35 @@
+from cryptography.hazmat.backends.openssl import backend
+ffi = backend.ffi
+lib = backend.lib
+
+def exception_from_error_queue(exceptionType):
+ errors = []
+ while True:
+ error = lib.ERR_get_error()
+ if error == 0:
+ break
+ errors.append((
+ ffi.string(lib.ERR_lib_error_string(error)),
+ ffi.string(lib.ERR_func_error_string(error)),
+ ffi.string(lib.ERR_reason_error_string(error))))
+
+ raise exceptionType(errors)
+
+
+
+def new_mem_buf(buffer=None):
+ if buffer is None:
+ bio = lib.BIO_new(lib.BIO_s_mem())
+ free = lib.BIO_free
+ else:
+ data = ffi.new("char[]", buffer)
+ bio = lib.BIO_new_mem_buf(data, len(buffer))
+ # Keep the memory alive as long as the bio is alive!
+ def free(bio, ref=data):
+ return lib.BIO_free(bio)
+
+ if bio == ffi.NULL:
+ 1/0
+
+ bio = ffi.gc(bio, free)
+ return bio
diff --git a/OpenSSL/crypto.py b/OpenSSL/crypto.py
index 1a4db38..e699f37 100644
--- a/OpenSSL/crypto.py
+++ b/OpenSSL/crypto.py
@@ -1,8 +1,11 @@
from time import time
+from functools import partial
-from cryptography.hazmat.backends.openssl import backend
-_ffi = backend.ffi
-_lib = backend.lib
+from OpenSSL._util import (
+ ffi as _ffi,
+ lib as _lib,
+ new_mem_buf as _new_mem_buf,
+ exception_from_error_queue as _exception_from_error_queue)
FILETYPE_PEM = _lib.SSL_FILETYPE_PEM
FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1
@@ -14,6 +17,13 @@
TYPE_DSA = _lib.EVP_PKEY_DSA
+class Error(Exception):
+ pass
+
+
+_raise_current_error = partial(_exception_from_error_queue, Error)
+
+
def _bio_to_string(bio):
"""
Copy the contents of an OpenSSL BIO object into a Python byte string.
@@ -24,25 +34,6 @@
-def _new_mem_buf(buffer=None):
- if buffer is None:
- bio = _lib.BIO_new(_lib.BIO_s_mem())
- free = _lib.BIO_free
- else:
- data = _ffi.new("char[]", buffer)
- bio = _lib.BIO_new_mem_buf(data, len(buffer))
- # Keep the memory alive as long as the bio is alive!
- def free(bio, ref=data):
- return _lib.BIO_free(bio)
-
- if bio == _ffi.NULL:
- 1/0
-
- bio = _ffi.gc(bio, free)
- return bio
-
-
-
def _set_asn1_time(boundary, when):
if not isinstance(when, bytes):
raise TypeError("when must be a byte string")
@@ -83,28 +74,6 @@
-class Error(Exception):
- pass
-
-
-
-def _raise_current_error(exceptionType=Error):
- errors = []
- while True:
- error = _lib.ERR_get_error()
- if error == 0:
- break
- errors.append((
- _ffi.string(_lib.ERR_lib_error_string(error)),
- _ffi.string(_lib.ERR_func_error_string(error)),
- _ffi.string(_lib.ERR_reason_error_string(error))))
-
- raise exceptionType(errors)
-
-_exception_from_error_queue = _raise_current_error
-
-
-
class PKey(object):
_only_public = False
_initialized = True
@@ -660,7 +629,7 @@
result = _lib.X509_REQ_verify(self._req, pkey._pkey)
if result <= 0:
- _raise_current_error(Error)
+ _raise_current_error()
return result
@@ -1105,7 +1074,7 @@
result = _lib.X509_STORE_add_cert(self._store, cert._x509)
if not result:
- _raise_current_error(Error)
+ _raise_current_error()
X509StoreType = X509Store
@@ -1848,7 +1817,7 @@
def raise_if_problem(self, exceptionType=Error):
try:
- _raise_current_error(exceptionType)
+ _exception_from_error_queue(exceptionType)
except exceptionType as e:
pass
if self._problems:
diff --git a/OpenSSL/rand.py b/OpenSSL/rand.py
index 8295854..0204118 100644
--- a/OpenSSL/rand.py
+++ b/OpenSSL/rand.py
@@ -6,14 +6,18 @@
import __builtin__
-from cryptography.hazmat.backends.openssl import backend
-_lib = backend.lib
-_ffi = backend.ffi
+from functools import partial
-# TODO Nothing tests the existence or use of this
+from OpenSSL._util import (
+ ffi as _ffi,
+ lib as _lib,
+ exception_from_error_queue as _exception_from_error_queue)
+
+
class Error(Exception):
pass
+_raise_current_error = partial(_exception_from_error_queue, Error)
_unspecified = object()
@@ -33,7 +37,10 @@
result_buffer = _ffi.new("char[]", num_bytes)
result_code = _lib.RAND_bytes(result_buffer, num_bytes)
if result_code == -1:
- raise Exception("zoops") # TODO
+ # TODO: No tests for this code path. Triggering a RAND_bytes failure
+ # might involve supplying a custom ENGINE? That's hard.
+ _raise_current_error()
+
return _ffi.buffer(result_buffer)[:]
diff --git a/OpenSSL/test/test_rand.py b/OpenSSL/test/test_rand.py
index 8a3c5fe..ffbb731 100644
--- a/OpenSSL/test/test_rand.py
+++ b/OpenSSL/test/test_rand.py
@@ -23,7 +23,14 @@
self.assertRaises(TypeError, rand.bytes, None)
self.assertRaises(TypeError, rand.bytes, 3, None)
- # XXX Test failure of the malloc() in rand_bytes.
+
+ def test_insufficientMemory(self):
+ """
+ :py:obj:`OpenSSL.rand.bytes` raises :py:obj:`MemoryError` if more bytes
+ are requested than will fit in memory.
+ """
+ self.assertRaises(MemoryError, rand.bytes, 1024 * 1024 * 1024 * 1024)
+
def test_bytes(self):
"""
diff --git a/OpenSSL/test/util.py b/OpenSSL/test/util.py
index 31242ce..a1a0cd9 100644
--- a/OpenSSL/test/util.py
+++ b/OpenSSL/test/util.py
@@ -14,7 +14,8 @@
from unittest import TestCase
import sys
-from OpenSSL.crypto import Error, _exception_from_error_queue
+from OpenSSL._util import exception_from_error_queue
+from OpenSSL.crypto import Error
import memdbg
@@ -167,7 +168,7 @@
elif os.path.exists(temp):
os.unlink(temp)
try:
- _exception_from_error_queue()
+ exception_from_error_queue(Error)
except Error:
e = sys.exc_info()[1]
if e.args != ([],):