test-scripts: Add cellular_proxy.py and some constants to shill_proxy.py

(1) cellular_proxy.py provides functions and constants over and above
    shill_proxy.py for cellular tests.
(2) Extended shill_proxy.py for tests.

BUG=chromium:249149
TEST=None

Change-Id: I88555d64b6c7f2b625aed7593b22d994a6ff6f52
Reviewed-on: https://gerrit.chromium.org/gerrit/62932
Commit-Queue: Prathmesh Prabhu <pprabhu@chromium.org>
Reviewed-by: Prathmesh Prabhu <pprabhu@chromium.org>
Tested-by: Prathmesh Prabhu <pprabhu@chromium.org>
diff --git a/test-scripts/cellular_proxy.py b/test-scripts/cellular_proxy.py
new file mode 100644
index 0000000..694909e
--- /dev/null
+++ b/test-scripts/cellular_proxy.py
@@ -0,0 +1,201 @@
+# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import dbus
+import logging
+import time
+
+import shill_proxy
+
+class CellularProxy(shill_proxy.ShillProxy):
+    """Wrapper around shill dbus interface used by cellular tests."""
+    # Properties exposed by shill.
+    DEVICE_PROPERTY_DBUS_OBJECT = 'DBus.Object'
+    DEVICE_PROPERTY_ICCID = 'Cellular.ICCID'
+    DEVICE_PROPERTY_SIM_LOCK_STATUS = 'Cellular.SIMLockStatus'
+
+    # Keys into the dictionaries exposed as properties.
+    PROPERTY_KEY_SIM_LOCK_TYPE = 'LockType'
+    PROPERTY_KEY_SIM_LOCK_ENABLED = 'LockEnabled'
+    PROPERTY_KEY_SIM_LOCK_RETRIES_LEFT = 'RetriesLeft'
+
+    # Valid values taken by properties exposed by shill.
+    VALUE_SIM_LOCK_TYPE_PIN = 'sim-pin'
+    VALUE_SIM_LOCK_TYPE_PUK = 'sim-puk'
+
+    # DBus errors raised by shill.
+    ERROR_INCORRECT_PIN = 'org.chromium.flimflam.Error.IncorrectPin'
+    ERROR_PIN_BLOCKED = 'org.chromium.flimflam.Error.PinBlocked'
+
+    def set_logging_for_cellular_test(self):
+        """Set the logging in shill for a test of cellular technology.
+
+        Set the log level to |ShillProxy.LOG_LEVEL_FOR_TEST| and the log scopes
+        to the ones defined in |ShillProxy.LOG_SCOPES_FOR_TEST| for
+        |ShillProxy.TECHNOLOGY_CELLULAR|.
+
+        """
+        self.set_logging_for_test(self.TECHNOLOGY_CELLULAR)
+
+
+    def find_cellular_device_object(self):
+        """Returns the first dbus object found that is a cellular device.
+
+        @return DBus object for the first cellular device found. None if no
+                device found.
+
+        """
+        return self.find_object('Device', {'Type': self.TECHNOLOGY_CELLULAR})
+
+
+    def reset_modem(self, modem, expect_device=True, expect_powered=True,
+                    expect_service=True):
+        """Reset |modem|.
+
+        Do, in sequence,
+        (1) Ensure that the current device object disappears.
+        (2) If |expect_device|, ensure that the device reappears.
+        (3) If |expect_powered|, ensure that the device is powered.
+        (4) If |expect_service|, ensure that the service reappears.
+
+        This function does not check the service state for the device after
+        reset.
+
+        @param modem: DBus object for the modem to reset.
+        @param expect_device: If True, ensure that a DBus object reappears for
+                the same modem after the reset.
+        @param expect_powered: If True, ensure that the modem is powered on
+                after the reset.
+        @param expect_service: If True, ensure that a service managing the
+                reappeared modem also reappears.
+
+        @return (device, service)
+                device: DBus object for the reappeared Device after the reset.
+                service: DBus object for the reappeared Service after the reset.
+                Either of these may be None, if the object is not expected to
+                reappear.
+
+        @raises ShillProxyError if any of the conditions (1)-(4) fail.
+
+        """
+        logging.info('Resetting modem')
+        # Obtain identifying information about the modem.
+        # ICCID is the only property we are guaranteed to obtain whenever the
+        # device is present. Others, like imei/imsi are not present if the
+        # device is locked.
+        properties = modem.GetProperties(utf8_strings=True)
+        iccid = properties.get(self.DEVICE_PROPERTY_ICCID)
+        if not iccid:
+            raise shill_proxy.ShillProxyError(
+                    'Failed to get identifying information for the modem.')
+        old_modem_path = modem.object_path
+        old_modem_mm_object = properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT)
+        if not old_modem_mm_object:
+            raise shill_proxy.ShillProxyError(
+                    'Failed to get the mm object path for the modem.')
+
+        modem.Reset()
+
+        # (1) Wait for the old modem to disappear
+        CellularProxy._poll_for_condition(
+                lambda: self._is_old_modem_gone(old_modem_path,
+                                                old_modem_mm_object),
+                'Old modem disappeared')
+
+        # (2) Wait for the device to reappear
+        if not expect_device:
+            return None, None
+        # The timeout here should be sufficient for our slowest modem to
+        # reappear.
+        CellularProxy._poll_for_condition(
+                lambda: self._get_reappeared_modem(iccid, old_modem_mm_object),
+                desc='The modem reappeared after reset.',
+                timeout=60)
+        new_modem = self._get_reappeared_modem(iccid, old_modem_mm_object)
+
+        # (3) Check powered state of the device
+        if not expect_powered:
+            return new_modem, None
+        self.wait_for_property_in(new_modem, self.DEVICE_PROPERTY_POWERED,
+                                  [self.VALUE_POWERED_ON], timeout_seconds=10)
+
+        # (4) Check that service reappears
+        if not expect_service:
+            return new_modem, None
+        new_service = self.get_service_for_device(new_modem)
+        if not new_service:
+            raise shill_proxy.ShillProxyError(
+                    'Failed to find a shill service managing the reappeared '
+                    'device.')
+        return new_modem, new_service
+
+
+    # TODO(pprabhu) Use utils.poll_for_condition instead, once
+    # cellular_proxy.py moves to autotest.
+    SLEEP_INTERVAL = 0.1
+    @staticmethod
+    def _poll_for_condition(condition, desc, timeout=10):
+        """Poll till |condition| is satisfied.
+
+        @param condition: A function taking no arguments. The condition is
+                met when the return value can be cast to the bool True.
+        @param desc: The description given when we timeout waiting for
+                |condition|.
+
+        """
+        start_time = time.time()
+        while True:
+            value = condition()
+            if value:
+                return value
+            if(time.time() + CellularProxy.SLEEP_INTERVAL - start_time >
+               timeout):
+                raise shill_proxy.ShillProxyError(
+                        'Timed out waiting for condition %s.' % desc)
+            time.sleep(CellularProxy.SLEEP_INTERVAL)
+
+
+    def _is_old_modem_gone(self, modem_path, modem_mm_object):
+        """Tests if the DBus object for modem disappears after Reset.
+
+        @param modem_path: The DBus path for the modem object that must vanish.
+        @param modem_mm_object: The modemmanager object path reported by the
+            old modem. This is unique everytime a new modem is (re)exposed.
+
+        @return True if the object disappeared, false otherwise.
+
+        """
+        device = self.get_dbus_object(self.DBUS_TYPE_DEVICE, modem_path)
+        try:
+            properties = device.GetProperties()
+            # DBus object exists, perhaps a reappeared device?
+            return (properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT) !=
+                    modem_mm_object)
+        except dbus.DBusException as e:
+            if e.get_dbus_name() == self.DBUS_ERROR_UNKNOWN_OBJECT:
+                return True
+            return False
+
+
+    def _get_reappeared_modem(self, iccid, old_modem_mm_object):
+        """Check that a vanished modem reappers.
+
+        @param iccid: The unique constant ICCID reported by the vanished modem.
+        @param old_modem_mm_object: The previously reported modemmanager object
+                path for this modem.
+
+        @return The reappeared DBus object, if any. None otherwise.
+
+        """
+        # TODO(pprabhu) This will break if we have multiple cellular devices
+        # in the system at the same time.
+        device = self.find_cellular_device_object()
+        if not device:
+            return None
+        properties = device.GetProperties(utf8_strings=True)
+        if (iccid == properties.get(self.DEVICE_PROPERTY_ICCID) and
+            (old_modem_mm_object !=
+             properties.get(self.DEVICE_PROPERTY_DBUS_OBJECT))):
+            return device
+        return None
diff --git a/test-scripts/shill_proxy.py b/test-scripts/shill_proxy.py
index 8951a42..f0bf88d 100644
--- a/test-scripts/shill_proxy.py
+++ b/test-scripts/shill_proxy.py
@@ -6,8 +6,17 @@
 import logging
 import time
 
