security_DeviceJail: add AllowDeny test
Adds test base for device_jail and a simple test to make sure
allow and deny verdicts have expected results.
BUG=chromium:644338
TEST=test_that on kevin
Change-Id: Ia83c3b3cd9280f34a4ed786c52739ae9c65eea89
Reviewed-on: https://chromium-review.googlesource.com/444037
Commit-Ready: Eric Caruso <ejcaruso@chromium.org>
Tested-by: Eric Caruso <ejcaruso@chromium.org>
Reviewed-by: Andrew de los Reyes <adlr@chromium.org>
diff --git a/client/cros/device_jail_test_base.py b/client/cros/device_jail_test_base.py
new file mode 100644
index 0000000..bcb3507
--- /dev/null
+++ b/client/cros/device_jail_test_base.py
@@ -0,0 +1,30 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import os.path
+import upstart
+from autotest_lib.client.bin import test
+from autotest_lib.client.common_lib import error
+from autotest_lib.client.cros import device_jail_utils
+
+
+class DeviceJailTestBase(test.test):
+ """
+ An abstract base class for device jail tests. This checks to
+ make sure the device jail module is loaded in the kernel and
+ makes it easier to use.
+ """
+
+ def warmup(self):
+ super(DeviceJailTestBase, self).warmup()
+ if not (os.path.exists(device_jail_utils.JAIL_CONTROL_PATH) and
+ os.path.exists(device_jail_utils.JAIL_REQUEST_PATH)):
+ raise error.TestNAError('Device jail is not present')
+ if upstart.is_running('permission_broker'):
+ upstart.stop_job('permission_broker')
+
+
+ def cleanup(self):
+ super(DeviceJailTestBase, self).cleanup()
+ upstart.restart_job('permission_broker')
diff --git a/client/cros/device_jail_utils.py b/client/cros/device_jail_utils.py
new file mode 100644
index 0000000..2e6706a
--- /dev/null
+++ b/client/cros/device_jail_utils.py
@@ -0,0 +1,163 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import logging
+import os
+import pyudev
+import re
+import select
+import struct
+import subprocess
+import threading
+import time
+from autotest_lib.client.common_lib import error
+
+
+JAIL_CONTROL_PATH = '/dev/jail-control'
+JAIL_REQUEST_PATH = '/dev/jail-request'
+
+# From linux/device_jail.h.
+REQUEST_ALLOW = 0
+REQUEST_ALLOW_WITH_LOCKDOWN = 1
+REQUEST_ALLOW_WITH_DETACH = 2
+REQUEST_DENY = 3
+
+
+class OSFile:
+ """Simple context manager for file descriptors."""
+ def __init__(self, path, flag):
+ self._fd = os.open(path, flag)
+
+ def close(self):
+ os.close(self._fd)
+
+ def __enter__(self):
+ """Returns the fd so it can be used in with-blocks."""
+ return self._fd
+
+ def __exit__(self, exc_type, exc_val, traceback):
+ self.close()
+
+
+class ConcurrentFunc:
+ """Simple context manager that starts and joins a thread."""
+ def __init__(self, target_func, timeout_func):
+ self._thread = threading.Thread(target=target_func)
+ self._timeout_func = timeout_func
+ self._target_name = target_func.__name__
+
+ def __enter__(self):
+ self._thread.start()
+
+ def __exit__(self, exc_type, exc_val, traceback):
+ self._thread.join(self._timeout_func())
+ if self._thread.is_alive() and not exc_val:
+ raise error.TestError('Function %s timed out' % self._target_name)
+
+
+class JailDevice:
+ TIMEOUT_SEC = 3
+ PATH_MAX = 4096
+
+ def __init__(self, path_to_jail):
+ self._path_to_jail = path_to_jail
+
+
+ def __enter__(self):
+ """
+ Creates a jail device for the device located at self._path_to_jail.
+ If the jail already exists, don't take ownership of it.
+ """
+ try:
+ output = subprocess.check_output(
+ ['device_jail_utility',
+ '--add={0}'.format(self._path_to_jail)],
+ stderr=subprocess.STDOUT)
+
+ match = re.search('created jail at (.*)', output)
+ if match:
+ self._path = match.group(1)
+ self._owns_device = True
+ return self
+
+ match = re.search('jail already exists at (.*)', output)
+ if match:
+ self._path = match.group(1)
+ self._owns_device = False
+ return self
+
+ raise error.TestError('Failed to create device jail')
+ except subprocess.CalledProcessError as e:
+ raise error.TestError('Failed to call device_jail_utility')
+
+
+ def expect_open(self, verdict):
+ """
+ Tries to open the jail device. This method mocks out the
+ device_jail request server which is normally run by permission_broker.
+ This allows us to set the verdict we want to test. Since the open
+ call will block until we return the verdict, we have to use a
+ separate thread to perform the open call, as well.
+ """
+ # Python 2 does not support "nonlocal" so this closure can't
+ # set the values of identifiers it closes over unless they
+ # are in global scope. Work around this by using a list and
+ # value-mutation.
+ dev_file_wrapper = [None]
+ def open_device():
+ try:
+ dev_file_wrapper[0] = OSFile(self._path, os.O_RDWR)
+ except OSError as e:
+ # We don't throw an error because this might be intentional,
+ # such as when the verdict is REQUEST_DENY.
+ logging.info("Failed to open jail device: %s", e.strerror)
+
+ # timeout_sec should be used for the timeouts below.
+ # This ensures we don't spend much longer than TIMEOUT_SEC in
+ # this method.
+ deadline = time.time() + self.TIMEOUT_SEC
+ def timeout_sec():
+ return max(deadline - time.time(), 0.01)
+
+ # We have to use FDs because polling works with FDs and
+ # buffering is silly.
+ try:
+ req_f = OSFile(JAIL_REQUEST_PATH, os.O_RDWR)
+ except OSError as e:
+ raise error.TestError(
+ 'Failed to open request device: %s' % e.strerror)
+
+ with req_f as req_fd:
+ poll_obj = select.poll()
+ poll_obj.register(req_fd, select.POLLIN)
+
+ # Starting open_device should ensure we have a request waiting
+ # on the request device.
+ with ConcurrentFunc(open_device, timeout_sec):
+ ready_fds = poll_obj.poll(timeout_sec() * 1000)
+ if not ready_fds:
+ raise error.TestError('Timed out waiting for jail-request')
+
+ # Sanity check the request.
+ path = os.read(req_fd, self.PATH_MAX)
+ logging.info('Received jail-request for path %s', path)
+ if path != self._path_to_jail:
+ raise error.TestError('Got request for the wrong path')
+
+ os.write(req_fd, struct.pack('I', verdict))
+ logging.info('Responded to jail-request')
+
+ return dev_file_wrapper[0]
+
+
+ def __exit__(self, exc_type, exc_val, traceback):
+ if self._owns_device:
+ subprocess.call(['device_jail_utility',
+ '--remove={0}'.format(self._path)])
+
+
+def get_usb_devices():
+ context = pyudev.Context()
+ return [device for device in context.list_devices()
+ if device.device_node and device.device_node.startswith('/dev/bus/usb')]
diff --git a/client/site_tests/security_DeviceJail_AllowDeny/control b/client/site_tests/security_DeviceJail_AllowDeny/control
new file mode 100644
index 0000000..f180260
--- /dev/null
+++ b/client/site_tests/security_DeviceJail_AllowDeny/control
@@ -0,0 +1,21 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+AUTHOR = "ejcaruso@chromium.org"
+NAME = "security_DeviceJail_AllowDeny"
+PURPOSE = "Verify that the device jail module is functioning."
+TIME = "SHORT"
+ATTRIBUTES = "suite:security"
+TEST_CATEGORY = "Functional"
+TEST_CLASS = "security"
+TEST_TYPE = "client"
+DOC = """
+This test is not applicable if the control devices for device jail are
+not present. This means the kernel must be 3.14 or later.
+The test will fail if the program is allowed to open the jail without
+permission from the server listening on /dev/jail-request, or is denied
+access when permission is granted.
+"""
+
+job.run_test('security_DeviceJail_AllowDeny')
diff --git a/client/site_tests/security_DeviceJail_AllowDeny/security_DeviceJail_AllowDeny.py b/client/site_tests/security_DeviceJail_AllowDeny/security_DeviceJail_AllowDeny.py
new file mode 100644
index 0000000..19077e0
--- /dev/null
+++ b/client/site_tests/security_DeviceJail_AllowDeny/security_DeviceJail_AllowDeny.py
@@ -0,0 +1,37 @@
+# Copyright 2017 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+from autotest_lib.client.bin import test, utils
+from autotest_lib.client.common_lib import error
+from autotest_lib.client.cros import device_jail_test_base
+from autotest_lib.client.cros import device_jail_utils
+
+
+class security_DeviceJail_AllowDeny(device_jail_test_base.DeviceJailTestBase):
+ """
+ Ensures that if device jail is present, it is functioning properly
+ in that it allows access if and only if instructed (generally
+ by permission_broker) and correctly locks down devices or detaches
+ kernel drivers as instructed.
+ """
+ version = 1
+
+ def run_once(self):
+ usb_devices = device_jail_utils.get_usb_devices()
+ if not usb_devices:
+ error.TestNAError('No USB devices found')
+
+ dev_path = usb_devices[0].device_node
+ with device_jail_utils.JailDevice(dev_path) as jail:
+ # This should succeed and return a file.
+ f = jail.expect_open(device_jail_utils.REQUEST_ALLOW)
+ if not f:
+ raise error.TestError('Failed to open allowed jail')
+ else:
+ f.close()
+
+ # This should not return a file.
+ f = jail.expect_open(device_jail_utils.REQUEST_DENY)
+ if f:
+ raise error.TestError('Successfully opened denied jail')