blob: 291e32929332a99a73a2a96b4a64dc4dcd7b4aec [file] [log] [blame]
Josh Gao191c1542015-12-09 11:26:11 -08001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
Josh Gaofe50bb72016-06-22 18:27:22 -070034import time
Josh Gao191c1542015-12-09 11:26:11 -080035import unittest
36
37import mock
38
39import adb
40
41
42def requires_root(func):
43 def wrapper(self, *args):
44 if self.device.get_prop('ro.debuggable') != '1':
45 raise unittest.SkipTest('requires rootable build')
46
47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48 if not was_root:
49 self.device.root()
50 self.device.wait()
51
52 try:
53 func(self, *args)
54 finally:
55 if not was_root:
56 self.device.unroot()
57 self.device.wait()
58
59 return wrapper
60
61
62def requires_non_root(func):
63 def wrapper(self, *args):
64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65 if was_root:
66 self.device.unroot()
67 self.device.wait()
68
69 try:
70 func(self, *args)
71 finally:
72 if was_root:
73 self.device.root()
74 self.device.wait()
75
76 return wrapper
77
78
79class GetDeviceTest(unittest.TestCase):
80 def setUp(self):
81 self.android_serial = os.getenv('ANDROID_SERIAL')
82 if 'ANDROID_SERIAL' in os.environ:
83 del os.environ['ANDROID_SERIAL']
84
85 def tearDown(self):
86 if self.android_serial is not None:
87 os.environ['ANDROID_SERIAL'] = self.android_serial
88 else:
89 if 'ANDROID_SERIAL' in os.environ:
90 del os.environ['ANDROID_SERIAL']
91
92 @mock.patch('adb.device.get_devices')
93 def test_explicit(self, mock_get_devices):
94 mock_get_devices.return_value = ['foo', 'bar']
95 device = adb.get_device('foo')
96 self.assertEqual(device.serial, 'foo')
97
98 @mock.patch('adb.device.get_devices')
99 def test_from_env(self, mock_get_devices):
100 mock_get_devices.return_value = ['foo', 'bar']
101 os.environ['ANDROID_SERIAL'] = 'foo'
102 device = adb.get_device()
103 self.assertEqual(device.serial, 'foo')
104
105 @mock.patch('adb.device.get_devices')
106 def test_arg_beats_env(self, mock_get_devices):
107 mock_get_devices.return_value = ['foo', 'bar']
108 os.environ['ANDROID_SERIAL'] = 'bar'
109 device = adb.get_device('foo')
110 self.assertEqual(device.serial, 'foo')
111
112 @mock.patch('adb.device.get_devices')
113 def test_no_such_device(self, mock_get_devices):
114 mock_get_devices.return_value = ['foo', 'bar']
115 self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
116
117 os.environ['ANDROID_SERIAL'] = 'baz'
118 self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
119
120 @mock.patch('adb.device.get_devices')
121 def test_unique_device(self, mock_get_devices):
122 mock_get_devices.return_value = ['foo']
123 device = adb.get_device()
124 self.assertEqual(device.serial, 'foo')
125
126 @mock.patch('adb.device.get_devices')
127 def test_no_unique_device(self, mock_get_devices):
128 mock_get_devices.return_value = ['foo', 'bar']
129 self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
130
131
132class DeviceTest(unittest.TestCase):
133 def setUp(self):
134 self.device = adb.get_device()
135
136
137class ForwardReverseTest(DeviceTest):
138 def _test_no_rebind(self, description, direction_list, direction,
139 direction_no_rebind, direction_remove_all):
140 msg = direction_list()
141 self.assertEqual('', msg.strip(),
142 description + ' list must be empty to run this test.')
143
144 # Use --no-rebind with no existing binding
145 direction_no_rebind('tcp:5566', 'tcp:6655')
146 msg = direction_list()
147 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
148
149 # Use --no-rebind with existing binding
150 with self.assertRaises(subprocess.CalledProcessError):
151 direction_no_rebind('tcp:5566', 'tcp:6677')
152 msg = direction_list()
153 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
154 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
155
156 # Use the absence of --no-rebind with existing binding
157 direction('tcp:5566', 'tcp:6677')
158 msg = direction_list()
159 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
160 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
161
162 direction_remove_all()
163 msg = direction_list()
164 self.assertEqual('', msg.strip())
165
166 def test_forward_no_rebind(self):
167 self._test_no_rebind('forward', self.device.forward_list,
168 self.device.forward, self.device.forward_no_rebind,
169 self.device.forward_remove_all)
170
171 def test_reverse_no_rebind(self):
172 self._test_no_rebind('reverse', self.device.reverse_list,
173 self.device.reverse, self.device.reverse_no_rebind,
174 self.device.reverse_remove_all)
175
176 def test_forward(self):
177 msg = self.device.forward_list()
178 self.assertEqual('', msg.strip(),
179 'Forwarding list must be empty to run this test.')
180 self.device.forward('tcp:5566', 'tcp:6655')
181 msg = self.device.forward_list()
182 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
183 self.device.forward('tcp:7788', 'tcp:8877')
184 msg = self.device.forward_list()
185 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
186 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
187 self.device.forward_remove('tcp:5566')
188 msg = self.device.forward_list()
189 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
190 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
191 self.device.forward_remove_all()
192 msg = self.device.forward_list()
193 self.assertEqual('', msg.strip())
194
David Purselleaae97e2016-04-07 11:25:48 -0700195 def test_forward_tcp_port_0(self):
196 self.assertEqual('', self.device.forward_list().strip(),
197 'Forwarding list must be empty to run this test.')
198
199 try:
200 # If resolving TCP port 0 is supported, `adb forward` will print
201 # the actual port number.
202 port = self.device.forward('tcp:0', 'tcp:8888').strip()
203 if not port:
204 raise unittest.SkipTest('Forwarding tcp:0 is not available.')
205
206 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
207 self.device.forward_list()))
208 finally:
209 self.device.forward_remove_all()
210
Josh Gao191c1542015-12-09 11:26:11 -0800211 def test_reverse(self):
212 msg = self.device.reverse_list()
213 self.assertEqual('', msg.strip(),
214 'Reverse forwarding list must be empty to run this test.')
215 self.device.reverse('tcp:5566', 'tcp:6655')
216 msg = self.device.reverse_list()
217 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
218 self.device.reverse('tcp:7788', 'tcp:8877')
219 msg = self.device.reverse_list()
220 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
221 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
222 self.device.reverse_remove('tcp:5566')
223 msg = self.device.reverse_list()
224 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
225 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
226 self.device.reverse_remove_all()
227 msg = self.device.reverse_list()
228 self.assertEqual('', msg.strip())
229
David Purselleaae97e2016-04-07 11:25:48 -0700230 def test_reverse_tcp_port_0(self):
231 self.assertEqual('', self.device.reverse_list().strip(),
232 'Reverse list must be empty to run this test.')
233
234 try:
235 # If resolving TCP port 0 is supported, `adb reverse` will print
236 # the actual port number.
237 port = self.device.reverse('tcp:0', 'tcp:8888').strip()
238 if not port:
239 raise unittest.SkipTest('Reversing tcp:0 is not available.')
240
241 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
242 self.device.reverse_list()))
243 finally:
244 self.device.reverse_remove_all()
245
Josh Gao191c1542015-12-09 11:26:11 -0800246 # Note: If you run this test when adb connect'd to a physical device over
247 # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821
248 def test_forward_reverse_echo(self):
249 """Send data through adb forward and read it back via adb reverse"""
250 forward_port = 12345
251 reverse_port = forward_port + 1
Josh Gao255c5c82016-03-03 14:49:02 -0800252 forward_spec = 'tcp:' + str(forward_port)
253 reverse_spec = 'tcp:' + str(reverse_port)
Josh Gao191c1542015-12-09 11:26:11 -0800254 forward_setup = False
255 reverse_setup = False
256
257 try:
258 # listen on localhost:forward_port, connect to remote:forward_port
259 self.device.forward(forward_spec, forward_spec)
260 forward_setup = True
261 # listen on remote:forward_port, connect to localhost:reverse_port
262 self.device.reverse(forward_spec, reverse_spec)
263 reverse_setup = True
264
265 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
266 with contextlib.closing(listener):
267 # Use SO_REUSEADDR so that subsequent runs of the test can grab
268 # the port even if it is in TIME_WAIT.
269 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
270
271 # Listen on localhost:reverse_port before connecting to
272 # localhost:forward_port because that will cause adb to connect
273 # back to localhost:reverse_port.
274 listener.bind(('127.0.0.1', reverse_port))
275 listener.listen(4)
276
277 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
278 with contextlib.closing(client):
279 # Connect to the listener.
280 client.connect(('127.0.0.1', forward_port))
281
282 # Accept the client connection.
283 accepted_connection, addr = listener.accept()
284 with contextlib.closing(accepted_connection) as server:
285 data = 'hello'
286
287 # Send data into the port setup by adb forward.
288 client.sendall(data)
289 # Explicitly close() so that server gets EOF.
290 client.close()
291
292 # Verify that the data came back via adb reverse.
293 self.assertEqual(data, server.makefile().read())
294 finally:
295 if reverse_setup:
296 self.device.reverse_remove(forward_spec)
297 if forward_setup:
298 self.device.forward_remove(forward_spec)
299
300
301class ShellTest(DeviceTest):
302 def _interactive_shell(self, shell_args, input):
303 """Runs an interactive adb shell.
304
305 Args:
306 shell_args: List of string arguments to `adb shell`.
307 input: String input to send to the interactive shell.
308
309 Returns:
310 The remote exit code.
311
312 Raises:
313 unittest.SkipTest: The device doesn't support exit codes.
314 """
David Pursellcf467412016-04-26 13:25:57 -0700315 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800316 raise unittest.SkipTest('exit codes are unavailable on this device')
317
318 proc = subprocess.Popen(
319 self.device.adb_cmd + ['shell'] + shell_args,
320 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
321 stderr=subprocess.PIPE)
322 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
323 # to explicitly add an exit command to close the session from the device
324 # side, plus the necessary newline to complete the interactive command.
325 proc.communicate(input + '; exit\n')
326 return proc.returncode
327
328 def test_cat(self):
329 """Check that we can at least cat a file."""
330 out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
331 elements = out.split()
332 self.assertEqual(len(elements), 2)
333
334 uptime, idle = elements
335 self.assertGreater(float(uptime), 0.0)
336 self.assertGreater(float(idle), 0.0)
337
338 def test_throws_on_failure(self):
339 self.assertRaises(adb.ShellError, self.device.shell, ['false'])
340
341 def test_output_not_stripped(self):
342 out = self.device.shell(['echo', 'foo'])[0]
343 self.assertEqual(out, 'foo' + self.device.linesep)
344
345 def test_shell_nocheck_failure(self):
346 rc, out, _ = self.device.shell_nocheck(['false'])
347 self.assertNotEqual(rc, 0)
348 self.assertEqual(out, '')
349
350 def test_shell_nocheck_output_not_stripped(self):
351 rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
352 self.assertEqual(rc, 0)
353 self.assertEqual(out, 'foo' + self.device.linesep)
354
355 def test_can_distinguish_tricky_results(self):
356 # If result checking on ADB shell is naively implemented as
357 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
358 # output from the result for a cmd of `echo -n 1`.
359 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
360 self.assertEqual(rc, 0)
361 self.assertEqual(out, '1')
362
363 def test_line_endings(self):
364 """Ensure that line ending translation is not happening in the pty.
365
366 Bug: http://b/19735063
367 """
368 output = self.device.shell(['uname'])[0]
369 self.assertEqual(output, 'Linux' + self.device.linesep)
370
371 def test_pty_logic(self):
372 """Tests that a PTY is allocated when it should be.
373
374 PTY allocation behavior should match ssh; some behavior requires
375 a terminal stdin to test so this test will be skipped if stdin
376 is not a terminal.
377 """
David Pursellcf467412016-04-26 13:25:57 -0700378 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800379 raise unittest.SkipTest('PTY arguments unsupported on this device')
380 if not os.isatty(sys.stdin.fileno()):
381 raise unittest.SkipTest('PTY tests require stdin terminal')
382
383 def check_pty(args):
384 """Checks adb shell PTY allocation.
385
386 Tests |args| for terminal and non-terminal stdin.
387
388 Args:
389 args: -Tt args in a list (e.g. ['-t', '-t']).
390
391 Returns:
392 A tuple (<terminal>, <non-terminal>). True indicates
393 the corresponding shell allocated a remote PTY.
394 """
395 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
396
397 terminal = subprocess.Popen(
398 test_cmd, stdin=None,
399 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
400 terminal.communicate()
401
402 non_terminal = subprocess.Popen(
403 test_cmd, stdin=subprocess.PIPE,
404 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
405 non_terminal.communicate()
406
407 return (terminal.returncode == 0, non_terminal.returncode == 0)
408
409 # -T: never allocate PTY.
410 self.assertEqual((False, False), check_pty(['-T']))
411
412 # No args: PTY only if stdin is a terminal and shell is interactive,
413 # which is difficult to reliably test from a script.
414 self.assertEqual((False, False), check_pty([]))
415
416 # -t: PTY if stdin is a terminal.
417 self.assertEqual((True, False), check_pty(['-t']))
418
419 # -t -t: always allocate PTY.
420 self.assertEqual((True, True), check_pty(['-t', '-t']))
421
422 def test_shell_protocol(self):
423 """Tests the shell protocol on the device.
424
425 If the device supports shell protocol, this gives us the ability
426 to separate stdout/stderr and return the exit code directly.
427
428 Bug: http://b/19734861
429 """
David Pursellcf467412016-04-26 13:25:57 -0700430 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800431 raise unittest.SkipTest('shell protocol unsupported on this device')
432
433 # Shell protocol should be used by default.
434 result = self.device.shell_nocheck(
435 shlex.split('echo foo; echo bar >&2; exit 17'))
436 self.assertEqual(17, result[0])
437 self.assertEqual('foo' + self.device.linesep, result[1])
438 self.assertEqual('bar' + self.device.linesep, result[2])
439
440 self.assertEqual(17, self._interactive_shell([], 'exit 17'))
441
442 # -x flag should disable shell protocol.
443 result = self.device.shell_nocheck(
444 shlex.split('-x echo foo; echo bar >&2; exit 17'))
445 self.assertEqual(0, result[0])
446 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
447 self.assertEqual('', result[2])
448
449 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
450
451 def test_non_interactive_sigint(self):
452 """Tests that SIGINT in a non-interactive shell kills the process.
453
454 This requires the shell protocol in order to detect the broken
455 pipe; raw data transfer mode will only see the break once the
456 subprocess tries to read or write.
457
458 Bug: http://b/23825725
459 """
David Pursellcf467412016-04-26 13:25:57 -0700460 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800461 raise unittest.SkipTest('shell protocol unsupported on this device')
462
463 # Start a long-running process.
464 sleep_proc = subprocess.Popen(
465 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
466 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
467 stderr=subprocess.STDOUT)
468 remote_pid = sleep_proc.stdout.readline().strip()
469 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
470 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
471
472 # Verify that the process is running, send signal, verify it stopped.
473 self.device.shell(proc_query)
474 os.kill(sleep_proc.pid, signal.SIGINT)
475 sleep_proc.communicate()
Josh Gaoe76b9f32016-10-21 12:40:42 -0700476
477 # It can take some time for the process to receive the signal and die.
478 end_time = time.time() + 3
479 while self.device.shell_nocheck(proc_query)[0] != 1:
480 self.assertFalse(time.time() > end_time,
481 'subprocess failed to terminate in time')
Josh Gao191c1542015-12-09 11:26:11 -0800482
483 def test_non_interactive_stdin(self):
484 """Tests that non-interactive shells send stdin."""
David Pursellcf467412016-04-26 13:25:57 -0700485 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800486 raise unittest.SkipTest('non-interactive stdin unsupported '
487 'on this device')
488
489 # Test both small and large inputs.
490 small_input = 'foo'
491 large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
492 string.digits))
493
494 for input in (small_input, large_input):
495 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
496 stdin=subprocess.PIPE,
497 stdout=subprocess.PIPE,
498 stderr=subprocess.PIPE)
499 stdout, stderr = proc.communicate(input)
500 self.assertEqual(input.splitlines(), stdout.splitlines())
501 self.assertEqual('', stderr)
502
Josh Gaofe50bb72016-06-22 18:27:22 -0700503 def test_sighup(self):
504 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
505 log_path = "/data/local/tmp/adb_signal_test.log"
506
507 # Clear the output file.
508 self.device.shell_nocheck(["echo", ">", log_path])
509
510 script = """
511 trap "echo SIGINT > {path}; exit 0" SIGINT
512 trap "echo SIGHUP > {path}; exit 0" SIGHUP
513 echo Waiting
Josh Gao470622f2016-10-21 13:17:32 -0700514 read
Josh Gaofe50bb72016-06-22 18:27:22 -0700515 """.format(path=log_path)
516
517 script = ";".join([x.strip() for x in script.strip().splitlines()])
518
Josh Gao470622f2016-10-21 13:17:32 -0700519 process = self.device.shell_popen([script], kill_atexit=False,
520 stdin=subprocess.PIPE,
521 stdout=subprocess.PIPE)
Josh Gaofe50bb72016-06-22 18:27:22 -0700522
523 self.assertEqual("Waiting\n", process.stdout.readline())
524 process.send_signal(signal.SIGINT)
525 process.wait()
526
527 # Waiting for the local adb to finish is insufficient, since it hangs
528 # up immediately.
Josh Gao470622f2016-10-21 13:17:32 -0700529 time.sleep(1)
Josh Gaofe50bb72016-06-22 18:27:22 -0700530
531 stdout, _ = self.device.shell(["cat", log_path])
532 self.assertEqual(stdout.strip(), "SIGHUP")
533
Josh Gao191c1542015-12-09 11:26:11 -0800534
535class ArgumentEscapingTest(DeviceTest):
536 def test_shell_escaping(self):
537 """Make sure that argument escaping is somewhat sane."""
538
539 # http://b/19734868
540 # Note that this actually matches ssh(1)'s behavior --- it's
541 # converted to `sh -c echo hello; echo world` which sh interprets
542 # as `sh -c echo` (with an argument to that shell of "hello"),
543 # and then `echo world` back in the first shell.
544 result = self.device.shell(
545 shlex.split("sh -c 'echo hello; echo world'"))[0]
546 result = result.splitlines()
547 self.assertEqual(['', 'world'], result)
548 # If you really wanted "hello" and "world", here's what you'd do:
549 result = self.device.shell(
550 shlex.split(r'echo hello\;echo world'))[0].splitlines()
551 self.assertEqual(['hello', 'world'], result)
552
553 # http://b/15479704
554 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
555 self.assertEqual('t', result)
556 result = self.device.shell(
557 shlex.split("sh -c 'true && echo t'"))[0].strip()
558 self.assertEqual('t', result)
559
560 # http://b/20564385
561 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
562 self.assertEqual('t', result)
563 result = self.device.shell(
564 shlex.split(r'echo -n 123\;uname'))[0].strip()
565 self.assertEqual('123Linux', result)
566
567 def test_install_argument_escaping(self):
568 """Make sure that install argument escaping works."""
569 # http://b/20323053, http://b/3090932.
570 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
571 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
572 delete=False)
573 tf.close()
574
575 # Installing bogus .apks fails if the device supports exit codes.
576 try:
577 output = self.device.install(tf.name)
578 except subprocess.CalledProcessError as e:
579 output = e.output
580
581 self.assertIn(file_suffix, output)
582 os.remove(tf.name)
583
584
585class RootUnrootTest(DeviceTest):
586 def _test_root(self):
587 message = self.device.root()
588 if 'adbd cannot run as root in production builds' in message:
589 return
590 self.device.wait()
591 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
592
593 def _test_unroot(self):
594 self.device.unroot()
595 self.device.wait()
596 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
597
598 def test_root_unroot(self):
599 """Make sure that adb root and adb unroot work, using id(1)."""
600 if self.device.get_prop('ro.debuggable') != '1':
601 raise unittest.SkipTest('requires rootable build')
602
603 original_user = self.device.shell(['id', '-un'])[0].strip()
604 try:
605 if original_user == 'root':
606 self._test_unroot()
607 self._test_root()
608 elif original_user == 'shell':
609 self._test_root()
610 self._test_unroot()
611 finally:
612 if original_user == 'root':
613 self.device.root()
614 else:
615 self.device.unroot()
616 self.device.wait()
617
618
619class TcpIpTest(DeviceTest):
620 def test_tcpip_failure_raises(self):
621 """adb tcpip requires a port.
622
623 Bug: http://b/22636927
624 """
625 self.assertRaises(
626 subprocess.CalledProcessError, self.device.tcpip, '')
627 self.assertRaises(
628 subprocess.CalledProcessError, self.device.tcpip, 'foo')
629
630
631class SystemPropertiesTest(DeviceTest):
632 def test_get_prop(self):
633 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
634
635 @requires_root
636 def test_set_prop(self):
637 prop_name = 'foo.bar'
638 self.device.shell(['setprop', prop_name, '""'])
639
640 self.device.set_prop(prop_name, 'qux')
641 self.assertEqual(
642 self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
643
644
645def compute_md5(string):
646 hsh = hashlib.md5()
647 hsh.update(string)
648 return hsh.hexdigest()
649
650
651def get_md5_prog(device):
652 """Older platforms (pre-L) had the name md5 rather than md5sum."""
653 try:
654 device.shell(['md5sum', '/proc/uptime'])
655 return 'md5sum'
656 except adb.ShellError:
657 return 'md5'
658
659
660class HostFile(object):
661 def __init__(self, handle, checksum):
662 self.handle = handle
663 self.checksum = checksum
664 self.full_path = handle.name
665 self.base_name = os.path.basename(self.full_path)
666
667
668class DeviceFile(object):
669 def __init__(self, checksum, full_path):
670 self.checksum = checksum
671 self.full_path = full_path
672 self.base_name = posixpath.basename(self.full_path)
673
674
675def make_random_host_files(in_dir, num_files):
676 min_size = 1 * (1 << 10)
677 max_size = 16 * (1 << 10)
678
679 files = []
680 for _ in xrange(num_files):
681 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
682
683 size = random.randrange(min_size, max_size, 1024)
684 rand_str = os.urandom(size)
685 file_handle.write(rand_str)
686 file_handle.flush()
687 file_handle.close()
688
689 md5 = compute_md5(rand_str)
690 files.append(HostFile(file_handle, md5))
691 return files
692
693
694def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
695 min_size = 1 * (1 << 10)
696 max_size = 16 * (1 << 10)
697
698 files = []
699 for file_num in xrange(num_files):
700 size = random.randrange(min_size, max_size, 1024)
701
702 base_name = prefix + str(file_num)
703 full_path = posixpath.join(in_dir, base_name)
704
705 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
706 'bs={}'.format(size), 'count=1'])
707 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
708
709 files.append(DeviceFile(dev_md5, full_path))
710 return files
711
712
713class FileOperationsTest(DeviceTest):
714 SCRATCH_DIR = '/data/local/tmp'
715 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
716 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
717
718 def _verify_remote(self, checksum, remote_path):
719 dev_md5, _ = self.device.shell([get_md5_prog(self.device),
720 remote_path])[0].split()
721 self.assertEqual(checksum, dev_md5)
722
723 def _verify_local(self, checksum, local_path):
724 with open(local_path, 'rb') as host_file:
725 host_md5 = compute_md5(host_file.read())
726 self.assertEqual(host_md5, checksum)
727
728 def test_push(self):
729 """Push a randomly generated file to specified device."""
730 kbytes = 512
731 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
732 rand_str = os.urandom(1024 * kbytes)
733 tmp.write(rand_str)
734 tmp.close()
735
736 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
737 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
738
739 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
740 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
741
742 os.remove(tmp.name)
743
744 def test_push_dir(self):
745 """Push a randomly generated directory of files to the device."""
746 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
747 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
748
749 try:
750 host_dir = tempfile.mkdtemp()
751
752 # Make sure the temp directory isn't setuid, or else adb will complain.
753 os.chmod(host_dir, 0o700)
754
755 # Create 32 random files.
756 temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
757 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
758
759 for temp_file in temp_files:
760 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
761 os.path.basename(host_dir),
762 temp_file.base_name)
763 self._verify_remote(temp_file.checksum, remote_path)
764 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
765 finally:
766 if host_dir is not None:
767 shutil.rmtree(host_dir)
768
769 @unittest.expectedFailure # b/25566053
770 def test_push_empty(self):
771 """Push a directory containing an empty directory to the device."""
772 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
773 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
774
775 try:
776 host_dir = tempfile.mkdtemp()
777
778 # Make sure the temp directory isn't setuid, or else adb will complain.
779 os.chmod(host_dir, 0o700)
780
781 # Create an empty directory.
782 os.mkdir(os.path.join(host_dir, 'empty'))
783
784 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
785
786 test_empty_cmd = ['[', '-d',
787 os.path.join(self.DEVICE_TEMP_DIR, 'empty')]
788 rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
789 self.assertEqual(rc, 0)
790 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
791 finally:
792 if host_dir is not None:
793 shutil.rmtree(host_dir)
794
Josh Gao94dc19f2016-09-14 16:13:50 -0700795 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
796 def test_push_symlink(self):
797 """Push a symlink.
798
799 Bug: http://b/31491920
800 """
801 try:
802 host_dir = tempfile.mkdtemp()
803
804 # Make sure the temp directory isn't setuid, or else adb will
805 # complain.
806 os.chmod(host_dir, 0o700)
807
808 with open(os.path.join(host_dir, 'foo'), 'w') as f:
809 f.write('foo')
810
811 symlink_path = os.path.join(host_dir, 'symlink')
812 os.symlink('foo', symlink_path)
813
814 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
815 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
816 self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
817 rc, out, _ = self.device.shell_nocheck(
818 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
819 self.assertEqual(0, rc)
820 self.assertEqual(out.strip(), 'foo')
821 finally:
822 if host_dir is not None:
823 shutil.rmtree(host_dir)
824
Josh Gao191c1542015-12-09 11:26:11 -0800825 def test_multiple_push(self):
826 """Push multiple files to the device in one adb push command.
827
828 Bug: http://b/25324823
829 """
830
831 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
832 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
833
834 try:
835 host_dir = tempfile.mkdtemp()
836
837 # Create some random files and a subdirectory containing more files.
838 temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
839
Josh Gao255c5c82016-03-03 14:49:02 -0800840 subdir = os.path.join(host_dir, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -0800841 os.mkdir(subdir)
842 subdir_temp_files = make_random_host_files(in_dir=subdir,
843 num_files=4)
844
845 paths = map(lambda temp_file: temp_file.full_path, temp_files)
846 paths.append(subdir)
847 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
848
849 for temp_file in temp_files:
850 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
851 temp_file.base_name)
852 self._verify_remote(temp_file.checksum, remote_path)
853
854 for subdir_temp_file in subdir_temp_files:
855 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
856 # BROKEN: http://b/25394682
Josh Gao255c5c82016-03-03 14:49:02 -0800857 # 'subdir';
Josh Gao191c1542015-12-09 11:26:11 -0800858 temp_file.base_name)
859 self._verify_remote(temp_file.checksum, remote_path)
860
861
862 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
863 finally:
864 if host_dir is not None:
865 shutil.rmtree(host_dir)
866
Josh Gaoafcdcd72016-02-19 15:55:55 -0800867 @requires_non_root
868 def test_push_error_reporting(self):
869 """Make sure that errors that occur while pushing a file get reported
870
871 Bug: http://b/26816782
872 """
873 with tempfile.NamedTemporaryFile() as tmp_file:
874 tmp_file.write('\0' * 1024 * 1024)
875 tmp_file.flush()
876 try:
877 self.device.push(local=tmp_file.name, remote='/system/')
Josh Gao255c5c82016-03-03 14:49:02 -0800878 self.fail('push should not have succeeded')
Josh Gaoafcdcd72016-02-19 15:55:55 -0800879 except subprocess.CalledProcessError as e:
880 output = e.output
881
Josh Gao255c5c82016-03-03 14:49:02 -0800882 self.assertIn('Permission denied', output)
Josh Gao191c1542015-12-09 11:26:11 -0800883
884 def _test_pull(self, remote_file, checksum):
885 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
886 tmp_write.close()
887 self.device.pull(remote=remote_file, local=tmp_write.name)
888 with open(tmp_write.name, 'rb') as tmp_read:
889 host_contents = tmp_read.read()
890 host_md5 = compute_md5(host_contents)
891 self.assertEqual(checksum, host_md5)
892 os.remove(tmp_write.name)
893
894 @requires_non_root
895 def test_pull_error_reporting(self):
896 self.device.shell(['touch', self.DEVICE_TEMP_FILE])
897 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
898
899 try:
900 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
901 except subprocess.CalledProcessError as e:
902 output = e.output
903
904 self.assertIn('Permission denied', output)
905
906 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
907
908 def test_pull(self):
909 """Pull a randomly generated file from specified device."""
910 kbytes = 512
911 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
912 cmd = ['dd', 'if=/dev/urandom',
913 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
914 'count={}'.format(kbytes)]
915 self.device.shell(cmd)
916 dev_md5, _ = self.device.shell(
917 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
918 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
919 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
920
921 def test_pull_dir(self):
922 """Pull a randomly generated directory of files from the device."""
923 try:
924 host_dir = tempfile.mkdtemp()
925
926 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
927 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
928
929 # Populate device directory with random files.
930 temp_files = make_random_device_files(
931 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
932
933 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
934
935 for temp_file in temp_files:
Josh Gaoce8f2cd2015-12-09 14:20:23 -0800936 host_path = os.path.join(
937 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
938 temp_file.base_name)
939 self._verify_local(temp_file.checksum, host_path)
Josh Gao191c1542015-12-09 11:26:11 -0800940
941 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
942 finally:
943 if host_dir is not None:
944 shutil.rmtree(host_dir)
945
Josh Gao1e611a32016-02-26 13:26:55 -0800946 def test_pull_dir_symlink(self):
947 """Pull a directory into a symlink to a directory.
948
949 Bug: http://b/27362811
950 """
Josh Gao255c5c82016-03-03 14:49:02 -0800951 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -0800952 raise unittest.SkipTest('requires POSIX')
953
954 try:
955 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -0800956 real_dir = os.path.join(host_dir, 'dir')
957 symlink = os.path.join(host_dir, 'symlink')
Josh Gao1e611a32016-02-26 13:26:55 -0800958 os.mkdir(real_dir)
959 os.symlink(real_dir, symlink)
960
961 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
962 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
963
964 # Populate device directory with random files.
965 temp_files = make_random_device_files(
966 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
967
968 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
969
970 for temp_file in temp_files:
971 host_path = os.path.join(
972 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
973 temp_file.base_name)
974 self._verify_local(temp_file.checksum, host_path)
975
976 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
977 finally:
978 if host_dir is not None:
979 shutil.rmtree(host_dir)
980
981 def test_pull_dir_symlink_collision(self):
982 """Pull a directory into a colliding symlink to directory."""
Josh Gao255c5c82016-03-03 14:49:02 -0800983 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -0800984 raise unittest.SkipTest('requires POSIX')
985
986 try:
987 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -0800988 real_dir = os.path.join(host_dir, 'real')
Josh Gao1e611a32016-02-26 13:26:55 -0800989 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
990 symlink = os.path.join(host_dir, tmp_dirname)
991 os.mkdir(real_dir)
992 os.symlink(real_dir, symlink)
993
994 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
995 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
996
997 # Populate device directory with random files.
998 temp_files = make_random_device_files(
999 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1000
1001 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1002
1003 for temp_file in temp_files:
1004 host_path = os.path.join(real_dir, temp_file.base_name)
1005 self._verify_local(temp_file.checksum, host_path)
1006
1007 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1008 finally:
1009 if host_dir is not None:
1010 shutil.rmtree(host_dir)
1011
Josh Gao89ec3a82016-03-02 16:00:02 -08001012 def test_pull_dir_nonexistent(self):
1013 """Pull a directory of files from the device to a nonexistent path."""
1014 try:
1015 host_dir = tempfile.mkdtemp()
1016 dest_dir = os.path.join(host_dir, 'dest')
1017
1018 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1019 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1020
1021 # Populate device directory with random files.
1022 temp_files = make_random_device_files(
1023 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1024
1025 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1026
1027 for temp_file in temp_files:
1028 host_path = os.path.join(dest_dir, temp_file.base_name)
1029 self._verify_local(temp_file.checksum, host_path)
1030
1031 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1032 finally:
1033 if host_dir is not None:
1034 shutil.rmtree(host_dir)
1035
Josh Gaof2642242015-12-09 14:03:30 -08001036 def test_pull_symlink_dir(self):
1037 """Pull a symlink to a directory of symlinks to files."""
1038 try:
1039 host_dir = tempfile.mkdtemp()
1040
1041 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1042 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1043 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1044
1045 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1046 self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1047 self.device.shell(['ln', '-s', remote_links, remote_symlink])
1048
1049 # Populate device directory with random files.
1050 temp_files = make_random_device_files(
1051 self.device, in_dir=remote_dir, num_files=32)
1052
1053 for temp_file in temp_files:
1054 self.device.shell(
1055 ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1056 posixpath.join(remote_links, temp_file.base_name)])
1057
1058 self.device.pull(remote=remote_symlink, local=host_dir)
1059
1060 for temp_file in temp_files:
1061 host_path = os.path.join(
1062 host_dir, 'symlink', temp_file.base_name)
1063 self._verify_local(temp_file.checksum, host_path)
1064
1065 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1066 finally:
1067 if host_dir is not None:
1068 shutil.rmtree(host_dir)
1069
Josh Gao191c1542015-12-09 11:26:11 -08001070 def test_pull_empty(self):
1071 """Pull a directory containing an empty directory from the device."""
1072 try:
1073 host_dir = tempfile.mkdtemp()
1074
1075 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1076 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1077 self.device.shell(['mkdir', '-p', remote_empty_path])
1078
1079 self.device.pull(remote=remote_empty_path, local=host_dir)
1080 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1081 finally:
1082 if host_dir is not None:
1083 shutil.rmtree(host_dir)
1084
1085 def test_multiple_pull(self):
1086 """Pull a randomly generated directory of files from the device."""
1087
1088 try:
1089 host_dir = tempfile.mkdtemp()
1090
Josh Gao255c5c82016-03-03 14:49:02 -08001091 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -08001092 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1093 self.device.shell(['mkdir', '-p', subdir])
1094
1095 # Create some random files and a subdirectory containing more files.
1096 temp_files = make_random_device_files(
1097 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1098
1099 subdir_temp_files = make_random_device_files(
1100 self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1101
1102 paths = map(lambda temp_file: temp_file.full_path, temp_files)
1103 paths.append(subdir)
1104 self.device._simple_call(['pull'] + paths + [host_dir])
1105
1106 for temp_file in temp_files:
1107 local_path = os.path.join(host_dir, temp_file.base_name)
1108 self._verify_local(temp_file.checksum, local_path)
1109
1110 for subdir_temp_file in subdir_temp_files:
1111 local_path = os.path.join(host_dir,
Josh Gao255c5c82016-03-03 14:49:02 -08001112 'subdir',
Josh Gao191c1542015-12-09 11:26:11 -08001113 subdir_temp_file.base_name)
1114 self._verify_local(subdir_temp_file.checksum, local_path)
1115
1116 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1117 finally:
1118 if host_dir is not None:
1119 shutil.rmtree(host_dir)
1120
1121 def test_sync(self):
1122 """Sync a randomly generated directory of files to specified device."""
1123
1124 try:
1125 base_dir = tempfile.mkdtemp()
1126
1127 # Create mirror device directory hierarchy within base_dir.
1128 full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1129 os.makedirs(full_dir_path)
1130
1131 # Create 32 random files within the host mirror.
1132 temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
1133
1134 # Clean up any trash on the device.
1135 device = adb.get_device(product=base_dir)
1136 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1137
1138 device.sync('data')
1139
1140 # Confirm that every file on the device mirrors that on the host.
1141 for temp_file in temp_files:
1142 device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
1143 temp_file.base_name)
1144 dev_md5, _ = device.shell(
1145 [get_md5_prog(self.device), device_full_path])[0].split()
1146 self.assertEqual(temp_file.checksum, dev_md5)
1147
1148 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1149 finally:
1150 if base_dir is not None:
1151 shutil.rmtree(base_dir)
1152
1153 def test_unicode_paths(self):
1154 """Ensure that we can support non-ASCII paths, even on Windows."""
1155 name = u'로보카 폴리'
1156
1157 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1158 remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1159
1160 ## push.
1161 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1162 tf.close()
1163 self.device.push(tf.name, remote_path)
1164 os.remove(tf.name)
1165 self.assertFalse(os.path.exists(tf.name))
1166
1167 # Verify that the device ended up with the expected UTF-8 path
1168 output = self.device.shell(
1169 ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1170 self.assertEqual(remote_path.encode('utf-8'), output)
1171
1172 # pull.
1173 self.device.pull(remote_path, tf.name)
1174 self.assertTrue(os.path.exists(tf.name))
1175 os.remove(tf.name)
1176 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1177
1178
1179def main():
1180 random.seed(0)
1181 if len(adb.get_devices()) > 0:
1182 suite = unittest.TestLoader().loadTestsFromName(__name__)
1183 unittest.TextTestRunner(verbosity=3).run(suite)
1184 else:
1185 print('Test suite must be run with attached devices')
1186
1187
1188if __name__ == '__main__':
1189 main()