Chris Masone | d976e0e | 2013-05-06 13:10:07 -0700 | [diff] [blame] | 1 | # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | |
Chris Masone | 3d68fe8 | 2014-03-26 13:18:27 -0700 | [diff] [blame] | 5 | import dbus, gobject, sys |
Chris Masone | d976e0e | 2013-05-06 13:10:07 -0700 | [diff] [blame] | 6 | |
| 7 | import common |
| 8 | from autotest_lib.client.common_lib import error |
Chris Masone | 3d68fe8 | 2014-03-26 13:18:27 -0700 | [diff] [blame] | 9 | from autotest_lib.client.common_lib.cros import session_manager |
| 10 | from autotest_lib.client.cros import ownership |
Chris Masone | d976e0e | 2013-05-06 13:10:07 -0700 | [diff] [blame] | 11 | |
| 12 | |
| 13 | """Utility class for tests that generate, push and fetch policies. |
| 14 | |
| 15 | As the python bindings for the protobufs used in policies are built as a part |
| 16 | of tests that use them, callers must pass in their location at call time.""" |
| 17 | |
| 18 | |
| 19 | def compare_policy_response(proto_binding_location, policy_response, |
| 20 | owner=None, guests=None, new_users=None, |
Xiyuan Xia | b6f3b08 | 2016-02-04 10:39:33 -0800 | [diff] [blame^] | 21 | roaming=None, whitelist=None): |
Chris Masone | d976e0e | 2013-05-06 13:10:07 -0700 | [diff] [blame] | 22 | """Check the contents of |policy_response| against given args. |
| 23 | |
| 24 | Deserializes |policy_response| into a PolicyFetchResponse protobuf, |
| 25 | with an embedded (serialized) PolicyData protobuf that embeds a |
| 26 | (serialized) ChromeDeviceSettingsProto, and checks to see if this |
| 27 | protobuf turducken contains the information passed in. |
| 28 | |
| 29 | @param proto_binding_location: the location of generated python bindings |
| 30 | for policy protobufs. |
| 31 | @param policy_response: string serialization of a PolicyData protobuf. |
| 32 | @param owner: string representing the owner's name/account. |
| 33 | @param guests: boolean indicating whether guests should be allowed. |
| 34 | @param new_users: boolean indicating if user pods are on login screen. |
| 35 | @param roaming: boolean indicating whether data roaming is enabled. |
| 36 | @param whitelist: list of accounts that are allowed to log in. |
Chris Masone | d976e0e | 2013-05-06 13:10:07 -0700 | [diff] [blame] | 37 | |
| 38 | @return True if |policy_response| has all the provided data, else False. |
| 39 | """ |
| 40 | # Pull in protobuf bindings. |
| 41 | sys.path.append(proto_binding_location) |
| 42 | from device_management_backend_pb2 import PolicyFetchResponse |
| 43 | from device_management_backend_pb2 import PolicyData |
| 44 | from chrome_device_policy_pb2 import ChromeDeviceSettingsProto |
| 45 | from chrome_device_policy_pb2 import AllowNewUsersProto |
| 46 | from chrome_device_policy_pb2 import GuestModeEnabledProto |
| 47 | from chrome_device_policy_pb2 import ShowUserNamesOnSigninProto |
| 48 | from chrome_device_policy_pb2 import DataRoamingEnabledProto |
Chris Masone | d976e0e | 2013-05-06 13:10:07 -0700 | [diff] [blame] | 49 | |
| 50 | response_proto = PolicyFetchResponse() |
| 51 | response_proto.ParseFromString(policy_response) |
| 52 | ownership.assert_has_policy_data(response_proto) |
| 53 | |
| 54 | data_proto = PolicyData() |
| 55 | data_proto.ParseFromString(response_proto.policy_data) |
| 56 | ownership.assert_has_device_settings(data_proto) |
| 57 | if owner: ownership.assert_username(data_proto, owner) |
| 58 | |
| 59 | settings = ChromeDeviceSettingsProto() |
| 60 | settings.ParseFromString(data_proto.policy_value) |
| 61 | if guests: ownership.assert_guest_setting(settings, guests) |
| 62 | if new_users: ownership.assert_show_users(settings, new_users) |
| 63 | if roaming: ownership.assert_roaming(settings, roaming) |
| 64 | if whitelist: |
| 65 | ownership.assert_new_users(settings, False) |
| 66 | ownership.assert_users_on_whitelist(settings, whitelist) |
Chris Masone | d976e0e | 2013-05-06 13:10:07 -0700 | [diff] [blame] | 67 | |
| 68 | |
| 69 | def build_policy_data(proto_binding_location, owner=None, guests=None, |
Xiyuan Xia | b6f3b08 | 2016-02-04 10:39:33 -0800 | [diff] [blame^] | 70 | new_users=None, roaming=None, whitelist=None): |
Chris Masone | d976e0e | 2013-05-06 13:10:07 -0700 | [diff] [blame] | 71 | """Generate and serialize a populated device policy protobuffer. |
| 72 | |
| 73 | Creates a PolicyData protobuf, with an embedded |
| 74 | ChromeDeviceSettingsProto, containing the information passed in. |
| 75 | |
| 76 | @param proto_binding_location: the location of generated python bindings |
| 77 | for policy protobufs. |
| 78 | @param owner: string representing the owner's name/account. |
| 79 | @param guests: boolean indicating whether guests should be allowed. |
| 80 | @param new_users: boolean indicating if user pods are on login screen. |
| 81 | @param roaming: boolean indicating whether data roaming is enabled. |
| 82 | @param whitelist: list of accounts that are allowed to log in. |
Chris Masone | d976e0e | 2013-05-06 13:10:07 -0700 | [diff] [blame] | 83 | |
| 84 | @return serialization of the PolicyData proto that we build. |
| 85 | """ |
| 86 | # Pull in protobuf bindings. |
| 87 | sys.path.append(proto_binding_location) |
| 88 | from device_management_backend_pb2 import PolicyData |
| 89 | from chrome_device_policy_pb2 import ChromeDeviceSettingsProto |
| 90 | from chrome_device_policy_pb2 import AllowNewUsersProto |
| 91 | from chrome_device_policy_pb2 import GuestModeEnabledProto |
| 92 | from chrome_device_policy_pb2 import ShowUserNamesOnSigninProto |
| 93 | from chrome_device_policy_pb2 import DataRoamingEnabledProto |
Chris Masone | d976e0e | 2013-05-06 13:10:07 -0700 | [diff] [blame] | 94 | |
| 95 | data_proto = PolicyData() |
| 96 | data_proto.policy_type = ownership.POLICY_TYPE |
| 97 | if owner: data_proto.username = owner |
| 98 | |
| 99 | settings = ChromeDeviceSettingsProto() |
| 100 | if guests: |
| 101 | settings.guest_mode_enabled.guest_mode_enabled = guests |
| 102 | if new_users: |
| 103 | settings.show_user_names.show_user_names = new_users |
| 104 | if roaming: |
| 105 | settings.data_roaming_enabled.data_roaming_enabled = roaming |
| 106 | if whitelist: |
| 107 | settings.allow_new_users.allow_new_users = False |
| 108 | for user in whitelist: |
| 109 | settings.user_whitelist.user_whitelist.append(user) |
Chris Masone | d976e0e | 2013-05-06 13:10:07 -0700 | [diff] [blame] | 110 | |
| 111 | data_proto.policy_value = settings.SerializeToString() |
| 112 | return data_proto.SerializeToString() |
| 113 | |
| 114 | |
| 115 | def generate_policy(proto_binding_location, key, pubkey, policy, old_key=None): |
| 116 | """Generate and serialize a populated, signed device policy protobuffer. |
| 117 | |
| 118 | Creates a protobuf containing the device policy |policy|, signed with |
| 119 | |key|. Also includes the public key |pubkey|, signed with |old_key| |
| 120 | if provided. If not, |pubkey| is signed with |key|. The protobuf |
| 121 | is serialized to a string and returned. |
| 122 | |
| 123 | @param proto_binding_location: the location of generated python bindings |
| 124 | for policy protobufs. |
| 125 | @param key: new policy signing key. |
| 126 | @param pubkey: new public key to be signed and embedded in generated |
| 127 | PolicyFetchResponse. |
| 128 | @param policy: policy data to be embedded in generated PolicyFetchResponse. |
| 129 | @param old_key: if provided, this implies the generated PolicyFetchRespone |
| 130 | is intended to represent a key rotation. pubkey will be |
| 131 | signed with this key before embedding. |
| 132 | |
| 133 | @return serialization of the PolicyFetchResponse proto that we build. |
| 134 | """ |
| 135 | # Pull in protobuf bindings. |
| 136 | sys.path.append(proto_binding_location) |
| 137 | from device_management_backend_pb2 import PolicyFetchResponse |
| 138 | |
| 139 | if old_key == None: |
| 140 | old_key = key |
| 141 | policy_proto = PolicyFetchResponse() |
| 142 | policy_proto.policy_data = policy |
| 143 | policy_proto.policy_data_signature = ownership.sign(key, policy) |
| 144 | policy_proto.new_public_key = pubkey |
| 145 | policy_proto.new_public_key_signature = ownership.sign(old_key, pubkey) |
| 146 | return policy_proto.SerializeToString() |
| 147 | |
| 148 | |
| 149 | def push_policy_and_verify(policy_string, sm): |
| 150 | """Push a device policy to the session manager over DBus. |
| 151 | |
| 152 | The serialized device policy |policy_string| is sent to the session |
| 153 | manager with the StorePolicy DBus call. Success of the store is |
| 154 | validated by fetching the policy again and comparing. |
| 155 | |
| 156 | @param policy_string: serialized policy to push to the session manager. |
| 157 | @param sm: a connected SessionManagerInterface. |
| 158 | |
| 159 | @raises error.TestFail if policy push failed. |
| 160 | """ |
Chris Masone | 3d68fe8 | 2014-03-26 13:18:27 -0700 | [diff] [blame] | 161 | listener = session_manager.OwnershipSignalListener(gobject.MainLoop()) |
| 162 | listener.listen_for_new_policy() |
Chris Masone | d976e0e | 2013-05-06 13:10:07 -0700 | [diff] [blame] | 163 | sm.StorePolicy(dbus.ByteArray(policy_string), byte_arrays=True) |
Chris Masone | 3d68fe8 | 2014-03-26 13:18:27 -0700 | [diff] [blame] | 164 | listener.wait_for_signals(desc='Policy push.') |
Chris Masone | d976e0e | 2013-05-06 13:10:07 -0700 | [diff] [blame] | 165 | |
| 166 | retrieved_policy = sm.RetrievePolicy(byte_arrays=True) |
| 167 | if retrieved_policy != policy_string: |
| 168 | raise error.TestFail('Policy should not be %s' % retrieved_policy) |
| 169 | |
| 170 | |
| 171 | def get_policy(sm): |
| 172 | """Get a device policy from the session manager over DBus. |
| 173 | |
| 174 | Provided mainly for symmetry with push_policy_and_verify(). |
| 175 | |
| 176 | @param sm: a connected SessionManagerInterface. |
| 177 | |
| 178 | @return Serialized PolicyFetchResponse. |
| 179 | """ |
| 180 | return sm.RetrievePolicy(byte_arrays=True) |