| #!/usr/bin/python3 |
| |
| import dbus |
| import dbus.service |
| from dbus.mainloop.glib import DBusGMainLoop |
| from gi.repository import GObject |
| from gi.repository import GLib |
| import os |
| import selinux |
| from subprocess import Popen, PIPE, STDOUT |
| |
| |
| class selinux_server(dbus.service.Object): |
| default_polkit_auth_required = "org.selinux.semanage" |
| |
| def __init__(self, *p, **k): |
| super(selinux_server, self).__init__(*p, **k) |
| |
| def is_authorized(self, sender, action_id): |
| bus = dbus.SystemBus() |
| proxy = bus.get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority') |
| authority = dbus.Interface(proxy, dbus_interface='org.freedesktop.PolicyKit1.Authority') |
| subject = ('system-bus-name', {'name': sender}) |
| result = authority.CheckAuthorization(subject, action_id, {}, 1, '') |
| return result[0] |
| |
| # |
| # The semanage method runs a transaction on a series of semanage commands, |
| # these commands can take the output of customized |
| # |
| @dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender") |
| def semanage(self, buf, sender): |
| if not self.is_authorized(sender, "org.selinux.semanage"): |
| raise dbus.exceptions.DBusException("Not authorized") |
| p = Popen(["/usr/sbin/semanage", "import"], stdout=PIPE, stderr=PIPE, stdin=PIPE, universal_newlines=True) |
| p.stdin.write(buf) |
| output = p.communicate() |
| if p.returncode and p.returncode != 0: |
| raise dbus.exceptions.DBusException(output[1]) |
| |
| # |
| # The customized method will return all of the custommizations for policy |
| # on the server. This output can be used with the semanage method on |
| # another server to make the two systems have duplicate policy. |
| # |
| @dbus.service.method("org.selinux", in_signature='', out_signature='s', sender_keyword="sender") |
| def customized(self, sender): |
| if not self.is_authorized(sender, "org.selinux.customized"): |
| raise dbus.exceptions.DBusException("Not authorized") |
| p = Popen(["/usr/sbin/semanage", "export"], stdout=PIPE, stderr=PIPE, universal_newlines=True) |
| buf = p.stdout.read() |
| output = p.communicate() |
| if p.returncode and p.returncode != 0: |
| raise OSError("Failed to read SELinux configuration: %s", output) |
| return buf |
| |
| # |
| # The semodule_list method will return the output of semodule --list=full, using the customized polkit, |
| # since this is a readonly behaviour |
| # |
| @dbus.service.method("org.selinux", in_signature='', out_signature='s', sender_keyword="sender") |
| def semodule_list(self, sender): |
| if not self.is_authorized(sender, "org.selinux.semodule_list"): |
| raise dbus.exceptions.DBusException("Not authorized") |
| p = Popen(["/usr/sbin/semodule", "--list=full"], stdout=PIPE, stderr=PIPE, universal_newlines=True) |
| buf = p.stdout.read() |
| output = p.communicate() |
| if p.returncode and p.returncode != 0: |
| raise OSError("Failed to list SELinux modules: %s", output) |
| return buf |
| |
| # |
| # The restorecon method modifies any file path to the default system label |
| # |
| @dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender") |
| def restorecon(self, path, sender): |
| if not self.is_authorized(sender, "org.selinux.restorecon"): |
| raise dbus.exceptions.DBusException("Not authorized") |
| selinux.restorecon(str(path), recursive=1) |
| |
| # |
| # The setenforce method turns off the current enforcement of SELinux |
| # |
| @dbus.service.method("org.selinux", in_signature='i', sender_keyword="sender") |
| def setenforce(self, value, sender): |
| if not self.is_authorized(sender, "org.selinux.setenforce"): |
| raise dbus.exceptions.DBusException("Not authorized") |
| selinux.security_setenforce(value) |
| |
| # |
| # The setenforce method turns off the current enforcement of SELinux |
| # |
| @dbus.service.method("org.selinux", in_signature='i', sender_keyword="sender") |
| def relabel_on_boot(self, value, sender): |
| if not self.is_authorized(sender, "org.selinux.relabel_on_boot"): |
| raise dbus.exceptions.DBusException("Not authorized") |
| if value == 1: |
| fd = open("/.autorelabel", "w") |
| fd.close() |
| else: |
| try: |
| os.unlink("/.autorelabel") |
| except FileNotFoundError: |
| pass |
| |
| def write_selinux_config(self, enforcing=None, policy=None): |
| path = selinux.selinux_path() + "config" |
| backup_path = path + ".bck" |
| fd = open(path) |
| lines = fd.readlines() |
| fd.close() |
| fd = open(backup_path, "w") |
| for l in lines: |
| if enforcing and l.startswith("SELINUX="): |
| fd.write("SELINUX=%s\n" % enforcing) |
| continue |
| if policy and l.startswith("SELINUXTYPE="): |
| fd.write("SELINUXTYPE=%s\n" % policy) |
| continue |
| fd.write(l) |
| fd.close() |
| os.rename(backup_path, path) |
| |
| # |
| # The change_default_enforcement modifies the current enforcement mode |
| # |
| @dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender") |
| def change_default_mode(self, value, sender): |
| if not self.is_authorized(sender, "org.selinux.change_default_mode"): |
| raise dbus.exceptions.DBusException("Not authorized") |
| values = ["enforcing", "permissive", "disabled"] |
| if value not in values: |
| raise ValueError("Enforcement mode must be %s" % ", ".join(values)) |
| self.write_selinux_config(enforcing=value) |
| |
| # |
| # The change_default_policy method modifies the policy type |
| # |
| @dbus.service.method("org.selinux", in_signature='s', sender_keyword="sender") |
| def change_default_policy(self, value, sender): |
| if not self.is_authorized(sender, "org.selinux.change_default_policy"): |
| raise dbus.exceptions.DBusException("Not authorized") |
| path = selinux.selinux_path() + value |
| if os.path.isdir(path): |
| return self.write_selinux_config(policy=value) |
| raise ValueError("%s does not exist" % path) |
| |
| if __name__ == "__main__": |
| DBusGMainLoop(set_as_default=True) |
| mainloop = GLib.MainLoop() |
| |
| system_bus = dbus.SystemBus() |
| name = dbus.service.BusName("org.selinux", system_bus) |
| server = selinux_server(system_bus, "/org/selinux/object") |
| mainloop.run() |