Allow using an OpenSSL hashed directory for verification in X509Store (#943)

Add X509Store.load_locations() to set a CA bundle file and/or an OpenSSL-
style hashed CA/CRL lookup directory, similar to the already existing
SSL.Context.load_verify_locations().

Co-authored-by: Sandor Oroszi <sandor.oroszi@balabit.com>
diff --git a/CHANGELOG.rst b/CHANGELOG.rst
index e8c52c2..e5f08d2 100644
--- a/CHANGELOG.rst
+++ b/CHANGELOG.rst
@@ -24,6 +24,9 @@
 Changes:
 ^^^^^^^^
 
+- Added ``OpenSSL.crypto.X509Store.load_locations`` to set trusted
+  certificate file bundles and/or directories for verification.
+  `#943 <https://github.com/pyca/pyopenssl/pull/943>`_
 - Added ``Context.set_keylog_callback`` to log key material.
   `#910 <https://github.com/pyca/pyopenssl/pull/910>`_
 - Added ``OpenSSL.SSL.Connection.get_verified_chain`` to retrieve the
diff --git a/src/OpenSSL/crypto.py b/src/OpenSSL/crypto.py
index fee5259..880c2b3 100644
--- a/src/OpenSSL/crypto.py
+++ b/src/OpenSSL/crypto.py
@@ -19,6 +19,7 @@
     exception_from_error_queue as _exception_from_error_queue,
     byte_string as _byte_string,
     native as _native,
+    path_string as _path_string,
     UNSPECIFIED as _UNSPECIFIED,
     text_to_bytes_and_warn as _text_to_bytes_and_warn,
     make_assert as _make_assert,
@@ -1674,6 +1675,53 @@
         _lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime("%s")))
         _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0)
 
+    def load_locations(self, cafile, capath=None):
+        """
+        Let X509Store know where we can find trusted certificates for the
+        certificate chain.  Note that the certificates have to be in PEM
+        format.
+
+        If *capath* is passed, it must be a directory prepared using the
+        ``c_rehash`` tool included with OpenSSL.  Either, but not both, of
+        *cafile* or *capath* may be ``None``.
+
+        .. note::
+
+          Both *cafile* and *capath* may be set simultaneously.
+
+          Call this method multiple times to add more than one location.
+          For example, CA certificates, and certificate revocation list bundles
+          may be passed in *cafile* in subsequent calls to this method.
+
+        .. versionadded:: 20.0
+
+        :param cafile: In which file we can find the certificates (``bytes`` or
+                       ``unicode``).
+        :param capath: In which directory we can find the certificates
+                       (``bytes`` or ``unicode``).
+
+        :return: ``None`` if the locations were set successfully.
+
+        :raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None``
+            or the locations could not be set for any reason.
+
+        """
+        if cafile is None:
+            cafile = _ffi.NULL
+        else:
+            cafile = _path_string(cafile)
+
+        if capath is None:
+            capath = _ffi.NULL
+        else:
+            capath = _path_string(capath)
+
+        load_result = _lib.X509_STORE_load_locations(
+            self._store, cafile, capath
+        )
+        if not load_result:
+            _raise_current_error()
+
 
 class X509StoreContextError(Exception):
     """
diff --git a/tests/test_crypto.py b/tests/test_crypto.py
index ac4e729..820a4b2 100644
--- a/tests/test_crypto.py
+++ b/tests/test_crypto.py
@@ -10,6 +10,7 @@
 import base64
 from subprocess import PIPE, Popen
 from datetime import datetime, timedelta
+import sys
 
 import pytest
 
@@ -46,7 +47,14 @@
     get_elliptic_curves,
 )
 
-from .util import EqualityTestsMixin, is_consistent_type, WARNING_TYPE_EXPECTED
+from OpenSSL._util import ffi as _ffi, lib as _lib
+
+from .util import (
+    EqualityTestsMixin,
+    is_consistent_type,
+    WARNING_TYPE_EXPECTED,
+    NON_ASCII,
+)
 
 
 def normalize_privatekey_pem(pem):
@@ -2228,6 +2236,68 @@
         store.add_cert(cert)
         store.add_cert(cert)
 
+    @pytest.mark.parametrize(
+        "cafile, capath, call_cafile, call_capath",
+        [
+            (
+                "/cafile" + NON_ASCII,
+                None,
+                b"/cafile" + NON_ASCII.encode(sys.getfilesystemencoding()),
+                _ffi.NULL,
+            ),
+            (
+                b"/cafile" + NON_ASCII.encode("utf-8"),
+                None,
+                b"/cafile" + NON_ASCII.encode("utf-8"),
+                _ffi.NULL,
+            ),
+            (
+                None,
+                "/capath" + NON_ASCII,
+                _ffi.NULL,
+                b"/capath" + NON_ASCII.encode(sys.getfilesystemencoding()),
+            ),
+            (
+                None,
+                b"/capath" + NON_ASCII.encode("utf-8"),
+                _ffi.NULL,
+                b"/capath" + NON_ASCII.encode("utf-8"),
+            ),
+        ],
+    )
+    def test_load_locations_parameters(
+        self, cafile, capath, call_cafile, call_capath, monkeypatch
+    ):
+        class LibMock(object):
+            def load_locations(self, store, cafile, capath):
+                self.cafile = cafile
+                self.capath = capath
+                return 1
+
+        lib_mock = LibMock()
+        monkeypatch.setattr(
+            _lib, "X509_STORE_load_locations", lib_mock.load_locations
+        )
+
+        store = X509Store()
+        store.load_locations(cafile=cafile, capath=capath)
+
+        assert call_cafile == lib_mock.cafile
+        assert call_capath == lib_mock.capath
+
+    def test_load_locations_fails_when_all_args_are_none(self):
+        store = X509Store()
+        with pytest.raises(Error):
+            store.load_locations(None, None)
+
+    def test_load_locations_raises_error_on_failure(self, tmpdir):
+        invalid_ca_file = tmpdir.join("invalid.pem")
+        invalid_ca_file.write("This is not a certificate")
+
+        store = X509Store()
+        with pytest.raises(Error):
+            store.load_locations(cafile=str(invalid_ca_file))
+
 
 class TestPKCS12(object):
     """
