Merge branch 'master' into commoncrypto-cipher-backend
* master:
expand tox backend example
On OS X at build time compile the CC bindings
fix docs
update docs for name attribute
revert fixture decorator for now, switch to append. no more globals
docs for explicit backend selection and document name attribute of backend
modify backend selection to allow multiple backends via comma delimiter
better name for the variable
don't mutate _ALL_BACKENDS
pass posargs via tox so --backend can be used for tox envs
support --backend as a pytest flag to limit to one backend for testing
diff --git a/cryptography/hazmat/backends/commoncrypto/backend.py b/cryptography/hazmat/backends/commoncrypto/backend.py
index 603edc4..8b9fe08 100644
--- a/cryptography/hazmat/backends/commoncrypto/backend.py
+++ b/cryptography/hazmat/backends/commoncrypto/backend.py
@@ -18,10 +18,16 @@
from cryptography import utils
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends.interfaces import (
- HashBackend, HMACBackend,
+ HashBackend, HMACBackend, CipherBackend
)
from cryptography.hazmat.bindings.commoncrypto.binding import Binding
from cryptography.hazmat.primitives import interfaces
+from cryptography.hazmat.primitives.ciphers.algorithms import (
+ AES, Blowfish, TripleDES, ARC4
+)
+from cryptography.hazmat.primitives.ciphers.modes import (
+ CBC, CTR, ECB, OFB, CFB
+)
HashMethods = namedtuple(
@@ -29,6 +35,7 @@
)
+@utils.register_interface(CipherBackend)
@utils.register_interface(HashBackend)
@utils.register_interface(HMACBackend)
class Backend(object):
@@ -42,6 +49,8 @@
self._ffi = self._binding.ffi
self._lib = self._binding.lib
+ self._cipher_registry = {}
+ self._register_default_ciphers()
self._hash_mapping = {
"md5": HashMethods(
"CC_MD5_CTX *", self._lib.CC_MD5_Init,
@@ -100,6 +109,182 @@
def create_hmac_ctx(self, key, algorithm):
return _HMACContext(self, key, algorithm)
+ def cipher_supported(self, cipher, mode):
+ try:
+ self._cipher_registry[type(cipher), type(mode)]
+ except KeyError:
+ return False
+ return True
+
+ def create_symmetric_encryption_ctx(self, cipher, mode):
+ return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
+
+ def create_symmetric_decryption_ctx(self, cipher, mode):
+ return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
+
+ def _register_cipher_adapter(self, cipher_cls, cipher_const, mode_cls,
+ mode_const):
+ if (cipher_cls, mode_cls) in self._cipher_registry:
+ raise ValueError("Duplicate registration for: {0} {1}".format(
+ cipher_cls, mode_cls)
+ )
+ self._cipher_registry[cipher_cls, mode_cls] = (cipher_const,
+ mode_const)
+
+ def _register_default_ciphers(self):
+ for mode_cls, mode_const in [
+ (CBC, self._lib.kCCModeCBC), (ECB, self._lib.kCCModeECB),
+ (CFB, self._lib.kCCModeCFB), (OFB, self._lib.kCCModeOFB),
+ (CTR, self._lib.kCCModeCTR)
+ ]:
+ self._register_cipher_adapter(
+ AES,
+ self._lib.kCCAlgorithmAES128,
+ mode_cls,
+ mode_const
+ )
+ for mode_cls, mode_const in [
+ (CBC, self._lib.kCCModeCBC), (CFB, self._lib.kCCModeCFB),
+ (OFB, self._lib.kCCModeOFB),
+ ]:
+ self._register_cipher_adapter(
+ TripleDES,
+ self._lib.kCCAlgorithm3DES,
+ mode_cls,
+ mode_const
+ )
+ for mode_cls, mode_const in [
+ (CBC, self._lib.kCCModeCBC), (ECB, self._lib.kCCModeECB),
+ (CFB, self._lib.kCCModeCFB), (OFB, self._lib.kCCModeOFB),
+ ]:
+ self._register_cipher_adapter(
+ Blowfish,
+ self._lib.kCCAlgorithmBlowfish,
+ mode_cls,
+ mode_const
+ )
+ self._register_cipher_adapter(
+ ARC4,
+ self._lib.kCCAlgorithmRC4,
+ type(None),
+ self._lib.kCCModeRC4
+ )
+
+ def _check_response(self, response):
+ if response == self._lib.kCCSuccess:
+ return
+ elif response == self._lib.kCCAlignmentError:
+ # This error is not currently triggered due to a bug filed as
+ # rdar://15589470
+ raise ValueError(
+ "The length of the provided data is not a multiple of "
+ "the block length"
+ )
+ else:
+ raise SystemError(
+ "The backend returned an error. Code: {0}".format(response)
+ )
+
+
+def _release_cipher_ctx(ctx):
+ """
+ Called by the garbage collector and used to safely dereference and
+ release the context.
+ """
+ if ctx[0] != backend._ffi.NULL:
+ res = backend._lib.CCCryptorRelease(ctx[0])
+ backend._check_response(res)
+ ctx[0] = backend._ffi.NULL
+
+
+@utils.register_interface(interfaces.CipherContext)
+class _CipherContext(object):
+ _ENCRYPT = 0 # kCCEncrypt
+ _DECRYPT = 1 # kCCDecrypt
+
+ def __init__(self, backend, cipher, mode, operation):
+ self._backend = backend
+ self._cipher = cipher
+ self._mode = mode
+ self._operation = operation
+ # There is a bug in CommonCrypto where block ciphers do not raise
+ # kCCAlignmentError when finalizing if you supply non-block aligned
+ # data. To work around this we need to keep track of the block
+ # alignment ourselves, but only for alg+mode combos that require
+ # block alignment. OFB, CFB, and CTR make a block cipher algorithm
+ # into a stream cipher so we don't need to track them (and thus their
+ # block size is effectively 1 byte just like OpenSSL/CommonCrypto
+ # treat RC4 and other stream cipher block sizes).
+ # This bug has been filed as rdar://15589470
+ self._bytes_processed = 0
+ if (isinstance(cipher, interfaces.BlockCipherAlgorithm) and not
+ isinstance(mode, (OFB, CFB, CTR))):
+ self._byte_block_size = cipher.block_size // 8
+ else:
+ self._byte_block_size = 1
+
+ registry = self._backend._cipher_registry
+ try:
+ cipher_enum, mode_enum = registry[type(cipher), type(mode)]
+ except KeyError:
+ raise UnsupportedAlgorithm(
+ "cipher {0} in {1} mode is not supported "
+ "by this backend".format(
+ cipher.name, mode.name if mode else mode)
+ )
+
+ ctx = self._backend._ffi.new("CCCryptorRef *")
+ ctx = self._backend._ffi.gc(ctx, _release_cipher_ctx)
+
+ if isinstance(mode, interfaces.ModeWithInitializationVector):
+ iv_nonce = mode.initialization_vector
+ elif isinstance(mode, interfaces.ModeWithNonce):
+ iv_nonce = mode.nonce
+ else:
+ iv_nonce = self._backend._ffi.NULL
+
+ if isinstance(mode, CTR):
+ mode_option = self._backend._lib.kCCModeOptionCTR_BE
+ else:
+ mode_option = 0
+
+ res = self._backend._lib.CCCryptorCreateWithMode(
+ operation,
+ mode_enum, cipher_enum,
+ self._backend._lib.ccNoPadding, iv_nonce,
+ cipher.key, len(cipher.key),
+ self._backend._ffi.NULL, 0, 0, mode_option, ctx)
+ self._backend._check_response(res)
+
+ self._ctx = ctx
+
+ def update(self, data):
+ # Count bytes processed to handle block alignment.
+ self._bytes_processed += len(data)
+ buf = self._backend._ffi.new(
+ "unsigned char[]", len(data) + self._byte_block_size - 1)
+ outlen = self._backend._ffi.new("size_t *")
+ res = self._backend._lib.CCCryptorUpdate(
+ self._ctx[0], data, len(data), buf,
+ len(data) + self._byte_block_size - 1, outlen)
+ self._backend._check_response(res)
+ return self._backend._ffi.buffer(buf)[:outlen[0]]
+
+ def finalize(self):
+ # Raise error if block alignment is wrong.
+ if self._bytes_processed % self._byte_block_size:
+ raise ValueError(
+ "The length of the provided data is not a multiple of "
+ "the block length"
+ )
+ buf = self._backend._ffi.new("unsigned char[]", self._byte_block_size)
+ outlen = self._backend._ffi.new("size_t *")
+ res = self._backend._lib.CCCryptorFinal(
+ self._ctx[0], buf, len(buf), outlen)
+ self._backend._check_response(res)
+ _release_cipher_ctx(self._ctx)
+ return self._backend._ffi.buffer(buf)[:outlen[0]]
+
@utils.register_interface(interfaces.HashContext)
class _HashContext(object):
diff --git a/docs/changelog.rst b/docs/changelog.rst
index 819b426..c3859c2 100644
--- a/docs/changelog.rst
+++ b/docs/changelog.rst
@@ -6,7 +6,7 @@
**In development**
-* Added :doc:`/hazmat/backends/commoncrypto` with hash and HMAC support.
+* Added :doc:`/hazmat/backends/commoncrypto`.
* Added initial :doc:`/hazmat/bindings/commoncrypto`.
0.1 - 2014-01-08
diff --git a/tests/hazmat/backends/test_commoncrypto.py b/tests/hazmat/backends/test_commoncrypto.py
new file mode 100644
index 0000000..68ab6bc
--- /dev/null
+++ b/tests/hazmat/backends/test_commoncrypto.py
@@ -0,0 +1,46 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from cryptography.hazmat.bindings.commoncrypto.binding import Binding
+from cryptography.hazmat.primitives.ciphers.algorithms import AES
+from cryptography.hazmat.primitives.ciphers.modes import CBC
+
+
+@pytest.mark.skipif(not Binding.is_available(),
+ reason="CommonCrypto not available")
+class TestCommonCrypto(object):
+ def test_supports_cipher(self):
+ from cryptography.hazmat.backends.commoncrypto.backend import backend
+ assert backend.cipher_supported(None, None) is False
+
+ def test_register_duplicate_cipher_adapter(self):
+ from cryptography.hazmat.backends.commoncrypto.backend import backend
+ with pytest.raises(ValueError):
+ backend._register_cipher_adapter(
+ AES, backend._lib.kCCAlgorithmAES128,
+ CBC, backend._lib.kCCModeCBC
+ )
+
+ def test_handle_response(self):
+ from cryptography.hazmat.backends.commoncrypto.backend import backend
+
+ with pytest.raises(ValueError):
+ backend._check_response(backend._lib.kCCAlignmentError)
+
+ with pytest.raises(SystemError):
+ backend._check_response(backend._lib.kCCMemoryFailure)
+
+ with pytest.raises(SystemError):
+ backend._check_response(backend._lib.kCCDecodeError)