OCSP response builder (#4485)

* ocsp response builder

* better prose

* review changes
diff --git a/tests/x509/test_ocsp.py b/tests/x509/test_ocsp.py
index 3ee6a26..fad48da 100644
--- a/tests/x509/test_ocsp.py
+++ b/tests/x509/test_ocsp.py
@@ -13,10 +13,12 @@
 from cryptography import x509
 from cryptography.exceptions import UnsupportedAlgorithm
 from cryptography.hazmat.primitives import hashes, serialization
+from cryptography.hazmat.primitives.asymmetric import ec
 from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
 from cryptography.x509 import ocsp
 
 from .test_x509 import _load_cert
+from ..hazmat.primitives.fixtures_ec import EC_KEY_SECP256R1
 from ..utils import load_vectors_from_file
 
 
@@ -43,6 +45,33 @@
     return cert, issuer
 
 
+def _generate_root():
+    from cryptography.hazmat.backends.openssl.backend import backend
+
+    private_key = EC_KEY_SECP256R1.private_key(backend)
+    subject = x509.Name([
+        x509.NameAttribute(x509.NameOID.COUNTRY_NAME, u'US'),
+        x509.NameAttribute(x509.NameOID.COMMON_NAME, u'Cryptography CA'),
+    ])
+
+    builder = x509.CertificateBuilder().serial_number(
+        123456789
+    ).issuer_name(
+        subject
+    ).subject_name(
+        subject
+    ).public_key(
+        private_key.public_key()
+    ).not_valid_before(
+        datetime.datetime.now()
+    ).not_valid_after(
+        datetime.datetime.now() + datetime.timedelta(days=3650)
+    )
+
+    cert = builder.sign(private_key, hashes.SHA256(), backend)
+    return cert, private_key
+
+
 class TestOCSPRequest(object):
     def test_bad_request(self):
         with pytest.raises(ValueError):
@@ -182,6 +211,359 @@
         assert req.extensions[0].critical is critical
 
 
+class TestOCSPResponseBuilder(object):
+    def test_add_response_twice(self):
+        cert, issuer = _cert_and_issuer()
+        time = datetime.datetime.now()
+        builder = ocsp.OCSPResponseBuilder()
+        builder = builder.add_response(
+            cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD, time,
+            time, None, None
+        )
+        with pytest.raises(ValueError):
+            builder.add_response(
+                cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD, time,
+                time, None, None
+            )
+
+    def test_invalid_add_response(self):
+        cert, issuer = _cert_and_issuer()
+        time = datetime.datetime.utcnow()
+        reason = x509.ReasonFlags.cessation_of_operation
+        builder = ocsp.OCSPResponseBuilder()
+        with pytest.raises(TypeError):
+            builder.add_response(
+                'bad', issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD,
+                time, time, None, None
+            )
+        with pytest.raises(TypeError):
+            builder.add_response(
+                cert, 'bad', hashes.SHA256(), ocsp.OCSPCertStatus.GOOD,
+                time, time, None, None
+            )
+        with pytest.raises(ValueError):
+            builder.add_response(
+                cert, issuer, 'notahash', ocsp.OCSPCertStatus.GOOD,
+                time, time, None, None
+            )
+        with pytest.raises(TypeError):
+            builder.add_response(
+                cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD,
+                'bad', time, None, None
+            )
+        with pytest.raises(TypeError):
+            builder.add_response(
+                cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD,
+                time, 'bad', None, None
+            )
+
+        with pytest.raises(TypeError):
+            builder.add_response(
+                cert, issuer, hashes.SHA256(), 0, time, time, None, None
+            )
+        with pytest.raises(ValueError):
+            builder.add_response(
+                cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD,
+                time, time, time, None
+            )
+        with pytest.raises(ValueError):
+            builder.add_response(
+                cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.GOOD,
+                time, time, None, reason
+            )
+        with pytest.raises(TypeError):
+            builder.add_response(
+                cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.REVOKED,
+                time, time, None, reason
+            )
+        with pytest.raises(TypeError):
+            builder.add_response(
+                cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.REVOKED,
+                time, time, time, 0
+            )
+        with pytest.raises(ValueError):
+            builder.add_response(
+                cert, issuer, hashes.SHA256(), ocsp.OCSPCertStatus.REVOKED,
+                time, time, time - datetime.timedelta(days=36500), None
+            )
+
+    def test_invalid_certificates(self):
+        builder = ocsp.OCSPResponseBuilder()
+        with pytest.raises(ValueError):
+            builder.certificates([])
+        with pytest.raises(TypeError):
+            builder.certificates(['notacert'])
+        with pytest.raises(TypeError):
+            builder.certificates('invalid')
+
+        _, issuer = _cert_and_issuer()
+        builder = builder.certificates([issuer])
+        with pytest.raises(ValueError):
+            builder.certificates([issuer])
+
+    def test_invalid_responder_id(self):
+        builder = ocsp.OCSPResponseBuilder()
+        cert, _ = _cert_and_issuer()
+        with pytest.raises(TypeError):
+            builder.responder_id(ocsp.OCSPResponderEncoding.HASH, 'invalid')
+        with pytest.raises(TypeError):
+            builder.responder_id('notanenum', cert)
+
+        builder = builder.responder_id(ocsp.OCSPResponderEncoding.NAME, cert)
+        with pytest.raises(ValueError):
+            builder.responder_id(ocsp.OCSPResponderEncoding.NAME, cert)
+
+    def test_invalid_extension(self):
+        builder = ocsp.OCSPResponseBuilder()
+        with pytest.raises(TypeError):
+            builder.add_extension("notanextension", True)
+
+    def test_sign_no_response(self):
+        builder = ocsp.OCSPResponseBuilder()
+        root_cert, private_key = _generate_root()
+        builder = builder.responder_id(
+            ocsp.OCSPResponderEncoding.NAME, root_cert
+        )
+        with pytest.raises(ValueError):
+            builder.sign(private_key, hashes.SHA256())
+
+    def test_sign_no_responder_id(self):
+        builder = ocsp.OCSPResponseBuilder()
+        cert, issuer = _cert_and_issuer()
+        _, private_key = _generate_root()
+        current_time = datetime.datetime.utcnow().replace(microsecond=0)
+        this_update = current_time - datetime.timedelta(days=1)
+        next_update = this_update + datetime.timedelta(days=7)
+        builder = builder.add_response(
+            cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update,
+            next_update, None, None
+        )
+        with pytest.raises(ValueError):
+            builder.sign(private_key, hashes.SHA256())
+
+    def test_sign_invalid_hash_algorithm(self):
+        builder = ocsp.OCSPResponseBuilder()
+        cert, issuer = _cert_and_issuer()
+        root_cert, private_key = _generate_root()
+        current_time = datetime.datetime.utcnow().replace(microsecond=0)
+        this_update = current_time - datetime.timedelta(days=1)
+        next_update = this_update + datetime.timedelta(days=7)
+        builder = builder.responder_id(
+            ocsp.OCSPResponderEncoding.NAME, root_cert
+        ).add_response(
+            cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update,
+            next_update, None, None
+        )
+        with pytest.raises(TypeError):
+            builder.sign(private_key, 'notahash')
+
+    def test_sign_good_cert(self):
+        builder = ocsp.OCSPResponseBuilder()
+        cert, issuer = _cert_and_issuer()
+        root_cert, private_key = _generate_root()
+        current_time = datetime.datetime.utcnow().replace(microsecond=0)
+        this_update = current_time - datetime.timedelta(days=1)
+        next_update = this_update + datetime.timedelta(days=7)
+        builder = builder.responder_id(
+            ocsp.OCSPResponderEncoding.NAME, root_cert
+        ).add_response(
+            cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update,
+            next_update, None, None
+        )
+        resp = builder.sign(private_key, hashes.SHA256())
+        assert resp.responder_name == root_cert.subject
+        assert resp.responder_key_hash is None
+        assert (current_time - resp.produced_at).total_seconds() < 10
+        assert (resp.signature_algorithm_oid ==
+                x509.SignatureAlgorithmOID.ECDSA_WITH_SHA256)
+        assert resp.certificate_status == ocsp.OCSPCertStatus.GOOD
+        assert resp.revocation_time is None
+        assert resp.revocation_reason is None
+        assert resp.this_update == this_update
+        assert resp.next_update == next_update
+        private_key.public_key().verify(
+            resp.signature, resp.tbs_response_bytes, ec.ECDSA(hashes.SHA256())
+        )
+
+    def test_sign_revoked_cert(self):
+        builder = ocsp.OCSPResponseBuilder()
+        cert, issuer = _cert_and_issuer()
+        root_cert, private_key = _generate_root()
+        current_time = datetime.datetime.utcnow().replace(microsecond=0)
+        this_update = current_time - datetime.timedelta(days=1)
+        next_update = this_update + datetime.timedelta(days=7)
+        revoked_date = this_update - datetime.timedelta(days=300)
+        builder = builder.responder_id(
+            ocsp.OCSPResponderEncoding.NAME, root_cert
+        ).add_response(
+            cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.REVOKED,
+            this_update, next_update, revoked_date, None
+        )
+        resp = builder.sign(private_key, hashes.SHA256())
+        assert resp.certificate_status == ocsp.OCSPCertStatus.REVOKED
+        assert resp.revocation_time == revoked_date
+        assert resp.revocation_reason is None
+        assert resp.this_update == this_update
+        assert resp.next_update == next_update
+        private_key.public_key().verify(
+            resp.signature, resp.tbs_response_bytes, ec.ECDSA(hashes.SHA256())
+        )
+
+    def test_sign_with_appended_certs(self):
+        builder = ocsp.OCSPResponseBuilder()
+        cert, issuer = _cert_and_issuer()
+        root_cert, private_key = _generate_root()
+        current_time = datetime.datetime.utcnow().replace(microsecond=0)
+        this_update = current_time - datetime.timedelta(days=1)
+        next_update = this_update + datetime.timedelta(days=7)
+        builder = builder.responder_id(
+            ocsp.OCSPResponderEncoding.NAME, root_cert
+        ).add_response(
+            cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update,
+            next_update, None, None
+        ).certificates(
+            [root_cert]
+        )
+        resp = builder.sign(private_key, hashes.SHA256())
+        assert resp.certificates == [root_cert]
+
+    def test_sign_revoked_no_next_update(self):
+        builder = ocsp.OCSPResponseBuilder()
+        cert, issuer = _cert_and_issuer()
+        root_cert, private_key = _generate_root()
+        current_time = datetime.datetime.utcnow().replace(microsecond=0)
+        this_update = current_time - datetime.timedelta(days=1)
+        revoked_date = this_update - datetime.timedelta(days=300)
+        builder = builder.responder_id(
+            ocsp.OCSPResponderEncoding.NAME, root_cert
+        ).add_response(
+            cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.REVOKED,
+            this_update, None, revoked_date, None
+        )
+        resp = builder.sign(private_key, hashes.SHA256())
+        assert resp.certificate_status == ocsp.OCSPCertStatus.REVOKED
+        assert resp.revocation_time == revoked_date
+        assert resp.revocation_reason is None
+        assert resp.this_update == this_update
+        assert resp.next_update is None
+        private_key.public_key().verify(
+            resp.signature, resp.tbs_response_bytes, ec.ECDSA(hashes.SHA256())
+        )
+
+    def test_sign_revoked_with_reason(self):
+        builder = ocsp.OCSPResponseBuilder()
+        cert, issuer = _cert_and_issuer()
+        root_cert, private_key = _generate_root()
+        current_time = datetime.datetime.utcnow().replace(microsecond=0)
+        this_update = current_time - datetime.timedelta(days=1)
+        next_update = this_update + datetime.timedelta(days=7)
+        revoked_date = this_update - datetime.timedelta(days=300)
+        builder = builder.responder_id(
+            ocsp.OCSPResponderEncoding.NAME, root_cert
+        ).add_response(
+            cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.REVOKED,
+            this_update, next_update, revoked_date,
+            x509.ReasonFlags.key_compromise
+        )
+        resp = builder.sign(private_key, hashes.SHA256())
+        assert resp.certificate_status == ocsp.OCSPCertStatus.REVOKED
+        assert resp.revocation_time == revoked_date
+        assert resp.revocation_reason is x509.ReasonFlags.key_compromise
+        assert resp.this_update == this_update
+        assert resp.next_update == next_update
+        private_key.public_key().verify(
+            resp.signature, resp.tbs_response_bytes, ec.ECDSA(hashes.SHA256())
+        )
+
+    def test_sign_responder_id_key_hash(self):
+        builder = ocsp.OCSPResponseBuilder()
+        cert, issuer = _cert_and_issuer()
+        root_cert, private_key = _generate_root()
+        current_time = datetime.datetime.utcnow().replace(microsecond=0)
+        this_update = current_time - datetime.timedelta(days=1)
+        next_update = this_update + datetime.timedelta(days=7)
+        builder = builder.responder_id(
+            ocsp.OCSPResponderEncoding.HASH, root_cert
+        ).add_response(
+            cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update,
+            next_update, None, None
+        )
+        resp = builder.sign(private_key, hashes.SHA256())
+        assert resp.responder_name is None
+        assert resp.responder_key_hash == (
+            b'\x8ca\x94\xe0\x948\xed\x89\xd8\xd4N\x89p\t\xd6\xf9^_\xec}'
+        )
+        private_key.public_key().verify(
+            resp.signature, resp.tbs_response_bytes, ec.ECDSA(hashes.SHA256())
+        )
+
+    def test_invalid_sign_responder_cert_does_not_match_private_key(self):
+        builder = ocsp.OCSPResponseBuilder()
+        cert, issuer = _cert_and_issuer()
+        root_cert, private_key = _generate_root()
+        current_time = datetime.datetime.utcnow().replace(microsecond=0)
+        this_update = current_time - datetime.timedelta(days=1)
+        next_update = this_update + datetime.timedelta(days=7)
+        builder = builder.responder_id(
+            ocsp.OCSPResponderEncoding.HASH, root_cert
+        ).add_response(
+            cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update,
+            next_update, None, None
+        )
+        from cryptography.hazmat.backends.openssl.backend import backend
+        diff_key = ec.generate_private_key(ec.SECP256R1(), backend)
+        with pytest.raises(ValueError):
+            builder.sign(diff_key, hashes.SHA256())
+
+    def test_sign_with_extension(self):
+        builder = ocsp.OCSPResponseBuilder()
+        cert, issuer = _cert_and_issuer()
+        root_cert, private_key = _generate_root()
+        current_time = datetime.datetime.utcnow().replace(microsecond=0)
+        this_update = current_time - datetime.timedelta(days=1)
+        next_update = this_update + datetime.timedelta(days=7)
+        builder = builder.responder_id(
+            ocsp.OCSPResponderEncoding.HASH, root_cert
+        ).add_response(
+            cert, issuer, hashes.SHA1(), ocsp.OCSPCertStatus.GOOD, this_update,
+            next_update, None, None
+        ).add_extension(
+            x509.OCSPNonce(b"012345"), False
+        )
+        resp = builder.sign(private_key, hashes.SHA256())
+        assert len(resp.extensions) == 1
+        assert resp.extensions[0].value == x509.OCSPNonce(b"012345")
+        assert resp.extensions[0].critical is False
+        private_key.public_key().verify(
+            resp.signature, resp.tbs_response_bytes, ec.ECDSA(hashes.SHA256())
+        )
+
+    @pytest.mark.parametrize(
+        ("status", "der"),
+        [
+            (ocsp.OCSPResponseStatus.MALFORMED_REQUEST, b"0\x03\n\x01\x01"),
+            (ocsp.OCSPResponseStatus.INTERNAL_ERROR, b"0\x03\n\x01\x02"),
+            (ocsp.OCSPResponseStatus.TRY_LATER, b"0\x03\n\x01\x03"),
+            (ocsp.OCSPResponseStatus.SIG_REQUIRED, b"0\x03\n\x01\x05"),
+            (ocsp.OCSPResponseStatus.UNAUTHORIZED, b"0\x03\n\x01\x06"),
+        ]
+    )
+    def test_build_non_successful_statuses(self, status, der):
+        resp = ocsp.OCSPResponseBuilder.build_unsuccessful(status)
+        assert resp.response_status is status
+        assert resp.public_bytes(serialization.Encoding.DER) == der
+
+    def test_invalid_build_not_a_status(self):
+        with pytest.raises(TypeError):
+            ocsp.OCSPResponseBuilder.build_unsuccessful("notastatus")
+
+    def test_invalid_build_successful_status(self):
+        with pytest.raises(ValueError):
+            ocsp.OCSPResponseBuilder.build_unsuccessful(
+                ocsp.OCSPResponseStatus.SUCCESSFUL
+            )
+
+
 class TestOCSPResponse(object):
     def test_bad_response(self):
         with pytest.raises(ValueError):