| # |
| # Copyright (C) 2015 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| import logging |
| import os |
| import re |
| import subprocess |
| |
| |
| class FindDeviceError(RuntimeError): |
| pass |
| |
| |
| class DeviceNotFoundError(FindDeviceError): |
| def __init__(self, serial): |
| self.serial = serial |
| super(DeviceNotFoundError, self).__init__( |
| 'No device with serial {}'.format(serial)) |
| |
| |
| class NoUniqueDeviceError(FindDeviceError): |
| def __init__(self): |
| super(NoUniqueDeviceError, self).__init__('No unique device') |
| |
| |
| def get_devices(): |
| with open(os.devnull, 'wb') as devnull: |
| subprocess.check_call(['adb', 'start-server'], stdout=devnull, |
| stderr=devnull) |
| out = subprocess.check_output(['adb', 'devices']).splitlines() |
| |
| # The first line of `adb devices` just says "List of attached devices", so |
| # skip that. |
| devices = [] |
| for line in out[1:]: |
| if not line.strip(): |
| continue |
| if 'offline' in line: |
| continue |
| |
| serial, _ = re.split(r'\s+', line, maxsplit=1) |
| devices.append(serial) |
| return devices |
| |
| |
| def _get_unique_device(product=None): |
| devices = get_devices() |
| if len(devices) != 1: |
| raise NoUniqueDeviceError() |
| return AndroidDevice(devices[0], product) |
| |
| |
| def _get_device_by_serial(serial, product=None): |
| for device in get_devices(): |
| if device == serial: |
| return AndroidDevice(serial, product) |
| raise DeviceNotFoundError(serial) |
| |
| |
| def get_device(serial=None, product=None): |
| """Get a uniquely identified AndroidDevice if one is available. |
| |
| Raises: |
| DeviceNotFoundError: |
| The serial specified by `serial` or $ANDROID_SERIAL is not |
| connected. |
| |
| NoUniqueDeviceError: |
| Neither `serial` nor $ANDROID_SERIAL was set, and the number of |
| devices connected to the system is not 1. Having 0 connected |
| devices will also result in this error. |
| |
| Returns: |
| An AndroidDevice associated with the first non-None identifier in the |
| following order of preference: |
| |
| 1) The `serial` argument. |
| 2) The environment variable $ANDROID_SERIAL. |
| 3) The single device connnected to the system. |
| """ |
| if serial is not None: |
| return _get_device_by_serial(serial, product) |
| |
| android_serial = os.getenv('ANDROID_SERIAL') |
| if android_serial is not None: |
| return _get_device_by_serial(android_serial, product) |
| |
| return _get_unique_device(product) |
| |
| |
| class AndroidDevice(object): |
| def __init__(self, serial, product=None): |
| self.serial = serial |
| self.product = product |
| self.adb_cmd = ['adb'] |
| if self.serial is not None: |
| self.adb_cmd.extend(['-s', serial]) |
| if self.product is not None: |
| self.adb_cmd.extend(['-p', product]) |
| self._linesep = None |
| self._shell_result_pattern = None |
| |
| @property |
| def linesep(self): |
| if self._linesep is None: |
| self._linesep = subprocess.check_output(['adb', 'shell', 'echo']) |
| return self._linesep |
| |
| def _make_shell_cmd(self, user_cmd): |
| # Follow any shell command with `; echo; echo $?` to get the exit |
| # status of a program since this isn't propagated by adb. |
| # |
| # The leading newline is needed because `printf 1; echo $?` would print |
| # "10", and we wouldn't be able to distinguish the exit code. |
| rc_probe = '; echo "\n$?"' |
| return self.adb_cmd + ['shell'] + user_cmd + [rc_probe] |
| |
| def _parse_shell_output(self, out): # pylint: disable=no-self-use |
| search_text = out |
| max_result_len = len('{0}255{0}'.format(self.linesep)) |
| if len(search_text) > max_result_len: |
| # We don't want to regex match over massive amounts of data when we |
| # know the part we want is right at the end. |
| search_text = search_text[-max_result_len:] |
| if self._shell_result_pattern is None: |
| self._shell_result_pattern = re.compile( |
| r'({0}\d+{0})$'.format(self.linesep), re.MULTILINE) |
| m = self._shell_result_pattern.search(search_text) |
| if m is None: |
| raise RuntimeError('Could not find exit status in shell output.') |
| |
| result_text = m.group(1) |
| result = int(result_text.strip()) |
| out = out[:-len(result_text)] # Trim the result text from the output. |
| return result, out |
| |
| def _simple_call(self, cmd): |
| logging.info(' '.join(self.adb_cmd + cmd)) |
| return subprocess.check_output( |
| self.adb_cmd + cmd, stderr=subprocess.STDOUT) |
| |
| def shell(self, cmd): |
| logging.info(' '.join(self.adb_cmd + ['shell'] + cmd)) |
| cmd = self._make_shell_cmd(cmd) |
| out = subprocess.check_output(cmd) |
| rc, out = self._parse_shell_output(out) |
| if rc != 0: |
| error = subprocess.CalledProcessError(rc, cmd) |
| error.out = out |
| raise error |
| return out |
| |
| def shell_nocheck(self, cmd): |
| cmd = self._make_shell_cmd(cmd) |
| logging.info(' '.join(cmd)) |
| p = subprocess.Popen( |
| cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) |
| out, _ = p.communicate() |
| return self._parse_shell_output(out) |
| |
| def install(self, filename): |
| return self._simple_call(['install', filename]) |
| |
| def push(self, local, remote): |
| return self._simple_call(['push', local, remote]) |
| |
| def pull(self, remote, local): |
| return self._simple_call(['pull', remote, local]) |
| |
| def sync(self, directory=None): |
| cmd = ['sync'] |
| if directory is not None: |
| cmd.append(directory) |
| return self._simple_call(cmd) |
| |
| def forward(self, local, remote): |
| return self._simple_call(['forward', local, remote]) |
| |
| def tcpip(self, port): |
| return self._simple_call(['tcpip', port]) |
| |
| def usb(self): |
| return self._simple_call(['usb']) |
| |
| def reboot(self): |
| return self._simple_call(['reboot']) |
| |
| def root(self): |
| return self._simple_call(['root']) |
| |
| def unroot(self): |
| return self._simple_call(['unroot']) |
| |
| def forward_remove(self, local): |
| return self._simple_call(['forward', '--remove', local]) |
| |
| def forward_remove_all(self): |
| return self._simple_call(['forward', '--remove-all']) |
| |
| def connect(self, host): |
| return self._simple_call(['connect', host]) |
| |
| def disconnect(self, host): |
| return self._simple_call(['disconnect', host]) |
| |
| def reverse(self, remote, local): |
| return self._simple_call(['reverse', remote, local]) |
| |
| def reverse_remove_all(self): |
| return self._simple_call(['reverse', '--remove-all']) |
| |
| def reverse_remove(self, remote): |
| return self._simple_call(['reverse', '--remove', remote]) |
| |
| def wait(self): |
| return self._simple_call(['wait-for-device']) |
| |
| def get_prop(self, prop_name): |
| output = self.shell(['getprop', prop_name]).splitlines() |
| if len(output) != 1: |
| raise RuntimeError('Too many lines in getprop output:\n' + |
| '\n'.join(output)) |
| value = output[0] |
| if not value.strip(): |
| return None |
| return value |
| |
| def set_prop(self, prop_name, value): |
| self.shell(['setprop', prop_name, value]) |