blob: 2efac9dd47c775b5a04152ac5b92e036a587b25a [file] [log] [blame]
Josh Gao49e3c632015-12-09 11:26:11 -08001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
Josh Gao2eae66e2016-06-22 18:27:22 -070034import time
Josh Gao49e3c632015-12-09 11:26:11 -080035import unittest
36
37import mock
38
39import adb
40
41
42def requires_root(func):
43 def wrapper(self, *args):
44 if self.device.get_prop('ro.debuggable') != '1':
45 raise unittest.SkipTest('requires rootable build')
46
47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48 if not was_root:
49 self.device.root()
50 self.device.wait()
51
52 try:
53 func(self, *args)
54 finally:
55 if not was_root:
56 self.device.unroot()
57 self.device.wait()
58
59 return wrapper
60
61
62def requires_non_root(func):
63 def wrapper(self, *args):
64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65 if was_root:
66 self.device.unroot()
67 self.device.wait()
68
69 try:
70 func(self, *args)
71 finally:
72 if was_root:
73 self.device.root()
74 self.device.wait()
75
76 return wrapper
77
78
79class GetDeviceTest(unittest.TestCase):
80 def setUp(self):
81 self.android_serial = os.getenv('ANDROID_SERIAL')
82 if 'ANDROID_SERIAL' in os.environ:
83 del os.environ['ANDROID_SERIAL']
84
85 def tearDown(self):
86 if self.android_serial is not None:
87 os.environ['ANDROID_SERIAL'] = self.android_serial
88 else:
89 if 'ANDROID_SERIAL' in os.environ:
90 del os.environ['ANDROID_SERIAL']
91
92 @mock.patch('adb.device.get_devices')
93 def test_explicit(self, mock_get_devices):
94 mock_get_devices.return_value = ['foo', 'bar']
95 device = adb.get_device('foo')
96 self.assertEqual(device.serial, 'foo')
97
98 @mock.patch('adb.device.get_devices')
99 def test_from_env(self, mock_get_devices):
100 mock_get_devices.return_value = ['foo', 'bar']
101 os.environ['ANDROID_SERIAL'] = 'foo'
102 device = adb.get_device()
103 self.assertEqual(device.serial, 'foo')
104
105 @mock.patch('adb.device.get_devices')
106 def test_arg_beats_env(self, mock_get_devices):
107 mock_get_devices.return_value = ['foo', 'bar']
108 os.environ['ANDROID_SERIAL'] = 'bar'
109 device = adb.get_device('foo')
110 self.assertEqual(device.serial, 'foo')
111
112 @mock.patch('adb.device.get_devices')
113 def test_no_such_device(self, mock_get_devices):
114 mock_get_devices.return_value = ['foo', 'bar']
115 self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
116
117 os.environ['ANDROID_SERIAL'] = 'baz'
118 self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
119
120 @mock.patch('adb.device.get_devices')
121 def test_unique_device(self, mock_get_devices):
122 mock_get_devices.return_value = ['foo']
123 device = adb.get_device()
124 self.assertEqual(device.serial, 'foo')
125
126 @mock.patch('adb.device.get_devices')
127 def test_no_unique_device(self, mock_get_devices):
128 mock_get_devices.return_value = ['foo', 'bar']
129 self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
130
131
132class DeviceTest(unittest.TestCase):
133 def setUp(self):
134 self.device = adb.get_device()
135
136
137class ForwardReverseTest(DeviceTest):
138 def _test_no_rebind(self, description, direction_list, direction,
139 direction_no_rebind, direction_remove_all):
140 msg = direction_list()
141 self.assertEqual('', msg.strip(),
142 description + ' list must be empty to run this test.')
143
144 # Use --no-rebind with no existing binding
145 direction_no_rebind('tcp:5566', 'tcp:6655')
146 msg = direction_list()
147 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
148
149 # Use --no-rebind with existing binding
150 with self.assertRaises(subprocess.CalledProcessError):
151 direction_no_rebind('tcp:5566', 'tcp:6677')
152 msg = direction_list()
153 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
154 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
155
156 # Use the absence of --no-rebind with existing binding
157 direction('tcp:5566', 'tcp:6677')
158 msg = direction_list()
159 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
160 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
161
162 direction_remove_all()
163 msg = direction_list()
164 self.assertEqual('', msg.strip())
165
166 def test_forward_no_rebind(self):
167 self._test_no_rebind('forward', self.device.forward_list,
168 self.device.forward, self.device.forward_no_rebind,
169 self.device.forward_remove_all)
170
171 def test_reverse_no_rebind(self):
172 self._test_no_rebind('reverse', self.device.reverse_list,
173 self.device.reverse, self.device.reverse_no_rebind,
174 self.device.reverse_remove_all)
175
176 def test_forward(self):
177 msg = self.device.forward_list()
178 self.assertEqual('', msg.strip(),
179 'Forwarding list must be empty to run this test.')
180 self.device.forward('tcp:5566', 'tcp:6655')
181 msg = self.device.forward_list()
182 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
183 self.device.forward('tcp:7788', 'tcp:8877')
184 msg = self.device.forward_list()
185 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
186 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
187 self.device.forward_remove('tcp:5566')
188 msg = self.device.forward_list()
189 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
190 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
191 self.device.forward_remove_all()
192 msg = self.device.forward_list()
193 self.assertEqual('', msg.strip())
194
David Pursell19d0c232016-04-07 11:25:48 -0700195 def test_forward_tcp_port_0(self):
196 self.assertEqual('', self.device.forward_list().strip(),
197 'Forwarding list must be empty to run this test.')
198
199 try:
200 # If resolving TCP port 0 is supported, `adb forward` will print
201 # the actual port number.
202 port = self.device.forward('tcp:0', 'tcp:8888').strip()
203 if not port:
204 raise unittest.SkipTest('Forwarding tcp:0 is not available.')
205
206 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
207 self.device.forward_list()))
208 finally:
209 self.device.forward_remove_all()
210
Josh Gao49e3c632015-12-09 11:26:11 -0800211 def test_reverse(self):
212 msg = self.device.reverse_list()
213 self.assertEqual('', msg.strip(),
214 'Reverse forwarding list must be empty to run this test.')
215 self.device.reverse('tcp:5566', 'tcp:6655')
216 msg = self.device.reverse_list()
217 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
218 self.device.reverse('tcp:7788', 'tcp:8877')
219 msg = self.device.reverse_list()
220 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
221 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
222 self.device.reverse_remove('tcp:5566')
223 msg = self.device.reverse_list()
224 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
225 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
226 self.device.reverse_remove_all()
227 msg = self.device.reverse_list()
228 self.assertEqual('', msg.strip())
229
David Pursell19d0c232016-04-07 11:25:48 -0700230 def test_reverse_tcp_port_0(self):
231 self.assertEqual('', self.device.reverse_list().strip(),
232 'Reverse list must be empty to run this test.')
233
234 try:
235 # If resolving TCP port 0 is supported, `adb reverse` will print
236 # the actual port number.
237 port = self.device.reverse('tcp:0', 'tcp:8888').strip()
238 if not port:
239 raise unittest.SkipTest('Reversing tcp:0 is not available.')
240
241 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
242 self.device.reverse_list()))
243 finally:
244 self.device.reverse_remove_all()
245
Josh Gao49e3c632015-12-09 11:26:11 -0800246 # Note: If you run this test when adb connect'd to a physical device over
247 # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821
248 def test_forward_reverse_echo(self):
249 """Send data through adb forward and read it back via adb reverse"""
250 forward_port = 12345
251 reverse_port = forward_port + 1
Josh Gao18f74202016-03-03 14:49:02 -0800252 forward_spec = 'tcp:' + str(forward_port)
253 reverse_spec = 'tcp:' + str(reverse_port)
Josh Gao49e3c632015-12-09 11:26:11 -0800254 forward_setup = False
255 reverse_setup = False
256
257 try:
258 # listen on localhost:forward_port, connect to remote:forward_port
259 self.device.forward(forward_spec, forward_spec)
260 forward_setup = True
261 # listen on remote:forward_port, connect to localhost:reverse_port
262 self.device.reverse(forward_spec, reverse_spec)
263 reverse_setup = True
264
265 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
266 with contextlib.closing(listener):
267 # Use SO_REUSEADDR so that subsequent runs of the test can grab
268 # the port even if it is in TIME_WAIT.
269 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
270
271 # Listen on localhost:reverse_port before connecting to
272 # localhost:forward_port because that will cause adb to connect
273 # back to localhost:reverse_port.
274 listener.bind(('127.0.0.1', reverse_port))
275 listener.listen(4)
276
277 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
278 with contextlib.closing(client):
279 # Connect to the listener.
280 client.connect(('127.0.0.1', forward_port))
281
282 # Accept the client connection.
283 accepted_connection, addr = listener.accept()
284 with contextlib.closing(accepted_connection) as server:
285 data = 'hello'
286
287 # Send data into the port setup by adb forward.
288 client.sendall(data)
289 # Explicitly close() so that server gets EOF.
290 client.close()
291
292 # Verify that the data came back via adb reverse.
293 self.assertEqual(data, server.makefile().read())
294 finally:
295 if reverse_setup:
296 self.device.reverse_remove(forward_spec)
297 if forward_setup:
298 self.device.forward_remove(forward_spec)
299
300
301class ShellTest(DeviceTest):
302 def _interactive_shell(self, shell_args, input):
303 """Runs an interactive adb shell.
304
305 Args:
306 shell_args: List of string arguments to `adb shell`.
307 input: String input to send to the interactive shell.
308
309 Returns:
310 The remote exit code.
311
312 Raises:
313 unittest.SkipTest: The device doesn't support exit codes.
314 """
David Pursell4b38af42016-04-26 13:25:57 -0700315 if not self.device.has_shell_protocol():
Josh Gao49e3c632015-12-09 11:26:11 -0800316 raise unittest.SkipTest('exit codes are unavailable on this device')
317
318 proc = subprocess.Popen(
319 self.device.adb_cmd + ['shell'] + shell_args,
320 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
321 stderr=subprocess.PIPE)
322 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
323 # to explicitly add an exit command to close the session from the device
324 # side, plus the necessary newline to complete the interactive command.
325 proc.communicate(input + '; exit\n')
326 return proc.returncode
327
328 def test_cat(self):
329 """Check that we can at least cat a file."""
330 out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
331 elements = out.split()
332 self.assertEqual(len(elements), 2)
333
334 uptime, idle = elements
335 self.assertGreater(float(uptime), 0.0)
336 self.assertGreater(float(idle), 0.0)
337
338 def test_throws_on_failure(self):
339 self.assertRaises(adb.ShellError, self.device.shell, ['false'])
340
341 def test_output_not_stripped(self):
342 out = self.device.shell(['echo', 'foo'])[0]
343 self.assertEqual(out, 'foo' + self.device.linesep)
344
345 def test_shell_nocheck_failure(self):
346 rc, out, _ = self.device.shell_nocheck(['false'])
347 self.assertNotEqual(rc, 0)
348 self.assertEqual(out, '')
349
350 def test_shell_nocheck_output_not_stripped(self):
351 rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
352 self.assertEqual(rc, 0)
353 self.assertEqual(out, 'foo' + self.device.linesep)
354
355 def test_can_distinguish_tricky_results(self):
356 # If result checking on ADB shell is naively implemented as
357 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
358 # output from the result for a cmd of `echo -n 1`.
359 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
360 self.assertEqual(rc, 0)
361 self.assertEqual(out, '1')
362
363 def test_line_endings(self):
364 """Ensure that line ending translation is not happening in the pty.
365
366 Bug: http://b/19735063
367 """
368 output = self.device.shell(['uname'])[0]
369 self.assertEqual(output, 'Linux' + self.device.linesep)
370
371 def test_pty_logic(self):
372 """Tests that a PTY is allocated when it should be.
373
374 PTY allocation behavior should match ssh; some behavior requires
375 a terminal stdin to test so this test will be skipped if stdin
376 is not a terminal.
377 """
David Pursell4b38af42016-04-26 13:25:57 -0700378 if not self.device.has_shell_protocol():
Josh Gao49e3c632015-12-09 11:26:11 -0800379 raise unittest.SkipTest('PTY arguments unsupported on this device')
380 if not os.isatty(sys.stdin.fileno()):
381 raise unittest.SkipTest('PTY tests require stdin terminal')
382
383 def check_pty(args):
384 """Checks adb shell PTY allocation.
385
386 Tests |args| for terminal and non-terminal stdin.
387
388 Args:
389 args: -Tt args in a list (e.g. ['-t', '-t']).
390
391 Returns:
392 A tuple (<terminal>, <non-terminal>). True indicates
393 the corresponding shell allocated a remote PTY.
394 """
395 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
396
397 terminal = subprocess.Popen(
398 test_cmd, stdin=None,
399 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
400 terminal.communicate()
401
402 non_terminal = subprocess.Popen(
403 test_cmd, stdin=subprocess.PIPE,
404 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
405 non_terminal.communicate()
406
407 return (terminal.returncode == 0, non_terminal.returncode == 0)
408
409 # -T: never allocate PTY.
410 self.assertEqual((False, False), check_pty(['-T']))
411
412 # No args: PTY only if stdin is a terminal and shell is interactive,
413 # which is difficult to reliably test from a script.
414 self.assertEqual((False, False), check_pty([]))
415
416 # -t: PTY if stdin is a terminal.
417 self.assertEqual((True, False), check_pty(['-t']))
418
419 # -t -t: always allocate PTY.
420 self.assertEqual((True, True), check_pty(['-t', '-t']))
421
422 def test_shell_protocol(self):
423 """Tests the shell protocol on the device.
424
425 If the device supports shell protocol, this gives us the ability
426 to separate stdout/stderr and return the exit code directly.
427
428 Bug: http://b/19734861
429 """
David Pursell4b38af42016-04-26 13:25:57 -0700430 if not self.device.has_shell_protocol():
Josh Gao49e3c632015-12-09 11:26:11 -0800431 raise unittest.SkipTest('shell protocol unsupported on this device')
432
433 # Shell protocol should be used by default.
434 result = self.device.shell_nocheck(
435 shlex.split('echo foo; echo bar >&2; exit 17'))
436 self.assertEqual(17, result[0])
437 self.assertEqual('foo' + self.device.linesep, result[1])
438 self.assertEqual('bar' + self.device.linesep, result[2])
439
440 self.assertEqual(17, self._interactive_shell([], 'exit 17'))
441
442 # -x flag should disable shell protocol.
443 result = self.device.shell_nocheck(
444 shlex.split('-x echo foo; echo bar >&2; exit 17'))
445 self.assertEqual(0, result[0])
446 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
447 self.assertEqual('', result[2])
448
449 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
450
451 def test_non_interactive_sigint(self):
452 """Tests that SIGINT in a non-interactive shell kills the process.
453
454 This requires the shell protocol in order to detect the broken
455 pipe; raw data transfer mode will only see the break once the
456 subprocess tries to read or write.
457
458 Bug: http://b/23825725
459 """
David Pursell4b38af42016-04-26 13:25:57 -0700460 if not self.device.has_shell_protocol():
Josh Gao49e3c632015-12-09 11:26:11 -0800461 raise unittest.SkipTest('shell protocol unsupported on this device')
462
463 # Start a long-running process.
464 sleep_proc = subprocess.Popen(
465 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
466 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
467 stderr=subprocess.STDOUT)
468 remote_pid = sleep_proc.stdout.readline().strip()
469 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
470 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
471
472 # Verify that the process is running, send signal, verify it stopped.
473 self.device.shell(proc_query)
474 os.kill(sleep_proc.pid, signal.SIGINT)
475 sleep_proc.communicate()
476 self.assertEqual(1, self.device.shell_nocheck(proc_query)[0],
477 'subprocess failed to terminate')
478
479 def test_non_interactive_stdin(self):
480 """Tests that non-interactive shells send stdin."""
David Pursell4b38af42016-04-26 13:25:57 -0700481 if not self.device.has_shell_protocol():
Josh Gao49e3c632015-12-09 11:26:11 -0800482 raise unittest.SkipTest('non-interactive stdin unsupported '
483 'on this device')
484
485 # Test both small and large inputs.
486 small_input = 'foo'
487 large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
488 string.digits))
489
490 for input in (small_input, large_input):
491 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
492 stdin=subprocess.PIPE,
493 stdout=subprocess.PIPE,
494 stderr=subprocess.PIPE)
495 stdout, stderr = proc.communicate(input)
496 self.assertEqual(input.splitlines(), stdout.splitlines())
497 self.assertEqual('', stderr)
498
Josh Gao2eae66e2016-06-22 18:27:22 -0700499 def test_sighup(self):
500 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
501 log_path = "/data/local/tmp/adb_signal_test.log"
502
503 # Clear the output file.
504 self.device.shell_nocheck(["echo", ">", log_path])
505
506 script = """
507 trap "echo SIGINT > {path}; exit 0" SIGINT
508 trap "echo SIGHUP > {path}; exit 0" SIGHUP
509 echo Waiting
510 while true; do sleep 100; done
511 """.format(path=log_path)
512
513 script = ";".join([x.strip() for x in script.strip().splitlines()])
514
515 process = self.device.shell_popen(
516 ["sh", "-c", "'{}'".format(script)], kill_atexit=False, stdout=subprocess.PIPE)
517
518 self.assertEqual("Waiting\n", process.stdout.readline())
519 process.send_signal(signal.SIGINT)
520 process.wait()
521
522 # Waiting for the local adb to finish is insufficient, since it hangs
523 # up immediately.
524 time.sleep(0.25)
525
526 stdout, _ = self.device.shell(["cat", log_path])
527 self.assertEqual(stdout.strip(), "SIGHUP")
528
Josh Gao49e3c632015-12-09 11:26:11 -0800529
530class ArgumentEscapingTest(DeviceTest):
531 def test_shell_escaping(self):
532 """Make sure that argument escaping is somewhat sane."""
533
534 # http://b/19734868
535 # Note that this actually matches ssh(1)'s behavior --- it's
536 # converted to `sh -c echo hello; echo world` which sh interprets
537 # as `sh -c echo` (with an argument to that shell of "hello"),
538 # and then `echo world` back in the first shell.
539 result = self.device.shell(
540 shlex.split("sh -c 'echo hello; echo world'"))[0]
541 result = result.splitlines()
542 self.assertEqual(['', 'world'], result)
543 # If you really wanted "hello" and "world", here's what you'd do:
544 result = self.device.shell(
545 shlex.split(r'echo hello\;echo world'))[0].splitlines()
546 self.assertEqual(['hello', 'world'], result)
547
548 # http://b/15479704
549 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
550 self.assertEqual('t', result)
551 result = self.device.shell(
552 shlex.split("sh -c 'true && echo t'"))[0].strip()
553 self.assertEqual('t', result)
554
555 # http://b/20564385
556 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
557 self.assertEqual('t', result)
558 result = self.device.shell(
559 shlex.split(r'echo -n 123\;uname'))[0].strip()
560 self.assertEqual('123Linux', result)
561
562 def test_install_argument_escaping(self):
563 """Make sure that install argument escaping works."""
564 # http://b/20323053, http://b/3090932.
565 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
566 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
567 delete=False)
568 tf.close()
569
570 # Installing bogus .apks fails if the device supports exit codes.
571 try:
572 output = self.device.install(tf.name)
573 except subprocess.CalledProcessError as e:
574 output = e.output
575
576 self.assertIn(file_suffix, output)
577 os.remove(tf.name)
578
579
580class RootUnrootTest(DeviceTest):
581 def _test_root(self):
582 message = self.device.root()
583 if 'adbd cannot run as root in production builds' in message:
584 return
585 self.device.wait()
586 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
587
588 def _test_unroot(self):
589 self.device.unroot()
590 self.device.wait()
591 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
592
593 def test_root_unroot(self):
594 """Make sure that adb root and adb unroot work, using id(1)."""
595 if self.device.get_prop('ro.debuggable') != '1':
596 raise unittest.SkipTest('requires rootable build')
597
598 original_user = self.device.shell(['id', '-un'])[0].strip()
599 try:
600 if original_user == 'root':
601 self._test_unroot()
602 self._test_root()
603 elif original_user == 'shell':
604 self._test_root()
605 self._test_unroot()
606 finally:
607 if original_user == 'root':
608 self.device.root()
609 else:
610 self.device.unroot()
611 self.device.wait()
612
613
614class TcpIpTest(DeviceTest):
615 def test_tcpip_failure_raises(self):
616 """adb tcpip requires a port.
617
618 Bug: http://b/22636927
619 """
620 self.assertRaises(
621 subprocess.CalledProcessError, self.device.tcpip, '')
622 self.assertRaises(
623 subprocess.CalledProcessError, self.device.tcpip, 'foo')
624
625
626class SystemPropertiesTest(DeviceTest):
627 def test_get_prop(self):
628 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
629
630 @requires_root
631 def test_set_prop(self):
632 prop_name = 'foo.bar'
633 self.device.shell(['setprop', prop_name, '""'])
634
635 self.device.set_prop(prop_name, 'qux')
636 self.assertEqual(
637 self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
638
639
640def compute_md5(string):
641 hsh = hashlib.md5()
642 hsh.update(string)
643 return hsh.hexdigest()
644
645
646def get_md5_prog(device):
647 """Older platforms (pre-L) had the name md5 rather than md5sum."""
648 try:
649 device.shell(['md5sum', '/proc/uptime'])
650 return 'md5sum'
651 except adb.ShellError:
652 return 'md5'
653
654
655class HostFile(object):
656 def __init__(self, handle, checksum):
657 self.handle = handle
658 self.checksum = checksum
659 self.full_path = handle.name
660 self.base_name = os.path.basename(self.full_path)
661
662
663class DeviceFile(object):
664 def __init__(self, checksum, full_path):
665 self.checksum = checksum
666 self.full_path = full_path
667 self.base_name = posixpath.basename(self.full_path)
668
669
670def make_random_host_files(in_dir, num_files):
671 min_size = 1 * (1 << 10)
672 max_size = 16 * (1 << 10)
673
674 files = []
675 for _ in xrange(num_files):
676 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
677
678 size = random.randrange(min_size, max_size, 1024)
679 rand_str = os.urandom(size)
680 file_handle.write(rand_str)
681 file_handle.flush()
682 file_handle.close()
683
684 md5 = compute_md5(rand_str)
685 files.append(HostFile(file_handle, md5))
686 return files
687
688
689def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
690 min_size = 1 * (1 << 10)
691 max_size = 16 * (1 << 10)
692
693 files = []
694 for file_num in xrange(num_files):
695 size = random.randrange(min_size, max_size, 1024)
696
697 base_name = prefix + str(file_num)
698 full_path = posixpath.join(in_dir, base_name)
699
700 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
701 'bs={}'.format(size), 'count=1'])
702 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
703
704 files.append(DeviceFile(dev_md5, full_path))
705 return files
706
707
708class FileOperationsTest(DeviceTest):
709 SCRATCH_DIR = '/data/local/tmp'
710 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
711 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
712
713 def _verify_remote(self, checksum, remote_path):
714 dev_md5, _ = self.device.shell([get_md5_prog(self.device),
715 remote_path])[0].split()
716 self.assertEqual(checksum, dev_md5)
717
718 def _verify_local(self, checksum, local_path):
719 with open(local_path, 'rb') as host_file:
720 host_md5 = compute_md5(host_file.read())
721 self.assertEqual(host_md5, checksum)
722
723 def test_push(self):
724 """Push a randomly generated file to specified device."""
725 kbytes = 512
726 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
727 rand_str = os.urandom(1024 * kbytes)
728 tmp.write(rand_str)
729 tmp.close()
730
731 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
732 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
733
734 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
735 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
736
737 os.remove(tmp.name)
738
739 def test_push_dir(self):
740 """Push a randomly generated directory of files to the device."""
741 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
742 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
743
744 try:
745 host_dir = tempfile.mkdtemp()
746
747 # Make sure the temp directory isn't setuid, or else adb will complain.
748 os.chmod(host_dir, 0o700)
749
750 # Create 32 random files.
751 temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
752 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
753
754 for temp_file in temp_files:
755 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
756 os.path.basename(host_dir),
757 temp_file.base_name)
758 self._verify_remote(temp_file.checksum, remote_path)
759 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
760 finally:
761 if host_dir is not None:
762 shutil.rmtree(host_dir)
763
764 @unittest.expectedFailure # b/25566053
765 def test_push_empty(self):
766 """Push a directory containing an empty directory to the device."""
767 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
768 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
769
770 try:
771 host_dir = tempfile.mkdtemp()
772
773 # Make sure the temp directory isn't setuid, or else adb will complain.
774 os.chmod(host_dir, 0o700)
775
776 # Create an empty directory.
777 os.mkdir(os.path.join(host_dir, 'empty'))
778
779 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
780
781 test_empty_cmd = ['[', '-d',
782 os.path.join(self.DEVICE_TEMP_DIR, 'empty')]
783 rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
784 self.assertEqual(rc, 0)
785 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
786 finally:
787 if host_dir is not None:
788 shutil.rmtree(host_dir)
789
790 def test_multiple_push(self):
791 """Push multiple files to the device in one adb push command.
792
793 Bug: http://b/25324823
794 """
795
796 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
797 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
798
799 try:
800 host_dir = tempfile.mkdtemp()
801
802 # Create some random files and a subdirectory containing more files.
803 temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
804
Josh Gao18f74202016-03-03 14:49:02 -0800805 subdir = os.path.join(host_dir, 'subdir')
Josh Gao49e3c632015-12-09 11:26:11 -0800806 os.mkdir(subdir)
807 subdir_temp_files = make_random_host_files(in_dir=subdir,
808 num_files=4)
809
810 paths = map(lambda temp_file: temp_file.full_path, temp_files)
811 paths.append(subdir)
812 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
813
814 for temp_file in temp_files:
815 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
816 temp_file.base_name)
817 self._verify_remote(temp_file.checksum, remote_path)
818
819 for subdir_temp_file in subdir_temp_files:
820 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
821 # BROKEN: http://b/25394682
Josh Gao18f74202016-03-03 14:49:02 -0800822 # 'subdir';
Josh Gao49e3c632015-12-09 11:26:11 -0800823 temp_file.base_name)
824 self._verify_remote(temp_file.checksum, remote_path)
825
826
827 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
828 finally:
829 if host_dir is not None:
830 shutil.rmtree(host_dir)
831
Josh Gaoa53abe72016-02-19 15:55:55 -0800832 @requires_non_root
833 def test_push_error_reporting(self):
834 """Make sure that errors that occur while pushing a file get reported
835
836 Bug: http://b/26816782
837 """
838 with tempfile.NamedTemporaryFile() as tmp_file:
839 tmp_file.write('\0' * 1024 * 1024)
840 tmp_file.flush()
841 try:
842 self.device.push(local=tmp_file.name, remote='/system/')
Josh Gao18f74202016-03-03 14:49:02 -0800843 self.fail('push should not have succeeded')
Josh Gaoa53abe72016-02-19 15:55:55 -0800844 except subprocess.CalledProcessError as e:
845 output = e.output
846
Josh Gao18f74202016-03-03 14:49:02 -0800847 self.assertIn('Permission denied', output)
Josh Gao49e3c632015-12-09 11:26:11 -0800848
849 def _test_pull(self, remote_file, checksum):
850 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
851 tmp_write.close()
852 self.device.pull(remote=remote_file, local=tmp_write.name)
853 with open(tmp_write.name, 'rb') as tmp_read:
854 host_contents = tmp_read.read()
855 host_md5 = compute_md5(host_contents)
856 self.assertEqual(checksum, host_md5)
857 os.remove(tmp_write.name)
858
859 @requires_non_root
860 def test_pull_error_reporting(self):
861 self.device.shell(['touch', self.DEVICE_TEMP_FILE])
862 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
863
864 try:
865 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
866 except subprocess.CalledProcessError as e:
867 output = e.output
868
869 self.assertIn('Permission denied', output)
870
871 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
872
873 def test_pull(self):
874 """Pull a randomly generated file from specified device."""
875 kbytes = 512
876 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
877 cmd = ['dd', 'if=/dev/urandom',
878 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
879 'count={}'.format(kbytes)]
880 self.device.shell(cmd)
881 dev_md5, _ = self.device.shell(
882 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
883 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
884 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
885
886 def test_pull_dir(self):
887 """Pull a randomly generated directory of files from the device."""
888 try:
889 host_dir = tempfile.mkdtemp()
890
891 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
892 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
893
894 # Populate device directory with random files.
895 temp_files = make_random_device_files(
896 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
897
898 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
899
900 for temp_file in temp_files:
Josh Gao38752792015-12-09 14:20:23 -0800901 host_path = os.path.join(
902 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
903 temp_file.base_name)
904 self._verify_local(temp_file.checksum, host_path)
Josh Gao49e3c632015-12-09 11:26:11 -0800905
906 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
907 finally:
908 if host_dir is not None:
909 shutil.rmtree(host_dir)
910
Josh Gao49726bc2016-02-26 13:26:55 -0800911 def test_pull_dir_symlink(self):
912 """Pull a directory into a symlink to a directory.
913
914 Bug: http://b/27362811
915 """
Josh Gao18f74202016-03-03 14:49:02 -0800916 if os.name != 'posix':
Josh Gao49726bc2016-02-26 13:26:55 -0800917 raise unittest.SkipTest('requires POSIX')
918
919 try:
920 host_dir = tempfile.mkdtemp()
Josh Gao18f74202016-03-03 14:49:02 -0800921 real_dir = os.path.join(host_dir, 'dir')
922 symlink = os.path.join(host_dir, 'symlink')
Josh Gao49726bc2016-02-26 13:26:55 -0800923 os.mkdir(real_dir)
924 os.symlink(real_dir, symlink)
925
926 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
927 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
928
929 # Populate device directory with random files.
930 temp_files = make_random_device_files(
931 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
932
933 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
934
935 for temp_file in temp_files:
936 host_path = os.path.join(
937 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
938 temp_file.base_name)
939 self._verify_local(temp_file.checksum, host_path)
940
941 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
942 finally:
943 if host_dir is not None:
944 shutil.rmtree(host_dir)
945
946 def test_pull_dir_symlink_collision(self):
947 """Pull a directory into a colliding symlink to directory."""
Josh Gao18f74202016-03-03 14:49:02 -0800948 if os.name != 'posix':
Josh Gao49726bc2016-02-26 13:26:55 -0800949 raise unittest.SkipTest('requires POSIX')
950
951 try:
952 host_dir = tempfile.mkdtemp()
Josh Gao18f74202016-03-03 14:49:02 -0800953 real_dir = os.path.join(host_dir, 'real')
Josh Gao49726bc2016-02-26 13:26:55 -0800954 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
955 symlink = os.path.join(host_dir, tmp_dirname)
956 os.mkdir(real_dir)
957 os.symlink(real_dir, symlink)
958
959 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
960 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
961
962 # Populate device directory with random files.
963 temp_files = make_random_device_files(
964 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
965
966 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
967
968 for temp_file in temp_files:
969 host_path = os.path.join(real_dir, temp_file.base_name)
970 self._verify_local(temp_file.checksum, host_path)
971
972 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
973 finally:
974 if host_dir is not None:
975 shutil.rmtree(host_dir)
976
Josh Gaoa842b382016-03-02 16:00:02 -0800977 def test_pull_dir_nonexistent(self):
978 """Pull a directory of files from the device to a nonexistent path."""
979 try:
980 host_dir = tempfile.mkdtemp()
981 dest_dir = os.path.join(host_dir, 'dest')
982
983 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
984 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
985
986 # Populate device directory with random files.
987 temp_files = make_random_device_files(
988 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
989
990 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
991
992 for temp_file in temp_files:
993 host_path = os.path.join(dest_dir, temp_file.base_name)
994 self._verify_local(temp_file.checksum, host_path)
995
996 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
997 finally:
998 if host_dir is not None:
999 shutil.rmtree(host_dir)
1000
Josh Gaod9a2fd62015-12-09 14:03:30 -08001001 def test_pull_symlink_dir(self):
1002 """Pull a symlink to a directory of symlinks to files."""
1003 try:
1004 host_dir = tempfile.mkdtemp()
1005
1006 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1007 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1008 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1009
1010 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1011 self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1012 self.device.shell(['ln', '-s', remote_links, remote_symlink])
1013
1014 # Populate device directory with random files.
1015 temp_files = make_random_device_files(
1016 self.device, in_dir=remote_dir, num_files=32)
1017
1018 for temp_file in temp_files:
1019 self.device.shell(
1020 ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1021 posixpath.join(remote_links, temp_file.base_name)])
1022
1023 self.device.pull(remote=remote_symlink, local=host_dir)
1024
1025 for temp_file in temp_files:
1026 host_path = os.path.join(
1027 host_dir, 'symlink', temp_file.base_name)
1028 self._verify_local(temp_file.checksum, host_path)
1029
1030 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1031 finally:
1032 if host_dir is not None:
1033 shutil.rmtree(host_dir)
1034
Josh Gao49e3c632015-12-09 11:26:11 -08001035 def test_pull_empty(self):
1036 """Pull a directory containing an empty directory from the device."""
1037 try:
1038 host_dir = tempfile.mkdtemp()
1039
1040 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1041 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1042 self.device.shell(['mkdir', '-p', remote_empty_path])
1043
1044 self.device.pull(remote=remote_empty_path, local=host_dir)
1045 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1046 finally:
1047 if host_dir is not None:
1048 shutil.rmtree(host_dir)
1049
1050 def test_multiple_pull(self):
1051 """Pull a randomly generated directory of files from the device."""
1052
1053 try:
1054 host_dir = tempfile.mkdtemp()
1055
Josh Gao18f74202016-03-03 14:49:02 -08001056 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
Josh Gao49e3c632015-12-09 11:26:11 -08001057 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1058 self.device.shell(['mkdir', '-p', subdir])
1059
1060 # Create some random files and a subdirectory containing more files.
1061 temp_files = make_random_device_files(
1062 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1063
1064 subdir_temp_files = make_random_device_files(
1065 self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1066
1067 paths = map(lambda temp_file: temp_file.full_path, temp_files)
1068 paths.append(subdir)
1069 self.device._simple_call(['pull'] + paths + [host_dir])
1070
1071 for temp_file in temp_files:
1072 local_path = os.path.join(host_dir, temp_file.base_name)
1073 self._verify_local(temp_file.checksum, local_path)
1074
1075 for subdir_temp_file in subdir_temp_files:
1076 local_path = os.path.join(host_dir,
Josh Gao18f74202016-03-03 14:49:02 -08001077 'subdir',
Josh Gao49e3c632015-12-09 11:26:11 -08001078 subdir_temp_file.base_name)
1079 self._verify_local(subdir_temp_file.checksum, local_path)
1080
1081 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1082 finally:
1083 if host_dir is not None:
1084 shutil.rmtree(host_dir)
1085
1086 def test_sync(self):
1087 """Sync a randomly generated directory of files to specified device."""
1088
1089 try:
1090 base_dir = tempfile.mkdtemp()
1091
1092 # Create mirror device directory hierarchy within base_dir.
1093 full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1094 os.makedirs(full_dir_path)
1095
1096 # Create 32 random files within the host mirror.
1097 temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
1098
1099 # Clean up any trash on the device.
1100 device = adb.get_device(product=base_dir)
1101 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1102
1103 device.sync('data')
1104
1105 # Confirm that every file on the device mirrors that on the host.
1106 for temp_file in temp_files:
1107 device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
1108 temp_file.base_name)
1109 dev_md5, _ = device.shell(
1110 [get_md5_prog(self.device), device_full_path])[0].split()
1111 self.assertEqual(temp_file.checksum, dev_md5)
1112
1113 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1114 finally:
1115 if base_dir is not None:
1116 shutil.rmtree(base_dir)
1117
1118 def test_unicode_paths(self):
1119 """Ensure that we can support non-ASCII paths, even on Windows."""
1120 name = u'로보카 폴리'
1121
1122 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1123 remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1124
1125 ## push.
1126 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1127 tf.close()
1128 self.device.push(tf.name, remote_path)
1129 os.remove(tf.name)
1130 self.assertFalse(os.path.exists(tf.name))
1131
1132 # Verify that the device ended up with the expected UTF-8 path
1133 output = self.device.shell(
1134 ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1135 self.assertEqual(remote_path.encode('utf-8'), output)
1136
1137 # pull.
1138 self.device.pull(remote_path, tf.name)
1139 self.assertTrue(os.path.exists(tf.name))
1140 os.remove(tf.name)
1141 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1142
1143
1144def main():
1145 random.seed(0)
1146 if len(adb.get_devices()) > 0:
1147 suite = unittest.TestLoader().loadTestsFromName(__name__)
1148 unittest.TextTestRunner(verbosity=3).run(suite)
1149 else:
1150 print('Test suite must be run with attached devices')
1151
1152
1153if __name__ == '__main__':
1154 main()