| import pdb |
| import re |
| from xml.etree.ElementTree import Element, SubElement, tostring |
| |
| #define equivalents |
| TYPE = 0 |
| ATTRIBUTE = 1 |
| TYPEATTRIBUTE = 2 |
| CLASS = 3 |
| COMMON = 4 |
| ALLOW_RULE = 5 |
| NEVERALLOW_RULE = 6 |
| OTHER = 7 |
| |
| #define helper methods |
| # advance_past_whitespace(): helper function to skip whitespace at current |
| # position in file. |
| # returns: the non-whitespace character at the file's new position |
| #TODO: should I deal with comments here as well? |
| def advance_past_whitespace(file_obj): |
| c = file_obj.read(1) |
| while c.isspace(): |
| c = file_obj.read(1) |
| file_obj.seek(-1, 1) |
| return c |
| |
| # advance_until_whitespace(): helper function to grab the string represented |
| # by the current position in file until next whitespace. |
| # returns: string until next whitespace. overlooks comments. |
| def advance_until_whitespace(file_obj): |
| ret_string = "" |
| c = file_obj.read(1) |
| #TODO: make a better way to deal with ':' and ';' |
| while not (c.isspace() or c == ':' or c == '' or c == ';'): |
| #don't count comments |
| if c == '#': |
| file_obj.readline() |
| return ret_string |
| else: |
| ret_string+=c |
| c = file_obj.read(1) |
| if not c == ':': |
| file_obj.seek(-1, 1) |
| return ret_string |
| |
| # expand_avc_rule - takes a processed avc rule and converts it into a list of |
| # 4-tuples for use in an access check of form: |
| # (source_type, target_type, class, permission) |
| def expand_avc_rule(policy, avc_rule): |
| ret_list = [ ] |
| |
| #expand source_types |
| source_types = avc_rule['source_types']['set'] |
| source_types = policy.expand_types(source_types) |
| if(avc_rule['source_types']['flags']['complement']): |
| #TODO: deal with negated 'self', not present in current policy.conf, though (I think) |
| source_types = policy.types - source_types #complement these types |
| if len(source_types) == 0: |
| print "ERROR: source_types empty after expansion" |
| print "Before: " |
| print avc_rule['source_types']['set'] |
| return |
| |
| #expand target_types |
| target_types = avc_rule['target_types']['set'] |
| target_types = policy.expand_types(target_types) |
| if(avc_rule['target_types']['flags']['complement']): |
| #TODO: deal with negated 'self', not present in current policy.conf, though (I think) |
| target_types = policy.types - target_types #complement these types |
| if len(target_types) == 0: |
| print "ERROR: target_types empty after expansion" |
| print "Before: " |
| print avc_rule['target_types']['set'] |
| return |
| |
| # get classes |
| rule_classes = avc_rule['classes']['set'] |
| if '' in rule_classes: |
| print "FOUND EMPTY STRING IN CLASSES" |
| print "Total sets:" |
| print avc_rule['source_types']['set'] |
| print avc_rule['target_types']['set'] |
| print rule_classes |
| print avc_rule['permissions']['set'] |
| |
| if len(rule_classes) == 0: |
| print "ERROR: empy set of object classes in avc rule" |
| return |
| |
| # get permissions |
| permissions = avc_rule['permissions']['set'] |
| if len(permissions) == 0: |
| print "ERROR: empy set of permissions in avc rule\n" |
| return |
| |
| #create the list with collosal nesting, n^4 baby! |
| for s in source_types: |
| for t in target_types: |
| for c in rule_classes: |
| if c == '': |
| continue |
| #expand permissions on a per-class basis |
| exp_permissions = policy.expand_permissions(c, permissions) |
| if(avc_rule['permissions']['flags']['complement']): |
| exp_permissions = policy.classes[c] - exp_permissions |
| if len(exp_permissions) == 0: |
| print "ERROR: permissions empty after expansion\n" |
| print "Before: " |
| print avc_rule['permissions']['set'] |
| return |
| for p in exp_permissions: |
| source = s |
| if t == 'self': |
| target = s |
| else: |
| target = t |
| obj_class = c |
| permission = p |
| ret_list.append((source, target, obj_class, permission)) |
| return ret_list |
| |
| # expand_avc_rule - takes a processed avc rule and converts it into an xml |
| # representation with the information needed in a checkSELinuxAccess() call. |
| # (source_type, target_type, class, permission) |
| def expand_avc_rule_to_xml(policy, avc_rule, rule_name, rule_type): |
| rule_xml = Element('avc_rule') |
| rule_xml.set('name', rule_name) |
| rule_xml.set('type', rule_type) |
| |
| #expand source_types |
| source_types = avc_rule['source_types']['set'] |
| source_types = policy.expand_types(source_types) |
| if(avc_rule['source_types']['flags']['complement']): |
| #TODO: deal with negated 'self', not present in current policy.conf, though (I think) |
| source_types = policy.types - source_types #complement these types |
| if len(source_types) == 0: |
| print "ERROR: source_types empty after expansion" |
| print "Before: " |
| print avc_rule['source_types']['set'] |
| return |
| for s in source_types: |
| elem = SubElement(rule_xml, 'type') |
| elem.set('type', 'source') |
| elem.text = s |
| |
| #expand target_types |
| target_types = avc_rule['target_types']['set'] |
| target_types = policy.expand_types(target_types) |
| if(avc_rule['target_types']['flags']['complement']): |
| #TODO: deal with negated 'self', not present in current policy.conf, though (I think) |
| target_types = policy.types - target_types #complement these types |
| if len(target_types) == 0: |
| print "ERROR: target_types empty after expansion" |
| print "Before: " |
| print avc_rule['target_types']['set'] |
| return |
| for t in target_types: |
| elem = SubElement(rule_xml, 'type') |
| elem.set('type', 'target') |
| elem.text = t |
| |
| # get classes |
| rule_classes = avc_rule['classes']['set'] |
| |
| if len(rule_classes) == 0: |
| print "ERROR: empy set of object classes in avc rule" |
| return |
| |
| # get permissions |
| permissions = avc_rule['permissions']['set'] |
| if len(permissions) == 0: |
| print "ERROR: empy set of permissions in avc rule\n" |
| return |
| |
| # permissions are class-dependent, so bundled together |
| for c in rule_classes: |
| if c == '': |
| print "AH!!! empty class found!\n" |
| continue |
| c_elem = SubElement(rule_xml, 'obj_class') |
| c_elem.set('name', c) |
| #expand permissions on a per-class basis |
| exp_permissions = policy.expand_permissions(c, permissions) |
| if(avc_rule['permissions']['flags']['complement']): |
| exp_permissions = policy.classes[c] - exp_permissions |
| if len(exp_permissions) == 0: |
| print "ERROR: permissions empty after expansion\n" |
| print "Before: " |
| print avc_rule['permissions']['set'] |
| return |
| |
| for p in exp_permissions: |
| p_elem = SubElement(c_elem, 'permission') |
| p_elem.text = p |
| |
| return rule_xml |
| |
| # expand_brackets - helper function which reads a file into a string until '{ }'s |
| # are balanced. Brackets are removed from the string. This function is based |
| # on the understanding that nested brackets in our policy.conf file occur only due |
| # to macro expansion, and we just need to know how much is included in a given |
| # policy sub-component. |
| def expand_brackets(file_obj): |
| ret_string = "" |
| c = file_obj.read(1) |
| if not c == '{': |
| print "Invalid bracket expression: " + c + "\n" |
| file_obj.seek(-1, 1) |
| return "" |
| else: |
| bracket_count = 1 |
| while bracket_count > 0: |
| c = file_obj.read(1) |
| if c == '{': |
| bracket_count+=1 |
| elif c == '}': |
| bracket_count-=1 |
| elif c == '#': |
| #get rid of comment and replace with whitespace |
| file_obj.readline() |
| ret_string+=' ' |
| else: |
| ret_string+=c |
| return ret_string |
| |
| # get_avc_rule_component - grabs the next component from an avc rule. Basically, |
| # just reads the next word or bracketed set of words. |
| # returns - a set of the word, or words with metadata |
| def get_avc_rule_component(file_obj): |
| ret_dict = { 'flags': {}, 'set': set() } |
| c = advance_past_whitespace(file_obj) |
| if c == '~': |
| ret_dict['flags']['complement'] = True |
| file_obj.read(1) #move to next char |
| c = advance_past_whitespace(file_obj) |
| else: |
| ret_dict['flags']['complement'] = False |
| if not c == '{': |
| #TODO: change operations on file to operations on string? |
| single_type = advance_until_whitespace(file_obj) |
| ret_dict['set'].add(single_type) |
| else: |
| mult_types = expand_brackets(file_obj) |
| mult_types = mult_types.split() |
| for t in mult_types: |
| ret_dict['set'].add(t) |
| return ret_dict |
| |
| def get_line_type(line): |
| if re.search(r'^type\s', line): |
| return TYPE |
| if re.search(r'^attribute\s', line): |
| return ATTRIBUTE |
| if re.search(r'^typeattribute\s', line): |
| return TYPEATTRIBUTE |
| if re.search(r'^class\s', line): |
| return CLASS |
| if re.search(r'^common\s', line): |
| return COMMON |
| if re.search(r'^allow\s', line): |
| return ALLOW_RULE |
| if re.search(r'^neverallow\s', line): |
| return NEVERALLOW_RULE |
| else: |
| return OTHER |
| |
| def is_multi_line(line_type): |
| if line_type == CLASS: |
| return True |
| elif line_type == COMMON: |
| return True |
| elif line_type == ALLOW_RULE: |
| return True |
| elif line_type == NEVERALLOW_RULE: |
| return True |
| else: |
| return False |
| |
| |
| #should only be called with file pointing to the 'i' in 'inherits' segment |
| def process_inherits_segment(file_obj): |
| inherit_keyword = file_obj.read(8) |
| if not inherit_keyword == 'inherits': |
| #TODO: handle error, invalid class statement |
| print "ERROR: invalid inherits statement" |
| return |
| else: |
| advance_past_whitespace(file_obj) |
| ret_inherited_common = advance_until_whitespace(file_obj) |
| return ret_inherited_common |
| |
| class SELinuxPolicy: |
| |
| def __init__(self): |
| self.types = set() |
| self.attributes = { } |
| self.classes = { } |
| self.common_classes = { } |
| self.allow_rules = [ ] |
| self.neverallow_rules = [ ] |
| |
| # create policy directly from policy file |
| #@classmethod |
| def from_file_name(self, policy_file_name): |
| self.types = set() |
| self.attributes = { } |
| self.classes = { } |
| self.common_classes = { } |
| self.allow_rules = [ ] |
| self.neverallow_rules = [ ] |
| with open(policy_file_name, 'r') as policy_file: |
| line = policy_file.readline() |
| while line: |
| line_type = get_line_type(line) |
| if is_multi_line(line_type): |
| self.parse_multi_line(line, line_type, policy_file) |
| else: |
| self.parse_single_line(line, line_type) |
| line = policy_file.readline() |
| |
| # expand_permissions - generates the actual permission set based on the listed |
| # permissions with wildcards and the given class on which they're based. |
| def expand_permissions(self, obj_class, permission_set): |
| ret_set = set() |
| neg_set = set() |
| for p in permission_set: |
| if p[0] == '-': |
| real_p = p[1:] |
| if real_p in self.classes[obj_class]: |
| neg_set.add(real_p) |
| else: |
| print "ERROR: invalid permission in avc rule " + real_t + "\n" |
| return |
| else: |
| if p in self.classes[obj_class]: |
| ret_set.add(p) |
| elif p == '*': #pretty sure this can't be negated? eg -* |
| ret_set |= self.classes[obj_class] #All of the permissions |
| else: |
| print "ERROR: invalid permission in avc rule " + p + "\n" |
| return |
| return ret_set - neg_set |
| |
| # expand_types - generates the actual type set based on the listed types, |
| # attributes, wildcards and negation. self is left as-is, and is processed |
| # specially when generating checkAccess() 4-tuples |
| def expand_types(self, type_set): |
| ret_set = set() |
| neg_set = set() |
| for t in type_set: |
| if t[0] == '-': |
| real_t = t[1:] |
| if real_t in self.attributes: |
| neg_set |= self.attributes[real_t] |
| elif real_t in self.types: |
| neg_set.add(real_t) |
| elif real_t == 'self': |
| ret_set |= real_t |
| else: |
| print "ERROR: invalid type in avc rule " + real_t + "\nTYPE SET:" |
| print type_set |
| return |
| else: |
| if t in self.attributes: |
| ret_set |= self.attributes[t] |
| elif t in self.types: |
| ret_set.add(t) |
| elif t == 'self': |
| ret_set.add(t) |
| elif t == '*': #pretty sure this can't be negated? |
| ret_set |= self.types #All of the types |
| else: |
| print "ERROR: invalid type in avc rule " + t + "\nTYPE SET" |
| print type_set |
| return |
| return ret_set - neg_set |
| |
| def parse_multi_line(self, line, line_type, file_obj): |
| if line_type == CLASS: |
| self.process_class_line(line, file_obj) |
| elif line_type == COMMON: |
| self.process_common_line(line, file_obj) |
| elif line_type == ALLOW_RULE: |
| self.process_avc_rule_line(line, file_obj) |
| elif line_type == NEVERALLOW_RULE: |
| self.process_avc_rule_line(line, file_obj) |
| else: |
| print "Error: This is not a multi-line input" |
| |
| def parse_single_line(self, line, line_type): |
| if line_type == TYPE: |
| self.process_type_line(line) |
| elif line_type == ATTRIBUTE: |
| self.process_attribute_line(line) |
| elif line_type == TYPEATTRIBUTE: |
| self.process_typeattribute_line(line) |
| return |
| |
| def process_attribute_line(self, line): |
| match = re.search(r'^attribute\s+(.+);', line) |
| if match: |
| declared_attribute = match.group(1) |
| self.attributes[declared_attribute] = set() |
| else: |
| #TODO: handle error? (no state changed) |
| return |
| |
| def process_class_line(self, line, file_obj): |
| match = re.search(r'^class\s([^\s]+)\s(.*$)', line) |
| if match: |
| declared_class = match.group(1) |
| #first class declaration has no perms |
| if not declared_class in self.classes: |
| self.classes[declared_class] = set() |
| return |
| else: |
| #need to parse file from after class name until end of '{ }'s |
| file_obj.seek(-(len(match.group(2)) + 1), 1) |
| c = advance_past_whitespace(file_obj) |
| if not (c == 'i' or c == '{'): |
| print "ERROR: invalid class statement" |
| return |
| elif c == 'i': |
| #add inherited permissions |
| inherited = process_inherits_segment(file_obj) |
| self.classes[declared_class] |= self.common_classes[inherited] |
| c = advance_past_whitespace(file_obj) |
| if c == '{': |
| permissions = expand_brackets(file_obj) |
| permissions = re.sub(r'#[^\n]*\n','\n' , permissions) #get rid of all comments |
| permissions = permissions.split() |
| for p in permissions: |
| self.classes[declared_class].add(p) |
| |
| def process_common_line(self, line, file_obj): |
| match = re.search(r'^common\s([^\s]+)(.*$)', line) |
| if match: |
| declared_common_class = match.group(1) |
| #TODO: common classes should only be declared once... |
| if not declared_common_class in self.common_classes: |
| self.common_classes[declared_common_class] = set() |
| #need to parse file from after common_class name until end of '{ }'s |
| file_obj.seek(-(len(match.group(2)) + 1), 1) |
| c = advance_past_whitespace(file_obj) |
| if not c == '{': |
| print "ERROR: invalid common statement" |
| return |
| permissions = expand_brackets(file_obj) |
| permissions = permissions.split() |
| for p in permissions: |
| self.common_classes[declared_common_class].add(p) |
| return |
| |
| def process_avc_rule_line(self, line, file_obj): |
| match = re.search(r'^(never)?allow\s(.*$)', line) |
| if match: |
| if(match.group(1)): |
| rule_type = 'neverallow' |
| else: |
| rule_type = 'allow' |
| #need to parse file from after class name until end of '{ }'s |
| file_obj.seek(-(len(match.group(2)) + 1), 1) |
| |
| #grab source type(s) |
| source_types = get_avc_rule_component(file_obj) |
| if len(source_types['set']) == 0: |
| print "ERROR: no source types for avc rule at line: " + line |
| return |
| |
| #grab target type(s) |
| target_types = get_avc_rule_component(file_obj) |
| if len(target_types['set']) == 0: |
| print "ERROR: no target types for avc rule at line: " + line |
| return |
| |
| #skip ':' potentially already handled by advance_until_whitespace |
| c = advance_past_whitespace(file_obj) |
| if c == ':': |
| file_obj.read(1) |
| |
| #grab class(es) |
| classes = get_avc_rule_component(file_obj) |
| if len(classes['set']) == 0: |
| print "ERROR: no classes for avc rule at line: " + line |
| return |
| |
| #grab permission(s) |
| permissions = get_avc_rule_component(file_obj) |
| if len(permissions['set']) == 0: |
| print "ERROR: no permissions for avc rule at line: " + line |
| return |
| rule_dict = { |
| 'source_types': source_types, |
| 'target_types': target_types, |
| 'classes': classes, |
| 'permissions': permissions } |
| |
| if rule_type == 'allow': |
| self.allow_rules.append(rule_dict) |
| elif rule_type == 'neverallow': |
| self.neverallow_rules.append(rule_dict) |
| |
| def process_type_line(self, line): |
| #TODO: add support for aliases (not yet in current policy.conf) |
| match = re.search(r'^type\s([^,]+),?(.*);', line) |
| if match: |
| declared_type = match.group(1) |
| self.types.add(declared_type) |
| if match.group(2): |
| declared_attributes = match.group(2) |
| declared_attributes = declared_attributes.replace(" ", "") #remove whitespace |
| declared_attributes = declared_attributes.split(',') #separate based on delimiter |
| for a in declared_attributes: |
| if not a in self.attributes: |
| #TODO: hanlde error? attribute should already exist |
| self.attributes[a] = set() |
| self.attributes[a].add(declared_type) |
| else: |
| #TODO: handle error? (no state changed) |
| return |
| |
| def process_typeattribute_line(self, line): |
| match = re.search(r'^typeattribute\s([^\s]+)\s(.*);', line) |
| if match: |
| declared_type = match.group(1) |
| if not declared_type in self.types: |
| #TODO: handle error? type should already exist |
| self.types.add(declared_type) |
| if match.group(2): |
| declared_attributes = match.group(2) |
| declared_attributes = declared_attributes.replace(" ", "") #remove whitespace |
| declared_attributes = declared_attributes.split(',') #separate based on delimiter |
| for a in declared_attributes: |
| if not a in self.attributes: |
| #TODO: hanlde error? attribute should already exist |
| self.attributes[a] = set() |
| self.attributes[a].add(declared_type) |
| else: |
| return |
| else: |
| #TODO: handle error? (no state changed) |
| return |