[autotest] Add test for the re-taking of ownership after the owner key is lost

BUG=chromium-os:11193
TEST=run this test

Change-Id: I95b402d47ec430a8556d012d9a2ee2614507a1e4

Review URL: http://codereview.chromium.org/6880288
diff --git a/client/cros/ownership.py b/client/cros/ownership.py
index 008f531..5dd1dcf 100644
--- a/client/cros/ownership.py
+++ b/client/cros/ownership.py
@@ -14,6 +14,11 @@
 from autotest_lib.client.common_lib import autotemp, error
 
 
+class OwnershipError(error.TestError):
+    """Generic error for ownership-related failures."""
+    pass
+
+
 class scoped_tempfile(object):
     """A wrapper that provides scoped semantics for temporary files.
 
@@ -61,7 +66,6 @@
 
 def clear_ownership():
     __unlink(constants.OWNER_KEY_FILE)
-    __unlink(constants.SIGNED_PREFERENCES_FILE)
     __unlink(constants.SIGNED_POLICY_FILE)
 
 
@@ -77,6 +81,94 @@
     return dbus.Interface(proxy, 'org.chromium.SessionManagerInterface')
 
 
+def listen_to_session_manager_signal(callback, signal):
+    """Create and return a DBus connection to session_manager.
+
+    Connects to the session manager over the DBus system bus.  Returns
+    appropriately configured DBus interface object.
+    """
+    bus = dbus.SystemBus()
+    bus.add_signal_receiver(
+        handler_function=callback,
+        signal_name=signal,
+        dbus_interface='org.chromium.Chromium',
+        bus_name=None,
+        path='/')
+
+POLICY_TYPE = 'google/chromeos/device'
+
+
+def assert_has_policy_data(response_proto):
+    if not response_proto.HasField("policy_data"):
+        raise OwnershipError('Malformatted response.')
+
+
+def assert_has_device_settings(data_proto):
+    if (not data_proto.HasField("policy_type") or
+        data_proto.policy_type != POLICY_TYPE or
+        not data_proto.HasField("policy_value")):
+        raise OwnershipError('Malformatted response.')
+
+
+def assert_username(data_proto, username):
+    if data_proto.username != username:
+        raise OwnershipError('Incorrect username.')
+
+
+def assert_guest_setting(settings, guests):
+    if not settings.HasField("guest_mode_enabled"):
+        raise OwnershipError('No guest mode setting protobuf.')
+    if not settings.guest_mode_enabled.HasField("guest_mode_enabled"):
+        raise OwnershipError('No guest mode setting.')
+    if settings.guest_mode_enabled.guest_mode_enabled != guests:
+        raise OwnershipError('Incorrect guest mode setting.')
+
+
+def assert_show_users(settings, show_users):
+    if not settings.HasField("show_user_names"):
+        raise OwnershipError('No show users setting protobuf.')
+    if not settings.show_user_names.HasField("show_user_names"):
+        raise OwnershipError('No show users setting.')
+    if settings.show_user_names.show_user_names != show_users:
+        raise OwnershipError('Incorrect show users setting.')
+
+
+def assert_roaming(settings, roaming):
+    if not settings.HasField("data_roaming_enabled"):
+        raise OwnershipError('No roaming setting protobuf.')
+    if not settings.data_roaming_enabled.HasField("data_roaming_enabled"):
+        raise OwnershipError('No roaming setting.')
+    if settings.data_roaming_enabled.data_roaming_enabled != roaming:
+        raise OwnershipError('Incorrect roaming setting.')
+
+
+def assert_new_users(settings, new_users):
+    if not settings.HasField("allow_new_users"):
+        raise OwnershipError('No allow new users setting protobuf.')
+    if not settings.allow_new_users.HasField("allow_new_users"):
+        raise OwnershipError('No allow new users setting.')
+    if settings.allow_new_users.allow_new_users != new_users:
+        raise OwnershipError('Incorrect allow new users setting.')
+
+
+def assert_users_on_whitelist(settings, users):
+    if settings.HasField("user_whitelist"):
+        for user in users:
+            if user not in settings.user_whitelist.user_whitelist:
+                raise OwnershipError(user + ' not whitelisted.')
+    else:
+        raise OwnershipError('No user whitelist.')
+
+
+def assert_proxy_settings(settings, proxies):
+    if not settings.HasField("device_proxy_settings"):
+        raise OwnershipError('No proxy settings protobuf.')
+    if not settings.device_proxy_settings.HasField("proxy_mode"):
+        raise OwnershipError('No proxy_mode setting.')
+    if settings.device_proxy_settings.proxy_mode != proxies['proxy_mode']:
+        raise OwnershipError('Incorrect proxies: %s' % proxies)
+
+
 NSSDB = constants.CRYPTOHOME_MOUNT_PT + '/.pki/nssdb'
 PK12UTIL = 'nsspk12util'
 OPENSSLP12 = 'openssl pkcs12'
@@ -86,6 +178,35 @@
 OPENSSLCRYPTO = 'openssl sha1'
 
 
+def use_known_ownerkeys():
+    """Sets the system up to use a well-known keypair for owner operations.
+
+    Assuming the appropriate cryptohome is already mounted, configures the
+    device to accept policies signed with the checked-in 'mock' owner key.
+    """
+    dirname = os.path.dirname(__file__)
+    mock_keyfile = os.path.join(dirname, constants.MOCK_OWNER_KEY)
+    mock_certfile = os.path.join(dirname, constants.MOCK_OWNER_CERT)
+    push_to_nss(mock_keyfile, mock_certfile,  NSSDB)
+    utils.open_write_close(constants.OWNER_KEY_FILE,
+                           cert_extract_pubkey_der(mock_certfile))
+
+
+def known_privkey():
+    """Returns the mock owner private key in PEM format.
+    """
+    dirname = os.path.dirname(__file__)
+    return utils.read_file(os.path.join(dirname, constants.MOCK_OWNER_KEY))
+
+
+def known_pubkey():
+    """Returns the mock owner public key in DER format.
+    """
+    dirname = os.path.dirname(__file__)
+    return cert_extract_pubkey_der(os.path.join(dirname,
+                                                constants.MOCK_OWNER_CERT))
+
+
 def pairgen():
     """Generate a self-signed cert and associated private key.
 
@@ -212,5 +333,5 @@
     sig.fo.seek(0)
     sig_data = sig.fo.read()
     if not sig_data:
-        raise error.TestFail('Empty signature!')
+        raise error.OwnershipError('Empty signature!')
     return sig_data