| # Copyright 2015 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| import logging |
| import re |
| import time |
| |
| from autotest_lib.client.bin import utils |
| from autotest_lib.client.common_lib import error |
| |
| # en-US key matrix (from "kb membrane pin matrix.pdf") |
| KEYMATRIX = {'`': (3, 1), '1': (6, 1), '2': (6, 4), '3': (6, 2), '4': (6, 3), |
| '5': (3, 3), '6': (3, 6), '7': (6, 6), '8': (6, 5), '9': (6, 9), |
| '0': (6, 8), '-': (3, 8), '=': (0, 8), 'q': (7, 1), 'w': (7, 4), |
| 'e': (7, 2), 'r': (7, 3), 't': (2, 3), 'y': (2, 6), 'u': (7, 6), |
| 'i': (7, 5), 'o': (7, 9), 'p': (7, 8), '[': (2, 8), ']': (2, 5), |
| '\\': (3, 11), 'a': (4, 1), 's': (4, 4), 'd': (4, 2), 'f': (4, 3), |
| 'g': (1, 3), 'h': (1, 6), 'j': (4, 6), 'k': (4, 5), 'l': (4, 9), |
| ';': (4, 8), '\'': (1, 8), 'z': (5, 1), 'x': (5, 4), 'c': (5, 2), |
| 'v': (5, 3), 'b': (0, 3), 'n': (0, 6), 'm': (5, 6), ',': (5, 5), |
| '.': (5, 9), '/': (5, 8), ' ': (5, 11), '<right>': (6, 12), |
| '<alt_r>': (0, 10), '<down>': (6, 11), '<tab>': (2, 1), |
| '<f10>': (0, 4), '<shift_r>': (7, 7), '<ctrl_r>': (4, 0), |
| '<esc>': (1, 1), '<backspace>': (1, 11), '<f2>': (3, 2), |
| '<alt_l>': (6, 10), '<ctrl_l>': (2, 0), '<f1>': (0, 2), |
| '<search>': (0, 1), '<f3>': (2, 2), '<f4>': (1, 2), '<f5>': (3, 4), |
| '<f6>': (2, 4), '<f7>': (1, 4), '<f8>': (2, 9), '<f9>': (1, 9), |
| '<up>': (7, 11), '<shift_l>': (5, 7), '<enter>': (4, 11), |
| '<left>': (7, 12)} |
| |
| |
| def has_ectool(): |
| """Determine if ectool shell command is present. |
| |
| Returns: |
| boolean true if avail, false otherwise. |
| """ |
| cmd = 'which ectool' |
| return (utils.system(cmd, ignore_status=True) == 0) |
| |
| |
| class ECError(Exception): |
| """Base class for a failure when communicating with EC.""" |
| pass |
| |
| |
| class EC_Common(object): |
| """Class for EC common. |
| |
| This incredibly brief base class is intended to encapsulate common elements |
| across various CrOS MCUs (ec proper, USB-PD, Sensor Hub). At the moment |
| that includes only the use of ectool. |
| """ |
| |
| def __init__(self, target='cros_ec'): |
| """Constructor. |
| |
| @param target: target name of ec to communicate with. |
| """ |
| if not has_ectool(): |
| ec_info = utils.system_output("mosys ec info", |
| ignore_status=True) |
| logging.warning("Ectool absent on this platform ( %s )", |
| ec_info) |
| raise error.TestNAError("Platform doesn't support ectool") |
| self._target = target |
| |
| def ec_command(self, cmd, **kwargs): |
| """Executes ec command and returns results. |
| |
| @param cmd: string of command to execute. |
| @param kwargs: optional params passed to utils.system_output |
| |
| @returns: string of results from ec command. |
| """ |
| full_cmd = 'ectool --name=%s %s' % (self._target, cmd) |
| logging.debug('Command: %s', full_cmd) |
| result = utils.system_output(full_cmd, **kwargs) |
| logging.debug('Result: %s', result) |
| return result |
| |
| |
| class EC(EC_Common): |
| """Class for CrOS embedded controller (EC).""" |
| HELLO_RE = "EC says hello" |
| GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)" |
| SET_FANSPEED_RE = "Fan target RPM set." |
| TEMP_SENSOR_TEMP_RE = "Reading temperature...([0-9]*)" |
| # <sensor idx>: <sensor type> <sensor name> |
| TEMP_SENSOR_INFO_RE = "(\d+):\s+(\d+)\s+([a-zA-Z_0-9]+)" |
| TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on" |
| # For battery, check we can see a non-zero capacity value. |
| BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh" |
| LIGHTBAR_RE = "^ 05\s+3f\s+3f$" |
| |
| def __init__(self): |
| """Constructor.""" |
| super(EC, self).__init__() |
| self._temperature_dict = None |
| |
| def hello(self, **kwargs): |
| """Test EC hello command. |
| |
| @param kwargs: optional params passed to utils.system_output |
| |
| @returns True if success False otherwise. |
| """ |
| response = self.ec_command('hello', **kwargs) |
| return (re.search(self.HELLO_RE, response) is not None) |
| |
| def auto_fan_ctrl(self): |
| """Turns auto fan ctrl on. |
| |
| @returns True if success False otherwise. |
| """ |
| response = self.ec_command('autofanctrl') |
| logging.info('Turned on auto fan control.') |
| return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None) |
| |
| def get_fanspeed(self): |
| """Gets fanspeed. |
| |
| @raises error.TestError if regexp fails to match. |
| |
| @returns integer of fan speed RPM. |
| """ |
| response = self.ec_command('pwmgetfanrpm') |
| match = re.search(self.GET_FANSPEED_RE, response) |
| if not match: |
| raise error.TestError('Unable to read fan speed') |
| |
| rpm = int(match.group(1)) |
| logging.info('Fan speed: %d', rpm) |
| return rpm |
| |
| def set_fanspeed(self, rpm): |
| """Sets fan speed. |
| |
| @param rpm: integer of fan speed RPM to set |
| |
| @returns True if success False otherwise. |
| """ |
| response = self.ec_command('pwmsetfanrpm %d' % rpm) |
| logging.info('Set fan speed: %d', rpm) |
| return (re.search(self.SET_FANSPEED_RE, response) is not None) |
| |
| def _get_temperature_dict(self): |
| """Read EC temperature name and idx into a dict. |
| |
| @returns dict where key=<sensor name>, value =<sensor idx> |
| """ |
| # The sensor (name, idx) mapping does not change. |
| if self._temperature_dict: |
| return self._temperature_dict |
| |
| temperature_dict = {} |
| response = self.ec_command('tempsinfo all') |
| for rline in response.split('\n'): |
| match = re.search(self.TEMP_SENSOR_INFO_RE, rline) |
| if match: |
| temperature_dict[match.group(3)] = int(match.group(1)) |
| |
| self._temperature_dict = temperature_dict |
| return temperature_dict |
| |
| def get_temperature(self, idx=None, name=None): |
| """Gets temperature from idx sensor. |
| |
| Reads temperature either directly if idx is provided or by discovering |
| idx using name. |
| |
| @param idx: integer of temp sensor to read. Default=None |
| @param name: string of temp sensor to read. Default=None. |
| For example: Battery, Ambient, Charger, DRAM, eMMC, Gyro |
| |
| @raises ECError if fails to find idx of name. |
| @raises error.TestError if fails to read sensor or fails to identify |
| sensor to read from idx & name param. |
| |
| @returns integer of temperature reading in degrees Kelvin. |
| """ |
| if idx is None: |
| temperature_dict = self._get_temperature_dict() |
| if name in temperature_dict: |
| idx = temperature_dict[name] |
| else: |
| raise ECError('Finding temp idx for name %s' % name) |
| |
| response = self.ec_command('temps %d' % idx) |
| match = re.search(self.TEMP_SENSOR_TEMP_RE, response) |
| if not match: |
| raise error.TestError('Reading temperature idx %d' % idx) |
| |
| return int(match.group(1)) |
| |
| def get_battery(self): |
| """Get battery presence (design capacity found). |
| |
| @returns True if success False otherwise. |
| """ |
| try: |
| response = self.ec_command('battery') |
| except error.CmdError: |
| raise ECError('calling EC battery command') |
| |
| return (re.search(self.BATTERY_RE, response) is not None) |
| |
| def get_lightbar(self): |
| """Test lightbar. |
| |
| @returns True if success False otherwise. |
| """ |
| self.ec_command('lightbar on') |
| self.ec_command('lightbar init') |
| self.ec_command('lightbar 4 255 255 255') |
| response = self.ec_command('lightbar') |
| self.ec_command('lightbar off') |
| return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None) |
| |
| def key_press(self, key): |
| """Emit key down and up signal of the keyboard. |
| |
| @param key: name of a key defined in KEYMATRIX. |
| """ |
| self.key_down(key) |
| self.key_up(key) |
| |
| def _key_action(self, key, action_type): |
| if not key in KEYMATRIX: |
| raise error.TestError('Unknown key: ' + key) |
| row, col = KEYMATRIX[key] |
| self.ec_command('kbpress %d %d %d' % (row, col, action_type)) |
| |
| def key_down(self, key): |
| """Emit key down signal of the keyboard. |
| |
| @param key: name of a key defined in KEYMATRIX. |
| """ |
| self._key_action(key, 1) |
| |
| def key_up(self, key): |
| """Emit key up signal of the keyboard. |
| |
| @param key: name of a key defined in KEYMATRIX. |
| """ |
| self._key_action(key, 0) |
| |
| |
| class EC_USBPD_Port(EC_Common): |
| """Class for CrOS embedded controller for USB-PD Port. |
| |
| Public attributes: |
| index: integer of USB type-C port index. |
| |
| Public Methods: |
| is_dfp: Determine if data role is Downstream Facing Port (DFP). |
| is_amode_supported: Check if alternate mode is supported by port. |
| is_amode_entered: Check if alternate mode is entered. |
| set_amode: Set an alternate mode. |
| |
| Private attributes: |
| _port: integer of USB type-C port id. |
| _port_info: holds usbpd protocol info. |
| _amodes: holds alternate mode info. |
| |
| Private methods: |
| _invalidate_port_data: Remove port data to force re-eval. |
| _get_port_info: Get USB-PD port info. |
| _get_amodes: parse and return port's svid info. |
| """ |
| def __init__(self, index): |
| """Constructor. |
| |
| @param index: integer of USB type-C port index. |
| """ |
| self.index = index |
| # TODO(crosbug.com/p/38133) target= only works for samus |
| super(EC_USBPD_Port, self).__init__(target='cros_pd') |
| |
| # Interrogate port at instantiation. Use invalidate to force re-eval. |
| self._port_info = self._get_port_info() |
| self._amodes = self._get_amodes() |
| |
| def _invalidate_port_data(self): |
| """Remove port data to force re-eval.""" |
| self._port_info = None |
| self._amodes = None |
| |
| def _get_port_info(self): |
| """Get USB-PD port info. |
| |
| ectool command usbpd provides the following information about the port: |
| - Enabled/Disabled |
| - Power & Data Role |
| - Polarity |
| - Protocol State |
| |
| At time of authoring it looks like: |
| Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY |
| |
| @raises error.TestError if ... |
| port info not parseable. |
| |
| @returns dictionary for <port> with keyval pairs: |
| enabled: True | False | None |
| power_role: sink | source | None |
| data_role: UFP | DFP | None |
| is_reversed: True | False | None |
| state: various strings | None |
| """ |
| PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \ |
| 'Polarity:CC(\d+)\s+State:(\w+)' |
| |
| match = re.search(PORT_INFO_RE, |
| self.ec_command("usbpd %s" % (self.index))) |
| if not match or int(match.group(1)) != self.index: |
| error.TestError('Unable to determine port %d info' % self.index) |
| |
| pinfo = dict(enabled=None, power_role=None, data_role=None, |
| is_reversed=None, state=None) |
| pinfo['enabled'] = match.group(2) == 'enabled' |
| pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source' |
| pinfo['data_role'] = match.group(4) |
| pinfo['is_reversed'] = True if match.group(5) == '2' else False |
| pinfo['state'] = match.group(6) |
| logging.debug('port_info = %s', pinfo) |
| return pinfo |
| |
| def _get_amodes(self): |
| """Parse alternate modes from pdgetmode. |
| |
| Looks like ... |
| *SVID:0xff01 *0x00000485 0x00000000 ... |
| SVID:0x18d1 0x00000001 0x00000000 ... |
| |
| @returns dictionary of format: |
| <svid>: {active: True|False, configs: <config_list>, opos:<opos>} |
| where: |
| <svid> : USB-IF Standard or vendor id as |
| hex string (i.e. 0xff01) |
| <config_list> : list of uint32_t configs |
| <opos> : integer of active object position. |
| Note, this is the config list index + 1 |
| """ |
| SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)' |
| svids = dict() |
| cmd = 'pdgetmode %d' % self.index |
| for line in self.ec_command(cmd, ignore_status=True).split('\n'): |
| if line.strip() == '': |
| continue |
| logging.debug('pdgetmode line: %s', line) |
| match = re.search(SVID_RE, line) |
| if not match: |
| logging.warning("Unable to parse SVID line %s", line) |
| continue |
| active = match.group(1) == '*' |
| svid = match.group(2) |
| configs_str = match.group(3) |
| configs = list() |
| opos = None |
| for i,config in enumerate(configs_str.split(), 1): |
| if config.startswith('*'): |
| opos = i |
| config = config[1:] |
| config = int(config, 16) |
| # ignore unpopulated configs |
| if config == 0: |
| continue |
| configs.append(config) |
| svids[svid] = dict(active=active, configs=configs, opos=opos) |
| |
| logging.debug("Port %d svids = %s", self.index, svids) |
| return svids |
| |
| def is_dfp(self): |
| """Determine if data role is Downstream Facing Port (DFP). |
| |
| @returns True if DFP False otherwise. |
| """ |
| if self._port_info is None: |
| self._port_info = self._get_port_info() |
| |
| return self._port_info['data_role'] == 'DFP' |
| |
| def is_amode_supported(self, svid): |
| """Check if alternate mode is supported by port partner. |
| |
| @param svid: alternate mode SVID hexstring (i.e. 0xff01) |
| """ |
| if self._amodes is None: |
| self._amodes = self._get_amodes() |
| |
| if svid in self._amodes.keys(): |
| return True |
| return False |
| |
| def is_amode_entered(self, svid, opos): |
| """Check if alternate mode is entered. |
| |
| @param svid: alternate mode SVID hexstring (i.e. 0xff01). |
| @param opos: object position of config to act on. |
| |
| @returns True if entered False otherwise |
| """ |
| if self._amodes is None: |
| self._amodes = self._get_amodes() |
| |
| if not self.is_amode_supported(svid): |
| return False |
| |
| if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos: |
| return True |
| |
| return False |
| |
| def set_amode(self, svid, opos, enter, delay_secs=2): |
| """Set alternate mode. |
| |
| @param svid: alternate mode SVID hexstring (i.e. 0xff01). |
| @param opos: object position of config to act on. |
| @param enter: Boolean of whether to enter mode. |
| |
| @raises error.TestError if ... |
| mode not supported. |
| opos is > number of configs. |
| |
| @returns True if successful False otherwise |
| """ |
| if self._amodes is None: |
| self._amodes = self._get_amodes() |
| |
| if svid not in self._amodes.keys(): |
| raise error.TestError("SVID %s not supported", svid) |
| |
| if opos > len(self._amodes[svid]['configs']): |
| raise error.TestError("opos > available configs") |
| |
| cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos, |
| 1 if enter else 0) |
| self.ec_command(cmd, ignore_status=True) |
| self._invalidate_port_data() |
| |
| # allow some time for mode entry/exit |
| time.sleep(delay_secs) |
| return self.is_amode_entered(svid, opos) == enter |
| |
| def get_flash_info(self): |
| mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*' |
| mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*' |
| flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major', |
| 'dev_minor', 'rw_hash', 'image_status']) |
| |
| cmd = 'infopddev %d' % self.index |
| |
| tries = 3 |
| while (tries): |
| res = self.ec_command(cmd, ignore_status=True) |
| if not 'has no discovered device' in res: |
| break |
| |
| tries -= 1 |
| time.sleep(1) |
| |
| for ln in res.split('\n'): |
| mat1 = re.match(mat1_re, ln) |
| if mat1: |
| flash_dict['ptype'] = int(mat1.group(1)) |
| flash_dict['vid'] = mat1.group(2) |
| flash_dict['pid'] = mat1.group(3) |
| continue |
| |
| mat2 = re.match(mat2_re, ln) |
| if mat2: |
| flash_dict['dev_major'] = int(mat2.group(1)) |
| flash_dict['dev_minor'] = int(mat2.group(2)) |
| flash_dict['rw_hash'] = mat2.group(3) |
| flash_dict['image_status'] = mat2.group(4) |
| break |
| |
| return flash_dict |
| |
| |
| class EC_USBPD(EC_Common): |
| """Class for CrOS embedded controller for USB-PD. |
| |
| Public attributes: |
| ports: list EC_USBPD_Port instances |
| |
| Public Methods: |
| get_num_ports: get number of USB-PD ports device has. |
| |
| Private attributes: |
| _num_ports: integer number of USB-PD ports device has. |
| """ |
| def __init__(self, num_ports=None): |
| """Constructor. |
| |
| @param num_ports: total number of USB-PD ports on device. This is an |
| override. If left 'None' will try to determine. |
| """ |
| self._num_ports = num_ports |
| self.ports = list() |
| |
| # TODO(crosbug.com/p/38133) target= only works for samus |
| super(EC_USBPD, self).__init__(target='cros_pd') |
| |
| if (self.get_num_ports() == 0): |
| raise error.TestNAError("Device has no USB-PD ports") |
| |
| for i in xrange(self._num_ports): |
| self.ports.append(EC_USBPD_Port(i)) |
| |
| def get_num_ports(self): |
| """Determine the number of ports for device. |
| |
| Uses ectool's usbpdpower command which in turn makes host command call |
| to EC_CMD_USB_PD_PORTS to determine the number of ports. |
| |
| TODO(tbroch) May want to consider adding separate ectool command to |
| surface the number of ports directly instead of via usbpdpower |
| |
| @returns number of ports. |
| """ |
| if (self._num_ports is not None): |
| return self._num_ports |
| |
| self._num_ports = len(self.ec_command("usbpdpower").split(b'\n')) |
| return self._num_ports |