blob: 4c5563f42cb18eebb93f36eea7fa7714dd391640 [file] [log] [blame]
Josh Gao191c1542015-12-09 11:26:11 -08001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
Josh Gaofe50bb72016-06-22 18:27:22 -070034import time
Josh Gao191c1542015-12-09 11:26:11 -080035import unittest
36
37import mock
38
39import adb
40
41
42def requires_root(func):
43 def wrapper(self, *args):
44 if self.device.get_prop('ro.debuggable') != '1':
45 raise unittest.SkipTest('requires rootable build')
46
47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48 if not was_root:
49 self.device.root()
50 self.device.wait()
51
52 try:
53 func(self, *args)
54 finally:
55 if not was_root:
56 self.device.unroot()
57 self.device.wait()
58
59 return wrapper
60
61
62def requires_non_root(func):
63 def wrapper(self, *args):
64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65 if was_root:
66 self.device.unroot()
67 self.device.wait()
68
69 try:
70 func(self, *args)
71 finally:
72 if was_root:
73 self.device.root()
74 self.device.wait()
75
76 return wrapper
77
78
79class GetDeviceTest(unittest.TestCase):
80 def setUp(self):
81 self.android_serial = os.getenv('ANDROID_SERIAL')
82 if 'ANDROID_SERIAL' in os.environ:
83 del os.environ['ANDROID_SERIAL']
84
85 def tearDown(self):
86 if self.android_serial is not None:
87 os.environ['ANDROID_SERIAL'] = self.android_serial
88 else:
89 if 'ANDROID_SERIAL' in os.environ:
90 del os.environ['ANDROID_SERIAL']
91
92 @mock.patch('adb.device.get_devices')
93 def test_explicit(self, mock_get_devices):
94 mock_get_devices.return_value = ['foo', 'bar']
95 device = adb.get_device('foo')
96 self.assertEqual(device.serial, 'foo')
97
98 @mock.patch('adb.device.get_devices')
99 def test_from_env(self, mock_get_devices):
100 mock_get_devices.return_value = ['foo', 'bar']
101 os.environ['ANDROID_SERIAL'] = 'foo'
102 device = adb.get_device()
103 self.assertEqual(device.serial, 'foo')
104
105 @mock.patch('adb.device.get_devices')
106 def test_arg_beats_env(self, mock_get_devices):
107 mock_get_devices.return_value = ['foo', 'bar']
108 os.environ['ANDROID_SERIAL'] = 'bar'
109 device = adb.get_device('foo')
110 self.assertEqual(device.serial, 'foo')
111
112 @mock.patch('adb.device.get_devices')
113 def test_no_such_device(self, mock_get_devices):
114 mock_get_devices.return_value = ['foo', 'bar']
115 self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
116
117 os.environ['ANDROID_SERIAL'] = 'baz'
118 self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
119
120 @mock.patch('adb.device.get_devices')
121 def test_unique_device(self, mock_get_devices):
122 mock_get_devices.return_value = ['foo']
123 device = adb.get_device()
124 self.assertEqual(device.serial, 'foo')
125
126 @mock.patch('adb.device.get_devices')
127 def test_no_unique_device(self, mock_get_devices):
128 mock_get_devices.return_value = ['foo', 'bar']
129 self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
130
131
132class DeviceTest(unittest.TestCase):
133 def setUp(self):
134 self.device = adb.get_device()
135
136
137class ForwardReverseTest(DeviceTest):
138 def _test_no_rebind(self, description, direction_list, direction,
139 direction_no_rebind, direction_remove_all):
140 msg = direction_list()
141 self.assertEqual('', msg.strip(),
142 description + ' list must be empty to run this test.')
143
144 # Use --no-rebind with no existing binding
145 direction_no_rebind('tcp:5566', 'tcp:6655')
146 msg = direction_list()
147 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
148
149 # Use --no-rebind with existing binding
150 with self.assertRaises(subprocess.CalledProcessError):
151 direction_no_rebind('tcp:5566', 'tcp:6677')
152 msg = direction_list()
153 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
154 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
155
156 # Use the absence of --no-rebind with existing binding
157 direction('tcp:5566', 'tcp:6677')
158 msg = direction_list()
159 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
160 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
161
162 direction_remove_all()
163 msg = direction_list()
164 self.assertEqual('', msg.strip())
165
166 def test_forward_no_rebind(self):
167 self._test_no_rebind('forward', self.device.forward_list,
168 self.device.forward, self.device.forward_no_rebind,
169 self.device.forward_remove_all)
170
171 def test_reverse_no_rebind(self):
172 self._test_no_rebind('reverse', self.device.reverse_list,
173 self.device.reverse, self.device.reverse_no_rebind,
174 self.device.reverse_remove_all)
175
176 def test_forward(self):
177 msg = self.device.forward_list()
178 self.assertEqual('', msg.strip(),
179 'Forwarding list must be empty to run this test.')
180 self.device.forward('tcp:5566', 'tcp:6655')
181 msg = self.device.forward_list()
182 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
183 self.device.forward('tcp:7788', 'tcp:8877')
184 msg = self.device.forward_list()
185 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
186 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
187 self.device.forward_remove('tcp:5566')
188 msg = self.device.forward_list()
189 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
190 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
191 self.device.forward_remove_all()
192 msg = self.device.forward_list()
193 self.assertEqual('', msg.strip())
194
David Purselleaae97e2016-04-07 11:25:48 -0700195 def test_forward_tcp_port_0(self):
196 self.assertEqual('', self.device.forward_list().strip(),
197 'Forwarding list must be empty to run this test.')
198
199 try:
200 # If resolving TCP port 0 is supported, `adb forward` will print
201 # the actual port number.
202 port = self.device.forward('tcp:0', 'tcp:8888').strip()
203 if not port:
204 raise unittest.SkipTest('Forwarding tcp:0 is not available.')
205
206 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
207 self.device.forward_list()))
208 finally:
209 self.device.forward_remove_all()
210
Josh Gao191c1542015-12-09 11:26:11 -0800211 def test_reverse(self):
212 msg = self.device.reverse_list()
213 self.assertEqual('', msg.strip(),
214 'Reverse forwarding list must be empty to run this test.')
215 self.device.reverse('tcp:5566', 'tcp:6655')
216 msg = self.device.reverse_list()
217 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
218 self.device.reverse('tcp:7788', 'tcp:8877')
219 msg = self.device.reverse_list()
220 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
221 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
222 self.device.reverse_remove('tcp:5566')
223 msg = self.device.reverse_list()
224 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
225 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
226 self.device.reverse_remove_all()
227 msg = self.device.reverse_list()
228 self.assertEqual('', msg.strip())
229
David Purselleaae97e2016-04-07 11:25:48 -0700230 def test_reverse_tcp_port_0(self):
231 self.assertEqual('', self.device.reverse_list().strip(),
232 'Reverse list must be empty to run this test.')
233
234 try:
235 # If resolving TCP port 0 is supported, `adb reverse` will print
236 # the actual port number.
237 port = self.device.reverse('tcp:0', 'tcp:8888').strip()
238 if not port:
239 raise unittest.SkipTest('Reversing tcp:0 is not available.')
240
241 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
242 self.device.reverse_list()))
243 finally:
244 self.device.reverse_remove_all()
245
Josh Gao191c1542015-12-09 11:26:11 -0800246 # Note: If you run this test when adb connect'd to a physical device over
247 # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821
248 def test_forward_reverse_echo(self):
249 """Send data through adb forward and read it back via adb reverse"""
250 forward_port = 12345
251 reverse_port = forward_port + 1
Josh Gao255c5c82016-03-03 14:49:02 -0800252 forward_spec = 'tcp:' + str(forward_port)
253 reverse_spec = 'tcp:' + str(reverse_port)
Josh Gao191c1542015-12-09 11:26:11 -0800254 forward_setup = False
255 reverse_setup = False
256
257 try:
258 # listen on localhost:forward_port, connect to remote:forward_port
259 self.device.forward(forward_spec, forward_spec)
260 forward_setup = True
261 # listen on remote:forward_port, connect to localhost:reverse_port
262 self.device.reverse(forward_spec, reverse_spec)
263 reverse_setup = True
264
265 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
266 with contextlib.closing(listener):
267 # Use SO_REUSEADDR so that subsequent runs of the test can grab
268 # the port even if it is in TIME_WAIT.
269 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
270
271 # Listen on localhost:reverse_port before connecting to
272 # localhost:forward_port because that will cause adb to connect
273 # back to localhost:reverse_port.
274 listener.bind(('127.0.0.1', reverse_port))
275 listener.listen(4)
276
277 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
278 with contextlib.closing(client):
279 # Connect to the listener.
280 client.connect(('127.0.0.1', forward_port))
281
282 # Accept the client connection.
283 accepted_connection, addr = listener.accept()
284 with contextlib.closing(accepted_connection) as server:
285 data = 'hello'
286
287 # Send data into the port setup by adb forward.
288 client.sendall(data)
289 # Explicitly close() so that server gets EOF.
290 client.close()
291
292 # Verify that the data came back via adb reverse.
293 self.assertEqual(data, server.makefile().read())
294 finally:
295 if reverse_setup:
296 self.device.reverse_remove(forward_spec)
297 if forward_setup:
298 self.device.forward_remove(forward_spec)
299
300
301class ShellTest(DeviceTest):
302 def _interactive_shell(self, shell_args, input):
303 """Runs an interactive adb shell.
304
305 Args:
306 shell_args: List of string arguments to `adb shell`.
307 input: String input to send to the interactive shell.
308
309 Returns:
310 The remote exit code.
311
312 Raises:
313 unittest.SkipTest: The device doesn't support exit codes.
314 """
David Pursellcf467412016-04-26 13:25:57 -0700315 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800316 raise unittest.SkipTest('exit codes are unavailable on this device')
317
318 proc = subprocess.Popen(
319 self.device.adb_cmd + ['shell'] + shell_args,
320 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
321 stderr=subprocess.PIPE)
322 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
323 # to explicitly add an exit command to close the session from the device
324 # side, plus the necessary newline to complete the interactive command.
325 proc.communicate(input + '; exit\n')
326 return proc.returncode
327
328 def test_cat(self):
329 """Check that we can at least cat a file."""
330 out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
331 elements = out.split()
332 self.assertEqual(len(elements), 2)
333
334 uptime, idle = elements
335 self.assertGreater(float(uptime), 0.0)
336 self.assertGreater(float(idle), 0.0)
337
338 def test_throws_on_failure(self):
339 self.assertRaises(adb.ShellError, self.device.shell, ['false'])
340
341 def test_output_not_stripped(self):
342 out = self.device.shell(['echo', 'foo'])[0]
343 self.assertEqual(out, 'foo' + self.device.linesep)
344
345 def test_shell_nocheck_failure(self):
346 rc, out, _ = self.device.shell_nocheck(['false'])
347 self.assertNotEqual(rc, 0)
348 self.assertEqual(out, '')
349
350 def test_shell_nocheck_output_not_stripped(self):
351 rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
352 self.assertEqual(rc, 0)
353 self.assertEqual(out, 'foo' + self.device.linesep)
354
355 def test_can_distinguish_tricky_results(self):
356 # If result checking on ADB shell is naively implemented as
357 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
358 # output from the result for a cmd of `echo -n 1`.
359 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
360 self.assertEqual(rc, 0)
361 self.assertEqual(out, '1')
362
363 def test_line_endings(self):
364 """Ensure that line ending translation is not happening in the pty.
365
366 Bug: http://b/19735063
367 """
368 output = self.device.shell(['uname'])[0]
369 self.assertEqual(output, 'Linux' + self.device.linesep)
370
371 def test_pty_logic(self):
372 """Tests that a PTY is allocated when it should be.
373
Elliott Hughescabfa112016-10-19 14:47:11 -0700374 PTY allocation behavior should match ssh.
Josh Gao191c1542015-12-09 11:26:11 -0800375 """
Josh Gao191c1542015-12-09 11:26:11 -0800376 def check_pty(args):
377 """Checks adb shell PTY allocation.
378
379 Tests |args| for terminal and non-terminal stdin.
380
381 Args:
382 args: -Tt args in a list (e.g. ['-t', '-t']).
383
384 Returns:
385 A tuple (<terminal>, <non-terminal>). True indicates
386 the corresponding shell allocated a remote PTY.
387 """
388 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
389
390 terminal = subprocess.Popen(
391 test_cmd, stdin=None,
392 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
393 terminal.communicate()
394
395 non_terminal = subprocess.Popen(
396 test_cmd, stdin=subprocess.PIPE,
397 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
398 non_terminal.communicate()
399
400 return (terminal.returncode == 0, non_terminal.returncode == 0)
401
402 # -T: never allocate PTY.
403 self.assertEqual((False, False), check_pty(['-T']))
404
Elliott Hughescabfa112016-10-19 14:47:11 -0700405 # These tests require a new device.
406 if self.device.has_shell_protocol() and os.isatty(sys.stdin.fileno()):
407 # No args: PTY only if stdin is a terminal and shell is interactive,
408 # which is difficult to reliably test from a script.
409 self.assertEqual((False, False), check_pty([]))
Josh Gao191c1542015-12-09 11:26:11 -0800410
Elliott Hughescabfa112016-10-19 14:47:11 -0700411 # -t: PTY if stdin is a terminal.
412 self.assertEqual((True, False), check_pty(['-t']))
Josh Gao191c1542015-12-09 11:26:11 -0800413
414 # -t -t: always allocate PTY.
415 self.assertEqual((True, True), check_pty(['-t', '-t']))
416
Elliott Hughescabfa112016-10-19 14:47:11 -0700417 # -tt: always allocate PTY, POSIX style (http://b/32216152).
418 self.assertEqual((True, True), check_pty(['-tt']))
419
420 # -ttt: ssh has weird even/odd behavior with multiple -t flags, but
421 # we follow the man page instead.
422 self.assertEqual((True, True), check_pty(['-ttt']))
423
424 # -ttx: -x and -tt aren't incompatible (though -Tx would be an error).
425 self.assertEqual((True, True), check_pty(['-ttx']))
426
427 # -Ttt: -tt cancels out -T.
428 self.assertEqual((True, True), check_pty(['-Ttt']))
429
430 # -ttT: -T cancels out -tt.
431 self.assertEqual((False, False), check_pty(['-ttT']))
432
Josh Gao191c1542015-12-09 11:26:11 -0800433 def test_shell_protocol(self):
434 """Tests the shell protocol on the device.
435
436 If the device supports shell protocol, this gives us the ability
437 to separate stdout/stderr and return the exit code directly.
438
439 Bug: http://b/19734861
440 """
David Pursellcf467412016-04-26 13:25:57 -0700441 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800442 raise unittest.SkipTest('shell protocol unsupported on this device')
443
444 # Shell protocol should be used by default.
445 result = self.device.shell_nocheck(
446 shlex.split('echo foo; echo bar >&2; exit 17'))
447 self.assertEqual(17, result[0])
448 self.assertEqual('foo' + self.device.linesep, result[1])
449 self.assertEqual('bar' + self.device.linesep, result[2])
450
451 self.assertEqual(17, self._interactive_shell([], 'exit 17'))
452
453 # -x flag should disable shell protocol.
454 result = self.device.shell_nocheck(
455 shlex.split('-x echo foo; echo bar >&2; exit 17'))
456 self.assertEqual(0, result[0])
457 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
458 self.assertEqual('', result[2])
459
460 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
461
462 def test_non_interactive_sigint(self):
463 """Tests that SIGINT in a non-interactive shell kills the process.
464
465 This requires the shell protocol in order to detect the broken
466 pipe; raw data transfer mode will only see the break once the
467 subprocess tries to read or write.
468
469 Bug: http://b/23825725
470 """
David Pursellcf467412016-04-26 13:25:57 -0700471 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800472 raise unittest.SkipTest('shell protocol unsupported on this device')
473
474 # Start a long-running process.
475 sleep_proc = subprocess.Popen(
476 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
477 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
478 stderr=subprocess.STDOUT)
479 remote_pid = sleep_proc.stdout.readline().strip()
480 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
481 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
482
483 # Verify that the process is running, send signal, verify it stopped.
484 self.device.shell(proc_query)
485 os.kill(sleep_proc.pid, signal.SIGINT)
486 sleep_proc.communicate()
487 self.assertEqual(1, self.device.shell_nocheck(proc_query)[0],
488 'subprocess failed to terminate')
489
490 def test_non_interactive_stdin(self):
491 """Tests that non-interactive shells send stdin."""
David Pursellcf467412016-04-26 13:25:57 -0700492 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800493 raise unittest.SkipTest('non-interactive stdin unsupported '
494 'on this device')
495
496 # Test both small and large inputs.
497 small_input = 'foo'
498 large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
499 string.digits))
500
501 for input in (small_input, large_input):
502 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
503 stdin=subprocess.PIPE,
504 stdout=subprocess.PIPE,
505 stderr=subprocess.PIPE)
506 stdout, stderr = proc.communicate(input)
507 self.assertEqual(input.splitlines(), stdout.splitlines())
508 self.assertEqual('', stderr)
509
Josh Gaofe50bb72016-06-22 18:27:22 -0700510 def test_sighup(self):
511 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
512 log_path = "/data/local/tmp/adb_signal_test.log"
513
514 # Clear the output file.
515 self.device.shell_nocheck(["echo", ">", log_path])
516
517 script = """
518 trap "echo SIGINT > {path}; exit 0" SIGINT
519 trap "echo SIGHUP > {path}; exit 0" SIGHUP
520 echo Waiting
521 while true; do sleep 100; done
522 """.format(path=log_path)
523
524 script = ";".join([x.strip() for x in script.strip().splitlines()])
525
526 process = self.device.shell_popen(
527 ["sh", "-c", "'{}'".format(script)], kill_atexit=False, stdout=subprocess.PIPE)
528
529 self.assertEqual("Waiting\n", process.stdout.readline())
530 process.send_signal(signal.SIGINT)
531 process.wait()
532
533 # Waiting for the local adb to finish is insufficient, since it hangs
534 # up immediately.
535 time.sleep(0.25)
536
537 stdout, _ = self.device.shell(["cat", log_path])
538 self.assertEqual(stdout.strip(), "SIGHUP")
539
Josh Gao191c1542015-12-09 11:26:11 -0800540
541class ArgumentEscapingTest(DeviceTest):
542 def test_shell_escaping(self):
543 """Make sure that argument escaping is somewhat sane."""
544
545 # http://b/19734868
546 # Note that this actually matches ssh(1)'s behavior --- it's
547 # converted to `sh -c echo hello; echo world` which sh interprets
548 # as `sh -c echo` (with an argument to that shell of "hello"),
549 # and then `echo world` back in the first shell.
550 result = self.device.shell(
551 shlex.split("sh -c 'echo hello; echo world'"))[0]
552 result = result.splitlines()
553 self.assertEqual(['', 'world'], result)
554 # If you really wanted "hello" and "world", here's what you'd do:
555 result = self.device.shell(
556 shlex.split(r'echo hello\;echo world'))[0].splitlines()
557 self.assertEqual(['hello', 'world'], result)
558
559 # http://b/15479704
560 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
561 self.assertEqual('t', result)
562 result = self.device.shell(
563 shlex.split("sh -c 'true && echo t'"))[0].strip()
564 self.assertEqual('t', result)
565
566 # http://b/20564385
567 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
568 self.assertEqual('t', result)
569 result = self.device.shell(
570 shlex.split(r'echo -n 123\;uname'))[0].strip()
571 self.assertEqual('123Linux', result)
572
573 def test_install_argument_escaping(self):
574 """Make sure that install argument escaping works."""
575 # http://b/20323053, http://b/3090932.
576 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
577 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
578 delete=False)
579 tf.close()
580
581 # Installing bogus .apks fails if the device supports exit codes.
582 try:
583 output = self.device.install(tf.name)
584 except subprocess.CalledProcessError as e:
585 output = e.output
586
587 self.assertIn(file_suffix, output)
588 os.remove(tf.name)
589
590
591class RootUnrootTest(DeviceTest):
592 def _test_root(self):
593 message = self.device.root()
594 if 'adbd cannot run as root in production builds' in message:
595 return
596 self.device.wait()
597 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
598
599 def _test_unroot(self):
600 self.device.unroot()
601 self.device.wait()
602 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
603
604 def test_root_unroot(self):
605 """Make sure that adb root and adb unroot work, using id(1)."""
606 if self.device.get_prop('ro.debuggable') != '1':
607 raise unittest.SkipTest('requires rootable build')
608
609 original_user = self.device.shell(['id', '-un'])[0].strip()
610 try:
611 if original_user == 'root':
612 self._test_unroot()
613 self._test_root()
614 elif original_user == 'shell':
615 self._test_root()
616 self._test_unroot()
617 finally:
618 if original_user == 'root':
619 self.device.root()
620 else:
621 self.device.unroot()
622 self.device.wait()
623
624
625class TcpIpTest(DeviceTest):
626 def test_tcpip_failure_raises(self):
627 """adb tcpip requires a port.
628
629 Bug: http://b/22636927
630 """
631 self.assertRaises(
632 subprocess.CalledProcessError, self.device.tcpip, '')
633 self.assertRaises(
634 subprocess.CalledProcessError, self.device.tcpip, 'foo')
635
636
637class SystemPropertiesTest(DeviceTest):
638 def test_get_prop(self):
639 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
640
641 @requires_root
642 def test_set_prop(self):
643 prop_name = 'foo.bar'
644 self.device.shell(['setprop', prop_name, '""'])
645
646 self.device.set_prop(prop_name, 'qux')
647 self.assertEqual(
648 self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
649
650
651def compute_md5(string):
652 hsh = hashlib.md5()
653 hsh.update(string)
654 return hsh.hexdigest()
655
656
657def get_md5_prog(device):
658 """Older platforms (pre-L) had the name md5 rather than md5sum."""
659 try:
660 device.shell(['md5sum', '/proc/uptime'])
661 return 'md5sum'
662 except adb.ShellError:
663 return 'md5'
664
665
666class HostFile(object):
667 def __init__(self, handle, checksum):
668 self.handle = handle
669 self.checksum = checksum
670 self.full_path = handle.name
671 self.base_name = os.path.basename(self.full_path)
672
673
674class DeviceFile(object):
675 def __init__(self, checksum, full_path):
676 self.checksum = checksum
677 self.full_path = full_path
678 self.base_name = posixpath.basename(self.full_path)
679
680
681def make_random_host_files(in_dir, num_files):
682 min_size = 1 * (1 << 10)
683 max_size = 16 * (1 << 10)
684
685 files = []
686 for _ in xrange(num_files):
687 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
688
689 size = random.randrange(min_size, max_size, 1024)
690 rand_str = os.urandom(size)
691 file_handle.write(rand_str)
692 file_handle.flush()
693 file_handle.close()
694
695 md5 = compute_md5(rand_str)
696 files.append(HostFile(file_handle, md5))
697 return files
698
699
700def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
701 min_size = 1 * (1 << 10)
702 max_size = 16 * (1 << 10)
703
704 files = []
705 for file_num in xrange(num_files):
706 size = random.randrange(min_size, max_size, 1024)
707
708 base_name = prefix + str(file_num)
709 full_path = posixpath.join(in_dir, base_name)
710
711 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
712 'bs={}'.format(size), 'count=1'])
713 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
714
715 files.append(DeviceFile(dev_md5, full_path))
716 return files
717
718
719class FileOperationsTest(DeviceTest):
720 SCRATCH_DIR = '/data/local/tmp'
721 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
722 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
723
724 def _verify_remote(self, checksum, remote_path):
725 dev_md5, _ = self.device.shell([get_md5_prog(self.device),
726 remote_path])[0].split()
727 self.assertEqual(checksum, dev_md5)
728
729 def _verify_local(self, checksum, local_path):
730 with open(local_path, 'rb') as host_file:
731 host_md5 = compute_md5(host_file.read())
732 self.assertEqual(host_md5, checksum)
733
734 def test_push(self):
735 """Push a randomly generated file to specified device."""
736 kbytes = 512
737 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
738 rand_str = os.urandom(1024 * kbytes)
739 tmp.write(rand_str)
740 tmp.close()
741
742 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
743 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
744
745 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
746 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
747
748 os.remove(tmp.name)
749
750 def test_push_dir(self):
751 """Push a randomly generated directory of files to the device."""
752 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
753 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
754
755 try:
756 host_dir = tempfile.mkdtemp()
757
758 # Make sure the temp directory isn't setuid, or else adb will complain.
759 os.chmod(host_dir, 0o700)
760
761 # Create 32 random files.
762 temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
763 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
764
765 for temp_file in temp_files:
766 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
767 os.path.basename(host_dir),
768 temp_file.base_name)
769 self._verify_remote(temp_file.checksum, remote_path)
770 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
771 finally:
772 if host_dir is not None:
773 shutil.rmtree(host_dir)
774
775 @unittest.expectedFailure # b/25566053
776 def test_push_empty(self):
777 """Push a directory containing an empty directory to the device."""
778 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
779 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
780
781 try:
782 host_dir = tempfile.mkdtemp()
783
784 # Make sure the temp directory isn't setuid, or else adb will complain.
785 os.chmod(host_dir, 0o700)
786
787 # Create an empty directory.
788 os.mkdir(os.path.join(host_dir, 'empty'))
789
790 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
791
792 test_empty_cmd = ['[', '-d',
793 os.path.join(self.DEVICE_TEMP_DIR, 'empty')]
794 rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
795 self.assertEqual(rc, 0)
796 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
797 finally:
798 if host_dir is not None:
799 shutil.rmtree(host_dir)
800
Josh Gao94dc19f2016-09-14 16:13:50 -0700801 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
802 def test_push_symlink(self):
803 """Push a symlink.
804
805 Bug: http://b/31491920
806 """
807 try:
808 host_dir = tempfile.mkdtemp()
809
810 # Make sure the temp directory isn't setuid, or else adb will
811 # complain.
812 os.chmod(host_dir, 0o700)
813
814 with open(os.path.join(host_dir, 'foo'), 'w') as f:
815 f.write('foo')
816
817 symlink_path = os.path.join(host_dir, 'symlink')
818 os.symlink('foo', symlink_path)
819
820 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
821 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
822 self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
823 rc, out, _ = self.device.shell_nocheck(
824 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
825 self.assertEqual(0, rc)
826 self.assertEqual(out.strip(), 'foo')
827 finally:
828 if host_dir is not None:
829 shutil.rmtree(host_dir)
830
Josh Gao191c1542015-12-09 11:26:11 -0800831 def test_multiple_push(self):
832 """Push multiple files to the device in one adb push command.
833
834 Bug: http://b/25324823
835 """
836
837 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
838 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
839
840 try:
841 host_dir = tempfile.mkdtemp()
842
843 # Create some random files and a subdirectory containing more files.
844 temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
845
Josh Gao255c5c82016-03-03 14:49:02 -0800846 subdir = os.path.join(host_dir, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -0800847 os.mkdir(subdir)
848 subdir_temp_files = make_random_host_files(in_dir=subdir,
849 num_files=4)
850
851 paths = map(lambda temp_file: temp_file.full_path, temp_files)
852 paths.append(subdir)
853 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
854
855 for temp_file in temp_files:
856 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
857 temp_file.base_name)
858 self._verify_remote(temp_file.checksum, remote_path)
859
860 for subdir_temp_file in subdir_temp_files:
861 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
862 # BROKEN: http://b/25394682
Josh Gao255c5c82016-03-03 14:49:02 -0800863 # 'subdir';
Josh Gao191c1542015-12-09 11:26:11 -0800864 temp_file.base_name)
865 self._verify_remote(temp_file.checksum, remote_path)
866
867
868 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
869 finally:
870 if host_dir is not None:
871 shutil.rmtree(host_dir)
872
Josh Gaoafcdcd72016-02-19 15:55:55 -0800873 @requires_non_root
874 def test_push_error_reporting(self):
875 """Make sure that errors that occur while pushing a file get reported
876
877 Bug: http://b/26816782
878 """
879 with tempfile.NamedTemporaryFile() as tmp_file:
880 tmp_file.write('\0' * 1024 * 1024)
881 tmp_file.flush()
882 try:
883 self.device.push(local=tmp_file.name, remote='/system/')
Josh Gao255c5c82016-03-03 14:49:02 -0800884 self.fail('push should not have succeeded')
Josh Gaoafcdcd72016-02-19 15:55:55 -0800885 except subprocess.CalledProcessError as e:
886 output = e.output
887
Josh Gao255c5c82016-03-03 14:49:02 -0800888 self.assertIn('Permission denied', output)
Josh Gao191c1542015-12-09 11:26:11 -0800889
890 def _test_pull(self, remote_file, checksum):
891 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
892 tmp_write.close()
893 self.device.pull(remote=remote_file, local=tmp_write.name)
894 with open(tmp_write.name, 'rb') as tmp_read:
895 host_contents = tmp_read.read()
896 host_md5 = compute_md5(host_contents)
897 self.assertEqual(checksum, host_md5)
898 os.remove(tmp_write.name)
899
900 @requires_non_root
901 def test_pull_error_reporting(self):
902 self.device.shell(['touch', self.DEVICE_TEMP_FILE])
903 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
904
905 try:
906 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
907 except subprocess.CalledProcessError as e:
908 output = e.output
909
910 self.assertIn('Permission denied', output)
911
912 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
913
914 def test_pull(self):
915 """Pull a randomly generated file from specified device."""
916 kbytes = 512
917 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
918 cmd = ['dd', 'if=/dev/urandom',
919 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
920 'count={}'.format(kbytes)]
921 self.device.shell(cmd)
922 dev_md5, _ = self.device.shell(
923 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
924 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
925 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
926
927 def test_pull_dir(self):
928 """Pull a randomly generated directory of files from the device."""
929 try:
930 host_dir = tempfile.mkdtemp()
931
932 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
933 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
934
935 # Populate device directory with random files.
936 temp_files = make_random_device_files(
937 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
938
939 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
940
941 for temp_file in temp_files:
Josh Gaoce8f2cd2015-12-09 14:20:23 -0800942 host_path = os.path.join(
943 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
944 temp_file.base_name)
945 self._verify_local(temp_file.checksum, host_path)
Josh Gao191c1542015-12-09 11:26:11 -0800946
947 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
948 finally:
949 if host_dir is not None:
950 shutil.rmtree(host_dir)
951
Josh Gao1e611a32016-02-26 13:26:55 -0800952 def test_pull_dir_symlink(self):
953 """Pull a directory into a symlink to a directory.
954
955 Bug: http://b/27362811
956 """
Josh Gao255c5c82016-03-03 14:49:02 -0800957 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -0800958 raise unittest.SkipTest('requires POSIX')
959
960 try:
961 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -0800962 real_dir = os.path.join(host_dir, 'dir')
963 symlink = os.path.join(host_dir, 'symlink')
Josh Gao1e611a32016-02-26 13:26:55 -0800964 os.mkdir(real_dir)
965 os.symlink(real_dir, symlink)
966
967 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
968 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
969
970 # Populate device directory with random files.
971 temp_files = make_random_device_files(
972 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
973
974 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
975
976 for temp_file in temp_files:
977 host_path = os.path.join(
978 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
979 temp_file.base_name)
980 self._verify_local(temp_file.checksum, host_path)
981
982 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
983 finally:
984 if host_dir is not None:
985 shutil.rmtree(host_dir)
986
987 def test_pull_dir_symlink_collision(self):
988 """Pull a directory into a colliding symlink to directory."""
Josh Gao255c5c82016-03-03 14:49:02 -0800989 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -0800990 raise unittest.SkipTest('requires POSIX')
991
992 try:
993 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -0800994 real_dir = os.path.join(host_dir, 'real')
Josh Gao1e611a32016-02-26 13:26:55 -0800995 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
996 symlink = os.path.join(host_dir, tmp_dirname)
997 os.mkdir(real_dir)
998 os.symlink(real_dir, symlink)
999
1000 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1001 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1002
1003 # Populate device directory with random files.
1004 temp_files = make_random_device_files(
1005 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1006
1007 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
1008
1009 for temp_file in temp_files:
1010 host_path = os.path.join(real_dir, temp_file.base_name)
1011 self._verify_local(temp_file.checksum, host_path)
1012
1013 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1014 finally:
1015 if host_dir is not None:
1016 shutil.rmtree(host_dir)
1017
Josh Gao89ec3a82016-03-02 16:00:02 -08001018 def test_pull_dir_nonexistent(self):
1019 """Pull a directory of files from the device to a nonexistent path."""
1020 try:
1021 host_dir = tempfile.mkdtemp()
1022 dest_dir = os.path.join(host_dir, 'dest')
1023
1024 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1025 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1026
1027 # Populate device directory with random files.
1028 temp_files = make_random_device_files(
1029 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1030
1031 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1032
1033 for temp_file in temp_files:
1034 host_path = os.path.join(dest_dir, temp_file.base_name)
1035 self._verify_local(temp_file.checksum, host_path)
1036
1037 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1038 finally:
1039 if host_dir is not None:
1040 shutil.rmtree(host_dir)
1041
Josh Gaof2642242015-12-09 14:03:30 -08001042 def test_pull_symlink_dir(self):
1043 """Pull a symlink to a directory of symlinks to files."""
1044 try:
1045 host_dir = tempfile.mkdtemp()
1046
1047 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1048 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1049 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1050
1051 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1052 self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1053 self.device.shell(['ln', '-s', remote_links, remote_symlink])
1054
1055 # Populate device directory with random files.
1056 temp_files = make_random_device_files(
1057 self.device, in_dir=remote_dir, num_files=32)
1058
1059 for temp_file in temp_files:
1060 self.device.shell(
1061 ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1062 posixpath.join(remote_links, temp_file.base_name)])
1063
1064 self.device.pull(remote=remote_symlink, local=host_dir)
1065
1066 for temp_file in temp_files:
1067 host_path = os.path.join(
1068 host_dir, 'symlink', temp_file.base_name)
1069 self._verify_local(temp_file.checksum, host_path)
1070
1071 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1072 finally:
1073 if host_dir is not None:
1074 shutil.rmtree(host_dir)
1075
Josh Gao191c1542015-12-09 11:26:11 -08001076 def test_pull_empty(self):
1077 """Pull a directory containing an empty directory from the device."""
1078 try:
1079 host_dir = tempfile.mkdtemp()
1080
1081 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1082 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1083 self.device.shell(['mkdir', '-p', remote_empty_path])
1084
1085 self.device.pull(remote=remote_empty_path, local=host_dir)
1086 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1087 finally:
1088 if host_dir is not None:
1089 shutil.rmtree(host_dir)
1090
1091 def test_multiple_pull(self):
1092 """Pull a randomly generated directory of files from the device."""
1093
1094 try:
1095 host_dir = tempfile.mkdtemp()
1096
Josh Gao255c5c82016-03-03 14:49:02 -08001097 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -08001098 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1099 self.device.shell(['mkdir', '-p', subdir])
1100
1101 # Create some random files and a subdirectory containing more files.
1102 temp_files = make_random_device_files(
1103 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1104
1105 subdir_temp_files = make_random_device_files(
1106 self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1107
1108 paths = map(lambda temp_file: temp_file.full_path, temp_files)
1109 paths.append(subdir)
1110 self.device._simple_call(['pull'] + paths + [host_dir])
1111
1112 for temp_file in temp_files:
1113 local_path = os.path.join(host_dir, temp_file.base_name)
1114 self._verify_local(temp_file.checksum, local_path)
1115
1116 for subdir_temp_file in subdir_temp_files:
1117 local_path = os.path.join(host_dir,
Josh Gao255c5c82016-03-03 14:49:02 -08001118 'subdir',
Josh Gao191c1542015-12-09 11:26:11 -08001119 subdir_temp_file.base_name)
1120 self._verify_local(subdir_temp_file.checksum, local_path)
1121
1122 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1123 finally:
1124 if host_dir is not None:
1125 shutil.rmtree(host_dir)
1126
1127 def test_sync(self):
1128 """Sync a randomly generated directory of files to specified device."""
1129
1130 try:
1131 base_dir = tempfile.mkdtemp()
1132
1133 # Create mirror device directory hierarchy within base_dir.
1134 full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1135 os.makedirs(full_dir_path)
1136
1137 # Create 32 random files within the host mirror.
1138 temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
1139
1140 # Clean up any trash on the device.
1141 device = adb.get_device(product=base_dir)
1142 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1143
1144 device.sync('data')
1145
1146 # Confirm that every file on the device mirrors that on the host.
1147 for temp_file in temp_files:
1148 device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
1149 temp_file.base_name)
1150 dev_md5, _ = device.shell(
1151 [get_md5_prog(self.device), device_full_path])[0].split()
1152 self.assertEqual(temp_file.checksum, dev_md5)
1153
1154 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1155 finally:
1156 if base_dir is not None:
1157 shutil.rmtree(base_dir)
1158
1159 def test_unicode_paths(self):
1160 """Ensure that we can support non-ASCII paths, even on Windows."""
1161 name = u'로보카 폴리'
1162
1163 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1164 remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1165
1166 ## push.
1167 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1168 tf.close()
1169 self.device.push(tf.name, remote_path)
1170 os.remove(tf.name)
1171 self.assertFalse(os.path.exists(tf.name))
1172
1173 # Verify that the device ended up with the expected UTF-8 path
1174 output = self.device.shell(
1175 ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1176 self.assertEqual(remote_path.encode('utf-8'), output)
1177
1178 # pull.
1179 self.device.pull(remote_path, tf.name)
1180 self.assertTrue(os.path.exists(tf.name))
1181 os.remove(tf.name)
1182 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1183
1184
1185def main():
1186 random.seed(0)
1187 if len(adb.get_devices()) > 0:
1188 suite = unittest.TestLoader().loadTestsFromName(__name__)
1189 unittest.TextTestRunner(verbosity=3).run(suite)
1190 else:
1191 print('Test suite must be run with attached devices')
1192
1193
1194if __name__ == '__main__':
1195 main()