| # coding: utf-8 |
| |
| """ |
| ASN.1 type classes for universal types. Exports the following items: |
| |
| - Any() |
| - Asn1Value() |
| - BitString() |
| - BMPString() |
| - Boolean() |
| - CharacterString() |
| - Choice() |
| - EmbeddedPdv() |
| - Enumerated() |
| - GeneralizedTime() |
| - GeneralString() |
| - GraphicString() |
| - IA5String() |
| - InstanceOf() |
| - Integer() |
| - IntegerBitString() |
| - IntegerOctetString() |
| - NoValue() |
| - Null() |
| - NumericString() |
| - ObjectDescriptor() |
| - ObjectIdentifier() |
| - OctetBitString() |
| - OctetString() |
| - PrintableString() |
| - Real() |
| - RelativeOid() |
| - Sequence() |
| - SequenceOf() |
| - Set() |
| - SetOf() |
| - TeletexString() |
| - UniversalString() |
| - UTCTime() |
| - UTF8String() |
| - VideotexString() |
| - VisibleString() |
| |
| Other type classes are defined that help compose the types listed above. |
| """ |
| |
| from __future__ import unicode_literals, division, absolute_import, print_function |
| |
| import sys |
| import re |
| from collections import OrderedDict |
| from datetime import datetime, timedelta, tzinfo |
| from pprint import pprint |
| |
| from . import _teletex_codec |
| from ._int import int_to_bytes, int_from_bytes |
| |
| # Python 2 |
| if sys.version_info <= (3,): |
| str_cls = unicode #pylint: disable=E0602 |
| byte_cls = str |
| int_types = (int, long) #pylint: disable=E0602 |
| py2 = True |
| chr_cls = chr |
| range = xrange #pylint: disable=E0602,W0622 |
| |
| class utc(tzinfo): |
| |
| def tzname(self, _): |
| return 'UTC+00:00' |
| |
| def utcoffset(self, _): |
| return timedelta(0) |
| |
| def dst(self, _): |
| return None |
| |
| class timezone(): |
| |
| utc = utc() |
| |
| # Python 3 |
| else: |
| str_cls = str |
| byte_cls = bytes |
| int_types = int |
| py2 = False |
| |
| def chr_cls(num): |
| return bytes([num]) |
| |
| from datetime import timezone |
| |
| |
| |
| _teletex_codec.register() |
| |
| |
| CLASS_NUM_TO_NAME_MAP = { |
| 0: 'universal', |
| 1: 'application', |
| 2: 'context', |
| 3: 'private', |
| } |
| |
| CLASS_NAME_TO_NUM_MAP = { |
| 'universal': 0, |
| 'application': 1, |
| 'context': 2, |
| 'private': 3, |
| 0: 0, |
| 1: 1, |
| 2: 2, |
| 3: 3, |
| } |
| |
| METHOD_NUM_TO_NAME_MAP = { |
| 0: 'primitive', |
| 1: 'constructed', |
| } |
| |
| |
| # A global tracker to ensure that _setup() is called for every class, even |
| # if is has been called for a parent class. This allows different _fields |
| # definitions for child classes. Without such a construct, the child classes |
| # would just see the parent class attributes and would use them. |
| _SETUP_CLASSES = {} |
| |
| |
| class Asn1Value(): |
| """ |
| The basis of all ASN.1 values |
| """ |
| |
| # The integer 0 for primitive, 1 for constructed |
| method = None |
| |
| # An integer 0 through 3 - see CLASS_NUM_TO_NAME_MAP for value |
| class_ = None |
| |
| # An integer 1 or greater indicating the tag number |
| tag = None |
| |
| # A unicode string or None - "explicit" or "implicit" for |
| # tagged values, None for normal |
| tag_type = None |
| |
| # If "explicit"ly tagged, the class and tag for the wrapped header |
| explicit_class = None |
| explicit_tag = None |
| |
| # The BER/DER header bytes |
| header = None |
| |
| # Raw encoded value bytes not including class, method, tag, length header |
| contents = None |
| |
| # The BER/DER trailer bytes |
| trailer = b'' |
| |
| # The native python representation of the value |
| _native = None |
| |
| @classmethod |
| def load(cls, encoded_data, **kwargs): |
| """ |
| Loads a BER/DER-encoded byte string using the current class as the spec |
| |
| :param encoded_data: |
| A byte string of BER or DER encoded data |
| |
| :return: |
| A instance of the current class |
| """ |
| |
| spec = None |
| if cls.tag is not None: |
| spec = cls |
| |
| value, _ = _parse_build(encoded_data, spec=spec, spec_params=kwargs) |
| return value |
| |
| #pylint: disable=W0613 |
| def __init__(self, tag_type=None, class_=None, tag=None, optional=None, default=None): |
| """ |
| The optional parameter is not used, but rather included so we don't |
| have to delete it from the parameter dictionary when passing as keyword |
| args |
| |
| :param tag_type: |
| None for normal values, or one of "implicit", "explicit" for tagged |
| values |
| |
| :param class_: |
| The class for the value - defaults to "universal" if tag_type is |
| None, otherwise defaults to "context". Valid values include: |
| - "universal" |
| - "application" |
| - "context" |
| - "private" |
| |
| :param tag: |
| The integer tag to override - usually this is used with tag_type or |
| class_ |
| |
| :param optional: |
| Dummy parameter that allows "optional" key in spec param dicts |
| |
| :param default: |
| The default value to use if the value is currently None |
| |
| :raises: |
| ValueError - when tag_type, class_ or tag are invalid values |
| """ |
| |
| if self.__class__ not in _SETUP_CLASSES: |
| cls = self.__class__ |
| if hasattr(cls, '_setup'): |
| self._setup() |
| _SETUP_CLASSES[cls] = True |
| |
| if tag_type is not None: |
| if tag_type not in ('implicit', 'explicit'): |
| raise ValueError('tag_type must be one of "implicit", "explicit" - is %s' % repr(tag_type)) |
| self.tag_type = tag_type |
| |
| if class_ is None: |
| class_ = 'context' |
| if class_ not in CLASS_NAME_TO_NUM_MAP: |
| raise ValueError('class_ must be one of "universal", "application", "context", "private" - is %s' % repr(class_)) |
| class_ = CLASS_NAME_TO_NUM_MAP[class_] |
| |
| if tag is not None: |
| if not isinstance(tag, int_types): |
| raise ValueError('tag must be an integer, not %s' % tag.__class__.__name__) |
| |
| if tag_type == 'implicit': |
| self.class_ = class_ |
| self.tag = tag |
| else: |
| self.explicit_class = class_ |
| self.explicit_tag = tag |
| else: |
| if class_ is not None: |
| if class_ not in CLASS_NUM_TO_NAME_MAP: |
| raise ValueError('class_ must be one of "universal", "application", "context", "private" - is %s' % repr(class_)) |
| self.class_ = CLASS_NAME_TO_NUM_MAP[class_] |
| |
| if tag is not None: |
| self.tag = tag |
| |
| if default is not None: |
| self.set(default) |
| |
| def __str__(self): |
| """ |
| Since str is differnt in Python 2 and 3, this calls the appropriate |
| method, __unicode__() or __bytes__() |
| |
| :return: |
| A unicode string |
| """ |
| |
| if py2: |
| return self.__bytes__() |
| else: |
| return self.__unicode__() |
| |
| def __repr__(self): |
| """ |
| :return: |
| A unicode string |
| """ |
| return '<%s %s>' % (self.__class__.__name__, repr(self.contents)) |
| |
| def retag(self, tag_type, tag): |
| """ |
| Copies the object, applying a new tagging to it |
| |
| :param tag_type: |
| A unicode string of "implicit" or "explicit" |
| |
| :param tag: |
| A integer tag number |
| |
| :return: |
| An Asn1Value object |
| """ |
| |
| new_obj = self.__class__(tag_type=tag_type, tag=tag) |
| new_obj._copy(self) #pylint: disable=W0212 |
| return new_obj |
| |
| def untag(self): |
| """ |
| Copies the object, removing any special tagging from it |
| |
| :return: |
| An Asn1Value object |
| """ |
| |
| new_obj = self.__class__() |
| new_obj._copy(self) #pylint: disable=W0212 |
| return new_obj |
| |
| #pylint: disable=W0212 |
| def _copy(self, other): |
| """ |
| Copies the contents of another Asn1Value object to itself |
| |
| :param object: |
| Another instance of the same class |
| """ |
| |
| if self.__class__ != other.__class__: |
| raise ValueError('Can not copy values from %s object to %s object' % (other.__class__.__name__, self.__class__.__name__)) |
| |
| self.contents = other.contents |
| self._native = other._native |
| if hasattr(other, '_parsed'): |
| self._parsed = other._parsed |
| |
| def dump(self, force=False): |
| """ |
| Encodes the value using DER |
| |
| :param force: |
| If the encoded contents already exist, clear them and regenerate |
| to ensure they are in DER format instead of BER format |
| |
| :return: |
| A byte string of the DER-encoded value |
| """ |
| |
| if self.header is None: |
| header = _dump_header(self.class_, self.method, self.tag, self.contents) |
| trailer = b'' |
| |
| if self.tag_type == 'explicit': |
| container = Asn1Value() |
| container.method = 1 |
| container.class_ = self.explicit_class |
| container.tag = self.explicit_tag |
| container.contents = header + self.contents + trailer |
| # Force the container to generate the header and footer |
| container.dump() |
| header = container.header + header |
| trailer += container.trailer |
| |
| self.header = header |
| self.trailer = trailer |
| |
| return self.header + self.contents + self.trailer |
| |
| def pprint(self): |
| """ |
| Pretty prints the native representation of the value |
| """ |
| |
| pprint(self.native) |
| |
| |
| class ValueMap(): |
| """ |
| Basic functionality that allows for mapping values from ints or OIDs to |
| python unicode strings |
| """ |
| |
| # A dict from primitive value (int or OID) to unicode string. This needs |
| # to be defined in the source code |
| _map = None |
| |
| # A dict from unicode string to int/OID. This is automatically generated |
| # from _map the first time it is needed |
| _reverse_map = None |
| |
| #pylint: disable=W0212 |
| def _setup(self): |
| """ |
| Generates _reverse_map from _map |
| """ |
| |
| cls = self.__class__ |
| if cls._map is None: |
| return |
| cls._reverse_map = {} |
| for key, value in cls._map.items(): |
| cls._reverse_map[value] = key |
| |
| |
| class NoValue(Asn1Value): |
| """ |
| A representation of an optional value that is not present. Has .native |
| property and .dump() method to be compatible with other value classes. |
| """ |
| |
| def __len__(self): |
| return 0 |
| |
| def __iter__(self): |
| return iter(()) |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| None |
| """ |
| |
| return None |
| |
| def dump(self, force=False): |
| """ |
| Encodes the value using DER |
| |
| :param force: |
| If the encoded contents already exist, clear them and regenerate |
| to ensure they are in DER format instead of BER format |
| |
| :return: |
| A byte string of the DER-encoded value |
| """ |
| |
| return b'' |
| |
| |
| |
| class Any(Asn1Value): |
| """ |
| A value class that can contain any value, and allows for easy parsing of |
| the underlying encoded value using a spec. This is normally contained in |
| a Structure that has an ObjectIdentifier field and _oid_pair and _oid_specs |
| defined. |
| """ |
| |
| # The parsed value object |
| _parsed = None |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| The .native value from the parsed value object |
| """ |
| |
| if self._parsed is None: |
| self.parse() |
| |
| return self._parsed[0].native |
| |
| @property |
| def parsed(self): |
| """ |
| Returns the parsed object from .parse() |
| |
| :return: |
| The object returned by .parse() |
| """ |
| |
| if self._parsed is None: |
| self.parse() |
| |
| return self._parsed[0] |
| |
| def parse(self, spec=None, spec_params=None): |
| """ |
| Parses the contents generically, or using a spec with optional params |
| |
| :param spec: |
| A class derived from Asn1Value that defines what class_ and tag the |
| value should have, and the semantics of the encoded value. The |
| return value will be of this type. If omitted, the encoded value |
| will be decoded using the standard universal tag based on the |
| encoded tag number. |
| |
| :param spec_params: |
| A dict of params to pass to the spec object |
| |
| :return: |
| An object of the type spec, or if not present, a child of Asn1Value |
| """ |
| |
| if self._parsed is None or self._parsed[1:3] != (spec, spec_params): |
| try: |
| passed_params = spec_params |
| if self.tag_type == 'explicit': |
| passed_params = {} if not spec_params else spec_params.copy() |
| passed_params['tag_type'] = self.tag_type |
| passed_params['tag'] = self.tag |
| parsed_value, _ = _parse_build(self.header + self.contents + self.trailer, spec=spec, spec_params=passed_params) |
| self._parsed = (parsed_value, spec, spec_params) |
| except (ValueError) as e: |
| args = e.args[1:] |
| e.args = (e.args[0] + '\n while parsing %s' % self.__class__.__name__,) + args |
| raise e |
| return self._parsed[0] |
| |
| def dump(self, force=False): |
| """ |
| Encodes the value using DER |
| |
| :param force: |
| If the encoded contents already exist, clear them and regenerate |
| to ensure they are in DER format instead of BER format |
| |
| :return: |
| A byte string of the DER-encoded value |
| """ |
| |
| if self._parsed is None: |
| self.parse() |
| |
| return self._parsed[0].dump(force=force) |
| |
| |
| class Choice(Asn1Value): |
| """ |
| A class to handle when a value may be one of several options |
| """ |
| |
| # The index in _alternatives of the validated alternative |
| _choice = None |
| |
| # The name of the chosen alternative |
| _name = None |
| |
| # The Asn1Value object for the chosen alternative |
| _parsed = None |
| |
| # A list of tuples in one of the following forms. |
| # |
| # Option 1, a unicode string field name and a value class |
| # |
| # ("name", Asn1ValueClass) |
| # |
| # Option 2, same as Option 1, but with a dict of class params |
| # |
| # ("name", Asn1ValueClass, {'tag_type': 'explicit', 'tag': 5}) |
| _alternatives = None |
| |
| # A dict that maps tuples of (class_, tag) to an index in _alternatives |
| _id_map = None |
| |
| # A dict that maps alternative names to an index in _alternatives |
| _name_map = None |
| |
| #pylint: disable=W0212 |
| def _setup(self): |
| """ |
| Generates _id_map from _alternatives to allow validating contents |
| """ |
| |
| cls = self.__class__ |
| cls._id_map = {} |
| cls._name_map = {} |
| for index, info in enumerate(cls._alternatives): |
| params = info[2] if len(info) > 2 else {} |
| id_ = _build_id_tuple(params, info[1]) |
| cls._id_map[id_] = index |
| cls._name_map[info[0]] = index |
| |
| def __init__(self, name=None, value=None, tag_type=None, **kwargs): |
| """ |
| Checks to ensure implicit tagging is not being used since it is |
| incompatible with Choice, then forwards on to Asn1Value.__init__() |
| |
| :param name: |
| The name of the alternative to be set - used with value |
| |
| :param value: |
| The alternative value to set - used with name |
| |
| :param tag_type: |
| The tag_type of the value - None, "implicit" or "explicit" |
| |
| :raises: |
| ValueError - when tag_type is "implicit" |
| """ |
| |
| if tag_type == 'implicit': |
| raise ValueError('The Choice type can not be implicitly tagged even if in an implicit module - due to its nature any tagging must be explicit') |
| kwargs['tag_type'] = tag_type |
| Asn1Value.__init__(self, **kwargs) |
| |
| if name is not None: |
| if name not in self._name_map: |
| raise ValueError('The name specified, "%s", is not a valid alternative for %s' % (name, self.__class__.__name__)) |
| |
| self._choice = self._name_map[name] |
| info = self._alternatives[self._choice] |
| spec = info[1] |
| params = {} if len(info) < 3 else info[2] |
| |
| if not isinstance(value, spec): |
| value = spec(value, **params) |
| self._parsed = value |
| |
| @property |
| def name(self): |
| """ |
| :return: |
| A unicode string of the field name of the chosen alternative |
| """ |
| if not self._name: |
| self._name = self._alternatives[self._choice][0] |
| return self._name |
| |
| def parse(self): |
| """ |
| Parses the detected alternative |
| |
| :return: |
| An Asn1Value object of the chosen alternative |
| """ |
| |
| if self._parsed is not None: |
| return self._parsed |
| |
| try: |
| info = self._alternatives[self._choice] |
| params = info[2] if len(info) > 2 else {} |
| self._parsed, _ = _parse_build(self.contents, spec=info[1], spec_params=params) |
| except (ValueError) as e: |
| args = e.args[1:] |
| e.args = (e.args[0] + '\n while parsing %s' % self.__class__.__name__,) + args |
| raise e |
| |
| @property |
| def chosen(self): |
| """ |
| :return: |
| An Asn1Value object of the chosen alternative |
| """ |
| |
| return self.parse() |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| The .native value from the contained value object |
| """ |
| |
| return self.chosen.native |
| |
| def validate(self, class_, tag): |
| """ |
| Ensures that the class and tag specified exist as an alternative |
| |
| :param class_: |
| The integer class_ from the encoded value header |
| |
| :param tag: |
| The interger tag from the encoded value header |
| |
| :raises: |
| ValueError - when value is not a valid alternative |
| """ |
| |
| id_ = (class_, tag) |
| |
| if id_ in self._id_map: |
| self._choice = self._id_map[id_] |
| return |
| |
| # This means the Choice was implicitly tagged |
| if self.class_ is not None and self.tag is not None: |
| if len(self._alternatives) > 1: |
| raise ValueError('%s was implicitly tagged, but more than one alternative exists' % self.__class__.__name__) |
| if id_ == (self.class_, self.tag): |
| self._choice = 0 |
| return |
| |
| asn1 = self._format_class_tag(class_, tag) |
| asn1s = [self._format_class_tag(id_[0], id_[1]) for id_ in self._id_map] |
| |
| raise ValueError('Value %s did not match the class and tag of any of the alternatives in %s: %s' % (asn1, self.__class__.__name__, '. '.join(asn1s))) |
| |
| def _format_class_tag(self, class_, tag): |
| """ |
| :return: |
| A unicode string of a human-friendly representation of the class and tag |
| """ |
| |
| return '[%s %s]' % (CLASS_NUM_TO_NAME_MAP[class_].upper(), tag) |
| |
| #pylint: disable=W0212 |
| def _copy(self, other): |
| """ |
| Copies the contents of another Asn1Value object to itself |
| |
| :param object: |
| Another instance of the same class |
| """ |
| |
| if self.__class__ != other.__class__: |
| raise ValueError('Can not copy values from %s object to %s object' % (other.__class__.__name__, self.__class__.__name__)) |
| |
| self.contents = other.contents |
| self._native = other._native |
| self._choice = other._choice |
| self._name = other._name |
| self._parsed = other._parsed |
| |
| def dump(self, force=False): |
| """ |
| Encodes the value using DER |
| |
| :param force: |
| If the encoded contents already exist, clear them and regenerate |
| to ensure they are in DER format instead of BER format |
| |
| :return: |
| A byte string of the DER-encoded value |
| """ |
| |
| self.contents = self.chosen.dump(force=force) |
| if self.header is None: |
| if self.tag_type == 'explicit': |
| self.header = _dump_header(self.explicit_class, 1, self.explicit_tag, self.contents) |
| else: |
| self.header = b'' |
| return self.header + self.contents |
| |
| |
| class Primitive(Asn1Value): |
| """ |
| Sets the class_ and method attributes for primitive, universal values |
| """ |
| |
| class_ = 0 |
| |
| method = 0 |
| |
| def __init__(self, value=None, default=None, **kwargs): |
| """ |
| Sets the value of the object before passing to Asn1Value.__init__() |
| |
| :param value: |
| A native Python datatype to initialize the object value with |
| |
| :param default: |
| The default value if no value is specified |
| """ |
| |
| Asn1Value.__init__(self, **kwargs) |
| |
| if value is not None: |
| self.set(value) |
| |
| elif default is not None: |
| self.set(default) |
| |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| A byte string |
| """ |
| |
| if not isinstance(value, byte_cls): |
| raise ValueError('%s value must be a byte string, not %s' % (self.__class__.__name__, value.__class__.__name__)) |
| |
| self._native = value |
| self.contents = value |
| self.header = None |
| if self.trailer != b'': |
| self.trailer = b'' |
| |
| def dump(self, force=False): |
| """ |
| Encodes the value using DER |
| |
| :param force: |
| If the encoded contents already exist, clear them and regenerate |
| to ensure they are in DER format instead of BER format |
| |
| :return: |
| A byte string of the DER-encoded value |
| """ |
| |
| if force: |
| native = self.native |
| self.contents = None |
| self.set(native) |
| |
| return Asn1Value.dump(self) |
| |
| |
| class AbstractString(Primitive): |
| """ |
| A base class for all strings that have a known encoding. In general, we do |
| not worry ourselves with confirming that the decoded values match a specific |
| set of characters, only that they are decoded into a Python unicode string |
| """ |
| |
| # The Python encoding name to use when decoding or encoded the contents |
| _encoding = 'latin1' |
| |
| def set(self, value): |
| """ |
| Sets the value of the string |
| |
| :param value: |
| A unicode string |
| """ |
| |
| if not isinstance(value, str_cls): |
| raise ValueError('%s value must be a unicode string, not %s' % (self.__class__.__name__, value.__class__.__name__)) |
| |
| self._native = value |
| self.contents = value.encode(self._encoding) |
| self.header = None |
| if self.trailer != b'': |
| self.trailer = b'' |
| |
| def __unicode__(self): |
| """ |
| :return: |
| A unicode string |
| """ |
| return self.contents.decode(self._encoding) |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| A unicode string or None |
| """ |
| |
| if self.contents is None: |
| return None |
| |
| if self._native is None: |
| self._native = self.__unicode__() |
| return self._native |
| |
| |
| class Boolean(Primitive): |
| """ |
| Represents a boolean in both ASN.1 and Python |
| """ |
| |
| tag = 1 |
| |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| True, False or another value that works with bool() |
| """ |
| |
| self._native = bool(value) |
| self.contents = b'\x00' if not value else b'\xff' |
| self.header = None |
| if self.trailer != b'': |
| self.trailer = b'' |
| |
| # Python 2 |
| def __nonzero__(self): |
| """ |
| :return: |
| True or False |
| """ |
| return self.__bool__() |
| |
| def __bool__(self): |
| """ |
| :return: |
| True or False |
| """ |
| return self.contents != b'\x00' |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| True, False or None |
| """ |
| |
| if self.contents is None: |
| return None |
| |
| if self._native is None: |
| self._native = self.__bool__() |
| return self._native |
| |
| |
| class Integer(Primitive, ValueMap): |
| """ |
| Represents an integer in both ASN.1 and Python |
| """ |
| |
| tag = 2 |
| |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| An integer, or a unicode string if _map is set |
| |
| :raises: |
| ValueError - when an invalid value is passed |
| """ |
| |
| if isinstance(value, str_cls): |
| if self._map is None: |
| raise ValueError('%s value is a unicode string, but no _map provided' % self.__class__.__name__) |
| |
| if value not in self._reverse_map: |
| raise ValueError('%s value, %s, is not present in the _map' % (self.__class__.__name__, value)) |
| |
| value = self._reverse_map[value] |
| |
| elif not isinstance(value, int_types): |
| raise ValueError('%s value must be an integer or unicode string when a name_map is provided, not %s' % (self.__class__.__name__, value.__class__.__name__)) |
| |
| self._native = self._map[value] if self._map and value in self._map else value |
| |
| self.contents = int_to_bytes(value, signed=True) |
| self.header = None |
| if self.trailer != b'': |
| self.trailer = b'' |
| |
| def __int__(self): |
| """ |
| :return: |
| An integer |
| """ |
| return int_from_bytes(self.contents, signed=True) |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| An integer or None |
| """ |
| |
| if self.contents is None: |
| return None |
| |
| if self._native is None: |
| self._native = self.__int__() |
| if self._map is not None and self._native in self._map: |
| self._native = self._map[self._native] |
| return self._native |
| |
| |
| class BitString(Primitive, ValueMap, object): |
| """ |
| Represents a bit string from ASN.1 as a Python tuple of 1s and 0s |
| """ |
| |
| tag = 3 |
| |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| An integer or a tuple of integers 0 and 1 |
| |
| :raises: |
| ValueError - when an invalid value is passed |
| """ |
| |
| if not isinstance(value, int_types) and not isinstance(value, tuple): |
| raise ValueError('%s value must be an integer or a tuple of ones and zeros, not %s' % (self.__class__.__name__, value.__class__.__name__)) |
| |
| if isinstance(value, tuple): |
| self._native = value |
| value = ''.join(map(str_cls, value)) |
| |
| elif isinstance(value, int_types): |
| value = '{0:b}'.format(value) |
| self._native = tuple(map(int, tuple(value))) |
| |
| size = max(self._map.keys()) + 1 |
| if len(value) != size: |
| raise ValueError('%s value must be %s bits long, specified was only %s long' % (self.__class__.__name__, size, len(value))) |
| |
| extra_bits = (size % 8) |
| if extra_bits != 0: |
| value += '0' * extra_bits |
| |
| self.contents = int_to_bytes(extra_bits) + int_to_bytes(int(value, 2)) |
| self.header = None |
| if self.trailer != b'': |
| self.trailer = b'' |
| |
| def __getitem__(self, key): |
| """ |
| Retrieves a boolean version of one of the bits based on a name from the |
| _map |
| |
| :param key: |
| The unicode string of one of the bit names |
| |
| :raises: |
| ValueError - when _map is not set or the key name is invalid |
| |
| :return: |
| A boolean if the bit is set |
| """ |
| |
| if not isinstance(self._map, dict): |
| raise ValueError('%s bit map has not been defined' % self.__class__.__name__) |
| |
| if key not in self._map: |
| raise ValueError('%s map does not contain an entry for "%s"' % (self.__class__.__name__, key)) |
| |
| if self._native is None: |
| _ = self.native |
| |
| return self._native[key] |
| |
| def __setitem__(self, key, value): |
| """ |
| Sets one of the bits based on a name from the _map |
| |
| :param key: |
| The unicode string of one of the bit names |
| |
| :param value: |
| A boolean value |
| |
| :raises: |
| ValueError - when _map is not set or the key name is invalid |
| """ |
| |
| if not isinstance(self._map, dict) or key not in self._map: |
| return super(BitString, self).__setattr__(key, value) |
| |
| if self._native is None: |
| _ = self.native |
| |
| self._native[key] = bool(value) |
| self.set(self._native) |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| If a _map is set, an OrdredDict of names as keys and boolean values |
| or if no _map is set, a tuple of integers 1 and 0. None if no value. |
| """ |
| |
| # For BitString we default the value to be all zeros |
| if self.contents is None: |
| size = max(self._map.keys()) + 1 |
| self.set((0,) * size) |
| |
| if self._native is None: |
| extra_bits = int_from_bytes(self.contents[0:1]) |
| bit_string = '{0:b}'.format(int_from_bytes(self.contents[1:])) |
| if extra_bits > 0: |
| bit_string = bit_string[0:0-extra_bits] |
| bits = tuple(map(int, tuple(bit_string))) |
| if self._map: |
| self._native = OrderedDict() |
| for i, bit in enumerate(bits): |
| self._native[self._map.get(i, i)] = bool(bit) |
| for i, name in self._map.items(): |
| if name not in self._native: |
| self._native[name] = False |
| else: |
| self._native = bits |
| return self._native |
| |
| |
| class OctetBitString(Primitive): |
| """ |
| Represents a bit string in ASN.1 as a Python byte string |
| """ |
| |
| tag = 3 |
| |
| _parsed = None |
| |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| A byte string |
| |
| :raises: |
| ValueError - when an invalid value is passed |
| """ |
| |
| if not isinstance(value, byte_cls): |
| raise ValueError('%s value must be a byte string, not %s' % (self.__class__.__name__, value.__class__.__name__)) |
| |
| self._native = value |
| # Set the unused bits to 0 |
| self.contents = b'\x00' + value |
| self.header = None |
| if self.trailer != b'': |
| self.trailer = b'' |
| |
| def parse(self, spec=None, spec_params=None): |
| """ |
| Parses the contents generically, or using a spec with optional params |
| |
| :param spec: |
| A class derived from Asn1Value that defines what class_ and tag the |
| value should have, and the semantics of the encoded value. The |
| return value will be of this type. If omitted, the encoded value |
| will be decoded using the standard universal tag based on the |
| encoded tag number. |
| |
| :param spec_params: |
| A dict of params to pass to the spec object |
| |
| :return: |
| An object of the type spec, or if not present, a child of Asn1Value |
| """ |
| |
| if self._parsed is None or self._parsed[1:3] != (spec, spec_params): |
| parsed_value, _ = _parse_build(self.__bytes__(), spec=spec, spec_params=spec_params) |
| self._parsed = (parsed_value, spec, spec_params) |
| return self._parsed[0] |
| |
| def __bytes__(self): |
| """ |
| :return: |
| A byte string |
| """ |
| |
| # Whenever dealing with octet-based bit strings, we really want the |
| # bytes, so we just ignore the unused bits portion since it isn't |
| # applicable to the current use case |
| # unused_bits = struct.unpack('>B', self.contents[0:1])[0] |
| return self.contents[1:] |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| A byte string or None |
| """ |
| |
| if self.contents is None: |
| return None |
| |
| if self._native is None: |
| if self._parsed is not None: |
| self._native = self._parsed[0].native |
| else: |
| self._native = self.__bytes__() |
| return self._native |
| |
| @property |
| def parsed(self): |
| """ |
| Returns the parsed object from .parse() |
| |
| :return: |
| The object returned by .parse() |
| """ |
| |
| if self._parsed is None: |
| self.parse() |
| |
| return self._parsed[0] |
| |
| def dump(self, force=False): |
| """ |
| Encodes the value using DER |
| |
| :param force: |
| If the encoded contents already exist, clear them and regenerate |
| to ensure they are in DER format instead of BER format |
| |
| :return: |
| A byte string of the DER-encoded value |
| """ |
| |
| if force: |
| if self._parsed is not None: |
| native = self.parsed.dump(force=force) |
| else: |
| native = self.native |
| self.contents = None |
| self.set(native) |
| |
| return Asn1Value.dump(self) |
| |
| |
| class IntegerBitString(Primitive): |
| """ |
| Represents a bit string in ASN.1 as a Python integer |
| """ |
| |
| tag = 3 |
| |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| An integer |
| |
| :raises: |
| ValueError - when an invalid value is passed |
| """ |
| |
| if not isinstance(value, int_types): |
| raise ValueError('%s value must be an integer, not %s' % (self.__class__.__name__, value.__class__.__name__)) |
| |
| self._native = value |
| # Set the unused bits to 0 |
| self.contents = b'\x00' + int_to_bytes(value, signed=True) |
| self.header = None |
| if self.trailer != b'': |
| self.trailer = b'' |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| An integer or None |
| """ |
| |
| if self.contents is None: |
| return None |
| |
| if self._native is None: |
| extra_bits = int_from_bytes(self.contents[0:1]) |
| if extra_bits > 0: |
| bit_string = '{0:b}'.format(int_from_bytes(self.contents[1:])) |
| bit_string = bit_string[0:0-extra_bits] |
| self._native = int(bit_string, 2) |
| else: |
| self._native = int_from_bytes(self.contents[1:]) |
| return self._native |
| |
| |
| class OctetString(Primitive): |
| """ |
| Represents a byte string in both ASN.1 and Python |
| """ |
| |
| tag = 4 |
| |
| _parsed = None |
| |
| def parse(self, spec=None, spec_params=None): |
| """ |
| Parses the contents generically, or using a spec with optional params |
| |
| :param spec: |
| A class derived from Asn1Value that defines what class_ and tag the |
| value should have, and the semantics of the encoded value. The |
| return value will be of this type. If omitted, the encoded value |
| will be decoded using the standard universal tag based on the |
| encoded tag number. |
| |
| :param spec_params: |
| A dict of params to pass to the spec object |
| |
| :return: |
| An object of the type spec, or if not present, a child of Asn1Value |
| """ |
| |
| if self._parsed is None or self._parsed[1:3] != (spec, spec_params): |
| parsed_value, _ = _parse_build(byte_cls(self), spec=spec, spec_params=spec_params) |
| self._parsed = (parsed_value, spec, spec_params) |
| return self._parsed[0] |
| |
| def __bytes__(self): |
| """ |
| :return: |
| A byte string |
| """ |
| return self.contents |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| A byte string or None |
| """ |
| |
| if self.contents is None: |
| return None |
| |
| if self._native is None: |
| if self._parsed is not None: |
| self._native = self._parsed[0].native |
| else: |
| self._native = self.__bytes__() |
| return self._native |
| |
| @property |
| def parsed(self): |
| """ |
| Returns the parsed object from .parse() |
| |
| :return: |
| The object returned by .parse() |
| """ |
| |
| if self._parsed is None: |
| self.parse() |
| |
| return self._parsed[0] |
| |
| def dump(self, force=False): |
| """ |
| Encodes the value using DER |
| |
| :param force: |
| If the encoded contents already exist, clear them and regenerate |
| to ensure they are in DER format instead of BER format |
| |
| :return: |
| A byte string of the DER-encoded value |
| """ |
| |
| if force: |
| if self._parsed is not None: |
| native = self.parsed.dump(force=force) |
| else: |
| native = self.native |
| self.contents = None |
| self.set(native) |
| |
| return Asn1Value.dump(self) |
| |
| |
| class IntegerOctetString(OctetString): |
| """ |
| Represents a byte string in ASN.1 as a Python integer |
| """ |
| |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| An integer |
| |
| :raises: |
| ValueError - when an invalid value is passed |
| """ |
| |
| if not isinstance(value, int_types): |
| raise ValueError('%s value must be an integer, not %s' % (self.__class__.__name__, value.__class__.__name__)) |
| |
| self._native = value |
| # Set the unused bits to 0 |
| self.contents = int_to_bytes(value, signed=True) |
| self.header = None |
| if self.trailer != b'': |
| self.trailer = b'' |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| An integer or None |
| """ |
| |
| if self.contents is None: |
| return None |
| |
| if self._native is None: |
| self._native = int_from_bytes(self.contents) |
| return self._native |
| |
| |
| class Null(Primitive): |
| """ |
| Represents a null value in ASN.1 as None in Python |
| """ |
| |
| tag = 5 |
| |
| contents = b'' |
| |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| None |
| """ |
| |
| self.contents = b'' |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| None |
| """ |
| |
| return None |
| |
| |
| class ObjectIdentifier(Primitive, ValueMap): |
| """ |
| Represents an object identifier in ASN.1 as a Python unicode dotted |
| integer string |
| """ |
| |
| tag = 6 |
| |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| A unicode string. May be a dotted integer string, or if _map is |
| provided, one of the mapped values. |
| |
| :raises: |
| ValueError - when an invalid value is passed |
| """ |
| |
| if not isinstance(value, str_cls): |
| raise ValueError('%s value must be a unicode string, not %s' % (self.__class__.__name__, value.__class__.__name__)) |
| |
| self._native = value |
| |
| if self._map is not None: |
| if value in self._reverse_map: |
| value = self._reverse_map[value] |
| |
| self.contents = b'' |
| first = None |
| for index, part in enumerate(value.split('.')): |
| part = int(part) |
| |
| # The first two parts are merged into a single byte |
| if index == 0: |
| first = part |
| continue |
| elif index == 1: |
| part = (first * 40) + part |
| |
| encoded_part = chr_cls(0x7F & part) |
| part = part >> 7 |
| while part > 0: |
| encoded_part = chr_cls(0x80 | (0x7F & part)) + encoded_part |
| part = part >> 7 |
| self.contents += encoded_part |
| |
| self.header = None |
| if self.trailer != b'': |
| self.trailer = b'' |
| |
| def __unicode__(self): |
| """ |
| :return: |
| A unicode string |
| """ |
| output = [] |
| |
| part = 0 |
| for byte in self.contents: |
| if py2: |
| byte = ord(byte) |
| part = part * 128 |
| part += byte & 127 |
| # Last byte in subidentifier has the eighth bit set to 0 |
| if byte & 0x80 == 0: |
| if len(output) == 0: |
| output.append(str_cls(part // 40)) |
| output.append(str_cls(part % 40)) |
| else: |
| output.append(str_cls(part)) |
| part = 0 |
| |
| return '.'.join(output) |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| A unicode string or None. If _map is not defined, the unicode string |
| is a string of dotted integers. If _map is defined and the dotted |
| string is present in the _map, the mapped value is returned. |
| """ |
| |
| if self.contents is None: |
| return None |
| |
| if self._native is None: |
| self._native = self.__unicode__() |
| if self._map is not None and self._native in self._map: |
| self._native = self._map[self._native] |
| return self._native |
| |
| |
| class ObjectDescriptor(Primitive): |
| """ |
| Represents an object descriptor from ASN.1 - no Python implementation |
| """ |
| |
| tag = 7 |
| |
| |
| class InstanceOf(Primitive): |
| """ |
| Represents an instance from ASN.1 - no Python implementation |
| """ |
| |
| tag = 8 |
| |
| |
| class Real(Primitive): |
| """ |
| Represents a real number from ASN.1 - no Python implementation |
| """ |
| |
| tag = 9 |
| |
| |
| class Enumerated(Integer): |
| """ |
| Represents a enumerated list of integers from ASN.1 as a Python |
| unicode string |
| """ |
| |
| tag = 10 |
| |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| An integer or a unicode string from _map |
| |
| :raises: |
| ValueError - when an invalid value is passed |
| """ |
| |
| if not isinstance(value, int_types) and not isinstance(value, str_cls): |
| raise ValueError('%s value must be an integer or a unicode string, not %s' % (self.__class__.__name__, value.__class__.__name__)) |
| |
| if isinstance(value, str_cls): |
| if value not in self._reverse_map: |
| raise ValueError('%s value "%s" is not a valid value' % (self.__class__.__name__, value)) |
| |
| value = self._reverse_map[value] |
| |
| elif value not in self._map: |
| raise ValueError('%s value %s is not a valid value' % (self.__class__.__name__, value)) |
| |
| Integer.set(self, value) |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| A unicode string or None |
| """ |
| |
| if self.contents is None: |
| return None |
| |
| if self._native is None: |
| self._native = self._map[self.__int__()] |
| return self._native |
| |
| |
| class UTF8String(AbstractString): |
| """ |
| Represents a UTF-8 string from ASN.1 as a Python unicode string |
| """ |
| |
| tag = 12 |
| _encoding = 'utf-8' |
| |
| |
| class RelativeOid(ObjectIdentifier): |
| """ |
| Represents an object identifier in ASN.1 as a Python unicode dotted |
| integer string |
| """ |
| |
| tag = 13 |
| |
| |
| class Sequence(Asn1Value): |
| """ |
| Represents a sequence of fields from ASN.1 as a Python object with a |
| dict-like interface |
| """ |
| |
| tag = 16 |
| |
| class_ = 0 |
| method = 1 |
| |
| # A list of child objects, in order of _fields |
| children = None |
| |
| # A list of tuples in one of the following forms. |
| # |
| # Option 1, a unicode string field name and a value class |
| # |
| # ("name", Asn1ValueClass) |
| # |
| # Option 2, same as Option 1, but with a dict of class params |
| # |
| # ("name", Asn1ValueClass, {'tag_type': 'explicit', 'tag': 5}) |
| _fields = [] |
| |
| # A dict with keys being the name of a field and the value being a unicode |
| # string of the method name on self to call to get the spec for that field |
| _spec_callbacks = None |
| |
| # A dict that maps unicode string field names to an index in _fields |
| _field_map = None |
| |
| # A list in the same order as _fields that has tuples in the form (class_, tag) |
| _field_ids = None |
| |
| # An optional 2-element tuple that defines the field names of an OID field |
| # and the field that the OID should be used to help decode. Works with the |
| # _oid_specs attribute. |
| _oid_pair = None |
| |
| # A dict with keys that are unicode string OID values and values that are |
| # Asn1Value classes to use for decoding a variable-type field. |
| _oid_specs = None |
| |
| # A 2-element tuple of the indexes in _fields of the OID and value fields |
| _oid_nums = None |
| |
| def __init__(self, value=None, default=None, **kwargs): |
| """ |
| Allows setting field values before passing everything else along to |
| Asn1Value.__init__() |
| |
| :param value: |
| A native Python datatype to initialize the object value with |
| |
| :param default: |
| The default value if no value is specified |
| """ |
| |
| Asn1Value.__init__(self, **kwargs) |
| |
| if value is None and default is not None: |
| value = default |
| |
| if value is not None: |
| for key, child in value.items(): |
| self.__setitem__(key, child) |
| |
| def _lazy_child(self, index): |
| """ |
| Builds a child object if the child has only been parsed into a tuple so far |
| """ |
| |
| child = self.children[index] |
| if isinstance(child, tuple): |
| child = _build(*child) |
| self.children[index] = child |
| return child |
| |
| def __len__(self): |
| """ |
| :return: |
| Integer |
| """ |
| # We inline this check to prevent method invocation each time |
| if self.children is None: |
| self._parse_children() |
| |
| return len(self.children) |
| |
| def __getitem__(self, key): |
| """ |
| Allows accessing fields by name or index |
| |
| :param key: |
| A unicode string of the field name, or an integer of the field index |
| |
| :raises: |
| ValueError - when a field name or index is invalid |
| |
| :return: |
| The Asn1Value object of the field specified |
| """ |
| |
| # We inline this check to prevent method invocation each time |
| if self.children is None: |
| self._parse_children() |
| |
| if not isinstance(key, int_types): |
| if key not in self._field_map: |
| raise KeyError('No field named "%s" defined for %s' % (key, self.__class__.__name__)) |
| key = self._field_map[key] |
| |
| if key >= len(self.children): |
| raise KeyError('No field numbered %s is present in this %s' % (key, self.__class__.__name__)) |
| |
| return self._lazy_child(key) |
| |
| def __setitem__(self, key, value): |
| """ |
| Allows settings fields by name or index |
| |
| :param key: |
| A unicode string of the field name, or an integer of the field index |
| |
| :param value: |
| A native Python datatype to set the field value to. This method will |
| construct the appropriate Asn1Value object from _fields. |
| |
| :raises: |
| ValueError - when a field name or index is invalid |
| """ |
| |
| # We inline this check to prevent method invocation each time |
| if self.children is None: |
| self._parse_children() |
| |
| if not isinstance(key, int_types): |
| if key not in self._field_map: |
| raise KeyError('No field named "%s" defined for %s' % (key, self.__class__.__name__)) |
| key = self._field_map[key] |
| |
| field_name, field_spec, value_spec, field_params, _ = self._determine_spec(key) |
| |
| new_value = self._make_value(field_name, field_spec, value_spec, field_params, value) |
| |
| if new_value.contents is None: |
| raise ValueError('Value for field "%s" of %s is not set' % (field_name, self.__class__.__name__)) |
| |
| self.children[key] = new_value |
| |
| if self._native is not None: |
| self._native[self._fields[key][0]] = self.children[key].native |
| self._set_contents() |
| |
| def __delitem__(self, key): |
| """ |
| Allows deleting optional or default fields by name or index |
| |
| :param key: |
| A unicode string of the field name, or an integer of the field index |
| |
| :raises: |
| ValueError - when a field name or index is invalid, or the field is not optional or defaulted |
| """ |
| |
| # We inline this check to prevent method invocation each time |
| if self.children is None: |
| self._parse_children() |
| |
| if not isinstance(key, int_types): |
| if key not in self._field_map: |
| raise KeyError('No field named "%s" defined for %s' % (key, self.__class__.__name__)) |
| key = self._field_map[key] |
| |
| info = self._fields[key] |
| if len(info) < 3 or ('default' not in info[2] and 'optional' not in info[2]): |
| raise ValueError('Can not delete the value for the field "%s" of %s since it is not optional or defaulted' % (info[0], self.__class__.__name__)) |
| |
| if 'optional' in info[2]: |
| self.children[key] = NoValue() |
| if self._native is not None: |
| self._native[info[0]] = None |
| else: |
| self.__setitem__(key, None) |
| self._set_contents() |
| |
| def __iter__(self): #pylint: disable=W0234 |
| """ |
| :return: |
| An iterator of field key names |
| """ |
| |
| for info in self._fields: |
| yield info[0] |
| |
| def _set_contents(self, force=False): |
| """ |
| Updates the .contents attribute of the value with the encoded value of |
| all of the child objects |
| |
| :param force: |
| Ensure all contents are in DER format instead of possibly using |
| cached BER-encoded data |
| """ |
| |
| if self.children is None: |
| self._parse_children() |
| |
| self.contents = b'' |
| for index, info in enumerate(self._fields): |
| child = self.children[index] |
| if child is None: |
| child_dump = b'' |
| elif isinstance(child, tuple): |
| if force: |
| child_dump = self._lazy_child(index).dump(force=force) |
| else: |
| child_dump = child[3] + child[4] + child[5] |
| else: |
| child_dump = child.dump(force=force) |
| # Skip values that are the same as the default |
| if len(info) > 2 and 'default' in info[2]: |
| default_value = info[1](**info[2]) |
| if default_value.dump() == child_dump: |
| continue |
| self.contents += child_dump |
| self.header = None |
| if self.trailer != b'': |
| self.trailer = b'' |
| |
| #pylint: disable=W0212 |
| def _setup(self): |
| """ |
| Generates _field_map, _field_ids and _oid_nums for use in parsing |
| """ |
| |
| cls = self.__class__ |
| cls._field_map = {} |
| cls._field_ids = [] |
| for index, field in enumerate(cls._fields): |
| cls._field_map[field[0]] = index |
| params = field[2] if len(field) > 2 else {} |
| cls._field_ids.append(_build_id_tuple(params, field[1])) |
| |
| if cls._oid_pair is not None: |
| cls._oid_nums = (cls._field_map[cls._oid_pair[0]], cls._field_map[cls._oid_pair[1]]) |
| |
| def _determine_spec(self, index): |
| """ |
| Determine how a value for a field should be constructed |
| |
| :param index: |
| The field number |
| |
| :return: |
| A tuple containing the following elements: |
| - unicode string of the field name |
| - Ans1Value class of the field spec |
| - Asn1Value class of the value spec |
| - None or dict of params to pass to the field spec |
| - None or Asn1Value class indicating the value spec was derived fomr an OID or a spec callback |
| """ |
| |
| info = self._fields[index] |
| field_params = info[2] if len(info) > 2 else {} |
| |
| field_spec = info[1] |
| value_spec = field_spec |
| spec_override = None |
| |
| if self._spec_callbacks is not None and info[0] in self._spec_callbacks: |
| callback = self._spec_callbacks[info[0]] |
| spec_override = callback(self) |
| if spec_override: |
| # Allow a spec callback to specify both the base spec and |
| # the override, for situations such as OctetString and parse_as |
| if isinstance(spec_override, tuple) and len(spec_override) == 2: |
| field_spec, value_spec = spec_override #pylint: disable=W0633 |
| else: |
| value_spec = spec_override |
| |
| elif self._oid_nums is not None and self._oid_nums[1] == index: |
| oid = self._lazy_child(self._oid_nums[0]).native |
| if oid in self._oid_specs: |
| spec_override = self._oid_specs[oid] |
| value_spec = spec_override |
| |
| return (info[0], field_spec, value_spec, field_params, spec_override) |
| |
| def _make_value(self, field_name, field_spec, value_spec, field_params, value): |
| """ |
| Contructs an appropriate Asn1Value object for a field |
| |
| :param field_name: |
| A unicode string of the field name |
| |
| :param field_spec: |
| An Asn1Value class that is the field spec |
| |
| :param value_spec: |
| An Asn1Value class that is the vaue spec |
| |
| :param field_params: |
| None or a dict of params for the field spec |
| |
| :param value: |
| The value to construct an Asn1Value object from |
| |
| :return: |
| An instance of a child class of Asn1Value |
| """ |
| |
| specs_different = field_spec != value_spec |
| is_any = issubclass(field_spec, Any) |
| |
| if issubclass(value_spec, Choice): |
| if not isinstance(value, Asn1Value): |
| raise ValueError('Can not set a native python value to %s, which has the choice type of %s – value must be an instance of Asn1Value' % (field_name, value_spec.__name__)) |
| if not isinstance(value, value_spec): |
| wrapper = value_spec() |
| wrapper.validate(value.class_, value.tag) |
| wrapper._parsed = value #pylint: disable=W0212 |
| new_value = wrapper |
| else: |
| new_value = value |
| |
| elif isinstance(value, field_spec): |
| new_value = value |
| if specs_different: |
| new_value.parse(value_spec) |
| |
| elif (not specs_different or is_any) and not isinstance(value, value_spec): |
| new_value = value_spec(value, **field_params) |
| |
| else: |
| if isinstance(value, value_spec): |
| new_value = value |
| else: |
| new_value = value_spec(value) |
| |
| # For when the field is OctetString or OctetBitString with embedded |
| # values we need to wrap the value in the field spec to get the |
| # appropriate encoded value. |
| if specs_different and not is_any: |
| wrapper = field_spec(value=new_value.dump(), **field_params) |
| wrapper._parsed = new_value #pylint: disable=W0212 |
| new_value = wrapper |
| |
| if field_params and 'tag_type' in field_params and 'tag' in field_params: |
| if field_params['tag_type'] != new_value.tag_type or field_params['tag'] != new_value.tag: |
| new_value = new_value.retag(tag_type=field_params['tag_type'], tag=field_params['tag']) |
| |
| return new_value |
| |
| def _parse_children(self, recurse=False): |
| """ |
| Parses the contents and generates Asn1Value objects based on the |
| definitions from _fields. |
| |
| :param recurse: |
| If child objects that are Sequence or SequenceOf objects should |
| be recursively parsed |
| |
| :raises: |
| ValueError - when an error occurs parsing child objects |
| """ |
| |
| if self.contents is None: |
| if self._fields: |
| self.children = [NoValue()] * len(self._fields) |
| for index, info in enumerate(self._fields): |
| if len(info) > 2 and 'default' in info[2]: |
| field_name, field_spec, value_spec, field_params, _ = self._determine_spec(index) |
| self.children[index] = self._make_value(field_name, field_spec, value_spec, field_params, None) |
| return |
| |
| try: |
| self.children = [] |
| contents_length = len(self.contents) |
| child_pointer = 0 |
| field = 0 |
| seen_field = 1 |
| while child_pointer < contents_length: |
| parts, num_bytes = _parse(self.contents, pointer=child_pointer) |
| |
| if field < len(self._fields): |
| _, field_spec, value_spec, field_params, spec_override = self._determine_spec(field) |
| |
| # If the next value is optional or default, allow it to be absent |
| if 'optional' in field_params or 'default' in field_params: |
| id_ = (parts[0], parts[2]) |
| |
| no_id_match = self._field_ids[field] != id_ |
| not_any = field_spec != Any |
| if no_id_match and not_any: |
| |
| # See if the value is a valid choice before assuming |
| # that we have a missing optional or default value |
| choice_match = False |
| if issubclass(field_spec, Choice): |
| try: |
| tester = field_spec(**field_params) |
| tester.validate(*id_) |
| choice_match = True |
| except (ValueError): #pylint: disable=W0704 |
| pass |
| |
| if not choice_match: |
| if 'optional' in field_params: |
| self.children.append(NoValue()) |
| else: |
| self.children.append(field_spec(**field_params)) |
| field += 1 |
| continue |
| |
| if field_spec is None or (issubclass(field_spec, Any) and spec_override): |
| field_spec = value_spec |
| spec_override = None |
| |
| if spec_override: |
| child = parts + (field_spec, field_params, value_spec) |
| else: |
| child = parts + (field_spec, field_params) |
| |
| # Handle situations where an optional or defaulted field definition is incorrect |
| elif len(self._fields) > 0 and seen_field <= len(self._fields): |
| missed_fields = [] |
| prev_field = field - 1 |
| while prev_field >= 0: |
| prev_field_info = self._fields[prev_field] |
| if len(prev_field_info) < 3: |
| break |
| if 'optional' in prev_field_info[2] or 'default' in prev_field_info[2]: |
| missed_fields.append(prev_field_info[0]) |
| prev_field -= 1 |
| plural = 's' if len(missed_fields) > 1 else '' |
| missed_field_names = ', '.join(missed_fields) |
| raise ValueError( |
| 'Data for field %s (%s class, %s method, tag %s) does not match the field definition%s of %s' % |
| ( |
| seen_field, |
| CLASS_NUM_TO_NAME_MAP.get(parts[0]), |
| METHOD_NUM_TO_NAME_MAP.get(parts[1]), |
| parts[2], |
| plural, |
| missed_field_names |
| ) |
| ) |
| |
| else: |
| child = parts |
| |
| if recurse: |
| child = _build(*child) |
| if isinstance(child, (Sequence, SequenceOf)): |
| child._parse_children(recurse=True) #pylint: disable=W0212 |
| |
| self.children.append(child) |
| child_pointer += num_bytes |
| field += 1 |
| seen_field += 1 |
| |
| total_fields = len(self._fields) |
| index = len(self.children) |
| while index < total_fields: |
| field_info = self._fields[index] |
| field_spec = field_info[1] |
| field_params = field_info[2] if len(field_info) > 2 else {} |
| if 'default' in field_params: |
| self.children.append(field_spec(**field_params)) |
| elif 'optional' in field_params: |
| self.children.append(NoValue()) |
| else: |
| raise ValueError('Field "%s" is missing from structure' % field_info[0]) |
| index += 1 |
| |
| except (ValueError) as e: |
| args = e.args[1:] |
| e.args = (e.args[0] + '\n while parsing %s' % self.__class__.__name__,) + args |
| raise e |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| An OrderedDict or None. If an OrderedDict, all child values are |
| recursively converted to native representation also. |
| """ |
| |
| if self.contents is None: |
| return None |
| |
| if self._native is None: |
| try: |
| if self.children is None: |
| self._parse_children(recurse=True) |
| self._native = OrderedDict() |
| for index, child in enumerate(self.children): |
| if isinstance(child, tuple): |
| child = _build(*child) |
| self.children[index] = child |
| try: |
| name = self._fields[index][0] |
| except (IndexError): |
| name = str_cls(index) |
| self._native[name] = child.native |
| except (ValueError) as e: |
| args = e.args[1:] |
| e.args = (e.args[0] + '\n while parsing %s' % self.__class__.__name__,) + args |
| raise e |
| return self._native |
| |
| #pylint: disable=W0212 |
| def _copy(self, other): |
| """ |
| Copies the contents of another Asn1Value object to itself |
| |
| :param object: |
| Another instance of the same class |
| """ |
| |
| if self.__class__ != other.__class__: |
| raise ValueError('Can not copy values from %s object to %s object' % (other.__class__.__name__, self.__class__.__name__)) |
| |
| self.contents = other.contents |
| self._native = other._native |
| self.children = other.children |
| |
| def dump(self, force=False): |
| """ |
| Encodes the value using DER |
| |
| :param force: |
| If the encoded contents already exist, clear them and regenerate |
| to ensure they are in DER format instead of BER format |
| |
| :return: |
| A byte string of the DER-encoded value |
| """ |
| |
| if force: |
| self._set_contents(force=force) |
| |
| return Asn1Value.dump(self) |
| |
| |
| class SequenceOf(Asn1Value): |
| """ |
| Represents a sequence (ordered) of a single type of values from ASN.1 as a |
| Python object with a list-like interface |
| """ |
| |
| tag = 16 |
| |
| class_ = 0 |
| method = 1 |
| |
| # A list of child objects |
| children = None |
| |
| # An Asn1Value class to use when parsing children |
| _child_spec = None |
| |
| def __init__(self, value=None, default=None, spec=None, **kwargs): |
| """ |
| Allows setting child objects and the _child_spec via the spec parameter |
| before passing everything else along to Asn1Value.__init__() |
| |
| :param value: |
| A native Python datatype to initialize the object value with |
| |
| :param default: |
| The default value if no value is specified |
| |
| :param spec: |
| A class derived from Asn1Value to use to parse children |
| """ |
| |
| if spec: |
| self._child_spec = spec |
| |
| Asn1Value.__init__(self, **kwargs) |
| |
| if value is None and default is not None: |
| value = default |
| |
| if value is not None: |
| for index, child in enumerate(value): |
| self.__setitem__(index, child) |
| |
| def _lazy_child(self, index): |
| """ |
| Builds a child object if the child has only been parsed into a tuple so far |
| """ |
| |
| child = self.children[index] |
| if isinstance(child, tuple): |
| child = _build(*child) |
| self.children[index] = child |
| return child |
| |
| def __len__(self): |
| """ |
| :return: |
| An integer |
| """ |
| # We inline this checks to prevent method invocation each time |
| if self.children is None: |
| self._parse_children() |
| |
| return len(self.children) |
| |
| def __getitem__(self, key): |
| """ |
| Allows accessing children via index |
| |
| :param key: |
| Integer index of child |
| """ |
| |
| # We inline this checks to prevent method invocation each time |
| if self.children is None: |
| self._parse_children() |
| |
| return self._lazy_child(key) |
| |
| def __setitem__(self, key, value): |
| """ |
| Allows overriding a child via index |
| |
| :param key: |
| Integer index of child |
| |
| :param value: |
| Native python datatype that will be passed to _child_spec to create |
| new child object |
| """ |
| |
| # We inline this checks to prevent method invocation each time |
| if self.children is None: |
| self._parse_children() |
| |
| # If adding at the end, create a space for the new value |
| if key == len(self.children): |
| self.children.append(None) |
| |
| if issubclass(self._child_spec, Any): |
| if isinstance(value, Asn1Value): |
| self.chilren[key] = value |
| else: |
| raise ValueError('Can not set a native python value to %s where the _child_spec is Any – value must be an instance of Asn1Value' % self.__class__.__name__) |
| |
| elif issubclass(self._child_spec, Choice): |
| if not isinstance(value, Asn1Value): |
| raise ValueError('Can not set a native python value to %s where the _child_spec is the choice type %s – value must be an instance of Asn1Value' % (self.__class__.__name__, self._child_spec.__name__)) |
| if not isinstance(value, self._child_spec): |
| wrapper = self._child_spec() |
| wrapper.validate(value.class_, value.tag) |
| wrapper._parsed = value #pylint: disable=W0212 |
| value = wrapper |
| self.children[key] = value |
| |
| elif isinstance(value, self._child_spec): |
| self.children[key] = value |
| |
| else: |
| self.children[key] = self._child_spec(value=value) |
| |
| if self._native is not None: |
| self._native[key] = self.children[key].native |
| self._set_contents() |
| |
| def __delitem__(self, key): |
| """ |
| Allows removing a child via index |
| |
| :param key: |
| Integer index of child |
| """ |
| |
| # We inline this checks to prevent method invocation each time |
| if self.children is None: |
| self._parse_children() |
| |
| self.children.remove(key) |
| if self._native is not None: |
| self._native.remove(key) |
| self._set_contents() |
| |
| def __iter__(self): #pylint: disable=W0234 |
| """ |
| :return: |
| An iter() of child objects |
| """ |
| |
| # We inline this checks to prevent method invocation each time |
| if self.children is None: |
| self._parse_children() |
| |
| for index in range(0, len(self.children)): |
| yield self._lazy_child(index) |
| |
| def _set_contents(self, force=False): |
| """ |
| Encodes all child objects into the contents for this object |
| |
| :param force: |
| Ensure all contents are in DER format instead of possibly using |
| cached BER-encoded data |
| """ |
| |
| if self.children is None: |
| self._parse_children() |
| |
| self.contents = b'' |
| for child in self: |
| child_dump = child.dump(force=force) |
| self.contents += child_dump |
| self.header = None |
| if self.trailer != b'': |
| self.trailer = b'' |
| |
| def _parse_children(self, recurse=False): |
| """ |
| Parses the contents and generates Asn1Value objects based on the |
| definitions from _child_spec. |
| |
| :param recurse: |
| If child objects that are Sequence or SequenceOf objects should |
| be recursively parsed |
| |
| :raises: |
| ValueError - when an error occurs parsing child objects |
| """ |
| |
| try: |
| self.children = [] |
| if self.contents is None: |
| return |
| contents_length = len(self.contents) |
| child_pointer = 0 |
| while child_pointer < contents_length: |
| parts, num_bytes = _parse(self.contents, pointer=child_pointer) |
| if self._child_spec: |
| child = parts + (self._child_spec,) |
| else: |
| child = parts |
| if recurse: |
| child = _build(*child) |
| if isinstance(child, (Sequence, SequenceOf)): |
| child._parse_children(recurse=True) #pylint: disable=W0212 |
| self.children.append(child) |
| child_pointer += num_bytes |
| except (ValueError) as e: |
| args = e.args[1:] |
| e.args = (e.args[0] + '\n while parsing %s' % self.__class__.__name__,) + args |
| raise e |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| A list or None. If a list, all child values are recursively |
| converted to native representation also. |
| """ |
| |
| if self.contents is None: |
| return None |
| |
| if self._native is None: |
| if self.children is None: |
| self._parse_children(recurse=True) |
| self._native = [child.native for child in self] |
| return self._native |
| |
| #pylint: disable=W0212 |
| def _copy(self, other): |
| """ |
| Copies the contents of another Asn1Value object to itself |
| |
| :param object: |
| Another instance of the same class |
| """ |
| |
| if self.__class__ != other.__class__: |
| raise ValueError('Can not copy values from %s object to %s object' % (other.__class__.__name__, self.__class__.__name__)) |
| |
| self.contents = other.contents |
| self._native = other._native |
| self.children = other.children |
| |
| def dump(self, force=False): |
| """ |
| Encodes the value using DER |
| |
| :param force: |
| If the encoded contents already exist, clear them and regenerate |
| to ensure they are in DER format instead of BER format |
| |
| :return: |
| A byte string of the DER-encoded value |
| """ |
| |
| if force: |
| self._set_contents(force=force) |
| |
| return Asn1Value.dump(self) |
| |
| |
| class Set(Sequence): |
| """ |
| Represents a set of fields (unordered) from ASN.1 as a Python object with a |
| dict-like interface |
| """ |
| |
| method = 1 |
| class_ = 0 |
| tag = 17 |
| |
| # A dict of 2-element tuples in the form (class_, tag) as keys and integers |
| # as values that are the index of the field in _fields |
| _field_ids = None |
| |
| #pylint: disable=W0212 |
| def _setup(self): |
| """ |
| Generates _field_map, _field_ids and _oid_nums for use in parsing |
| """ |
| |
| cls = self.__class__ |
| cls._field_map = {} |
| cls._field_ids = {} |
| for index, field in enumerate(cls._fields): |
| cls._field_map[field[0]] = index |
| params = field[2] if len(field) > 2 else {} |
| cls._field_ids[_build_id_tuple(params, field[1])] = index |
| |
| if cls._oid_pair is not None: |
| cls._oid_nums = (cls._field_map[cls._oid_pair[0]], cls._field_map[cls._oid_pair[1]]) |
| |
| def _parse_children(self, recurse=False): |
| """ |
| Parses the contents and generates Asn1Value objects based on the |
| definitions from _fields. |
| |
| :param recurse: |
| If child objects that are Sequence or SequenceOf objects should |
| be recursively parsed |
| |
| :raises: |
| ValueError - when an error occurs parsing child objects |
| """ |
| |
| try: |
| child_map = {} |
| contents_length = len(self.contents) |
| child_pointer = 0 |
| while child_pointer < contents_length: |
| parts, num_bytes = _parse(self.contents, pointer=child_pointer) |
| |
| id_ = (parts[0], parts[2]) |
| |
| field = self._field_ids[id_] |
| field_info = self._fields[field] |
| field_params = field_info[2] if len(field_info) > 2 else {} |
| |
| spec = field_info[1] |
| parse_as = None |
| |
| if self._oid_nums is not None and self._oid_nums[1] == field: |
| oid = self.children[self._oid_nums[0]].native |
| if isinstance(spec, Any): |
| spec = self._oid_specs[oid] |
| else: |
| parse_as = self._oid_specs[oid] |
| |
| if parse_as: |
| child = parts + (spec, field_params, parse_as) |
| else: |
| child = parts + (spec, field_params) |
| |
| if recurse: |
| child = _build(*child) |
| if isinstance(child, (Sequence, SequenceOf)): |
| child._parse_children(recurse=True) #pylint: disable=W0212 |
| |
| child_map[field] = child |
| child_pointer += num_bytes |
| |
| total_fields = len(self._fields) |
| |
| for index in range(0, total_fields): |
| if index in child_map: |
| continue |
| field_info = self._fields[index] |
| |
| missing = False |
| |
| if len(field_info) < 3: |
| missing = True |
| elif 'optional' not in field_info[2] and 'default' not in field_info[2]: |
| missing = True |
| elif 'optional' in field_info[2]: |
| child_map[index] = NoValue() |
| elif 'default' in field_info[2]: |
| child_map[index] = field_info[1](**field_info[2]) |
| |
| if missing: |
| raise ValueError('Missing required field "%s" from %s' % (field_info[0], self.__class__.__name__)) |
| |
| self.children = [] |
| for index in range(0, total_fields): |
| self.children.append(child_map[index]) |
| |
| except (ValueError) as e: |
| args = e.args[1:] |
| e.args = (e.args[0] + '\n while parsing %s' % self.__class__.__name__,) + args |
| raise e |
| |
| |
| class SetOf(SequenceOf): |
| """ |
| Represents a set (unordered) of a single type of values from ASN.1 as a |
| Python object with a list-like interface |
| """ |
| |
| tag = 17 |
| |
| |
| class EmbeddedPdv(Sequence): |
| """ |
| A sequence structure |
| """ |
| |
| tag = 11 |
| |
| |
| class NumericString(AbstractString): |
| """ |
| Represents a numeric string from ASN.1 as a Python unicode string |
| """ |
| |
| tag = 18 |
| _encoding = 'latin1' |
| |
| |
| class PrintableString(AbstractString): |
| """ |
| Represents a printable string from ASN.1 as a Python unicode string |
| """ |
| |
| tag = 19 |
| _encoding = 'latin1' |
| |
| |
| class TeletexString(AbstractString): |
| """ |
| Represents a teletex string from ASN.1 as a Python unicode string |
| """ |
| |
| tag = 20 |
| _encoding = 'teletex' |
| |
| |
| class VideotexString(OctetString): |
| """ |
| Represents a videotex string from ASN.1 as a Python byte string |
| """ |
| |
| tag = 21 |
| |
| |
| class IA5String(AbstractString): |
| """ |
| Represents an IA5 string from ASN.1 as a Python unicode string |
| """ |
| |
| tag = 22 |
| _encoding = 'latin1' |
| |
| |
| class AbstractTime(AbstractString): |
| """ |
| Represents a time from ASN.1 as a Python datetime.datetime object |
| """ |
| |
| @property |
| def native(self): |
| """ |
| The a native Python datatype representation of this value |
| |
| :return: |
| A datetime.datetime object in the UTC timezone or None |
| """ |
| |
| if self.contents is None: |
| return None |
| |
| if self._native is None: |
| string = str_cls(self) |
| has_timezone = re.search('[-\\+]', string) |
| |
| # We don't know what timezone it is in, or it is UTC because of a Z |
| # suffix, so we just assume UTC |
| if not has_timezone: |
| string = string.rstrip('Z') |
| date = self._date_by_len(string) |
| self._native = date.replace(tzinfo=timezone.utc) |
| |
| else: |
| # Python 2 doesn't support the %z format code, so we have to manually |
| # process the timezone offset. |
| date = self._date_by_len(string[0:-5]) |
| |
| hours = int(string[-4:-2]) |
| minutes = int(string[-2:]) |
| delta = timedelta(hours=abs(hours), minutes=minutes) |
| if hours < 0: |
| date -= delta |
| else: |
| date += delta |
| |
| self._native = date.replace(tzinfo=timezone.utc) |
| |
| return self._native |
| |
| |
| class UTCTime(AbstractTime): |
| """ |
| Represents a UTC time from ASN.1 as a Python datetime.datetime object in UTC |
| """ |
| |
| tag = 23 |
| |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| A unicode string or a datetime.datetime object |
| |
| :raises: |
| ValueError - when an invalid value is passed |
| """ |
| |
| if isinstance(value, datetime): |
| value = value.strftime('%y%m%d%H%M%SZ') |
| |
| AbstractString.set(self, value) |
| # Set it to None and let the class take care of converting the next |
| # time that .native is called |
| self._native = None |
| |
| def _date_by_len(self, string): |
| """ |
| Parses a date from a string based on its length |
| |
| :param string: |
| A unicode string to parse |
| |
| :return: |
| A datetime.datetime object or a unicode string |
| """ |
| |
| strlen = len(string) |
| |
| if strlen == 10: |
| return datetime.strptime(string, '%y%m%d%H%M') |
| |
| if strlen == 12: |
| return datetime.strptime(string, '%y%m%d%H%M%S') |
| |
| return string |
| |
| class GeneralizedTime(AbstractTime): |
| """ |
| Represents a generalized time from ASN.1 as a Python datetime.datetime |
| object in UTC |
| """ |
| |
| tag = 24 |
| |
| def set(self, value): |
| """ |
| Sets the value of the object |
| |
| :param value: |
| A unicode string or a datetime.datetime object |
| |
| :raises: |
| ValueError - when an invalid value is passed |
| """ |
| |
| if isinstance(value, datetime): |
| value = value.strftime('%Y%m%d%H%M%SZ') |
| |
| AbstractString.set(self, value) |
| # Set it to None and let the class take care of converting the next |
| # time that .native is called |
| self._native = None |
| |
| def _date_by_len(self, string): |
| """ |
| Parses a date from a string based on its length |
| |
| :param string: |
| A unicode string to parse |
| |
| :return: |
| A datetime.datetime object or a unicode string |
| """ |
| |
| strlen = len(string) |
| |
| if strlen == 10: |
| return datetime.strptime(string, '%Y%m%d%H') |
| |
| if strlen == 12: |
| return datetime.strptime(string, '%Y%m%d%H%M') |
| |
| if strlen == 14: |
| return datetime.strptime(string, '%Y%m%d%H%M%S') |
| |
| if strlen == 18: |
| return datetime.strptime(string, '%Y%m%d%H%M%S.%f') |
| |
| return string |
| |
| |
| class GraphicString(AbstractString): |
| """ |
| Represents a graphic string from ASN.1 as a Python unicode string |
| """ |
| |
| tag = 25 |
| # This is technically not correct since this type can contain any charset |
| _encoding = 'latin1' |
| |
| |
| class VisibleString(AbstractString): |
| """ |
| Represents a visible string from ASN.1 as a Python unicode string |
| """ |
| |
| tag = 26 |
| _encoding = 'latin1' |
| |
| |
| class GeneralString(AbstractString): |
| """ |
| Represents a general string from ASN.1 as a Python unicode string |
| """ |
| |
| tag = 27 |
| # This is technically not correct since this type can contain any charset |
| _encoding = 'latin1' |
| |
| |
| class UniversalString(AbstractString): |
| """ |
| Represents a universal string from ASN.1 as a Python unicode string |
| """ |
| |
| tag = 28 |
| _encoding = 'utf-32-be' |
| |
| |
| class CharacterString(AbstractString): |
| """ |
| Represents a character string from ASN.1 as a Python unicode string |
| """ |
| |
| tag = 29 |
| # This is technically not correct since this type can contain any charset |
| _encoding = 'latin1' |
| |
| |
| class BMPString(AbstractString): |
| """ |
| Represents a BMP string from ASN.1 as a Python unicode string |
| """ |
| |
| tag = 30 |
| _encoding = 'utf-16-be' |
| |
| |
| def _dump_header(class_, method, tag, contents): |
| header = b'' |
| |
| id_num = 0 |
| id_num |= class_ << 6 |
| id_num |= method << 5 |
| |
| tag = tag |
| |
| if tag >= 31: |
| header += chr_cls(id_num | 31) |
| while tag > 0: |
| continuation_bit = 0x80 if tag > 0x7F else 0 |
| header += chr_cls(continuation_bit | (tag & 0x7F)) |
| tag = tag >> 7 |
| else: |
| header += chr_cls(id_num | tag) |
| |
| length = len(contents) |
| if length <= 127: |
| header += chr_cls(length) |
| else: |
| length_bytes = int_to_bytes(length) |
| header += chr_cls(0x80 | len(length_bytes)) |
| header += length_bytes |
| |
| return header |
| |
| |
| def _build_id_tuple(params, spec): |
| """ |
| Builds a 2-element tuple used to identify fields by grabbing the class_ |
| and tag from an Asn1Value class and the params dict being passed to it |
| |
| :param params: |
| A dict of params to pass to spec |
| |
| :param spec: |
| An Asn1Value class |
| |
| :return: |
| A 2-element integer tuple in the form (class_, tag) |
| """ |
| |
| # Handle situations where the the spec is not known at setup time |
| if spec is None: |
| return (None, None) |
| |
| required_class = spec.class_ |
| required_tag = spec.tag |
| |
| tag_type = params.get('tag_type', spec.tag_type) |
| if tag_type is not None: |
| required_class = 2 |
| |
| required_class = params.get('class_', required_class) |
| required_tag = params.get('tag', required_tag) |
| |
| return (required_class, required_tag) |
| |
| |
| def _parse_id(encoded_data, pointer): |
| """ |
| Peeks ahead into a byte string and parses the ASN.1 header |
| |
| :param encoded_data: |
| A byte string |
| |
| :param pointer: |
| The index in the byte string to parse the header from |
| |
| :return: |
| A 4-element tuple of (class_, method, tag, number_of_bytes_consumed) |
| """ |
| |
| original_pointer = pointer |
| |
| first_octet = ord(encoded_data[pointer:pointer+1]) |
| pointer += 1 |
| |
| class_ = first_octet >> 6 |
| method = (first_octet >> 5) & 1 |
| |
| tag = first_octet & 31 |
| # Base 128 length using 8th bit as continuation indicator |
| if tag == 31: |
| tag = 0 |
| while True: |
| num = ord(encoded_data[pointer:pointer+1]) |
| pointer += 1 |
| tag *= 128 |
| tag += num & 127 |
| if num >> 7 == 0: |
| break |
| |
| num_bytes = pointer - original_pointer |
| |
| return (class_, method, tag, num_bytes) |
| |
| |
| def _build(class_, method, tag, header, contents, trailer, spec=None, spec_params=None, nested_spec=None): |
| """ |
| Builds an Asn1Value object generically, or using a spec with optional params |
| |
| :param class_: |
| An integer representing the ASN1 class |
| |
| :param method: |
| An integer representing the ASN1 method |
| |
| :param tag: |
| An integer representing the ASN1 tag |
| |
| :param header: |
| A byte string of the ASN1 header (class, method, tag, length) |
| |
| :param contents: |
| A byte string of the ASN1 value |
| |
| :param trailer: |
| A byte string of any ASN1 trailer (only used by indefinite length encodings) |
| |
| :param spec: |
| A class derived from Asn1Value that defines what class_ and tag the |
| value should have, and the semantics of the encoded value. The |
| return value will be of this type. If omitted, the encoded value |
| will be decoded using the standard universal tag based on the |
| encoded tag number. |
| |
| :param spec_params: |
| A dict of params to pass to the spec object |
| |
| :param nested_spec: |
| For certain Asn1Value classes (such as OctetString and BitString), the |
| contents can be further parsed and interpreted as another Asn1Value. |
| This parameter controls the spec for that sub-parsing. |
| |
| :return: |
| An object of the type spec, or if not specified, a child of Asn1Value |
| """ |
| |
| if header is None: |
| return NoValue() |
| |
| # If an explicit specification was passed in, make sure it matches |
| if spec is not None: |
| if spec_params: |
| value = spec(**spec_params) |
| else: |
| value = spec() |
| |
| if isinstance(value, Any): |
| pass |
| |
| elif value.tag_type == 'explicit': |
| if class_ != value.explicit_class: |
| raise ValueError( |
| 'Error parsing %s - explicitly-tagged class should have been %s, but %s was found' % |
| ( |
| value.__class__.__name__, |
| CLASS_NUM_TO_NAME_MAP.get(value.explicit_class), |
| CLASS_NUM_TO_NAME_MAP.get(class_, class_) |
| ) |
| ) |
| if method != 1: |
| raise ValueError( |
| 'Error parsing %s - explicitly-tagged method should have been %s, but %s was found' % |
| ( |
| value.__class__.__name__, |
| METHOD_NUM_TO_NAME_MAP.get(1), |
| METHOD_NUM_TO_NAME_MAP.get(method, method) |
| ) |
| ) |
| if tag != value.explicit_tag: |
| raise ValueError( |
| 'Error parsing %s - explicitly-tagged tag should have been %s, but %s was found' % |
| ( |
| value.__class__.__name__, |
| value.explicit_tag, |
| tag |
| ) |
| ) |
| |
| elif isinstance(value, Choice): |
| value.validate(class_, tag) |
| |
| else: |
| if class_ != value.class_: |
| raise ValueError( |
| 'Error parsing %s - class should have been %s, but %s was found' % |
| ( |
| value.__class__.__name__, |
| CLASS_NUM_TO_NAME_MAP.get(value.class_), |
| CLASS_NUM_TO_NAME_MAP.get(class_, class_) |
| ) |
| ) |
| if method != value.method: |
| raise ValueError( |
| 'Error parsing %s - method should have been %s, but %s was found' % |
| ( |
| value.__class__.__name__, |
| METHOD_NUM_TO_NAME_MAP.get(value.method), |
| METHOD_NUM_TO_NAME_MAP.get(method, method) |
| ) |
| ) |
| if tag != value.tag: |
| raise ValueError( |
| 'Error parsing %s - tag should have been %s, but %s was found' % |
| ( |
| value.__class__.__name__, |
| value.tag, |
| tag |
| ) |
| ) |
| |
| # For explicitly tagged, un-speced parsings, we use a generic container |
| # since we will be parsing the contents and discarding the outer object |
| # anyway a little further on |
| elif spec_params and 'tag_type' in spec_params and spec_params['tag_type'] == 'explicit': |
| value = Asn1Value(**spec_params) |
| |
| # If no spec was specified, allow anything and just process what |
| # is in the input data |
| else: |
| universal_specs = { |
| 1: Boolean, |
| 2: Integer, |
| 3: BitString, |
| 4: OctetString, |
| 5: Null, |
| 6: ObjectIdentifier, |
| 7: ObjectDescriptor, |
| 8: InstanceOf, |
| 9: Real, |
| 10: Enumerated, |
| 11: EmbeddedPdv, |
| 12: UTF8String, |
| 13: RelativeOid, |
| 16: Sequence, |
| 17: Set, |
| 18: NumericString, |
| 19: PrintableString, |
| 20: TeletexString, |
| 21: VideotexString, |
| 22: IA5String, |
| 23: UTCTime, |
| 24: GeneralizedTime, |
| 25: GraphicString, |
| 26: VisibleString, |
| 27: GeneralString, |
| 28: UniversalString, |
| 29: CharacterString, |
| 30: BMPString |
| } |
| if tag not in universal_specs: |
| raise ValueError( |
| 'Unknown element - %s class, %s method, tag %s' % |
| ( |
| CLASS_NUM_TO_NAME_MAP.get(class_), |
| METHOD_NUM_TO_NAME_MAP.get(method), |
| tag, |
| ) |
| ) |
| |
| spec = universal_specs[tag] |
| |
| value = spec(class_=class_) |
| |
| value.header = header |
| value.contents = contents |
| if trailer is not None and trailer != b'': |
| value.trailer = trailer |
| |
| # Destroy any default value that our contents have overwritten |
| value._native = None #pylint: disable=W0212 |
| |
| # For explicitly tagged values, parse the inner value and pull it out |
| if value.tag_type == 'explicit': |
| original_value = value |
| (class_, method, tag, header, contents, trailer), _ = _parse(value.contents) |
| value = _build(class_, method, tag, header, contents, trailer, spec=spec) |
| value.header = original_value.header + header |
| value.trailer += original_value.trailer |
| value.tag_type = 'explicit' |
| value.explicit_class = original_value.explicit_class |
| value.explicit_tag = original_value.explicit_tag |
| elif isinstance(value, Choice): |
| value.contents = value.header + value.contents |
| value.header = b'' |
| |
| try: |
| # Force parsing the Choice now |
| if isinstance(value, Choice): |
| value.parse() |
| |
| if nested_spec: |
| value.parse(nested_spec) |
| except (ValueError) as e: |
| args = e.args[1:] |
| e.args = (e.args[0] + '\n while parsing %s' % value.__class__.__name__,) + args |
| raise e |
| |
| return value |
| |
| |
| def _parse(encoded_data, pointer=0): |
| """ |
| Parses a byte string into component parts |
| |
| :param encoded_data: |
| A byte string that contains BER-encoded data |
| |
| :param pointer: |
| The index in the byte string to parse from |
| |
| :return: |
| A 2-element tuple: |
| - 0: A tuple of (class_, method, tag, header, content, trailer) |
| - 1: An integer indicating how many bytes were consumed |
| """ |
| |
| if len(encoded_data) == 0: |
| return ((None, None, None, None, None, None), 0) |
| |
| start = pointer |
| |
| class_, method, tag, num_bytes = _parse_id(encoded_data, pointer) |
| pointer += num_bytes |
| |
| length_octet = ord(encoded_data[pointer:pointer+1]) |
| pointer += 1 |
| length_type = length_octet >> 7 |
| if length_type == 1: |
| length = 0 |
| remaining_length_octets = length_octet & 127 |
| while remaining_length_octets > 0: |
| length *= 256 |
| length += ord(encoded_data[pointer:pointer+1]) |
| pointer += 1 |
| remaining_length_octets -= 1 |
| else: |
| length = length_octet & 127 |
| |
| header = encoded_data[start:pointer] |
| |
| # Indefinite length |
| if length_type == 1 and length == 0: |
| end_token = encoded_data.find(b'\x00\x00', pointer) |
| contents = encoded_data[pointer:end_token] |
| pointer = end_token + 2 |
| trailer = b'\x00\x00' |
| else: |
| contents = encoded_data[pointer:pointer+length] |
| pointer += length |
| trailer = b'' |
| |
| num_bytes = pointer - start |
| |
| return ((class_, method, tag, header, contents, trailer), num_bytes) |
| |
| |
| def _parse_build(encoded_data, pointer=0, spec=None, spec_params=None): |
| """ |
| Parses a byte string generically, or using a spec with optional params |
| |
| :param encoded_data: |
| A byte string that contains BER-encoded data |
| |
| :param pointer: |
| The index in the byte string to parse from |
| |
| :param spec: |
| A class derived from Asn1Value that defines what class_ and tag the |
| value should have, and the semantics of the encoded value. The |
| return value will be of this type. If omitted, the encoded value |
| will be decoded using the standard universal tag based on the |
| encoded tag number. |
| |
| :param spec_params: |
| A dict of params to pass to the spec object |
| |
| :return: |
| A 2-element tuple: |
| - 0: An object of the type spec, or if not specified, a child of Asn1Value |
| - 1: An integer indicating how many bytes were consumed |
| """ |
| |
| (class_, method, tag, header, contents, trailer), num_bytes = _parse(encoded_data, pointer) |
| value = _build(class_, method, tag, header, contents, trailer, spec=spec, spec_params=spec_params) |
| return (value, num_bytes) |