+class ShillProxyError(Exception):
+    """Exceptions raised by ShillProxy and it's children."""
+    pass
+
 
 class ShillProxy(object):
+    # Core DBus error names
+    DBUS_ERROR_UNKNOWN_OBJECT = 'org.freedesktop.DBus.Error.UnknownObject'
+    # Shill error names
+    ERROR_FAILURE = 'org.chromium.flimflam.Error.Failure'
+
     DBUS_INTERFACE = 'org.chromium.flimflam'
     DBUS_SERVICE_UNKNOWN = 'org.freedesktop.DBus.Error.ServiceUnknown'
     DBUS_TYPE_DEVICE = 'org.chromium.flimflam.Device'
@@ -31,6 +40,7 @@
     SERVICE_DISCONNECT_TIMEOUT = 5
 
     SERVICE_PROPERTY_AUTOCONNECT = 'AutoConnect'
+    SERVICE_PROPERTY_DEVICE = 'Device'
     SERVICE_PROPERTY_GUID = 'GUID'
     SERVICE_PROPERTY_HIDDEN = 'WiFi.HiddenSSID'
     SERVICE_PROPERTY_MODE = 'Mode'
@@ -47,12 +57,26 @@
                                     'ibss': 'adhoc',
                                     None: 'managed'}
 
