blob: b12bf88b5356ab0067f99b33be3938b349dd02c5 [file] [log] [blame]
Josh Gao191c1542015-12-09 11:26:11 -08001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
Josh Gaofe50bb72016-06-22 18:27:22 -070034import time
Josh Gao191c1542015-12-09 11:26:11 -080035import unittest
36
37import mock
38
39import adb
40
41
42def requires_root(func):
43 def wrapper(self, *args):
44 if self.device.get_prop('ro.debuggable') != '1':
45 raise unittest.SkipTest('requires rootable build')
46
47 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
48 if not was_root:
49 self.device.root()
50 self.device.wait()
51
52 try:
53 func(self, *args)
54 finally:
55 if not was_root:
56 self.device.unroot()
57 self.device.wait()
58
59 return wrapper
60
61
62def requires_non_root(func):
63 def wrapper(self, *args):
64 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
65 if was_root:
66 self.device.unroot()
67 self.device.wait()
68
69 try:
70 func(self, *args)
71 finally:
72 if was_root:
73 self.device.root()
74 self.device.wait()
75
76 return wrapper
77
78
79class GetDeviceTest(unittest.TestCase):
80 def setUp(self):
81 self.android_serial = os.getenv('ANDROID_SERIAL')
82 if 'ANDROID_SERIAL' in os.environ:
83 del os.environ['ANDROID_SERIAL']
84
85 def tearDown(self):
86 if self.android_serial is not None:
87 os.environ['ANDROID_SERIAL'] = self.android_serial
88 else:
89 if 'ANDROID_SERIAL' in os.environ:
90 del os.environ['ANDROID_SERIAL']
91
92 @mock.patch('adb.device.get_devices')
93 def test_explicit(self, mock_get_devices):
94 mock_get_devices.return_value = ['foo', 'bar']
95 device = adb.get_device('foo')
96 self.assertEqual(device.serial, 'foo')
97
98 @mock.patch('adb.device.get_devices')
99 def test_from_env(self, mock_get_devices):
100 mock_get_devices.return_value = ['foo', 'bar']
101 os.environ['ANDROID_SERIAL'] = 'foo'
102 device = adb.get_device()
103 self.assertEqual(device.serial, 'foo')
104
105 @mock.patch('adb.device.get_devices')
106 def test_arg_beats_env(self, mock_get_devices):
107 mock_get_devices.return_value = ['foo', 'bar']
108 os.environ['ANDROID_SERIAL'] = 'bar'
109 device = adb.get_device('foo')
110 self.assertEqual(device.serial, 'foo')
111
112 @mock.patch('adb.device.get_devices')
113 def test_no_such_device(self, mock_get_devices):
114 mock_get_devices.return_value = ['foo', 'bar']
115 self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
116
117 os.environ['ANDROID_SERIAL'] = 'baz'
118 self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
119
120 @mock.patch('adb.device.get_devices')
121 def test_unique_device(self, mock_get_devices):
122 mock_get_devices.return_value = ['foo']
123 device = adb.get_device()
124 self.assertEqual(device.serial, 'foo')
125
126 @mock.patch('adb.device.get_devices')
127 def test_no_unique_device(self, mock_get_devices):
128 mock_get_devices.return_value = ['foo', 'bar']
129 self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
130
131
132class DeviceTest(unittest.TestCase):
133 def setUp(self):
134 self.device = adb.get_device()
135
136
137class ForwardReverseTest(DeviceTest):
138 def _test_no_rebind(self, description, direction_list, direction,
139 direction_no_rebind, direction_remove_all):
140 msg = direction_list()
141 self.assertEqual('', msg.strip(),
142 description + ' list must be empty to run this test.')
143
144 # Use --no-rebind with no existing binding
145 direction_no_rebind('tcp:5566', 'tcp:6655')
146 msg = direction_list()
147 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
148
149 # Use --no-rebind with existing binding
150 with self.assertRaises(subprocess.CalledProcessError):
151 direction_no_rebind('tcp:5566', 'tcp:6677')
152 msg = direction_list()
153 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
154 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
155
156 # Use the absence of --no-rebind with existing binding
157 direction('tcp:5566', 'tcp:6677')
158 msg = direction_list()
159 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
160 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
161
162 direction_remove_all()
163 msg = direction_list()
164 self.assertEqual('', msg.strip())
165
166 def test_forward_no_rebind(self):
167 self._test_no_rebind('forward', self.device.forward_list,
168 self.device.forward, self.device.forward_no_rebind,
169 self.device.forward_remove_all)
170
171 def test_reverse_no_rebind(self):
172 self._test_no_rebind('reverse', self.device.reverse_list,
173 self.device.reverse, self.device.reverse_no_rebind,
174 self.device.reverse_remove_all)
175
176 def test_forward(self):
177 msg = self.device.forward_list()
178 self.assertEqual('', msg.strip(),
179 'Forwarding list must be empty to run this test.')
180 self.device.forward('tcp:5566', 'tcp:6655')
181 msg = self.device.forward_list()
182 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
183 self.device.forward('tcp:7788', 'tcp:8877')
184 msg = self.device.forward_list()
185 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
186 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
187 self.device.forward_remove('tcp:5566')
188 msg = self.device.forward_list()
189 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
190 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
191 self.device.forward_remove_all()
192 msg = self.device.forward_list()
193 self.assertEqual('', msg.strip())
194
David Purselleaae97e2016-04-07 11:25:48 -0700195 def test_forward_tcp_port_0(self):
196 self.assertEqual('', self.device.forward_list().strip(),
197 'Forwarding list must be empty to run this test.')
198
199 try:
200 # If resolving TCP port 0 is supported, `adb forward` will print
201 # the actual port number.
202 port = self.device.forward('tcp:0', 'tcp:8888').strip()
203 if not port:
204 raise unittest.SkipTest('Forwarding tcp:0 is not available.')
205
206 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
207 self.device.forward_list()))
208 finally:
209 self.device.forward_remove_all()
210
Josh Gao191c1542015-12-09 11:26:11 -0800211 def test_reverse(self):
212 msg = self.device.reverse_list()
213 self.assertEqual('', msg.strip(),
214 'Reverse forwarding list must be empty to run this test.')
215 self.device.reverse('tcp:5566', 'tcp:6655')
216 msg = self.device.reverse_list()
217 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
218 self.device.reverse('tcp:7788', 'tcp:8877')
219 msg = self.device.reverse_list()
220 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
221 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
222 self.device.reverse_remove('tcp:5566')
223 msg = self.device.reverse_list()
224 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
225 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
226 self.device.reverse_remove_all()
227 msg = self.device.reverse_list()
228 self.assertEqual('', msg.strip())
229
David Purselleaae97e2016-04-07 11:25:48 -0700230 def test_reverse_tcp_port_0(self):
231 self.assertEqual('', self.device.reverse_list().strip(),
232 'Reverse list must be empty to run this test.')
233
234 try:
235 # If resolving TCP port 0 is supported, `adb reverse` will print
236 # the actual port number.
237 port = self.device.reverse('tcp:0', 'tcp:8888').strip()
238 if not port:
239 raise unittest.SkipTest('Reversing tcp:0 is not available.')
240
241 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
242 self.device.reverse_list()))
243 finally:
244 self.device.reverse_remove_all()
245
Josh Gao191c1542015-12-09 11:26:11 -0800246 # Note: If you run this test when adb connect'd to a physical device over
247 # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821
248 def test_forward_reverse_echo(self):
249 """Send data through adb forward and read it back via adb reverse"""
250 forward_port = 12345
251 reverse_port = forward_port + 1
Josh Gao255c5c82016-03-03 14:49:02 -0800252 forward_spec = 'tcp:' + str(forward_port)
253 reverse_spec = 'tcp:' + str(reverse_port)
Josh Gao191c1542015-12-09 11:26:11 -0800254 forward_setup = False
255 reverse_setup = False
256
257 try:
258 # listen on localhost:forward_port, connect to remote:forward_port
259 self.device.forward(forward_spec, forward_spec)
260 forward_setup = True
261 # listen on remote:forward_port, connect to localhost:reverse_port
262 self.device.reverse(forward_spec, reverse_spec)
263 reverse_setup = True
264
265 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
266 with contextlib.closing(listener):
267 # Use SO_REUSEADDR so that subsequent runs of the test can grab
268 # the port even if it is in TIME_WAIT.
269 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
270
271 # Listen on localhost:reverse_port before connecting to
272 # localhost:forward_port because that will cause adb to connect
273 # back to localhost:reverse_port.
274 listener.bind(('127.0.0.1', reverse_port))
275 listener.listen(4)
276
277 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
278 with contextlib.closing(client):
279 # Connect to the listener.
280 client.connect(('127.0.0.1', forward_port))
281
282 # Accept the client connection.
283 accepted_connection, addr = listener.accept()
284 with contextlib.closing(accepted_connection) as server:
285 data = 'hello'
286
287 # Send data into the port setup by adb forward.
288 client.sendall(data)
289 # Explicitly close() so that server gets EOF.
290 client.close()
291
292 # Verify that the data came back via adb reverse.
293 self.assertEqual(data, server.makefile().read())
294 finally:
295 if reverse_setup:
296 self.device.reverse_remove(forward_spec)
297 if forward_setup:
298 self.device.forward_remove(forward_spec)
299
300
301class ShellTest(DeviceTest):
302 def _interactive_shell(self, shell_args, input):
303 """Runs an interactive adb shell.
304
305 Args:
306 shell_args: List of string arguments to `adb shell`.
307 input: String input to send to the interactive shell.
308
309 Returns:
310 The remote exit code.
311
312 Raises:
313 unittest.SkipTest: The device doesn't support exit codes.
314 """
David Pursellcf467412016-04-26 13:25:57 -0700315 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800316 raise unittest.SkipTest('exit codes are unavailable on this device')
317
318 proc = subprocess.Popen(
319 self.device.adb_cmd + ['shell'] + shell_args,
320 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
321 stderr=subprocess.PIPE)
322 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
323 # to explicitly add an exit command to close the session from the device
324 # side, plus the necessary newline to complete the interactive command.
325 proc.communicate(input + '; exit\n')
326 return proc.returncode
327
328 def test_cat(self):
329 """Check that we can at least cat a file."""
330 out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
331 elements = out.split()
332 self.assertEqual(len(elements), 2)
333
334 uptime, idle = elements
335 self.assertGreater(float(uptime), 0.0)
336 self.assertGreater(float(idle), 0.0)
337
338 def test_throws_on_failure(self):
339 self.assertRaises(adb.ShellError, self.device.shell, ['false'])
340
341 def test_output_not_stripped(self):
342 out = self.device.shell(['echo', 'foo'])[0]
343 self.assertEqual(out, 'foo' + self.device.linesep)
344
345 def test_shell_nocheck_failure(self):
346 rc, out, _ = self.device.shell_nocheck(['false'])
347 self.assertNotEqual(rc, 0)
348 self.assertEqual(out, '')
349
350 def test_shell_nocheck_output_not_stripped(self):
351 rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
352 self.assertEqual(rc, 0)
353 self.assertEqual(out, 'foo' + self.device.linesep)
354
355 def test_can_distinguish_tricky_results(self):
356 # If result checking on ADB shell is naively implemented as
357 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
358 # output from the result for a cmd of `echo -n 1`.
359 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
360 self.assertEqual(rc, 0)
361 self.assertEqual(out, '1')
362
363 def test_line_endings(self):
364 """Ensure that line ending translation is not happening in the pty.
365
366 Bug: http://b/19735063
367 """
368 output = self.device.shell(['uname'])[0]
369 self.assertEqual(output, 'Linux' + self.device.linesep)
370
371 def test_pty_logic(self):
372 """Tests that a PTY is allocated when it should be.
373
374 PTY allocation behavior should match ssh; some behavior requires
375 a terminal stdin to test so this test will be skipped if stdin
376 is not a terminal.
377 """
David Pursellcf467412016-04-26 13:25:57 -0700378 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800379 raise unittest.SkipTest('PTY arguments unsupported on this device')
380 if not os.isatty(sys.stdin.fileno()):
381 raise unittest.SkipTest('PTY tests require stdin terminal')
382
383 def check_pty(args):
384 """Checks adb shell PTY allocation.
385
386 Tests |args| for terminal and non-terminal stdin.
387
388 Args:
389 args: -Tt args in a list (e.g. ['-t', '-t']).
390
391 Returns:
392 A tuple (<terminal>, <non-terminal>). True indicates
393 the corresponding shell allocated a remote PTY.
394 """
395 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
396
397 terminal = subprocess.Popen(
398 test_cmd, stdin=None,
399 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
400 terminal.communicate()
401
402 non_terminal = subprocess.Popen(
403 test_cmd, stdin=subprocess.PIPE,
404 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
405 non_terminal.communicate()
406
407 return (terminal.returncode == 0, non_terminal.returncode == 0)
408
409 # -T: never allocate PTY.
410 self.assertEqual((False, False), check_pty(['-T']))
411
412 # No args: PTY only if stdin is a terminal and shell is interactive,
413 # which is difficult to reliably test from a script.
414 self.assertEqual((False, False), check_pty([]))
415
416 # -t: PTY if stdin is a terminal.
417 self.assertEqual((True, False), check_pty(['-t']))
418
419 # -t -t: always allocate PTY.
420 self.assertEqual((True, True), check_pty(['-t', '-t']))
421
422 def test_shell_protocol(self):
423 """Tests the shell protocol on the device.
424
425 If the device supports shell protocol, this gives us the ability
426 to separate stdout/stderr and return the exit code directly.
427
428 Bug: http://b/19734861
429 """
David Pursellcf467412016-04-26 13:25:57 -0700430 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800431 raise unittest.SkipTest('shell protocol unsupported on this device')
432
433 # Shell protocol should be used by default.
434 result = self.device.shell_nocheck(
435 shlex.split('echo foo; echo bar >&2; exit 17'))
436 self.assertEqual(17, result[0])
437 self.assertEqual('foo' + self.device.linesep, result[1])
438 self.assertEqual('bar' + self.device.linesep, result[2])
439
440 self.assertEqual(17, self._interactive_shell([], 'exit 17'))
441
442 # -x flag should disable shell protocol.
443 result = self.device.shell_nocheck(
444 shlex.split('-x echo foo; echo bar >&2; exit 17'))
445 self.assertEqual(0, result[0])
446 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
447 self.assertEqual('', result[2])
448
449 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
450
451 def test_non_interactive_sigint(self):
452 """Tests that SIGINT in a non-interactive shell kills the process.
453
454 This requires the shell protocol in order to detect the broken
455 pipe; raw data transfer mode will only see the break once the
456 subprocess tries to read or write.
457
458 Bug: http://b/23825725
459 """
David Pursellcf467412016-04-26 13:25:57 -0700460 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800461 raise unittest.SkipTest('shell protocol unsupported on this device')
462
463 # Start a long-running process.
464 sleep_proc = subprocess.Popen(
465 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
466 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
467 stderr=subprocess.STDOUT)
468 remote_pid = sleep_proc.stdout.readline().strip()
469 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
470 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
471
472 # Verify that the process is running, send signal, verify it stopped.
473 self.device.shell(proc_query)
474 os.kill(sleep_proc.pid, signal.SIGINT)
475 sleep_proc.communicate()
476 self.assertEqual(1, self.device.shell_nocheck(proc_query)[0],
477 'subprocess failed to terminate')
478
479 def test_non_interactive_stdin(self):
480 """Tests that non-interactive shells send stdin."""
David Pursellcf467412016-04-26 13:25:57 -0700481 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800482 raise unittest.SkipTest('non-interactive stdin unsupported '
483 'on this device')
484
485 # Test both small and large inputs.
486 small_input = 'foo'
487 large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
488 string.digits))
489
490 for input in (small_input, large_input):
491 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
492 stdin=subprocess.PIPE,
493 stdout=subprocess.PIPE,
494 stderr=subprocess.PIPE)
495 stdout, stderr = proc.communicate(input)
496 self.assertEqual(input.splitlines(), stdout.splitlines())
497 self.assertEqual('', stderr)
498
Josh Gaofe50bb72016-06-22 18:27:22 -0700499 def test_sighup(self):
500 """Ensure that SIGHUP gets sent upon non-interactive ctrl-c"""
501 log_path = "/data/local/tmp/adb_signal_test.log"
502
503 # Clear the output file.
504 self.device.shell_nocheck(["echo", ">", log_path])
505
506 script = """
507 trap "echo SIGINT > {path}; exit 0" SIGINT
508 trap "echo SIGHUP > {path}; exit 0" SIGHUP
509 echo Waiting
510 while true; do sleep 100; done
511 """.format(path=log_path)
512
513 script = ";".join([x.strip() for x in script.strip().splitlines()])
514
515 process = self.device.shell_popen(
516 ["sh", "-c", "'{}'".format(script)], kill_atexit=False, stdout=subprocess.PIPE)
517
518 self.assertEqual("Waiting\n", process.stdout.readline())
519 process.send_signal(signal.SIGINT)
520 process.wait()
521
522 # Waiting for the local adb to finish is insufficient, since it hangs
523 # up immediately.
524 time.sleep(0.25)
525
526 stdout, _ = self.device.shell(["cat", log_path])
527 self.assertEqual(stdout.strip(), "SIGHUP")
528
Josh Gao191c1542015-12-09 11:26:11 -0800529
530class ArgumentEscapingTest(DeviceTest):
531 def test_shell_escaping(self):
532 """Make sure that argument escaping is somewhat sane."""
533
534 # http://b/19734868
535 # Note that this actually matches ssh(1)'s behavior --- it's
536 # converted to `sh -c echo hello; echo world` which sh interprets
537 # as `sh -c echo` (with an argument to that shell of "hello"),
538 # and then `echo world` back in the first shell.
539 result = self.device.shell(
540 shlex.split("sh -c 'echo hello; echo world'"))[0]
541 result = result.splitlines()
542 self.assertEqual(['', 'world'], result)
543 # If you really wanted "hello" and "world", here's what you'd do:
544 result = self.device.shell(
545 shlex.split(r'echo hello\;echo world'))[0].splitlines()
546 self.assertEqual(['hello', 'world'], result)
547
548 # http://b/15479704
549 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
550 self.assertEqual('t', result)
551 result = self.device.shell(
552 shlex.split("sh -c 'true && echo t'"))[0].strip()
553 self.assertEqual('t', result)
554
555 # http://b/20564385
556 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
557 self.assertEqual('t', result)
558 result = self.device.shell(
559 shlex.split(r'echo -n 123\;uname'))[0].strip()
560 self.assertEqual('123Linux', result)
561
562 def test_install_argument_escaping(self):
563 """Make sure that install argument escaping works."""
564 # http://b/20323053, http://b/3090932.
565 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
566 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
567 delete=False)
568 tf.close()
569
570 # Installing bogus .apks fails if the device supports exit codes.
571 try:
572 output = self.device.install(tf.name)
573 except subprocess.CalledProcessError as e:
574 output = e.output
575
576 self.assertIn(file_suffix, output)
577 os.remove(tf.name)
578
579
580class RootUnrootTest(DeviceTest):
581 def _test_root(self):
582 message = self.device.root()
583 if 'adbd cannot run as root in production builds' in message:
584 return
585 self.device.wait()
586 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
587
588 def _test_unroot(self):
589 self.device.unroot()
590 self.device.wait()
591 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
592
593 def test_root_unroot(self):
594 """Make sure that adb root and adb unroot work, using id(1)."""
595 if self.device.get_prop('ro.debuggable') != '1':
596 raise unittest.SkipTest('requires rootable build')
597
598 original_user = self.device.shell(['id', '-un'])[0].strip()
599 try:
600 if original_user == 'root':
601 self._test_unroot()
602 self._test_root()
603 elif original_user == 'shell':
604 self._test_root()
605 self._test_unroot()
606 finally:
607 if original_user == 'root':
608 self.device.root()
609 else:
610 self.device.unroot()
611 self.device.wait()
612
613
614class TcpIpTest(DeviceTest):
615 def test_tcpip_failure_raises(self):
616 """adb tcpip requires a port.
617
618 Bug: http://b/22636927
619 """
620 self.assertRaises(
621 subprocess.CalledProcessError, self.device.tcpip, '')
622 self.assertRaises(
623 subprocess.CalledProcessError, self.device.tcpip, 'foo')
624
625
626class SystemPropertiesTest(DeviceTest):
627 def test_get_prop(self):
628 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
629
630 @requires_root
631 def test_set_prop(self):
632 prop_name = 'foo.bar'
633 self.device.shell(['setprop', prop_name, '""'])
634
635 self.device.set_prop(prop_name, 'qux')
636 self.assertEqual(
637 self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
638
639
640def compute_md5(string):
641 hsh = hashlib.md5()
642 hsh.update(string)
643 return hsh.hexdigest()
644
645
646def get_md5_prog(device):
647 """Older platforms (pre-L) had the name md5 rather than md5sum."""
648 try:
649 device.shell(['md5sum', '/proc/uptime'])
650 return 'md5sum'
651 except adb.ShellError:
652 return 'md5'
653
654
655class HostFile(object):
656 def __init__(self, handle, checksum):
657 self.handle = handle
658 self.checksum = checksum
659 self.full_path = handle.name
660 self.base_name = os.path.basename(self.full_path)
661
662
663class DeviceFile(object):
664 def __init__(self, checksum, full_path):
665 self.checksum = checksum
666 self.full_path = full_path
667 self.base_name = posixpath.basename(self.full_path)
668
669
670def make_random_host_files(in_dir, num_files):
671 min_size = 1 * (1 << 10)
672 max_size = 16 * (1 << 10)
673
674 files = []
675 for _ in xrange(num_files):
676 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
677
678 size = random.randrange(min_size, max_size, 1024)
679 rand_str = os.urandom(size)
680 file_handle.write(rand_str)
681 file_handle.flush()
682 file_handle.close()
683
684 md5 = compute_md5(rand_str)
685 files.append(HostFile(file_handle, md5))
686 return files
687
688
689def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
690 min_size = 1 * (1 << 10)
691 max_size = 16 * (1 << 10)
692
693 files = []
694 for file_num in xrange(num_files):
695 size = random.randrange(min_size, max_size, 1024)
696
697 base_name = prefix + str(file_num)
698 full_path = posixpath.join(in_dir, base_name)
699
700 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
701 'bs={}'.format(size), 'count=1'])
702 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
703
704 files.append(DeviceFile(dev_md5, full_path))
705 return files
706
707
708class FileOperationsTest(DeviceTest):
709 SCRATCH_DIR = '/data/local/tmp'
710 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
711 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
712
713 def _verify_remote(self, checksum, remote_path):
714 dev_md5, _ = self.device.shell([get_md5_prog(self.device),
715 remote_path])[0].split()
716 self.assertEqual(checksum, dev_md5)
717
718 def _verify_local(self, checksum, local_path):
719 with open(local_path, 'rb') as host_file:
720 host_md5 = compute_md5(host_file.read())
721 self.assertEqual(host_md5, checksum)
722
723 def test_push(self):
724 """Push a randomly generated file to specified device."""
725 kbytes = 512
726 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
727 rand_str = os.urandom(1024 * kbytes)
728 tmp.write(rand_str)
729 tmp.close()
730
731 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
732 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
733
734 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
735 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
736
737 os.remove(tmp.name)
738
739 def test_push_dir(self):
740 """Push a randomly generated directory of files to the device."""
741 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
742 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
743
744 try:
745 host_dir = tempfile.mkdtemp()
746
747 # Make sure the temp directory isn't setuid, or else adb will complain.
748 os.chmod(host_dir, 0o700)
749
750 # Create 32 random files.
751 temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
752 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
753
754 for temp_file in temp_files:
755 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
756 os.path.basename(host_dir),
757 temp_file.base_name)
758 self._verify_remote(temp_file.checksum, remote_path)
759 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
760 finally:
761 if host_dir is not None:
762 shutil.rmtree(host_dir)
763
764 @unittest.expectedFailure # b/25566053
765 def test_push_empty(self):
766 """Push a directory containing an empty directory to the device."""
767 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
768 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
769
770 try:
771 host_dir = tempfile.mkdtemp()
772
773 # Make sure the temp directory isn't setuid, or else adb will complain.
774 os.chmod(host_dir, 0o700)
775
776 # Create an empty directory.
777 os.mkdir(os.path.join(host_dir, 'empty'))
778
779 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
780
781 test_empty_cmd = ['[', '-d',
782 os.path.join(self.DEVICE_TEMP_DIR, 'empty')]
783 rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
784 self.assertEqual(rc, 0)
785 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
786 finally:
787 if host_dir is not None:
788 shutil.rmtree(host_dir)
789
Josh Gao94dc19f2016-09-14 16:13:50 -0700790 @unittest.skipIf(sys.platform == "win32", "symlinks require elevated privileges on windows")
791 def test_push_symlink(self):
792 """Push a symlink.
793
794 Bug: http://b/31491920
795 """
796 try:
797 host_dir = tempfile.mkdtemp()
798
799 # Make sure the temp directory isn't setuid, or else adb will
800 # complain.
801 os.chmod(host_dir, 0o700)
802
803 with open(os.path.join(host_dir, 'foo'), 'w') as f:
804 f.write('foo')
805
806 symlink_path = os.path.join(host_dir, 'symlink')
807 os.symlink('foo', symlink_path)
808
809 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
810 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
811 self.device.push(symlink_path, self.DEVICE_TEMP_DIR)
812 rc, out, _ = self.device.shell_nocheck(
813 ['cat', posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')])
814 self.assertEqual(0, rc)
815 self.assertEqual(out.strip(), 'foo')
816 finally:
817 if host_dir is not None:
818 shutil.rmtree(host_dir)
819
Josh Gao191c1542015-12-09 11:26:11 -0800820 def test_multiple_push(self):
821 """Push multiple files to the device in one adb push command.
822
823 Bug: http://b/25324823
824 """
825
826 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
827 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
828
829 try:
830 host_dir = tempfile.mkdtemp()
831
832 # Create some random files and a subdirectory containing more files.
833 temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
834
Josh Gao255c5c82016-03-03 14:49:02 -0800835 subdir = os.path.join(host_dir, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -0800836 os.mkdir(subdir)
837 subdir_temp_files = make_random_host_files(in_dir=subdir,
838 num_files=4)
839
840 paths = map(lambda temp_file: temp_file.full_path, temp_files)
841 paths.append(subdir)
842 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
843
844 for temp_file in temp_files:
845 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
846 temp_file.base_name)
847 self._verify_remote(temp_file.checksum, remote_path)
848
849 for subdir_temp_file in subdir_temp_files:
850 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
851 # BROKEN: http://b/25394682
Josh Gao255c5c82016-03-03 14:49:02 -0800852 # 'subdir';
Josh Gao191c1542015-12-09 11:26:11 -0800853 temp_file.base_name)
854 self._verify_remote(temp_file.checksum, remote_path)
855
856
857 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
858 finally:
859 if host_dir is not None:
860 shutil.rmtree(host_dir)
861
Josh Gaoafcdcd72016-02-19 15:55:55 -0800862 @requires_non_root
863 def test_push_error_reporting(self):
864 """Make sure that errors that occur while pushing a file get reported
865
866 Bug: http://b/26816782
867 """
868 with tempfile.NamedTemporaryFile() as tmp_file:
869 tmp_file.write('\0' * 1024 * 1024)
870 tmp_file.flush()
871 try:
872 self.device.push(local=tmp_file.name, remote='/system/')
Josh Gao255c5c82016-03-03 14:49:02 -0800873 self.fail('push should not have succeeded')
Josh Gaoafcdcd72016-02-19 15:55:55 -0800874 except subprocess.CalledProcessError as e:
875 output = e.output
876
Josh Gao255c5c82016-03-03 14:49:02 -0800877 self.assertIn('Permission denied', output)
Josh Gao191c1542015-12-09 11:26:11 -0800878
879 def _test_pull(self, remote_file, checksum):
880 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
881 tmp_write.close()
882 self.device.pull(remote=remote_file, local=tmp_write.name)
883 with open(tmp_write.name, 'rb') as tmp_read:
884 host_contents = tmp_read.read()
885 host_md5 = compute_md5(host_contents)
886 self.assertEqual(checksum, host_md5)
887 os.remove(tmp_write.name)
888
889 @requires_non_root
890 def test_pull_error_reporting(self):
891 self.device.shell(['touch', self.DEVICE_TEMP_FILE])
892 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
893
894 try:
895 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
896 except subprocess.CalledProcessError as e:
897 output = e.output
898
899 self.assertIn('Permission denied', output)
900
901 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
902
903 def test_pull(self):
904 """Pull a randomly generated file from specified device."""
905 kbytes = 512
906 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
907 cmd = ['dd', 'if=/dev/urandom',
908 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
909 'count={}'.format(kbytes)]
910 self.device.shell(cmd)
911 dev_md5, _ = self.device.shell(
912 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
913 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
914 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
915
916 def test_pull_dir(self):
917 """Pull a randomly generated directory of files from the device."""
918 try:
919 host_dir = tempfile.mkdtemp()
920
921 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
922 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
923
924 # Populate device directory with random files.
925 temp_files = make_random_device_files(
926 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
927
928 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
929
930 for temp_file in temp_files:
Josh Gaoce8f2cd2015-12-09 14:20:23 -0800931 host_path = os.path.join(
932 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
933 temp_file.base_name)
934 self._verify_local(temp_file.checksum, host_path)
Josh Gao191c1542015-12-09 11:26:11 -0800935
936 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
937 finally:
938 if host_dir is not None:
939 shutil.rmtree(host_dir)
940
Josh Gao1e611a32016-02-26 13:26:55 -0800941 def test_pull_dir_symlink(self):
942 """Pull a directory into a symlink to a directory.
943
944 Bug: http://b/27362811
945 """
Josh Gao255c5c82016-03-03 14:49:02 -0800946 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -0800947 raise unittest.SkipTest('requires POSIX')
948
949 try:
950 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -0800951 real_dir = os.path.join(host_dir, 'dir')
952 symlink = os.path.join(host_dir, 'symlink')
Josh Gao1e611a32016-02-26 13:26:55 -0800953 os.mkdir(real_dir)
954 os.symlink(real_dir, symlink)
955
956 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
957 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
958
959 # Populate device directory with random files.
960 temp_files = make_random_device_files(
961 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
962
963 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
964
965 for temp_file in temp_files:
966 host_path = os.path.join(
967 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
968 temp_file.base_name)
969 self._verify_local(temp_file.checksum, host_path)
970
971 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
972 finally:
973 if host_dir is not None:
974 shutil.rmtree(host_dir)
975
976 def test_pull_dir_symlink_collision(self):
977 """Pull a directory into a colliding symlink to directory."""
Josh Gao255c5c82016-03-03 14:49:02 -0800978 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -0800979 raise unittest.SkipTest('requires POSIX')
980
981 try:
982 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -0800983 real_dir = os.path.join(host_dir, 'real')
Josh Gao1e611a32016-02-26 13:26:55 -0800984 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
985 symlink = os.path.join(host_dir, tmp_dirname)
986 os.mkdir(real_dir)
987 os.symlink(real_dir, symlink)
988
989 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
990 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
991
992 # Populate device directory with random files.
993 temp_files = make_random_device_files(
994 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
995
996 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
997
998 for temp_file in temp_files:
999 host_path = os.path.join(real_dir, temp_file.base_name)
1000 self._verify_local(temp_file.checksum, host_path)
1001
1002 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1003 finally:
1004 if host_dir is not None:
1005 shutil.rmtree(host_dir)
1006
Josh Gao89ec3a82016-03-02 16:00:02 -08001007 def test_pull_dir_nonexistent(self):
1008 """Pull a directory of files from the device to a nonexistent path."""
1009 try:
1010 host_dir = tempfile.mkdtemp()
1011 dest_dir = os.path.join(host_dir, 'dest')
1012
1013 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1014 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
1015
1016 # Populate device directory with random files.
1017 temp_files = make_random_device_files(
1018 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
1019
1020 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
1021
1022 for temp_file in temp_files:
1023 host_path = os.path.join(dest_dir, temp_file.base_name)
1024 self._verify_local(temp_file.checksum, host_path)
1025
1026 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1027 finally:
1028 if host_dir is not None:
1029 shutil.rmtree(host_dir)
1030
Josh Gaof2642242015-12-09 14:03:30 -08001031 def test_pull_symlink_dir(self):
1032 """Pull a symlink to a directory of symlinks to files."""
1033 try:
1034 host_dir = tempfile.mkdtemp()
1035
1036 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
1037 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
1038 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
1039
1040 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1041 self.device.shell(['mkdir', '-p', remote_dir, remote_links])
1042 self.device.shell(['ln', '-s', remote_links, remote_symlink])
1043
1044 # Populate device directory with random files.
1045 temp_files = make_random_device_files(
1046 self.device, in_dir=remote_dir, num_files=32)
1047
1048 for temp_file in temp_files:
1049 self.device.shell(
1050 ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
1051 posixpath.join(remote_links, temp_file.base_name)])
1052
1053 self.device.pull(remote=remote_symlink, local=host_dir)
1054
1055 for temp_file in temp_files:
1056 host_path = os.path.join(
1057 host_dir, 'symlink', temp_file.base_name)
1058 self._verify_local(temp_file.checksum, host_path)
1059
1060 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1061 finally:
1062 if host_dir is not None:
1063 shutil.rmtree(host_dir)
1064
Josh Gao191c1542015-12-09 11:26:11 -08001065 def test_pull_empty(self):
1066 """Pull a directory containing an empty directory from the device."""
1067 try:
1068 host_dir = tempfile.mkdtemp()
1069
1070 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1071 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1072 self.device.shell(['mkdir', '-p', remote_empty_path])
1073
1074 self.device.pull(remote=remote_empty_path, local=host_dir)
1075 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1076 finally:
1077 if host_dir is not None:
1078 shutil.rmtree(host_dir)
1079
1080 def test_multiple_pull(self):
1081 """Pull a randomly generated directory of files from the device."""
1082
1083 try:
1084 host_dir = tempfile.mkdtemp()
1085
Josh Gao255c5c82016-03-03 14:49:02 -08001086 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -08001087 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1088 self.device.shell(['mkdir', '-p', subdir])
1089
1090 # Create some random files and a subdirectory containing more files.
1091 temp_files = make_random_device_files(
1092 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1093
1094 subdir_temp_files = make_random_device_files(
1095 self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1096
1097 paths = map(lambda temp_file: temp_file.full_path, temp_files)
1098 paths.append(subdir)
1099 self.device._simple_call(['pull'] + paths + [host_dir])
1100
1101 for temp_file in temp_files:
1102 local_path = os.path.join(host_dir, temp_file.base_name)
1103 self._verify_local(temp_file.checksum, local_path)
1104
1105 for subdir_temp_file in subdir_temp_files:
1106 local_path = os.path.join(host_dir,
Josh Gao255c5c82016-03-03 14:49:02 -08001107 'subdir',
Josh Gao191c1542015-12-09 11:26:11 -08001108 subdir_temp_file.base_name)
1109 self._verify_local(subdir_temp_file.checksum, local_path)
1110
1111 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1112 finally:
1113 if host_dir is not None:
1114 shutil.rmtree(host_dir)
1115
1116 def test_sync(self):
1117 """Sync a randomly generated directory of files to specified device."""
1118
1119 try:
1120 base_dir = tempfile.mkdtemp()
1121
1122 # Create mirror device directory hierarchy within base_dir.
1123 full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1124 os.makedirs(full_dir_path)
1125
1126 # Create 32 random files within the host mirror.
1127 temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
1128
1129 # Clean up any trash on the device.
1130 device = adb.get_device(product=base_dir)
1131 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1132
1133 device.sync('data')
1134
1135 # Confirm that every file on the device mirrors that on the host.
1136 for temp_file in temp_files:
1137 device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
1138 temp_file.base_name)
1139 dev_md5, _ = device.shell(
1140 [get_md5_prog(self.device), device_full_path])[0].split()
1141 self.assertEqual(temp_file.checksum, dev_md5)
1142
1143 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1144 finally:
1145 if base_dir is not None:
1146 shutil.rmtree(base_dir)
1147
1148 def test_unicode_paths(self):
1149 """Ensure that we can support non-ASCII paths, even on Windows."""
1150 name = u'로보카 폴리'
1151
1152 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1153 remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1154
1155 ## push.
1156 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1157 tf.close()
1158 self.device.push(tf.name, remote_path)
1159 os.remove(tf.name)
1160 self.assertFalse(os.path.exists(tf.name))
1161
1162 # Verify that the device ended up with the expected UTF-8 path
1163 output = self.device.shell(
1164 ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1165 self.assertEqual(remote_path.encode('utf-8'), output)
1166
1167 # pull.
1168 self.device.pull(remote_path, tf.name)
1169 self.assertTrue(os.path.exists(tf.name))
1170 os.remove(tf.name)
1171 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1172
1173
1174def main():
1175 random.seed(0)
1176 if len(adb.get_devices()) > 0:
1177 suite = unittest.TestLoader().loadTestsFromName(__name__)
1178 unittest.TextTestRunner(verbosity=3).run(suite)
1179 else:
1180 print('Test suite must be run with attached devices')
1181
1182
1183if __name__ == '__main__':
1184 main()