@@ -3884,6 +3954,70 @@
         assert exc.value.args[0][2] == "unable to get issuer certificate"
         assert exc.value.certificate.get_subject().CN == "intermediate"
 
+    @pytest.fixture
+    def root_ca_file(self, tmpdir):
+        return self._create_ca_file(tmpdir, "root_ca_hash_dir", self.root_cert)
+
+    @pytest.fixture
+    def intermediate_ca_file(self, tmpdir):
+        return self._create_ca_file(
+            tmpdir, "intermediate_ca_hash_dir", self.intermediate_cert
+        )
+
+    @staticmethod
+    def _create_ca_file(base_path, hash_directory, cacert):
+        ca_hash = "{:08x}.0".format(cacert.subject_name_hash())
+        cafile = base_path.join(hash_directory, ca_hash)
+        cafile.write_binary(
+            dump_certificate(FILETYPE_PEM, cacert), ensure=True
+        )
+        return cafile
+
+    def test_verify_with_ca_file_location(self, root_ca_file):
+        store = X509Store()
+        store.load_locations(str(root_ca_file))
+
+        store_ctx = X509StoreContext(store, self.intermediate_cert)
+        store_ctx.verify_certificate()
+
+    def test_verify_with_ca_path_location(self, root_ca_file):
+        store = X509Store()
+        store.load_locations(None, str(root_ca_file.dirname))
+
+        store_ctx = X509StoreContext(store, self.intermediate_cert)
+        store_ctx.verify_certificate()
+
+    def test_verify_with_cafile_and_capath(
+        self, root_ca_file, intermediate_ca_file
+    ):
+        store = X509Store()
+        store.load_locations(
+            cafile=str(root_ca_file), capath=str(intermediate_ca_file.dirname)
+        )
+
+        store_ctx = X509StoreContext(store, self.intermediate_server_cert)
+        store_ctx.verify_certificate()
+
+    def test_verify_with_multiple_ca_files(
+        self, root_ca_file, intermediate_ca_file
+    ):
+        store = X509Store()
+        store.load_locations(str(root_ca_file))
+        store.load_locations(str(intermediate_ca_file))
+
+        store_ctx = X509StoreContext(store, self.intermediate_server_cert)
+        store_ctx.verify_certificate()
+
+    def test_verify_failure_with_empty_ca_directory(self, tmpdir):
+        store = X509Store()
+        store.load_locations(None, str(tmpdir))
+
+        store_ctx = X509StoreContext(store, self.intermediate_cert)
+        with pytest.raises(X509StoreContextError) as exc:
+            store_ctx.verify_certificate()
+
+        assert exc.value.args[0][2] == "unable to get local issuer certificate"
+
 
 class TestSignVerify(object):
     """