+    DEVICE_PROPERTY_ADDRESS = 'Address'
+    DEVICE_PROPERTY_EAP_AUTHENTICATION_COMPLETED = 'EapAuthenticationCompleted'
+    DEVICE_PROPERTY_EAP_AUTHENTICATOR_DETECTED = 'EapAuthenticatorDetected'
+    DEVICE_PROPERTY_IP_CONFIG = 'IpConfig'
+    DEVICE_PROPERTY_INTERFACE = 'Interface'
+    DEVICE_PROPERTY_NAME = 'Name'
+    DEVICE_PROPERTY_POWERED = 'Powered'
+    DEVICE_PROPERTY_RECEIVE_BYTE_COUNT = 'ReceiveByteCount'
+    DEVICE_PROPERTY_TRANSMIT_BYTE_COUNT = 'TransmitByteCount'
+    DEVICE_PROPERTY_TYPE = 'Type'
+
     TECHNOLOGY_CELLULAR = 'cellular'
     TECHNOLOGY_ETHERNET = 'ethernet'
     TECHNOLOGY_VPN = 'vpn'
     TECHNOLOGY_WIFI = 'wifi'
     TECHNOLOGY_WIMAX = 'wimax'
 
+    VALUE_POWERED_ON = 'on'
+    VALUE_POWERED_OFF = 'off'
+
     POLLING_INTERVAL_SECONDS = 0.2
 
     # Default log level used in connectivity tests.
@@ -88,6 +112,8 @@
             return bool(value)
         elif isinstance(value, int):
             return int(value)
+        elif isinstance(value, dbus.UInt32):
+            return long(value)
         elif isinstance(value, float):
             return float(value)
         elif isinstance(value, str):
@@ -191,7 +217,8 @@
 
     def wait_for_property_in(self, dbus_object, property_name,
                              expected_values, timeout_seconds):
-        """
+        """Wait till a property is in a list of expected values.
+
         Block until the property |property_name| in |dbus_object| is in
         |expected_values|, or |timeout_seconds|.
 
@@ -230,8 +257,7 @@
 
 
     def get_active_profile(self):
-        """
-        Get the active profile in shill.
+        """Get the active profile in shill.
 
         @return dbus object representing the active profile.
 
@@ -243,8 +269,7 @@
 
 
     def get_dbus_object(self, type_str, path):
-        """
-        Return the DBus object of type |type_str| at |path| in shill.
+        """Return the DBus object of type |type_str| at |path| in shill.
 
         @param type_str string (e.g. self.DBUS_TYPE_SERVICE).
         @param path path to object in shill (e.g. '/service/12').
@@ -275,6 +300,31 @@
         return self.get_dbus_object(self.DBUS_TYPE_SERVICE, path)
 
 
+    def get_service_for_device(self, device):
+        """Attempt to find a service that manages |device|.
+
+        @param device a dbus object interface representing a device.
+        @return Dbus object interface representing a service if found. None
+                otherwise.
+
+        """
+        properties = self.manager.GetProperties(utf8_strings=True)
+        all_services = properties.get(self.MANAGER_PROPERTY_ALL_SERVICES,
+                                      None)
+        if not all_services:
+            return None
+
+        for service_path in all_services:
+            service = self.get_dbus_object(self.DBUS_TYPE_SERVICE,
+                                           service_path)
+            properties = service.GetProperties(utf8_strings=True)
+            device_path = properties.get(self.SERVICE_PROPERTY_DEVICE, None)
+            if device_path == device.object_path:
+                return service
+
+        return None
+
+
     def find_object(self, object_type, properties):
         """Find a shill object with the specified type and properties.
 
@@ -305,4 +355,3 @@
             else:
                 return test_object
         return None
-