| # coding: utf-8 |
| |
| """ |
| ASN.1 type classes for the online certificate status protocol (OCSP). Exports |
| the following items: |
| |
| - OCSPRequest() |
| - OCSPResponse() |
| |
| Other type classes are defined that help compose the types listed above. |
| """ |
| |
| from __future__ import unicode_literals, division, absolute_import, print_function |
| |
| from ._errors import unwrap |
| from .algos import DigestAlgorithm, SignedDigestAlgorithm |
| from .core import ( |
| Boolean, |
| Choice, |
| Enumerated, |
| GeneralizedTime, |
| IA5String, |
| Integer, |
| Null, |
| ObjectIdentifier, |
| OctetBitString, |
| OctetString, |
| ParsableOctetString, |
| Sequence, |
| SequenceOf, |
| ) |
| from .crl import AuthorityInfoAccessSyntax, CRLReason |
| from .keys import PublicKeyAlgorithm |
| from .x509 import Certificate, GeneralName, GeneralNames, Name |
| |
| |
| # The structures in this file are taken from https://tools.ietf.org/html/rfc6960 |
| |
| |
| class Version(Integer): |
| _map = { |
| 0: 'v1' |
| } |
| |
| |
| class CertId(Sequence): |
| _fields = [ |
| ('hash_algorithm', DigestAlgorithm), |
| ('issuer_name_hash', OctetString), |
| ('issuer_key_hash', OctetString), |
| ('serial_number', Integer), |
| ] |
| |
| |
| class ServiceLocator(Sequence): |
| _fields = [ |
| ('issuer', Name), |
| ('locator', AuthorityInfoAccessSyntax), |
| ] |
| |
| |
| class RequestExtensionId(ObjectIdentifier): |
| _map = { |
| '1.3.6.1.5.5.7.48.1.7': 'service_locator', |
| } |
| |
| |
| class RequestExtension(Sequence): |
| _fields = [ |
| ('extn_id', RequestExtensionId), |
| ('critical', Boolean, {'default': False}), |
| ('extn_value', ParsableOctetString), |
| ] |
| |
| _oid_pair = ('extn_id', 'extn_value') |
| _oid_specs = { |
| 'service_locator': ServiceLocator, |
| } |
| |
| |
| class RequestExtensions(SequenceOf): |
| _child_spec = RequestExtension |
| |
| |
| class Request(Sequence): |
| _fields = [ |
| ('req_cert', CertId), |
| ('single_request_extensions', RequestExtensions, {'explicit': 0, 'optional': True}), |
| ] |
| |
| _processed_extensions = False |
| _critical_extensions = None |
| _service_locator_value = None |
| |
| def _set_extensions(self): |
| """ |
| Sets common named extensions to private attributes and creates a list |
| of critical extensions |
| """ |
| |
| self._critical_extensions = set() |
| |
| for extension in self['single_request_extensions']: |
| name = extension['extn_id'].native |
| attribute_name = '_%s_value' % name |
| if hasattr(self, attribute_name): |
| setattr(self, attribute_name, extension['extn_value'].parsed) |
| if extension['critical'].native: |
| self._critical_extensions.add(name) |
| |
| self._processed_extensions = True |
| |
| @property |
| def critical_extensions(self): |
| """ |
| Returns a set of the names (or OID if not a known extension) of the |
| extensions marked as critical |
| |
| :return: |
| A set of unicode strings |
| """ |
| |
| if not self._processed_extensions: |
| self._set_extensions() |
| return self._critical_extensions |
| |
| @property |
| def service_locator_value(self): |
| """ |
| This extension is used when communicating with an OCSP responder that |
| acts as a proxy for OCSP requests |
| |
| :return: |
| None or a ServiceLocator object |
| """ |
| |
| if self._processed_extensions is False: |
| self._set_extensions() |
| return self._service_locator_value |
| |
| |
| class Requests(SequenceOf): |
| _child_spec = Request |
| |
| |
| class ResponseType(ObjectIdentifier): |
| _map = { |
| '1.3.6.1.5.5.7.48.1.1': 'basic_ocsp_response', |
| } |
| |
| |
| class AcceptableResponses(SequenceOf): |
| _child_spec = ResponseType |
| |
| |
| class PreferredSignatureAlgorithm(Sequence): |
| _fields = [ |
| ('sig_identifier', SignedDigestAlgorithm), |
| ('cert_identifier', PublicKeyAlgorithm, {'optional': True}), |
| ] |
| |
| |
| class PreferredSignatureAlgorithms(SequenceOf): |
| _child_spec = PreferredSignatureAlgorithm |
| |
| |
| class TBSRequestExtensionId(ObjectIdentifier): |
| _map = { |
| '1.3.6.1.5.5.7.48.1.2': 'nonce', |
| '1.3.6.1.5.5.7.48.1.4': 'acceptable_responses', |
| '1.3.6.1.5.5.7.48.1.8': 'preferred_signature_algorithms', |
| } |
| |
| |
| class TBSRequestExtension(Sequence): |
| _fields = [ |
| ('extn_id', TBSRequestExtensionId), |
| ('critical', Boolean, {'default': False}), |
| ('extn_value', ParsableOctetString), |
| ] |
| |
| _oid_pair = ('extn_id', 'extn_value') |
| _oid_specs = { |
| 'nonce': OctetString, |
| 'acceptable_responses': AcceptableResponses, |
| 'preferred_signature_algorithms': PreferredSignatureAlgorithms, |
| } |
| |
| |
| class TBSRequestExtensions(SequenceOf): |
| _child_spec = TBSRequestExtension |
| |
| |
| class TBSRequest(Sequence): |
| _fields = [ |
| ('version', Version, {'explicit': 0, 'default': 'v1'}), |
| ('requestor_name', GeneralName, {'explicit': 1, 'optional': True}), |
| ('request_list', Requests), |
| ('request_extensions', TBSRequestExtensions, {'explicit': 2, 'optional': True}), |
| ] |
| |
| |
| class Certificates(SequenceOf): |
| _child_spec = Certificate |
| |
| |
| class Signature(Sequence): |
| _fields = [ |
| ('signature_algorithm', SignedDigestAlgorithm), |
| ('signature', OctetBitString), |
| ('certs', Certificates, {'explicit': 0, 'optional': True}), |
| ] |
| |
| |
| class OCSPRequest(Sequence): |
| _fields = [ |
| ('tbs_request', TBSRequest), |
| ('optional_signature', Signature, {'explicit': 0, 'optional': True}), |
| ] |
| |
| _processed_extensions = False |
| _critical_extensions = None |
| _nonce_value = None |
| _acceptable_responses_value = None |
| _preferred_signature_algorithms_value = None |
| |
| def _set_extensions(self): |
| """ |
| Sets common named extensions to private attributes and creates a list |
| of critical extensions |
| """ |
| |
| self._critical_extensions = set() |
| |
| for extension in self['tbs_request']['request_extensions']: |
| name = extension['extn_id'].native |
| attribute_name = '_%s_value' % name |
| if hasattr(self, attribute_name): |
| setattr(self, attribute_name, extension['extn_value'].parsed) |
| if extension['critical'].native: |
| self._critical_extensions.add(name) |
| |
| self._processed_extensions = True |
| |
| @property |
| def critical_extensions(self): |
| """ |
| Returns a set of the names (or OID if not a known extension) of the |
| extensions marked as critical |
| |
| :return: |
| A set of unicode strings |
| """ |
| |
| if not self._processed_extensions: |
| self._set_extensions() |
| return self._critical_extensions |
| |
| @property |
| def nonce_value(self): |
| """ |
| This extension is used to prevent replay attacks by including a unique, |
| random value with each request/response pair |
| |
| :return: |
| None or an OctetString object |
| """ |
| |
| if self._processed_extensions is False: |
| self._set_extensions() |
| return self._nonce_value |
| |
| @property |
| def acceptable_responses_value(self): |
| """ |
| This extension is used to allow the client and server to communicate |
| with alternative response formats other than just basic_ocsp_response, |
| although no other formats are defined in the standard. |
| |
| :return: |
| None or an AcceptableResponses object |
| """ |
| |
| if self._processed_extensions is False: |
| self._set_extensions() |
| return self._acceptable_responses_value |
| |
| @property |
| def preferred_signature_algorithms_value(self): |
| """ |
| This extension is used by the client to define what signature algorithms |
| are preferred, including both the hash algorithm and the public key |
| algorithm, with a level of detail down to even the public key algorithm |
| parameters, such as curve name. |
| |
| :return: |
| None or a PreferredSignatureAlgorithms object |
| """ |
| |
| if self._processed_extensions is False: |
| self._set_extensions() |
| return self._preferred_signature_algorithms_value |
| |
| |
| class OCSPResponseStatus(Enumerated): |
| _map = { |
| 0: 'successful', |
| 1: 'malformed_request', |
| 2: 'internal_error', |
| 3: 'try_later', |
| 5: 'sign_required', |
| 6: 'unauthorized', |
| } |
| |
| |
| class ResponderId(Choice): |
| _alternatives = [ |
| ('by_name', Name, {'explicit': 1}), |
| ('by_key', OctetString, {'explicit': 2}), |
| ] |
| |
| |
| # Custom class to return a meaningful .native attribute from CertStatus() |
| class StatusGood(Null): |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| None or 'good' |
| """ |
| |
| if value is not None and value != 'good' and not isinstance(value, Null): |
| raise ValueError(unwrap( |
| ''' |
| value must be one of None, "good", not %s |
| ''', |
| repr(value) |
| )) |
| |
| self.contents = b'' |
| |
| @property |
| def native(self): |
| return 'good' |
| |
| |
| # Custom class to return a meaningful .native attribute from CertStatus() |
| class StatusUnknown(Null): |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| None or 'unknown' |
| """ |
| |
| if value is not None and value != 'unknown' and not isinstance(value, Null): |
| raise ValueError(unwrap( |
| ''' |
| value must be one of None, "unknown", not %s |
| ''', |
| repr(value) |
| )) |
| |
| self.contents = b'' |
| |
| @property |
| def native(self): |
| return 'unknown' |
| |
| |
| class RevokedInfo(Sequence): |
| _fields = [ |
| ('revocation_time', GeneralizedTime), |
| ('revocation_reason', CRLReason, {'explicit': 0, 'optional': True}), |
| ] |
| |
| |
| class CertStatus(Choice): |
| _alternatives = [ |
| ('good', StatusGood, {'implicit': 0}), |
| ('revoked', RevokedInfo, {'implicit': 1}), |
| ('unknown', StatusUnknown, {'implicit': 2}), |
| ] |
| |
| |
| class CrlId(Sequence): |
| _fields = [ |
| ('crl_url', IA5String, {'explicit': 0, 'optional': True}), |
| ('crl_num', Integer, {'explicit': 1, 'optional': True}), |
| ('crl_time', GeneralizedTime, {'explicit': 2, 'optional': True}), |
| ] |
| |
| |
| class SingleResponseExtensionId(ObjectIdentifier): |
| _map = { |
| '1.3.6.1.5.5.7.48.1.3': 'crl', |
| '1.3.6.1.5.5.7.48.1.6': 'archive_cutoff', |
| # These are CRLEntryExtension values from |
| # https://tools.ietf.org/html/rfc5280 |
| '2.5.29.21': 'crl_reason', |
| '2.5.29.24': 'invalidity_date', |
| '2.5.29.29': 'certificate_issuer', |
| # https://tools.ietf.org/html/rfc6962.html#page-13 |
| '1.3.6.1.4.1.11129.2.4.5': 'signed_certificate_timestamp_list', |
| } |
| |
| |
| class SingleResponseExtension(Sequence): |
| _fields = [ |
| ('extn_id', SingleResponseExtensionId), |
| ('critical', Boolean, {'default': False}), |
| ('extn_value', ParsableOctetString), |
| ] |
| |
| _oid_pair = ('extn_id', 'extn_value') |
| _oid_specs = { |
| 'crl': CrlId, |
| 'archive_cutoff': GeneralizedTime, |
| 'crl_reason': CRLReason, |
| 'invalidity_date': GeneralizedTime, |
| 'certificate_issuer': GeneralNames, |
| 'signed_certificate_timestamp_list': OctetString, |
| } |
| |
| |
| class SingleResponseExtensions(SequenceOf): |
| _child_spec = SingleResponseExtension |
| |
| |
| class SingleResponse(Sequence): |
| _fields = [ |
| ('cert_id', CertId), |
| ('cert_status', CertStatus), |
| ('this_update', GeneralizedTime), |
| ('next_update', GeneralizedTime, {'explicit': 0, 'optional': True}), |
| ('single_extensions', SingleResponseExtensions, {'explicit': 1, 'optional': True}), |
| ] |
| |
| _processed_extensions = False |
| _critical_extensions = None |
| _crl_value = None |
| _archive_cutoff_value = None |
| _crl_reason_value = None |
| _invalidity_date_value = None |
| _certificate_issuer_value = None |
| |
| def _set_extensions(self): |
| """ |
| Sets common named extensions to private attributes and creates a list |
| of critical extensions |
| """ |
| |
| self._critical_extensions = set() |
| |
| for extension in self['single_extensions']: |
| name = extension['extn_id'].native |
| attribute_name = '_%s_value' % name |
| if hasattr(self, attribute_name): |
| setattr(self, attribute_name, extension['extn_value'].parsed) |
| if extension['critical'].native: |
| self._critical_extensions.add(name) |
| |
| self._processed_extensions = True |
| |
| @property |
| def critical_extensions(self): |
| """ |
| Returns a set of the names (or OID if not a known extension) of the |
| extensions marked as critical |
| |
| :return: |
| A set of unicode strings |
| """ |
| |
| if not self._processed_extensions: |
| self._set_extensions() |
| return self._critical_extensions |
| |
| @property |
| def crl_value(self): |
| """ |
| This extension is used to locate the CRL that a certificate's revocation |
| is contained within. |
| |
| :return: |
| None or a CrlId object |
| """ |
| |
| if self._processed_extensions is False: |
| self._set_extensions() |
| return self._crl_value |
| |
| @property |
| def archive_cutoff_value(self): |
| """ |
| This extension is used to indicate the date at which an archived |
| (historical) certificate status entry will no longer be available. |
| |
| :return: |
| None or a GeneralizedTime object |
| """ |
| |
| if self._processed_extensions is False: |
| self._set_extensions() |
| return self._archive_cutoff_value |
| |
| @property |
| def crl_reason_value(self): |
| """ |
| This extension indicates the reason that a certificate was revoked. |
| |
| :return: |
| None or a CRLReason object |
| """ |
| |
| if self._processed_extensions is False: |
| self._set_extensions() |
| return self._crl_reason_value |
| |
| @property |
| def invalidity_date_value(self): |
| """ |
| This extension indicates the suspected date/time the private key was |
| compromised or the certificate became invalid. This would usually be |
| before the revocation date, which is when the CA processed the |
| revocation. |
| |
| :return: |
| None or a GeneralizedTime object |
| """ |
| |
| if self._processed_extensions is False: |
| self._set_extensions() |
| return self._invalidity_date_value |
| |
| @property |
| def certificate_issuer_value(self): |
| """ |
| This extension indicates the issuer of the certificate in question. |
| |
| :return: |
| None or an x509.GeneralNames object |
| """ |
| |
| if self._processed_extensions is False: |
| self._set_extensions() |
| return self._certificate_issuer_value |
| |
| |
| class Responses(SequenceOf): |
| _child_spec = SingleResponse |
| |
| |
| class ResponseDataExtensionId(ObjectIdentifier): |
| _map = { |
| '1.3.6.1.5.5.7.48.1.2': 'nonce', |
| '1.3.6.1.5.5.7.48.1.9': 'extended_revoke', |
| } |
| |
| |
| class ResponseDataExtension(Sequence): |
| _fields = [ |
| ('extn_id', ResponseDataExtensionId), |
| ('critical', Boolean, {'default': False}), |
| ('extn_value', ParsableOctetString), |
| ] |
| |
| _oid_pair = ('extn_id', 'extn_value') |
| _oid_specs = { |
| 'nonce': OctetString, |
| 'extended_revoke': Null, |
| } |
| |
| |
| class ResponseDataExtensions(SequenceOf): |
| _child_spec = ResponseDataExtension |
| |
| |
| class ResponseData(Sequence): |
| _fields = [ |
| ('version', Version, {'explicit': 0, 'default': 'v1'}), |
| ('responder_id', ResponderId), |
| ('produced_at', GeneralizedTime), |
| ('responses', Responses), |
| ('response_extensions', ResponseDataExtensions, {'explicit': 1, 'optional': True}), |
| ] |
| |
| |
| class BasicOCSPResponse(Sequence): |
| _fields = [ |
| ('tbs_response_data', ResponseData), |
| ('signature_algorithm', SignedDigestAlgorithm), |
| ('signature', OctetBitString), |
| ('certs', Certificates, {'explicit': 0, 'optional': True}), |
| ] |
| |
| |
| class ResponseBytes(Sequence): |
| _fields = [ |
| ('response_type', ResponseType), |
| ('response', ParsableOctetString), |
| ] |
| |
| _oid_pair = ('response_type', 'response') |
| _oid_specs = { |
| 'basic_ocsp_response': BasicOCSPResponse, |
| } |
| |
| |
| class OCSPResponse(Sequence): |
| _fields = [ |
| ('response_status', OCSPResponseStatus), |
| ('response_bytes', ResponseBytes, {'explicit': 0, 'optional': True}), |
| ] |
| |
| _processed_extensions = False |
| _critical_extensions = None |
| _nonce_value = None |
| _extended_revoke_value = None |
| |
| def _set_extensions(self): |
| """ |
| Sets common named extensions to private attributes and creates a list |
| of critical extensions |
| """ |
| |
| self._critical_extensions = set() |
| |
| for extension in self['response_bytes']['response'].parsed['tbs_response_data']['response_extensions']: |
| name = extension['extn_id'].native |
| attribute_name = '_%s_value' % name |
| if hasattr(self, attribute_name): |
| setattr(self, attribute_name, extension['extn_value'].parsed) |
| if extension['critical'].native: |
| self._critical_extensions.add(name) |
| |
| self._processed_extensions = True |
| |
| @property |
| def critical_extensions(self): |
| """ |
| Returns a set of the names (or OID if not a known extension) of the |
| extensions marked as critical |
| |
| :return: |
| A set of unicode strings |
| """ |
| |
| if not self._processed_extensions: |
| self._set_extensions() |
| return self._critical_extensions |
| |
| @property |
| def nonce_value(self): |
| """ |
| This extension is used to prevent replay attacks on the request/response |
| exchange |
| |
| :return: |
| None or an OctetString object |
| """ |
| |
| if self._processed_extensions is False: |
| self._set_extensions() |
| return self._nonce_value |
| |
| @property |
| def extended_revoke_value(self): |
| """ |
| This extension is used to signal that the responder will return a |
| "revoked" status for non-issued certificates. |
| |
| :return: |
| None or a Null object (if present) |
| """ |
| |
| if self._processed_extensions is False: |
| self._set_extensions() |
| return self._extended_revoke_value |
| |
| @property |
| def basic_ocsp_response(self): |
| """ |
| A shortcut into the BasicOCSPResponse sequence |
| |
| :return: |
| None or an asn1crypto.ocsp.BasicOCSPResponse object |
| """ |
| |
| return self['response_bytes']['response'].parsed |
| |
| @property |
| def response_data(self): |
| """ |
| A shortcut into the parsed, ResponseData sequence |
| |
| :return: |
| None or an asn1crypto.ocsp.ResponseData object |
| """ |
| |
| return self['response_bytes']['response'].parsed['tbs_response_data'] |