| # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import collections |
| import logging |
| import os |
| import re |
| |
| from autotest_lib.client.bin import local_host |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.common_lib import error |
| from autotest_lib.client.common_lib.cros.network import netblock |
| |
| # A tuple consisting of a readable part number (one of NAME_* below) |
| # and a kernel module that provides the driver for this part (e.g. ath9k). |
| DeviceDescription = collections.namedtuple('DeviceDescription', |
| ['name', 'kernel_module']) |
| |
| |
| # A tuple describing a default route, consisting of an interface name, |
| # gateway IP address, and the metric value visible in the routing table. |
| DefaultRoute = collections.namedtuple('DefaultRoute', ['interface_name', |
| 'gateway', |
| 'metric']) |
| |
| NAME_MARVELL_88W8797_SDIO = 'Marvell 88W8797 SDIO' |
| NAME_MARVELL_88W8887_SDIO = 'Marvell 88W8887 SDIO' |
| NAME_MARVELL_88W8897_SDIO = 'Marvell 88W8897 SDIO' |
| NAME_MARVELL_88W8897_PCIE = 'Marvell 88W8897 PCIE' |
| NAME_MARVELL_88W8997_PCIE = 'Marvell 88W8997 PCIE' |
| NAME_ATHEROS_AR9280 = 'Atheros AR9280' |
| NAME_ATHEROS_AR9382 = 'Atheros AR9382' |
| NAME_ATHEROS_AR9462 = 'Atheros AR9462' |
| NAME_QUALCOMM_ATHEROS_QCA6174 = 'Qualcomm Atheros QCA6174' |
| NAME_QUALCOMM_ATHEROS_QCA6174_SDIO = 'Qualcomm Atheros QCA6174 SDIO' |
| NAME_QUALCOMM_WCN3990 = 'Qualcomm WCN3990' |
| NAME_INTEL_7260 = 'Intel 7260' |
| NAME_INTEL_7265 = 'Intel 7265' |
| NAME_INTEL_9000 = 'Intel 9000' |
| NAME_INTEL_9260 = 'Intel 9260' |
| NAME_INTEL_22260 = 'Intel 22260' |
| NAME_BROADCOM_BCM4354_SDIO = 'Broadcom BCM4354 SDIO' |
| NAME_BROADCOM_BCM4356_PCIE = 'Broadcom BCM4356 PCIE' |
| NAME_BROADCOM_BCM4371_PCIE = 'Broadcom BCM4371 PCIE' |
| NAME_UNKNOWN = 'Unknown WiFi Device' |
| |
| DEVICE_INFO_ROOT = '/sys/class/net' |
| |
| DeviceInfo = collections.namedtuple('DeviceInfo', ['vendor', 'device', |
| 'compatible']) |
| # Provide default values for parameters. |
| DeviceInfo.__new__.__defaults__ = (None, None, None) |
| |
| DEVICE_NAME_LOOKUP = { |
| DeviceInfo('0x02df', '0x9129'): NAME_MARVELL_88W8797_SDIO, |
| DeviceInfo('0x02df', '0x912d'): NAME_MARVELL_88W8897_SDIO, |
| DeviceInfo('0x02df', '0x9135'): NAME_MARVELL_88W8887_SDIO, |
| DeviceInfo('0x11ab', '0x2b38'): NAME_MARVELL_88W8897_PCIE, |
| DeviceInfo('0x1b4b', '0x2b42'): NAME_MARVELL_88W8997_PCIE, |
| DeviceInfo('0x168c', '0x002a'): NAME_ATHEROS_AR9280, |
| DeviceInfo('0x168c', '0x0030'): NAME_ATHEROS_AR9382, |
| DeviceInfo('0x168c', '0x0034'): NAME_ATHEROS_AR9462, |
| DeviceInfo('0x168c', '0x003e'): NAME_QUALCOMM_ATHEROS_QCA6174, |
| DeviceInfo('0x105b', '0xe09d'): NAME_QUALCOMM_ATHEROS_QCA6174, |
| DeviceInfo('0x0271', '0x050a'): NAME_QUALCOMM_ATHEROS_QCA6174_SDIO, |
| DeviceInfo('0x8086', '0x08b1'): NAME_INTEL_7260, |
| DeviceInfo('0x8086', '0x08b2'): NAME_INTEL_7260, |
| DeviceInfo('0x8086', '0x095a'): NAME_INTEL_7265, |
| DeviceInfo('0x8086', '0x095b'): NAME_INTEL_7265, |
| DeviceInfo('0x8086', '0x9df0'): NAME_INTEL_9000, |
| DeviceInfo('0x8086', '0x31dc'): NAME_INTEL_9000, |
| DeviceInfo('0x8086', '0x2526'): NAME_INTEL_9260, |
| DeviceInfo('0x8086', '0x2723'): NAME_INTEL_22260, |
| DeviceInfo('0x02d0', '0x4354'): NAME_BROADCOM_BCM4354_SDIO, |
| DeviceInfo('0x14e4', '0x43ec'): NAME_BROADCOM_BCM4356_PCIE, |
| DeviceInfo('0x14e4', '0x440d'): NAME_BROADCOM_BCM4371_PCIE, |
| |
| DeviceInfo(compatible='qcom,wcn3990-wifi'): NAME_QUALCOMM_WCN3990, |
| } |
| |
| class Interface: |
| """Interace is a class that contains the queriable address properties |
| of an network device. |
| """ |
| ADDRESS_TYPE_MAC = 'link/ether' |
| ADDRESS_TYPE_IPV4 = 'inet' |
| ADDRESS_TYPE_IPV6 = 'inet6' |
| ADDRESS_TYPES = [ ADDRESS_TYPE_MAC, ADDRESS_TYPE_IPV4, ADDRESS_TYPE_IPV6 ] |
| |
| |
| @staticmethod |
| def get_connected_ethernet_interface(ignore_failures=False): |
| """Get an interface object representing a connected ethernet device. |
| |
| Raises an exception if no such interface exists. |
| |
| @param ignore_failures bool function will return None instead of raising |
| an exception on failures. |
| @return an Interface object except under the conditions described above. |
| |
| """ |
| # Assume that ethernet devices are called ethX until proven otherwise. |
| for device_name in ['eth%d' % i for i in range(5)]: |
| ethernet_if = Interface(device_name) |
| if ethernet_if.exists and ethernet_if.ipv4_address: |
| return ethernet_if |
| |
| else: |
| if ignore_failures: |
| return None |
| |
| raise error.TestFail('Failed to find ethernet interface.') |
| |
| |
| def __init__(self, name, host=None): |
| self._name = name |
| if host is None: |
| self.host = local_host.LocalHost() |
| else: |
| self.host = host |
| self._run = self.host.run |
| |
| |
| @property |
| def name(self): |
| """@return name of the interface (e.g. 'wlan0').""" |
| return self._name |
| |
| |
| @property |
| def addresses(self): |
| """@return the addresses (MAC, IP) associated with interface.""" |
| # "ip addr show %s 2> /dev/null" returns something that looks like: |
| # |
| # 2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast |
| # link/ether ac:16:2d:07:51:0f brd ff:ff:ff:ff:ff:ff |
| # inet 172.22.73.124/22 brd 172.22.75.255 scope global eth0 |
| # inet6 2620:0:1000:1b02:ae16:2dff:fe07:510f/64 scope global dynamic |
| # valid_lft 2591982sec preferred_lft 604782sec |
| # inet6 fe80::ae16:2dff:fe07:510f/64 scope link |
| # valid_lft forever preferred_lft forever |
| # |
| # We extract the second column from any entry for which the first |
| # column is an address type we are interested in. For example, |
| # for "inet 172.22.73.124/22 ...", we will capture "172.22.73.124/22". |
| result = self._run('ip addr show %s 2> /dev/null' % self._name, |
| ignore_status=True) |
| address_info = result.stdout |
| if result.exit_status != 0: |
| # The "ip" command will return non-zero if the interface does |
| # not exist. |
| return {} |
| |
| addresses = {} |
| for address_line in address_info.splitlines(): |
| address_parts = address_line.lstrip().split() |
| if len(address_parts) < 2: |
| continue |
| address_type, address_value = address_parts[:2] |
| if address_type in self.ADDRESS_TYPES: |
| if address_type not in addresses: |
| addresses[address_type] = [] |
| addresses[address_type].append(address_value) |
| return addresses |
| |
| |
| @property |
| def device_path(self): |
| """@return the sysfs path of the interface device""" |
| # This assumes that our path separator is the same as the remote host. |
| device_path = os.path.join(DEVICE_INFO_ROOT, self._name, 'device') |
| if not self.host.path_exists(device_path): |
| logging.error('No device information found at %s', device_path) |
| return None |
| |
| return device_path |
| |
| |
| @property |
| def wiphy_name(self): |
| """ |
| @return name of the wiphy (e.g., 'phy0'), if available. |
| Otherwise None. |
| """ |
| readlink_result = self._run('readlink "%s"' % |
| os.path.join(DEVICE_INFO_ROOT, self._name, 'phy80211'), |
| ignore_status=True) |
| if readlink_result.exit_status != 0: |
| return None |
| |
| return os.path.basename(readlink_result.stdout.strip()) |
| |
| |
| @property |
| def module_name(self): |
| """@return Name of kernel module in use by this interface.""" |
| module_readlink_result = self._run('readlink "%s"' % |
| os.path.join(self.device_path, 'driver', 'module'), |
| ignore_status=True) |
| if module_readlink_result.exit_status != 0: |
| return None |
| |
| return os.path.basename(module_readlink_result.stdout.strip()) |
| |
| @property |
| def parent_device_name(self): |
| """ |
| @return Name of device at which wiphy device is present. For example, |
| for a wifi NIC present on a PCI bus, this would be the same as |
| PCI_SLOT_PATH. """ |
| path_readlink_result = self._run('readlink "%s"' % self.device_path) |
| if path_readlink_result.exit_status != 0: |
| return None |
| |
| return os.path.basename(path_readlink_result.stdout.strip()) |
| |
| @property |
| def device_description(self): |
| """@return DeviceDescription object for a WiFi interface, or None.""" |
| read_file = (lambda path: self._run('cat "%s"' % path).stdout.rstrip() |
| if self.host.path_exists(path) else None) |
| if not self.is_wifi_device(): |
| logging.error('Device description not supported on non-wifi ' |
| 'interface: %s.', self._name) |
| return None |
| |
| device_path = self.device_path |
| if not device_path: |
| logging.error('No device path found') |
| return None |
| |
| # Try to identify using either vendor/product ID, or using device tree |
| # "OF_COMPATIBLE_x". |
| vendor_id = read_file(os.path.join(device_path, 'vendor')) |
| product_id = read_file(os.path.join(device_path, 'device')) |
| uevent = read_file(os.path.join(device_path, 'uevent')) |
| |
| # Vendor/product ID. |
| infos = [DeviceInfo(vendor_id, product_id)] |
| |
| # Compatible value(s). |
| for line in uevent.splitlines(): |
| key, _, value = line.partition('=') |
| if re.match('^OF_COMPATIBLE_[0-9]+$', key): |
| infos += [DeviceInfo(compatible=value)] |
| |
| for info in infos: |
| if info in DEVICE_NAME_LOOKUP: |
| device_name = DEVICE_NAME_LOOKUP[info] |
| logging.debug('Device is %s', device_name) |
| break |
| else: |
| logging.error('Device is unknown. Info: %r', infos) |
| device_name = NAME_UNKNOWN |
| module_name = self.module_name |
| kernel_release = self._run('uname -r').stdout.strip() |
| net_drivers_path = '/lib/modules/%s/kernel/drivers/net' % kernel_release |
| if module_name is not None and self.host.path_exists(net_drivers_path): |
| module_path = self._run('find %s -name %s.ko -printf %%P' % ( |
| net_drivers_path, module_name)).stdout |
| else: |
| module_path = 'Unknown (kernel might have modules disabled)' |
| return DeviceDescription(device_name, module_path) |
| |
| |
| @property |
| def exists(self): |
| """@return True if this interface exists, False otherwise.""" |
| # No valid interface has no addresses at all. |
| return bool(self.addresses) |
| |
| |
| |
| def get_ip_flags(self): |
| """@return List of flags from 'ip addr show'.""" |
| # "ip addr show %s 2> /dev/null" returns something that looks like: |
| # |
| # 2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast |
| # link/ether ac:16:2d:07:51:0f brd ff:ff:ff:ff:ff:ff |
| # inet 172.22.73.124/22 brd 172.22.75.255 scope global eth0 |
| # inet6 2620:0:1000:1b02:ae16:2dff:fe07:510f/64 scope global dynamic |
| # valid_lft 2591982sec preferred_lft 604782sec |
| # inet6 fe80::ae16:2dff:fe07:510f/64 scope link |
| # valid_lft forever preferred_lft forever |
| # |
| # We only cares about the flags in the first line. |
| result = self._run('ip addr show %s 2> /dev/null' % self._name, |
| ignore_status=True) |
| address_info = result.stdout |
| if result.exit_status != 0: |
| # The "ip" command will return non-zero if the interface does |
| # not exist. |
| return [] |
| status_line = address_info.splitlines()[0] |
| flags_str = status_line[status_line.find('<')+1:status_line.find('>')] |
| return flags_str.split(',') |
| |
| |
| @property |
| def is_up(self): |
| """@return True if this interface is UP, False otherwise.""" |
| return 'UP' in self.get_ip_flags() |
| |
| |
| @property |
| def is_lower_up(self): |
| """ |
| Check if the interface is in LOWER_UP state. This usually means (e.g., |
| for ethernet) a link is detected. |
| |
| @return True if this interface is LOWER_UP, False otherwise.""" |
| return 'LOWER_UP' in self.get_ip_flags() |
| |
| |
| def is_link_operational(self): |
| """@return True if RFC 2683 IfOperStatus is UP (i.e., is able to pass |
| packets). |
| """ |
| command = 'ip link show %s' % self._name |
| result = self._run(command, ignore_status=True) |
| if result.exit_status: |
| return False |
| return result.stdout.find('state UP') >= 0 |
| |
| |
| @property |
| def mac_address(self): |
| """@return the (first) MAC address, e.g., "00:11:22:33:44:55".""" |
| return self.addresses.get(self.ADDRESS_TYPE_MAC, [None])[0] |
| |
| |
| @property |
| def ipv4_address_and_prefix(self): |
| """@return the IPv4 address/prefix, e.g., "192.186.0.1/24".""" |
| return self.addresses.get(self.ADDRESS_TYPE_IPV4, [None])[0] |
| |
| |
| @property |
| def ipv4_address(self): |
| """@return the (first) IPv4 address, e.g., "192.168.0.1".""" |
| netblock_addr = self.netblock |
| return netblock_addr.addr if netblock_addr else None |
| |
| |
| @property |
| def ipv4_prefix(self): |
| """@return the IPv4 address prefix e.g., 24.""" |
| addr = self.netblock |
| return addr.prefix_len if addr else None |
| |
| |
| @property |
| def ipv4_subnet(self): |
| """@return string subnet of IPv4 address (e.g. '192.168.0.0')""" |
| addr = self.netblock |
| return addr.subnet if addr else None |
| |
| |
| @property |
| def ipv4_subnet_mask(self): |
| """@return the IPv4 subnet mask e.g., "255.255.255.0".""" |
| addr = self.netblock |
| return addr.netmask if addr else None |
| |
| |
| def is_wifi_device(self): |
| """@return True if iw thinks this is a wifi device.""" |
| if self._run('iw dev %s info' % self._name, |
| ignore_status=True).exit_status: |
| logging.debug('%s does not seem to be a wireless device.', |
| self._name) |
| return False |
| return True |
| |
| |
| @property |
| def netblock(self): |
| """Return Netblock object for this interface's IPv4 address. |
| |
| @return Netblock object (or None if no IPv4 address found). |
| |
| """ |
| netblock_str = self.ipv4_address_and_prefix |
| return netblock.from_addr(netblock_str) if netblock_str else None |
| |
| |
| @property |
| def signal_level(self): |
| """Get the signal level for an interface. |
| |
| This is currently only defined for WiFi interfaces. |
| |
| localhost test # iw dev mlan0 link |
| Connected to 04:f0:21:03:7d:b2 (on mlan0) |
| SSID: Perf_slvf0_ch36 |
| freq: 5180 |
| RX: 699407596 bytes (8165441 packets) |
| TX: 58632580 bytes (9923989 packets) |
| signal: -54 dBm |
| tx bitrate: 130.0 MBit/s MCS 15 |
| |
| bss flags: |
| dtim period: 2 |
| beacon int: 100 |
| |
| @return signal level in dBm (a negative, integral number). |
| |
| """ |
| if not self.is_wifi_device(): |
| return None |
| |
| result_lines = self._run('iw dev %s link' % |
| self._name).stdout.splitlines() |
| signal_pattern = re.compile('signal:\s+([-0-9]+)\s+dbm') |
| for line in result_lines: |
| cleaned = line.strip().lower() |
| match = re.search(signal_pattern, cleaned) |
| if match is not None: |
| return int(match.group(1)) |
| |
| logging.error('Failed to find signal level for %s.', self._name) |
| return None |
| |
| |
| @property |
| def mtu(self): |
| """@return the interface configured maximum transmission unit (MTU).""" |
| # "ip addr show %s 2> /dev/null" returns something that looks like: |
| # |
| # 2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast |
| # link/ether ac:16:2d:07:51:0f brd ff:ff:ff:ff:ff:ff |
| # inet 172.22.73.124/22 brd 172.22.75.255 scope global eth0 |
| # inet6 2620:0:1000:1b02:ae16:2dff:fe07:510f/64 scope global dynamic |
| # valid_lft 2591982sec preferred_lft 604782sec |
| # inet6 fe80::ae16:2dff:fe07:510f/64 scope link |
| # valid_lft forever preferred_lft forever |
| # |
| # We extract the 'mtu' value (in this example "1500") |
| try: |
| result = self._run('ip addr show %s 2> /dev/null' % self._name) |
| address_info = result.stdout |
| except error.CmdError, e: |
| # The "ip" command will return non-zero if the interface does |
| # not exist. |
| return None |
| |
| match = re.search('mtu\s+(\d+)', address_info) |
| if not match: |
| raise error.TestFail('MTU information is not available.') |
| return int(match.group(1)) |
| |
| |
| def noise_level(self, frequency_mhz): |
| """Get the noise level for an interface at a given frequency. |
| |
| This is currently only defined for WiFi interfaces. |
| |
| This only works on some devices because 'iw survey dump' (the method |
| used to get the noise) only works on some devices. On other devices, |
| this method returns None. |
| |
| @param frequency_mhz: frequency at which the noise level should be |
| measured and reported. |
| @return noise level in dBm (a negative, integral number) or None. |
| |
| """ |
| if not self.is_wifi_device(): |
| return None |
| |
| # This code has to find the frequency and then find the noise |
| # associated with that frequency because 'iw survey dump' output looks |
| # like this: |
| # |
| # localhost test # iw dev mlan0 survey dump |
| # ... |
| # Survey data from mlan0 |
| # frequency: 5805 MHz |
| # noise: -91 dBm |
| # channel active time: 124 ms |
| # channel busy time: 1 ms |
| # channel receive time: 1 ms |
| # channel transmit time: 0 ms |
| # Survey data from mlan0 |
| # frequency: 5825 MHz |
| # ... |
| |
| result_lines = self._run('iw dev %s survey dump' % |
| self._name).stdout.splitlines() |
| my_frequency_pattern = re.compile('frequency:\s*%d mhz' % |
| frequency_mhz) |
| any_frequency_pattern = re.compile('frequency:\s*\d{4} mhz') |
| inside_desired_frequency_block = False |
| noise_pattern = re.compile('noise:\s*([-0-9]+)\s+dbm') |
| for line in result_lines: |
| cleaned = line.strip().lower() |
| if my_frequency_pattern.match(cleaned): |
| inside_desired_frequency_block = True |
| elif inside_desired_frequency_block: |
| match = noise_pattern.match(cleaned) |
| if match is not None: |
| return int(match.group(1)) |
| if any_frequency_pattern.match(cleaned): |
| inside_desired_frequency_block = False |
| |
| logging.error('Failed to find noise level for %s at %d MHz.', |
| self._name, frequency_mhz) |
| return None |
| |
| |
| def get_interfaces(): |
| """ |
| Retrieve the list of network interfaces found on the system. |
| |
| @return List of interfaces. |
| |
| """ |
| return [Interface(nic.strip()) for nic in os.listdir(DEVICE_INFO_ROOT)] |
| |
| |
| def get_prioritized_default_route(host=None, interface_name_regex=None): |
| """ |
| Query a local or remote host for its prioritized default interface |
| and route. |
| |
| @param interface_name_regex string regex to filter routes by interface. |
| @return DefaultRoute tuple, or None if no default routes are found. |
| |
| """ |
| # Build a list of default routes, filtered by interface if requested. |
| # Example command output: 'default via 172.23.188.254 dev eth0 metric 2' |
| run = host.run if host is not None else utils.run |
| output = run('ip route show').stdout |
| output_regex_str = 'default\s+via\s+(\S+)\s+dev\s+(\S+)\s+metric\s+(\d+)' |
| output_regex = re.compile(output_regex_str) |
| defaults = [] |
| for item in output.splitlines(): |
| if 'default' not in item: |
| continue |
| match = output_regex.match(item.strip()) |
| if match is None: |
| raise error.TestFail('Unexpected route output: %s' % item) |
| gateway = match.group(1) |
| interface_name = match.group(2) |
| metric = int(match.group(3)) |
| if interface_name_regex is not None: |
| if re.match(interface_name_regex, interface_name) is None: |
| continue |
| defaults.append(DefaultRoute(interface_name=interface_name, |
| gateway=gateway, metric=metric)) |
| if not defaults: |
| return None |
| |
| # Sort and return the route with the lowest metric value. |
| defaults.sort(key=lambda x: x.metric) |
| return defaults[0] |
| |