blob: 601989b991a76042eca2866418b7ebca3843b910 [file] [log] [blame]
Dan Albert8e1fdd72015-07-24 17:08:33 -07001#
2# Copyright (C) 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16import os
17import re
18import subprocess
19
20
21class FindDeviceError(RuntimeError):
22 pass
23
24
25class DeviceNotFoundError(FindDeviceError):
26 def __init__(self, serial):
27 self.serial = serial
28 super(DeviceNotFoundError, self).__init__(
29 'No device with serial {}'.format(serial))
30
31
32class NoUniqueDeviceError(FindDeviceError):
33 def __init__(self):
34 super(NoUniqueDeviceError, self).__init__('No unique device')
35
36
37def get_devices():
38 with open(os.devnull, 'wb') as devnull:
39 subprocess.check_call(['adb', 'start-server'], stdout=devnull,
40 stderr=devnull)
41 out = subprocess.check_output(['adb', 'devices']).splitlines()
42
43 # The first line of `adb devices` just says "List of attached devices", so
44 # skip that.
45 devices = []
46 for line in out[1:]:
47 if not line.strip():
48 continue
49 if 'offline' in line:
50 continue
51
52 serial, _ = re.split(r'\s+', line, maxsplit=1)
53 devices.append(serial)
54 return devices
55
56
57def _get_unique_device(product=None):
58 devices = get_devices()
59 if len(devices) != 1:
60 raise NoUniqueDeviceError()
61 return AndroidDevice(devices[0], product)
62
63def _get_device_by_serial(serial, product=None):
64 for device in get_devices():
65 if device == serial:
66 return AndroidDevice(serial, product)
67 raise DeviceNotFoundError(serial)
68
69
70def get_device(serial=None, product=None):
71 """Get a uniquely identified AndroidDevice if one is available.
72
73 Raises:
74 DeviceNotFoundError:
75 The serial specified by `serial` or $ANDROID_SERIAL is not
76 connected.
77
78 NoUniqueDeviceError:
79 Neither `serial` nor $ANDROID_SERIAL was set, and the number of
80 devices connected to the system is not 1. Having 0 connected
81 devices will also result in this error.
82
83 Returns:
84 An AndroidDevice associated with the first non-None identifier in the
85 following order of preference:
86
87 1) The `serial` argument.
88 2) The environment variable $ANDROID_SERIAL.
89 3) The single device connnected to the system.
90 """
91 if serial is not None:
92 return _get_device_by_serial(serial, product)
93
94 android_serial = os.getenv('ANDROID_SERIAL')
95 if android_serial is not None:
96 return _get_device_by_serial(android_serial, product)
97
98 return _get_unique_device(product)
99
100
101class AndroidDevice(object):
102 def __init__(self, serial, product=None):
103 self.serial = serial
104 self.product = product
105 self.adb_cmd = ['adb']
106 if self.serial is not None:
107 self.adb_cmd.extend(['-s', serial])
108 if self.product is not None:
109 self.adb_cmd.extend(['-p', product])
110 self._linesep = None
111 self._shell_result_pattern = None
112
113 @property
114 def linesep(self):
115 if self._linesep is None:
116 self._linesep = subprocess.check_output(['adb', 'shell', 'echo'])
117 return self._linesep
118
119 def _make_shell_cmd(self, user_cmd):
120 # Follow any shell command with `; echo; echo $?` to get the exit
121 # status of a program since this isn't propagated by adb.
122 #
123 # The leading newline is needed because `printf 1; echo $?` would print
124 # "10", and we wouldn't be able to distinguish the exit code.
125 rc_probe = '; echo "\n$?"'
126 return self.adb_cmd + ['shell'] + user_cmd + [rc_probe]
127
128 def _parse_shell_output(self, out): # pylint: disable=no-self-use
129 search_text = out
130 max_result_len = len('{0}255{0}'.format(self.linesep))
131 if len(search_text) > max_result_len:
132 # We don't want to regex match over massive amounts of data when we
133 # know the part we want is right at the end.
134 search_text = search_text[-max_result_len:]
135 if self._shell_result_pattern is None:
136 self._shell_result_pattern = re.compile(
137 r'({0}\d+{0})$'.format(self.linesep), re.MULTILINE)
138 m = self._shell_result_pattern.search(search_text)
139 if m is None:
140 raise RuntimeError('Could not find exit status in shell output.')
141
142 result_text = m.group(1)
143 result = int(result_text.strip())
144 out = out[:-len(result_text)] # Trim the result text from the output.
145 return result, out
146
147 def _simple_call(self, cmd):
148 return subprocess.check_output(
149 self.adb_cmd + cmd, stderr=subprocess.STDOUT)
150
151 def shell(self, cmd):
152 cmd = self._make_shell_cmd(cmd)
153 out = subprocess.check_output(cmd)
154 rc, out = self._parse_shell_output(out)
155 if rc != 0:
156 error = subprocess.CalledProcessError(rc, cmd)
157 error.out = out
158 raise error
159 return out
160
161 def shell_nocheck(self, cmd):
162 cmd = self._make_shell_cmd(cmd)
163 p = subprocess.Popen(
164 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
165 out, _ = p.communicate()
166 return self._parse_shell_output(out)
167
168 def install(self, filename):
169 return self._simple_call(['install', filename])
170
171 def push(self, local, remote):
172 return self._simple_call(['push', local, remote])
173
174 def pull(self, remote, local):
175 return self._simple_call(['pull', remote, local])
176
177 def sync(self, directory=None):
178 cmd = ['sync']
179 if directory is not None:
180 cmd.append(directory)
181 return self._simple_call(cmd)
182
183 def forward(self, local, remote):
184 return self._simple_call(['forward', local, remote])
185
186 def tcpip(self, port):
187 return self._simple_call(['tcpip', port])
188
189 def usb(self):
190 return self._simple_call(['usb'])
191
192 def root(self):
193 return self._simple_call(['root'])
194
195 def unroot(self):
196 return self._simple_call(['unroot'])
197
198 def forward_remove(self, local):
199 return self._simple_call(['forward', '--remove', local])
200
201 def forward_remove_all(self):
202 return self._simple_call(['forward', '--remove-all'])
203
204 def connect(self, host):
205 return self._simple_call(['connect', host])
206
207 def disconnect(self, host):
208 return self._simple_call(['disconnect', host])
209
210 def reverse(self, remote, local):
211 return self._simple_call(['reverse', remote, local])
212
213 def reverse_remove_all(self):
214 return self._simple_call(['reverse', '--remove-all'])
215
216 def reverse_remove(self, remote):
217 return self._simple_call(['reverse', '--remove', remote])
218
219 def wait(self):
220 return self._simple_call(['wait-for-device'])
221
222 def get_prop(self, prop_name):
223 output = self.shell(['getprop', prop_name])
224 if len(output) != 1:
225 raise RuntimeError('Too many lines in getprop output:\n' +
226 '\n'.join(output))
227 value = output[0]
228 if not value.strip():
229 return None
230 return value
231
232 def set_prop(self, prop_name, value):
233 self.shell(['setprop', prop_name, value])