| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| # DEPRECATED |
| # Do not use flimflam.py in future development. |
| # Extend / migrate to shill_proxy suite of scripts instead. |
| |
| import logging, time |
| |
| import dbus |
| |
| DEFAULT_CELLULAR_TIMEOUT = 60 |
| |
| def make_dbus_boolean(value): |
| value = value.upper() |
| if value in ["ON", "TRUE"]: |
| return dbus.Boolean(1) |
| elif value in ["OFF", "FALSE"]: |
| return dbus.Boolean(0) |
| else: |
| return dbus.Boolean(int(value)) |
| |
| # |
| # Convert a DBus value to a printable value; used |
| # to print properties returned via DBus |
| # |
| def convert_dbus_value(value, indent=0): |
| # DEPRECATED |
| spacer = ' ' * indent |
| if value.__class__ == dbus.Byte: |
| return int(value) |
| elif value.__class__ == dbus.Boolean: |
| return bool(value) |
| elif value.__class__ == dbus.Dictionary: |
| valstr = "{" |
| for key in value: |
| valstr += "\n" + spacer + " " + \ |
| key + ": " + str(convert_dbus_value(value[key], indent + 4)) |
| valstr += "\n" + spacer + "}" |
| return valstr |
| elif value.__class__ == dbus.Array: |
| valstr = "[" |
| for val in value: |
| valstr += "\n" + spacer + " " + \ |
| str(convert_dbus_value(val, indent + 4)) |
| valstr += "\n" + spacer + "]" |
| return valstr |
| else: |
| return str(value) |
| |
| class FlimFlam(object): |
| # DEPRECATED |
| |
| SHILL_DBUS_INTERFACE = "org.chromium.flimflam" |
| UNKNOWN_METHOD = 'org.freedesktop.DBus.Error.UnknownMethod' |
| UNKNOWN_OBJECT = 'org.freedesktop.DBus.Error.UnknownObject' |
| |
| DEVICE_WIMAX = 'wimax' |
| DEVICE_CELLULAR = 'cellular' |
| |
| @staticmethod |
| def _GetContainerName(kind): |
| """Map shill element names to the names of their collections.""" |
| # For example, Device - > Devices. |
| # Just pulling this out so we can use a map if we start |
| # caring about "AvailableTechnologies" |
| return kind + "s" |
| |
| @staticmethod |
| def WaitForServiceState(service, expected_states, timeout, |
| ignore_failure=False, property_name="State"): |
| """Wait until service enters a state in expected_states or times out. |
| Args: |
| service: service to watch |
| expected_states: list of exit states |
| timeout: in seconds |
| ignore_failure: should the failure state be ignored? |
| property_name: name of service property |
| |
| Returns: (state, seconds waited) |
| |
| If the state is "failure" and ignore_failure is False we return |
| immediately without waiting for the timeout. |
| """ |
| |
| state = None |
| start_time = time.time() |
| timeout = start_time + timeout |
| while time.time() < timeout: |
| properties = service.GetProperties(utf8_strings = True) |
| state = properties.get(property_name, None) |
| if ((state == "failure" and not ignore_failure) or |
| state in expected_states): |
| break |
| time.sleep(.5) |
| |
| config_time = time.time() - start_time |
| # str() to remove DBus boxing |
| return (str(state), config_time) |
| |
| @staticmethod |
| def DisconnectService(service, wait_timeout=15): |
| try: |
| service.Disconnect() |
| except dbus.exceptions.DBusException, error: |
| if error.get_dbus_name() not in [ |
| FlimFlam.SHILL_DBUS_INTERFACE + ".Error.InProgress", |
| FlimFlam.SHILL_DBUS_INTERFACE + ".Error.NotConnected", ]: |
| raise error |
| return FlimFlam.WaitForServiceState(service, ['idle'], wait_timeout) |
| |
| def __init__(self, bus=None): |
| if not bus: |
| bus = dbus.SystemBus() |
| self.bus = bus |
| shill = bus.get_object(FlimFlam.SHILL_DBUS_INTERFACE, "/") |
| self.manager = dbus.Interface( |
| shill, |
| FlimFlam.SHILL_DBUS_INTERFACE + ".Manager") |
| |
| def _FindDevice(self, device_type, timeout): |
| """ Return the first device object that matches a given device type. |
| |
| Wait until the device type is avilable or until timeout |
| |
| Args: |
| device_type: string format of the type of device. |
| timeout: in seconds |
| |
| Returns: Device or None |
| """ |
| timeout = time.time() + timeout |
| device_obj = None |
| while time.time() < timeout: |
| device_obj = self.FindElementByPropertySubstring('Device', |
| 'Type', |
| device_type) |
| if device_obj: |
| break |
| time.sleep(1) |
| return device_obj |
| |
| def FindCellularDevice(self, timeout=DEFAULT_CELLULAR_TIMEOUT): |
| return self._FindDevice(self.DEVICE_CELLULAR, timeout) |
| |
| def FindWimaxDevice(self, timeout=30): |
| return self._FindDevice(self.DEVICE_WIMAX, timeout) |
| |
| def _FindService(self, device_type, timeout): |
| """Return the first service object that matches the device type. |
| |
| Wait until a service is available or until the timeout. |
| |
| Args: |
| device_type: string format of the type of device. |
| timeout: in seconds |
| |
| Returns: service or None |
| """ |
| start_time = time.time() |
| timeout = start_time + timeout |
| service = None |
| while time.time() < timeout: |
| service = self.FindElementByPropertySubstring('Service', |
| 'Type', device_type) |
| if service: |
| break |
| time.sleep(.5) |
| return service |
| |
| def FindCellularService(self, timeout=DEFAULT_CELLULAR_TIMEOUT): |
| return self._FindService(self.DEVICE_CELLULAR, timeout) |
| |
| def FindWimaxService(self, timeout=30): |
| return self._FindService(self.DEVICE_WIMAX, timeout) |
| |
| def GetService(self, params): |
| path = self.manager.GetService(params) |
| return self.GetObjectInterface("Service", path) |
| |
| def ConnectService(self, assoc_timeout=15, config_timeout=15, |
| async=False, service=None, service_type='', |
| retry=False, retries=1, retry_sleep=15, |
| save_creds=False, |
| **kwargs): |
| """Connect to a service and wait until connection is up |
| Args: |
| assoc_timeout, config_timeout: Timeouts in seconds. |
| async: return immediately. do not wait for connection. |
| service: DBus service |
| service_type: If supplied, invoke type-specific code to find service. |
| retry: Retry connection after Connect failure. |
| retries: Number of retries to allow. |
| retry_sleep: Number of seconds to wait before retrying. |
| kwargs: Additional args for type-specific code |
| |
| Returns: |
| (success, dictionary), where dictionary contains stats and diagnostics. |
| """ |
| output = {} |
| connected_states = ["ready", "portal", "online"] |
| |
| # Retry connections on failure. Need to call GetService again as some |
| # Connect failure states are unrecoverable. |
| connect_success = False |
| while not connect_success: |
| if service_type == "wifi": |
| try: |
| # Sanity check to make sure the caller hasn't provided |
| # both a service and a service type. At which point its |
| # unclear what they actually want to do, so err on the |
| # side of caution and except out. |
| if service: |
| raise Exception('supplied service and service type') |
| params = { |
| "Type": service_type, |
| "Mode": kwargs["mode"], |
| "SSID": kwargs["ssid"], |
| "Security": kwargs.get("security", "none"), |
| "SaveCredentials": save_creds } |
| # Supply a passphrase only if it is non-empty. |
| passphrase = kwargs.get("passphrase", "") |
| if passphrase: |
| params["Passphrase"] = passphrase |
| path = self.manager.GetService(params) |
| service = self.GetObjectInterface("Service", path) |
| except Exception, e: |
| output["reason"] = "FAIL(GetService): exception %s" % e |
| return (False, output) |
| |
| output["service"] = service |
| |
| try: |
| service.Connect() |
| connect_success = True |
| except Exception, e: |
| if not retry or retries == 0: |
| output["reason"] = "FAIL(Connect): exception %s" % e |
| return (False, output) |
| else: |
| logging.info("INFO(Connect): connect failed. Retrying...") |
| retries -= 1 |
| |
| if not connect_success: |
| # FlimFlam can be a little funny sometimes. At least for In |
| # Progress errors, even though the service state may be failed, |
| # it is actually still trying to connect. As such, while we're |
| # waiting for retry, keep checking the service state to see if |
| # it actually succeeded in connecting. |
| state = FlimFlam.WaitForServiceState( |
| service=service, |
| expected_states=connected_states, |
| timeout=retry_sleep, |
| ignore_failure=True)[0] |
| |
| if state in connected_states: |
| return (True, output) |
| |
| # While service can be caller provided, it is also set by the |
| # GetService call above. If service was not caller provided we |
| # need to reset it to None so we don't fail the sanity check |
| # above. |
| if service_type != '': |
| service = None |
| |
| if async: |
| return (True, output) |
| |
| logging.info("Associating...") |
| (state, assoc_time) = ( |
| FlimFlam.WaitForServiceState(service, |
| ["configuration"] + connected_states, |
| assoc_timeout)) |
| output["state"] = state |
| if state == "failure": |
| output["reason"] = "FAIL(assoc)" |
| if assoc_time > assoc_timeout: |
| output["reason"] = "TIMEOUT(assoc)" |
| output["assoc_time"] = assoc_time |
| if "reason" in output: |
| return (False, output) |
| |
| |
| (state, config_time) = ( |
| FlimFlam.WaitForServiceState(service, |
| connected_states, config_timeout)) |
| output["state"] = state |
| if state == "failure": |
| output["reason"] = "FAIL(config)" |
| if config_time > config_timeout: |
| output["reason"] = "TIMEOUT(config)" |
| output["config_time"] = config_time |
| |
| if "reason" in output: |
| return (False, output) |
| |
| return (True, output) |
| |
| def GetObjectInterface(self, kind, path): |
| return dbus.Interface( |
| self.bus.get_object(FlimFlam.SHILL_DBUS_INTERFACE, path), |
| FlimFlam.SHILL_DBUS_INTERFACE + "." + kind) |
| |
| def FindElementByNameSubstring(self, kind, substring): |
| properties = self.manager.GetProperties(utf8_strings = True) |
| for path in properties[FlimFlam._GetContainerName(kind)]: |
| if path.find(substring) >= 0: |
| return self.GetObjectInterface(kind, path) |
| return None |
| |
| def FindElementByPropertySubstring(self, kind, prop, substring): |
| properties = self.manager.GetProperties(utf8_strings = True) |
| for path in properties[FlimFlam._GetContainerName(kind)]: |
| obj = self.GetObjectInterface(kind, path) |
| try: |
| obj_properties = obj.GetProperties(utf8_strings = True) |
| except dbus.exceptions.DBusException, error: |
| if (error.get_dbus_name() == self.UNKNOWN_METHOD or |
| error.get_dbus_name() == self.UNKNOWN_OBJECT): |
| # object disappeared; ignore and keep looking |
| continue |
| else: |
| raise error |
| if (prop in obj_properties and |
| obj_properties[prop].find(substring) >= 0): |
| return obj |
| return None |
| |
| def GetObjectList(self, kind, properties=None): |
| if properties is None: |
| properties = self.manager.GetProperties(utf8_strings = True) |
| return [self.GetObjectInterface(kind, path) |
| for path in properties[FlimFlam._GetContainerName(kind)]] |
| |
| def GetActiveProfile(self): |
| properties = self.manager.GetProperties(utf8_strings = True) |
| return self.GetObjectInterface("Profile", properties["ActiveProfile"]) |
| |
| def CreateProfile(self, ident): |
| path = self.manager.CreateProfile(ident) |
| return self.GetObjectInterface("Profile", path) |
| |
| def RemoveProfile(self, ident): |
| self.manager.RemoveProfile(ident) |
| |
| def PushProfile(self, ident): |
| path = self.manager.PushProfile(ident) |
| return self.GetObjectInterface("Profile", path) |
| |
| def PopProfile(self, ident): |
| self.manager.PopProfile(ident) |
| |
| def PopAnyProfile(self): |
| self.manager.PopAnyProfile() |
| |
| def GetSystemState(self): |
| properties = self.manager.GetProperties(utf8_strings = True) |
| return properties["State"] |
| |
| def GetDebugTags(self): |
| return self.manager.GetDebugTags() |
| |
| def ListDebugTags(self): |
| return self.manager.ListDebugTags() |
| |
| def SetDebugTags(self, taglist): |
| try: |
| self.manager.SetDebugTags(taglist) |
| self.SetDebugLevel(-4) |
| except dbus.exceptions.DBusException, error: |
| if error.get_dbus_name() not in [ |
| "org.freedesktop.DBus.Error.UnknownMethod" ]: |
| raise error |
| |
| def SetDebugLevel(self, level): |
| self.manager.SetDebugLevel(level) |
| |
| def GetServiceOrder(self): |
| return self.manager.GetServiceOrder() |
| |
| def SetServiceOrder(self, new_order): |
| old_order = self.GetServiceOrder() |
| self.manager.SetServiceOrder(new_order) |
| return (old_order, new_order) |
| |
| def EnableTechnology(self, tech): |
| try: |
| self.manager.EnableTechnology(tech) |
| except dbus.exceptions.DBusException, error: |
| if error.get_dbus_name() not in [ |
| FlimFlam.SHILL_DBUS_INTERFACE + ".Error.AlreadyEnabled", |
| FlimFlam.SHILL_DBUS_INTERFACE + ".Error.InProgress" ]: |
| raise error |
| |
| def DisableTechnology(self, tech): |
| self.manager.DisableTechnology(tech, timeout=60) |
| |
| def RequestScan(self, technology): |
| self.manager.RequestScan(technology) |
| |
| def GetCountry(self): |
| properties = self.manager.GetProperties(utf8_strings = True) |
| return properties["Country"] |
| |
| def SetCountry(self, country): |
| self.manager.SetProperty("Country", country) |
| |
| def GetCheckPortalList(self): |
| properties = self.manager.GetProperties(utf8_strings = True) |
| return properties["CheckPortalList"] |
| |
| def SetCheckPortalList(self, tech_list): |
| self.manager.SetProperty("CheckPortalList", tech_list) |
| |
| def GetPortalURL(self): |
| properties = self.manager.GetProperties(utf8_strings = True) |
| return properties["PortalURL"] |
| |
| def SetPortalURL(self, url): |
| self.manager.SetProperty("PortalURL", url) |
| |
| def GetArpGateway(self): |
| properties = self.manager.GetProperties() |
| return properties["ArpGateway"] |
| |
| def SetArpGateway(self, do_arp_gateway): |
| self.manager.SetProperty("ArpGateway", do_arp_gateway) |
| |
| |
| class DeviceManager(object): |
| # DEPRECATED |
| """Use flimflam to isolate a given interface for testing. |
| |
| DeviceManager can be used to turn off network devices that are not |
| under test so that they will not interfere with testing. |
| |
| NB: Ethernet devices are special inside Flimflam. You will need to |
| take care of them via other means (like, for example, the |
| backchannel ethernet code in client autotests) |
| |
| Sample usage: |
| |
| device_manager = flimflam.DeviceManager() |
| try: |
| device_manager.ShutdownAllExcept('cellular') |
| use routing.getRouteFor() |
| to verify that only the expected device is used |
| do stuff to test cellular connections |
| finally: |
| device_manager.RestoreDevices() |
| """ |
| |
| @staticmethod |
| def _EnableDevice(device, enable): |
| """Enables/Disables a device in shill.""" |
| if enable: |
| device.Enable() |
| else: |
| device.Disable() |
| |
| def __init__(self, flim=None): |
| self.flim_ = flim or FlimFlam() |
| self.devices_to_restore_ = [] |
| |
| def ShutdownAllExcept(self, device_type): |
| """Shutdown all devices except device_type ones.""" |
| for device in self.flim_.GetObjectList('Device'): |
| device_properties = device.GetProperties(utf8_strings = True) |
| if (device_properties["Type"] != device_type): |
| logging.info("Powering off %s device %s", |
| device_properties["Type"], |
| device.object_path) |
| self.devices_to_restore_.append(device.object_path) |
| DeviceManager._EnableDevice(device, False) |
| |
| def RestoreDevices(self): |
| """Restore devices powered down in ShutdownAllExcept.""" |
| should_raise = False |
| to_raise = Exception("Nothing to raise") |
| for device_path in self.devices_to_restore_: |
| try: |
| logging.info("Attempting to power on device %s", device_path) |
| device = self.flim_.GetObjectInterface("Device", device_path) |
| DeviceManager._EnableDevice(device, True) |
| except Exception, e: |
| # We want to keep on trying to power things on, so save an |
| # exception and continue |
| should_raise = True |
| to_raise = e |
| if should_raise: |
| raise to_raise |