blob: 6849ffed6d027ff1f7d8b77ecee2041442b26885 [file] [log] [blame]
## This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Philippe Biondi <phil@secdev.org>
## This program is published under a GPLv2 license
"""
Classes that implement ASN.1 data structures.
"""
from asn1.asn1 import *
from asn1.ber import *
from volatile import *
from base_classes import BasePacket
#####################
#### ASN1 Fields ####
#####################
class ASN1F_badsequence(Exception):
pass
class ASN1F_element:
pass
class ASN1F_optionnal(ASN1F_element):
def __init__(self, field):
self._field=field
def __getattr__(self, attr):
return getattr(self._field,attr)
def dissect(self,pkt,s):
try:
return self._field.dissect(pkt,s)
except ASN1F_badsequence:
self._field.set_val(pkt,None)
return s
except BER_Decoding_Error:
self._field.set_val(pkt,None)
return s
def build(self, pkt):
if self._field.is_empty(pkt):
return ""
return self._field.build(pkt)
class ASN1F_field(ASN1F_element):
holds_packets=0
islist=0
ASN1_tag = ASN1_Class_UNIVERSAL.ANY
context=ASN1_Class_UNIVERSAL
def __init__(self, name, default, context=None):
if context is not None:
self.context = context
self.name = name
self.default = default
def i2repr(self, pkt, x):
return repr(x)
def i2h(self, pkt, x):
return x
def any2i(self, pkt, x):
return x
def m2i(self, pkt, x):
return self.ASN1_tag.get_codec(pkt.ASN1_codec).safedec(x, context=self.context)
def i2m(self, pkt, x):
if x is None:
x = 0
if isinstance(x, ASN1_Object):
if ( self.ASN1_tag == ASN1_Class_UNIVERSAL.ANY
or x.tag == ASN1_Class_UNIVERSAL.RAW
or x.tag == ASN1_Class_UNIVERSAL.ERROR
or self.ASN1_tag == x.tag ):
return x.enc(pkt.ASN1_codec)
else:
raise ASN1_Error("Encoding Error: got %r instead of an %r for field [%s]" % (x, self.ASN1_tag, self.name))
return self.ASN1_tag.get_codec(pkt.ASN1_codec).enc(x)
def do_copy(self, x):
if hasattr(x, "copy"):
return x.copy()
if type(x) is list:
x = x[:]
for i in xrange(len(x)):
if isinstance(x[i], BasePacket):
x[i] = x[i].copy()
return x
def build(self, pkt):
return self.i2m(pkt, getattr(pkt, self.name))
def set_val(self, pkt, val):
setattr(pkt, self.name, val)
def is_empty(self, pkt):
return getattr(pkt,self.name) is None
def dissect(self, pkt, s):
v,s = self.m2i(pkt, s)
self.set_val(pkt, v)
return s
def get_fields_list(self):
return [self]
def __hash__(self):
return hash(self.name)
def __str__(self):
return self.name
def __eq__(self, other):
return self.name == other
def __repr__(self):
return self.name
def randval(self):
return RandInt()
class ASN1F_INTEGER(ASN1F_field):
ASN1_tag= ASN1_Class_UNIVERSAL.INTEGER
def randval(self):
return RandNum(-2**64, 2**64-1)
class ASN1F_BOOLEAN(ASN1F_field):
ASN1_tag= ASN1_Class_UNIVERSAL.BOOLEAN
def randval(self):
return RandChoice(True,False)
class ASN1F_NULL(ASN1F_INTEGER):
ASN1_tag= ASN1_Class_UNIVERSAL.NULL
class ASN1F_SEP(ASN1F_NULL):
ASN1_tag= ASN1_Class_UNIVERSAL.SEP
class ASN1F_enum_INTEGER(ASN1F_INTEGER):
def __init__(self, name, default, enum):
ASN1F_INTEGER.__init__(self, name, default)
i2s = self.i2s = {}
s2i = self.s2i = {}
if type(enum) is list:
keys = xrange(len(enum))
else:
keys = enum.keys()
if filter(lambda x: type(x) is str, keys):
i2s,s2i = s2i,i2s
for k in keys:
i2s[k] = enum[k]
s2i[enum[k]] = k
def any2i_one(self, pkt, x):
if type(x) is str:
x = self.s2i[x]
return x
def i2repr_one(self, pkt, x):
return self.i2s.get(x, repr(x))
def any2i(self, pkt, x):
if type(x) is list:
return map(lambda z,pkt=pkt:self.any2i_one(pkt,z), x)
else:
return self.any2i_one(pkt,x)
def i2repr(self, pkt, x):
if type(x) is list:
return map(lambda z,pkt=pkt:self.i2repr_one(pkt,z), x)
else:
return self.i2repr_one(pkt,x)
class ASN1F_ENUMERATED(ASN1F_enum_INTEGER):
ASN1_tag = ASN1_Class_UNIVERSAL.ENUMERATED
class ASN1F_STRING(ASN1F_field):
ASN1_tag = ASN1_Class_UNIVERSAL.STRING
def randval(self):
return RandString(RandNum(0, 1000))
class ASN1F_PRINTABLE_STRING(ASN1F_STRING):
ASN1_tag = ASN1_Class_UNIVERSAL.PRINTABLE_STRING
class ASN1F_BIT_STRING(ASN1F_STRING):
ASN1_tag = ASN1_Class_UNIVERSAL.BIT_STRING
class ASN1F_IPADDRESS(ASN1F_STRING):
ASN1_tag = ASN1_Class_UNIVERSAL.IPADDRESS
class ASN1F_TIME_TICKS(ASN1F_INTEGER):
ASN1_tag = ASN1_Class_UNIVERSAL.TIME_TICKS
class ASN1F_UTC_TIME(ASN1F_STRING):
ASN1_tag = ASN1_Class_UNIVERSAL.UTC_TIME
class ASN1F_GENERALIZED_TIME(ASN1F_STRING):
ASN1_tag = ASN1_Class_UNIVERSAL.GENERALIZED_TIME
class ASN1F_OID(ASN1F_field):
ASN1_tag = ASN1_Class_UNIVERSAL.OID
def randval(self):
return RandOID()
class ASN1F_SEQUENCE(ASN1F_field):
ASN1_tag = ASN1_Class_UNIVERSAL.SEQUENCE
def __init__(self, *seq, **kargs):
if "ASN1_tag" in kargs:
self.ASN1_tag = kargs["ASN1_tag"]
self.seq = seq
def __repr__(self):
return "<%s%r>" % (self.__class__.__name__,self.seq,)
def set_val(self, pkt, val):
for f in self.seq:
f.set_val(pkt,val)
def is_empty(self, pkt):
for f in self.seq:
if not f.is_empty(pkt):
return False
return True
def get_fields_list(self):
return reduce(lambda x,y: x+y.get_fields_list(), self.seq, [])
def build(self, pkt):
s = reduce(lambda x,y: x+y.build(pkt), self.seq, "")
return self.i2m(pkt, s)
def dissect(self, pkt, s):
codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
try:
i,s,remain = codec.check_type_check_len(s)
for obj in self.seq:
s = obj.dissect(pkt,s)
if s:
warning("Too many bytes to decode sequence: [%r]" % s) # XXX not reversible!
return remain
except ASN1_Error,e:
raise ASN1F_badsequence(e)
class ASN1F_SET(ASN1F_SEQUENCE):
ASN1_tag = ASN1_Class_UNIVERSAL.SET
class ASN1F_SEQUENCE_OF(ASN1F_SEQUENCE):
holds_packets = 1
islist = 1
def __init__(self, name, default, asn1pkt, ASN1_tag=0x30):
self.asn1pkt = asn1pkt
self.tag = chr(ASN1_tag)
self.name = name
self.default = default
def i2repr(self, pkt, i):
if i is None:
return []
return i
def get_fields_list(self):
return [self]
def set_val(self, pkt, val):
ASN1F_field.set_val(self, pkt, val)
def is_empty(self, pkt):
return ASN1F_field.is_empty(self, pkt)
def build(self, pkt):
val = getattr(pkt, self.name)
if isinstance(val, ASN1_Object) and val.tag == ASN1_Class_UNIVERSAL.RAW:
s = val
elif val is None:
s = ""
else:
s = "".join(map(str, val ))
return self.i2m(pkt, s)
def dissect(self, pkt, s):
codec = self.ASN1_tag.get_codec(pkt.ASN1_codec)
i,s1,remain = codec.check_type_check_len(s)
lst = []
while s1:
try:
p = self.asn1pkt(s1)
except ASN1F_badsequence,e:
lst.append(packet.Raw(s1))
break
lst.append(p)
if packet.Raw in p:
s1 = p[packet.Raw].load
del(p[packet.Raw].underlayer.payload)
else:
break
self.set_val(pkt, lst)
return remain
def randval(self):
return fuzz(self.asn1pkt())
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__,self.name)
class ASN1F_PACKET(ASN1F_field):
holds_packets = 1
def __init__(self, name, default, cls):
ASN1_field.__init__(self, name, default)
self.cls = cls
def i2m(self, pkt, x):
if x is None:
x = ""
return str(x)
def extract_packet(self, cls, x):
try:
c = cls(x)
except ASN1F_badsequence:
c = packet.Raw(x)
cpad = c.getlayer(packet.Padding)
x = ""
if cpad is not None:
x = cpad.load
del(cpad.underlayer.payload)
return c,x
def m2i(self, pkt, x):
return self.extract_packet(self.cls, x)
class ASN1F_CHOICE(ASN1F_PACKET):
ASN1_tag = ASN1_Class_UNIVERSAL.NONE
def __init__(self, name, default, *args):
self.name=name
self.choice = {}
for p in args:
self.choice[p.ASN1_root.ASN1_tag] = p
# self.context=context
self.default=default
def m2i(self, pkt, x):
if len(x) == 0:
return packet.Raw(),""
raise ASN1_Error("ASN1F_CHOICE: got empty string")
if ord(x[0]) not in self.choice:
return packet.Raw(x),"" # XXX return RawASN1 packet ? Raise error
raise ASN1_Error("Decoding Error: choice [%i] not found in %r" % (ord(x[0]), self.choice.keys()))
z = ASN1F_PACKET.extract_packet(self, self.choice[ord(x[0])], x)
return z
def randval(self):
return RandChoice(*map(lambda x:fuzz(x()), self.choice.values()))
# This import must come in last to avoid problems with cyclic dependencies
import packet