blob: 9dab3ae62862ae4c61d8e68cfe1d1f5fafa444a5 [file] [log] [blame]
Josh Gao191c1542015-12-09 11:26:11 -08001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import unittest
35
36import mock
37
38import adb
39
40
41def requires_root(func):
42 def wrapper(self, *args):
43 if self.device.get_prop('ro.debuggable') != '1':
44 raise unittest.SkipTest('requires rootable build')
45
46 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
47 if not was_root:
48 self.device.root()
49 self.device.wait()
50
51 try:
52 func(self, *args)
53 finally:
54 if not was_root:
55 self.device.unroot()
56 self.device.wait()
57
58 return wrapper
59
60
61def requires_non_root(func):
62 def wrapper(self, *args):
63 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
64 if was_root:
65 self.device.unroot()
66 self.device.wait()
67
68 try:
69 func(self, *args)
70 finally:
71 if was_root:
72 self.device.root()
73 self.device.wait()
74
75 return wrapper
76
77
78class GetDeviceTest(unittest.TestCase):
79 def setUp(self):
80 self.android_serial = os.getenv('ANDROID_SERIAL')
81 if 'ANDROID_SERIAL' in os.environ:
82 del os.environ['ANDROID_SERIAL']
83
84 def tearDown(self):
85 if self.android_serial is not None:
86 os.environ['ANDROID_SERIAL'] = self.android_serial
87 else:
88 if 'ANDROID_SERIAL' in os.environ:
89 del os.environ['ANDROID_SERIAL']
90
91 @mock.patch('adb.device.get_devices')
92 def test_explicit(self, mock_get_devices):
93 mock_get_devices.return_value = ['foo', 'bar']
94 device = adb.get_device('foo')
95 self.assertEqual(device.serial, 'foo')
96
97 @mock.patch('adb.device.get_devices')
98 def test_from_env(self, mock_get_devices):
99 mock_get_devices.return_value = ['foo', 'bar']
100 os.environ['ANDROID_SERIAL'] = 'foo'
101 device = adb.get_device()
102 self.assertEqual(device.serial, 'foo')
103
104 @mock.patch('adb.device.get_devices')
105 def test_arg_beats_env(self, mock_get_devices):
106 mock_get_devices.return_value = ['foo', 'bar']
107 os.environ['ANDROID_SERIAL'] = 'bar'
108 device = adb.get_device('foo')
109 self.assertEqual(device.serial, 'foo')
110
111 @mock.patch('adb.device.get_devices')
112 def test_no_such_device(self, mock_get_devices):
113 mock_get_devices.return_value = ['foo', 'bar']
114 self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
115
116 os.environ['ANDROID_SERIAL'] = 'baz'
117 self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
118
119 @mock.patch('adb.device.get_devices')
120 def test_unique_device(self, mock_get_devices):
121 mock_get_devices.return_value = ['foo']
122 device = adb.get_device()
123 self.assertEqual(device.serial, 'foo')
124
125 @mock.patch('adb.device.get_devices')
126 def test_no_unique_device(self, mock_get_devices):
127 mock_get_devices.return_value = ['foo', 'bar']
128 self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
129
130
131class DeviceTest(unittest.TestCase):
132 def setUp(self):
133 self.device = adb.get_device()
134
135
136class ForwardReverseTest(DeviceTest):
137 def _test_no_rebind(self, description, direction_list, direction,
138 direction_no_rebind, direction_remove_all):
139 msg = direction_list()
140 self.assertEqual('', msg.strip(),
141 description + ' list must be empty to run this test.')
142
143 # Use --no-rebind with no existing binding
144 direction_no_rebind('tcp:5566', 'tcp:6655')
145 msg = direction_list()
146 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
147
148 # Use --no-rebind with existing binding
149 with self.assertRaises(subprocess.CalledProcessError):
150 direction_no_rebind('tcp:5566', 'tcp:6677')
151 msg = direction_list()
152 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
153 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
154
155 # Use the absence of --no-rebind with existing binding
156 direction('tcp:5566', 'tcp:6677')
157 msg = direction_list()
158 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
159 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
160
161 direction_remove_all()
162 msg = direction_list()
163 self.assertEqual('', msg.strip())
164
165 def test_forward_no_rebind(self):
166 self._test_no_rebind('forward', self.device.forward_list,
167 self.device.forward, self.device.forward_no_rebind,
168 self.device.forward_remove_all)
169
170 def test_reverse_no_rebind(self):
171 self._test_no_rebind('reverse', self.device.reverse_list,
172 self.device.reverse, self.device.reverse_no_rebind,
173 self.device.reverse_remove_all)
174
175 def test_forward(self):
176 msg = self.device.forward_list()
177 self.assertEqual('', msg.strip(),
178 'Forwarding list must be empty to run this test.')
179 self.device.forward('tcp:5566', 'tcp:6655')
180 msg = self.device.forward_list()
181 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
182 self.device.forward('tcp:7788', 'tcp:8877')
183 msg = self.device.forward_list()
184 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
185 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
186 self.device.forward_remove('tcp:5566')
187 msg = self.device.forward_list()
188 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
189 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
190 self.device.forward_remove_all()
191 msg = self.device.forward_list()
192 self.assertEqual('', msg.strip())
193
194 def test_reverse(self):
195 msg = self.device.reverse_list()
196 self.assertEqual('', msg.strip(),
197 'Reverse forwarding list must be empty to run this test.')
198 self.device.reverse('tcp:5566', 'tcp:6655')
199 msg = self.device.reverse_list()
200 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
201 self.device.reverse('tcp:7788', 'tcp:8877')
202 msg = self.device.reverse_list()
203 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
204 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
205 self.device.reverse_remove('tcp:5566')
206 msg = self.device.reverse_list()
207 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
208 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
209 self.device.reverse_remove_all()
210 msg = self.device.reverse_list()
211 self.assertEqual('', msg.strip())
212
213 # Note: If you run this test when adb connect'd to a physical device over
214 # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821
215 def test_forward_reverse_echo(self):
216 """Send data through adb forward and read it back via adb reverse"""
217 forward_port = 12345
218 reverse_port = forward_port + 1
Josh Gaoa996c292016-03-03 14:49:02 -0800219 forward_spec = 'tcp:' + str(forward_port)
220 reverse_spec = 'tcp:' + str(reverse_port)
Josh Gao191c1542015-12-09 11:26:11 -0800221 forward_setup = False
222 reverse_setup = False
223
224 try:
225 # listen on localhost:forward_port, connect to remote:forward_port
226 self.device.forward(forward_spec, forward_spec)
227 forward_setup = True
228 # listen on remote:forward_port, connect to localhost:reverse_port
229 self.device.reverse(forward_spec, reverse_spec)
230 reverse_setup = True
231
232 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
233 with contextlib.closing(listener):
234 # Use SO_REUSEADDR so that subsequent runs of the test can grab
235 # the port even if it is in TIME_WAIT.
236 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
237
238 # Listen on localhost:reverse_port before connecting to
239 # localhost:forward_port because that will cause adb to connect
240 # back to localhost:reverse_port.
241 listener.bind(('127.0.0.1', reverse_port))
242 listener.listen(4)
243
244 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
245 with contextlib.closing(client):
246 # Connect to the listener.
247 client.connect(('127.0.0.1', forward_port))
248
249 # Accept the client connection.
250 accepted_connection, addr = listener.accept()
251 with contextlib.closing(accepted_connection) as server:
252 data = 'hello'
253
254 # Send data into the port setup by adb forward.
255 client.sendall(data)
256 # Explicitly close() so that server gets EOF.
257 client.close()
258
259 # Verify that the data came back via adb reverse.
260 self.assertEqual(data, server.makefile().read())
261 finally:
262 if reverse_setup:
263 self.device.reverse_remove(forward_spec)
264 if forward_setup:
265 self.device.forward_remove(forward_spec)
266
267
268class ShellTest(DeviceTest):
269 def _interactive_shell(self, shell_args, input):
270 """Runs an interactive adb shell.
271
272 Args:
273 shell_args: List of string arguments to `adb shell`.
274 input: String input to send to the interactive shell.
275
276 Returns:
277 The remote exit code.
278
279 Raises:
280 unittest.SkipTest: The device doesn't support exit codes.
281 """
282 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
283 raise unittest.SkipTest('exit codes are unavailable on this device')
284
285 proc = subprocess.Popen(
286 self.device.adb_cmd + ['shell'] + shell_args,
287 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
288 stderr=subprocess.PIPE)
289 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
290 # to explicitly add an exit command to close the session from the device
291 # side, plus the necessary newline to complete the interactive command.
292 proc.communicate(input + '; exit\n')
293 return proc.returncode
294
295 def test_cat(self):
296 """Check that we can at least cat a file."""
297 out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
298 elements = out.split()
299 self.assertEqual(len(elements), 2)
300
301 uptime, idle = elements
302 self.assertGreater(float(uptime), 0.0)
303 self.assertGreater(float(idle), 0.0)
304
305 def test_throws_on_failure(self):
306 self.assertRaises(adb.ShellError, self.device.shell, ['false'])
307
308 def test_output_not_stripped(self):
309 out = self.device.shell(['echo', 'foo'])[0]
310 self.assertEqual(out, 'foo' + self.device.linesep)
311
312 def test_shell_nocheck_failure(self):
313 rc, out, _ = self.device.shell_nocheck(['false'])
314 self.assertNotEqual(rc, 0)
315 self.assertEqual(out, '')
316
317 def test_shell_nocheck_output_not_stripped(self):
318 rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
319 self.assertEqual(rc, 0)
320 self.assertEqual(out, 'foo' + self.device.linesep)
321
322 def test_can_distinguish_tricky_results(self):
323 # If result checking on ADB shell is naively implemented as
324 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
325 # output from the result for a cmd of `echo -n 1`.
326 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
327 self.assertEqual(rc, 0)
328 self.assertEqual(out, '1')
329
330 def test_line_endings(self):
331 """Ensure that line ending translation is not happening in the pty.
332
333 Bug: http://b/19735063
334 """
335 output = self.device.shell(['uname'])[0]
336 self.assertEqual(output, 'Linux' + self.device.linesep)
337
338 def test_pty_logic(self):
339 """Tests that a PTY is allocated when it should be.
340
341 PTY allocation behavior should match ssh; some behavior requires
342 a terminal stdin to test so this test will be skipped if stdin
343 is not a terminal.
344 """
345 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
346 raise unittest.SkipTest('PTY arguments unsupported on this device')
347 if not os.isatty(sys.stdin.fileno()):
348 raise unittest.SkipTest('PTY tests require stdin terminal')
349
350 def check_pty(args):
351 """Checks adb shell PTY allocation.
352
353 Tests |args| for terminal and non-terminal stdin.
354
355 Args:
356 args: -Tt args in a list (e.g. ['-t', '-t']).
357
358 Returns:
359 A tuple (<terminal>, <non-terminal>). True indicates
360 the corresponding shell allocated a remote PTY.
361 """
362 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
363
364 terminal = subprocess.Popen(
365 test_cmd, stdin=None,
366 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
367 terminal.communicate()
368
369 non_terminal = subprocess.Popen(
370 test_cmd, stdin=subprocess.PIPE,
371 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
372 non_terminal.communicate()
373
374 return (terminal.returncode == 0, non_terminal.returncode == 0)
375
376 # -T: never allocate PTY.
377 self.assertEqual((False, False), check_pty(['-T']))
378
379 # No args: PTY only if stdin is a terminal and shell is interactive,
380 # which is difficult to reliably test from a script.
381 self.assertEqual((False, False), check_pty([]))
382
383 # -t: PTY if stdin is a terminal.
384 self.assertEqual((True, False), check_pty(['-t']))
385
386 # -t -t: always allocate PTY.
387 self.assertEqual((True, True), check_pty(['-t', '-t']))
388
389 def test_shell_protocol(self):
390 """Tests the shell protocol on the device.
391
392 If the device supports shell protocol, this gives us the ability
393 to separate stdout/stderr and return the exit code directly.
394
395 Bug: http://b/19734861
396 """
397 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
398 raise unittest.SkipTest('shell protocol unsupported on this device')
399
400 # Shell protocol should be used by default.
401 result = self.device.shell_nocheck(
402 shlex.split('echo foo; echo bar >&2; exit 17'))
403 self.assertEqual(17, result[0])
404 self.assertEqual('foo' + self.device.linesep, result[1])
405 self.assertEqual('bar' + self.device.linesep, result[2])
406
407 self.assertEqual(17, self._interactive_shell([], 'exit 17'))
408
409 # -x flag should disable shell protocol.
410 result = self.device.shell_nocheck(
411 shlex.split('-x echo foo; echo bar >&2; exit 17'))
412 self.assertEqual(0, result[0])
413 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
414 self.assertEqual('', result[2])
415
416 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
417
418 def test_non_interactive_sigint(self):
419 """Tests that SIGINT in a non-interactive shell kills the process.
420
421 This requires the shell protocol in order to detect the broken
422 pipe; raw data transfer mode will only see the break once the
423 subprocess tries to read or write.
424
425 Bug: http://b/23825725
426 """
427 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
428 raise unittest.SkipTest('shell protocol unsupported on this device')
429
430 # Start a long-running process.
431 sleep_proc = subprocess.Popen(
432 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
433 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
434 stderr=subprocess.STDOUT)
435 remote_pid = sleep_proc.stdout.readline().strip()
436 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
437 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
438
439 # Verify that the process is running, send signal, verify it stopped.
440 self.device.shell(proc_query)
441 os.kill(sleep_proc.pid, signal.SIGINT)
442 sleep_proc.communicate()
443 self.assertEqual(1, self.device.shell_nocheck(proc_query)[0],
444 'subprocess failed to terminate')
445
446 def test_non_interactive_stdin(self):
447 """Tests that non-interactive shells send stdin."""
448 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
449 raise unittest.SkipTest('non-interactive stdin unsupported '
450 'on this device')
451
452 # Test both small and large inputs.
453 small_input = 'foo'
454 large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
455 string.digits))
456
457 for input in (small_input, large_input):
458 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
459 stdin=subprocess.PIPE,
460 stdout=subprocess.PIPE,
461 stderr=subprocess.PIPE)
462 stdout, stderr = proc.communicate(input)
463 self.assertEqual(input.splitlines(), stdout.splitlines())
464 self.assertEqual('', stderr)
465
466
467class ArgumentEscapingTest(DeviceTest):
468 def test_shell_escaping(self):
469 """Make sure that argument escaping is somewhat sane."""
470
471 # http://b/19734868
472 # Note that this actually matches ssh(1)'s behavior --- it's
473 # converted to `sh -c echo hello; echo world` which sh interprets
474 # as `sh -c echo` (with an argument to that shell of "hello"),
475 # and then `echo world` back in the first shell.
476 result = self.device.shell(
477 shlex.split("sh -c 'echo hello; echo world'"))[0]
478 result = result.splitlines()
479 self.assertEqual(['', 'world'], result)
480 # If you really wanted "hello" and "world", here's what you'd do:
481 result = self.device.shell(
482 shlex.split(r'echo hello\;echo world'))[0].splitlines()
483 self.assertEqual(['hello', 'world'], result)
484
485 # http://b/15479704
486 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
487 self.assertEqual('t', result)
488 result = self.device.shell(
489 shlex.split("sh -c 'true && echo t'"))[0].strip()
490 self.assertEqual('t', result)
491
492 # http://b/20564385
493 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
494 self.assertEqual('t', result)
495 result = self.device.shell(
496 shlex.split(r'echo -n 123\;uname'))[0].strip()
497 self.assertEqual('123Linux', result)
498
499 def test_install_argument_escaping(self):
500 """Make sure that install argument escaping works."""
501 # http://b/20323053, http://b/3090932.
502 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
503 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
504 delete=False)
505 tf.close()
506
507 # Installing bogus .apks fails if the device supports exit codes.
508 try:
509 output = self.device.install(tf.name)
510 except subprocess.CalledProcessError as e:
511 output = e.output
512
513 self.assertIn(file_suffix, output)
514 os.remove(tf.name)
515
516
517class RootUnrootTest(DeviceTest):
518 def _test_root(self):
519 message = self.device.root()
520 if 'adbd cannot run as root in production builds' in message:
521 return
522 self.device.wait()
523 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
524
525 def _test_unroot(self):
526 self.device.unroot()
527 self.device.wait()
528 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
529
530 def test_root_unroot(self):
531 """Make sure that adb root and adb unroot work, using id(1)."""
532 if self.device.get_prop('ro.debuggable') != '1':
533 raise unittest.SkipTest('requires rootable build')
534
535 original_user = self.device.shell(['id', '-un'])[0].strip()
536 try:
537 if original_user == 'root':
538 self._test_unroot()
539 self._test_root()
540 elif original_user == 'shell':
541 self._test_root()
542 self._test_unroot()
543 finally:
544 if original_user == 'root':
545 self.device.root()
546 else:
547 self.device.unroot()
548 self.device.wait()
549
550
551class TcpIpTest(DeviceTest):
552 def test_tcpip_failure_raises(self):
553 """adb tcpip requires a port.
554
555 Bug: http://b/22636927
556 """
557 self.assertRaises(
558 subprocess.CalledProcessError, self.device.tcpip, '')
559 self.assertRaises(
560 subprocess.CalledProcessError, self.device.tcpip, 'foo')
561
562
563class SystemPropertiesTest(DeviceTest):
564 def test_get_prop(self):
565 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
566
567 @requires_root
568 def test_set_prop(self):
569 prop_name = 'foo.bar'
570 self.device.shell(['setprop', prop_name, '""'])
571
572 self.device.set_prop(prop_name, 'qux')
573 self.assertEqual(
574 self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
575
576
577def compute_md5(string):
578 hsh = hashlib.md5()
579 hsh.update(string)
580 return hsh.hexdigest()
581
582
583def get_md5_prog(device):
584 """Older platforms (pre-L) had the name md5 rather than md5sum."""
585 try:
586 device.shell(['md5sum', '/proc/uptime'])
587 return 'md5sum'
588 except adb.ShellError:
589 return 'md5'
590
591
592class HostFile(object):
593 def __init__(self, handle, checksum):
594 self.handle = handle
595 self.checksum = checksum
596 self.full_path = handle.name
597 self.base_name = os.path.basename(self.full_path)
598
599
600class DeviceFile(object):
601 def __init__(self, checksum, full_path):
602 self.checksum = checksum
603 self.full_path = full_path
604 self.base_name = posixpath.basename(self.full_path)
605
606
607def make_random_host_files(in_dir, num_files):
608 min_size = 1 * (1 << 10)
609 max_size = 16 * (1 << 10)
610
611 files = []
612 for _ in xrange(num_files):
613 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
614
615 size = random.randrange(min_size, max_size, 1024)
616 rand_str = os.urandom(size)
617 file_handle.write(rand_str)
618 file_handle.flush()
619 file_handle.close()
620
621 md5 = compute_md5(rand_str)
622 files.append(HostFile(file_handle, md5))
623 return files
624
625
626def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
627 min_size = 1 * (1 << 10)
628 max_size = 16 * (1 << 10)
629
630 files = []
631 for file_num in xrange(num_files):
632 size = random.randrange(min_size, max_size, 1024)
633
634 base_name = prefix + str(file_num)
635 full_path = posixpath.join(in_dir, base_name)
636
637 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
638 'bs={}'.format(size), 'count=1'])
639 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
640
641 files.append(DeviceFile(dev_md5, full_path))
642 return files
643
644
645class FileOperationsTest(DeviceTest):
646 SCRATCH_DIR = '/data/local/tmp'
647 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
648 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
649
650 def _verify_remote(self, checksum, remote_path):
651 dev_md5, _ = self.device.shell([get_md5_prog(self.device),
652 remote_path])[0].split()
653 self.assertEqual(checksum, dev_md5)
654
655 def _verify_local(self, checksum, local_path):
656 with open(local_path, 'rb') as host_file:
657 host_md5 = compute_md5(host_file.read())
658 self.assertEqual(host_md5, checksum)
659
660 def test_push(self):
661 """Push a randomly generated file to specified device."""
662 kbytes = 512
663 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
664 rand_str = os.urandom(1024 * kbytes)
665 tmp.write(rand_str)
666 tmp.close()
667
668 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
669 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
670
671 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
672 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
673
674 os.remove(tmp.name)
675
676 def test_push_dir(self):
677 """Push a randomly generated directory of files to the device."""
678 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
679 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
680
681 try:
682 host_dir = tempfile.mkdtemp()
683
684 # Make sure the temp directory isn't setuid, or else adb will complain.
685 os.chmod(host_dir, 0o700)
686
687 # Create 32 random files.
688 temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
689 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
690
691 for temp_file in temp_files:
692 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
693 os.path.basename(host_dir),
694 temp_file.base_name)
695 self._verify_remote(temp_file.checksum, remote_path)
696 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
697 finally:
698 if host_dir is not None:
699 shutil.rmtree(host_dir)
700
701 @unittest.expectedFailure # b/25566053
702 def test_push_empty(self):
703 """Push a directory containing an empty directory to the device."""
704 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
705 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
706
707 try:
708 host_dir = tempfile.mkdtemp()
709
710 # Make sure the temp directory isn't setuid, or else adb will complain.
711 os.chmod(host_dir, 0o700)
712
713 # Create an empty directory.
714 os.mkdir(os.path.join(host_dir, 'empty'))
715
716 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
717
718 test_empty_cmd = ['[', '-d',
719 os.path.join(self.DEVICE_TEMP_DIR, 'empty')]
720 rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
721 self.assertEqual(rc, 0)
722 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
723 finally:
724 if host_dir is not None:
725 shutil.rmtree(host_dir)
726
727 def test_multiple_push(self):
728 """Push multiple files to the device in one adb push command.
729
730 Bug: http://b/25324823
731 """
732
733 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
734 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
735
736 try:
737 host_dir = tempfile.mkdtemp()
738
739 # Create some random files and a subdirectory containing more files.
740 temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
741
Josh Gaoa996c292016-03-03 14:49:02 -0800742 subdir = os.path.join(host_dir, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -0800743 os.mkdir(subdir)
744 subdir_temp_files = make_random_host_files(in_dir=subdir,
745 num_files=4)
746
747 paths = map(lambda temp_file: temp_file.full_path, temp_files)
748 paths.append(subdir)
749 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
750
751 for temp_file in temp_files:
752 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
753 temp_file.base_name)
754 self._verify_remote(temp_file.checksum, remote_path)
755
756 for subdir_temp_file in subdir_temp_files:
757 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
758 # BROKEN: http://b/25394682
Josh Gaoa996c292016-03-03 14:49:02 -0800759 # 'subdir';
Josh Gao191c1542015-12-09 11:26:11 -0800760 temp_file.base_name)
761 self._verify_remote(temp_file.checksum, remote_path)
762
763
764 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
765 finally:
766 if host_dir is not None:
767 shutil.rmtree(host_dir)
768
Josh Gaoda811952016-02-19 15:55:55 -0800769 @requires_non_root
770 def test_push_error_reporting(self):
771 """Make sure that errors that occur while pushing a file get reported
772
773 Bug: http://b/26816782
774 """
775 with tempfile.NamedTemporaryFile() as tmp_file:
776 tmp_file.write('\0' * 1024 * 1024)
777 tmp_file.flush()
778 try:
779 self.device.push(local=tmp_file.name, remote='/system/')
Josh Gaoa996c292016-03-03 14:49:02 -0800780 self.fail('push should not have succeeded')
Josh Gaoda811952016-02-19 15:55:55 -0800781 except subprocess.CalledProcessError as e:
782 output = e.output
783
Josh Gaoa996c292016-03-03 14:49:02 -0800784 self.assertIn('Permission denied', output)
Josh Gao191c1542015-12-09 11:26:11 -0800785
786 def _test_pull(self, remote_file, checksum):
787 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
788 tmp_write.close()
789 self.device.pull(remote=remote_file, local=tmp_write.name)
790 with open(tmp_write.name, 'rb') as tmp_read:
791 host_contents = tmp_read.read()
792 host_md5 = compute_md5(host_contents)
793 self.assertEqual(checksum, host_md5)
794 os.remove(tmp_write.name)
795
796 @requires_non_root
797 def test_pull_error_reporting(self):
798 self.device.shell(['touch', self.DEVICE_TEMP_FILE])
799 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
800
801 try:
802 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
803 except subprocess.CalledProcessError as e:
804 output = e.output
805
806 self.assertIn('Permission denied', output)
807
808 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
809
810 def test_pull(self):
811 """Pull a randomly generated file from specified device."""
812 kbytes = 512
813 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
814 cmd = ['dd', 'if=/dev/urandom',
815 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
816 'count={}'.format(kbytes)]
817 self.device.shell(cmd)
818 dev_md5, _ = self.device.shell(
819 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
820 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
821 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
822
823 def test_pull_dir(self):
824 """Pull a randomly generated directory of files from the device."""
825 try:
826 host_dir = tempfile.mkdtemp()
827
828 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
829 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
830
831 # Populate device directory with random files.
832 temp_files = make_random_device_files(
833 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
834
835 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
836
837 for temp_file in temp_files:
Josh Gaoce8f2cd2015-12-09 14:20:23 -0800838 host_path = os.path.join(
839 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
840 temp_file.base_name)
841 self._verify_local(temp_file.checksum, host_path)
Josh Gao191c1542015-12-09 11:26:11 -0800842
843 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
844 finally:
845 if host_dir is not None:
846 shutil.rmtree(host_dir)
847
Josh Gao74e0fe72016-02-26 13:26:55 -0800848 def test_pull_dir_symlink(self):
849 """Pull a directory into a symlink to a directory.
850
851 Bug: http://b/27362811
852 """
Josh Gaoa996c292016-03-03 14:49:02 -0800853 if os.name != 'posix':
Josh Gao74e0fe72016-02-26 13:26:55 -0800854 raise unittest.SkipTest('requires POSIX')
855
856 try:
857 host_dir = tempfile.mkdtemp()
Josh Gaoa996c292016-03-03 14:49:02 -0800858 real_dir = os.path.join(host_dir, 'dir')
859 symlink = os.path.join(host_dir, 'symlink')
Josh Gao74e0fe72016-02-26 13:26:55 -0800860 os.mkdir(real_dir)
861 os.symlink(real_dir, symlink)
862
863 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
864 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
865
866 # Populate device directory with random files.
867 temp_files = make_random_device_files(
868 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
869
870 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
871
872 for temp_file in temp_files:
873 host_path = os.path.join(
874 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
875 temp_file.base_name)
876 self._verify_local(temp_file.checksum, host_path)
877
878 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
879 finally:
880 if host_dir is not None:
881 shutil.rmtree(host_dir)
882
883 def test_pull_dir_symlink_collision(self):
884 """Pull a directory into a colliding symlink to directory."""
Josh Gaoa996c292016-03-03 14:49:02 -0800885 if os.name != 'posix':
Josh Gao74e0fe72016-02-26 13:26:55 -0800886 raise unittest.SkipTest('requires POSIX')
887
888 try:
889 host_dir = tempfile.mkdtemp()
Josh Gaoa996c292016-03-03 14:49:02 -0800890 real_dir = os.path.join(host_dir, 'real')
Josh Gao74e0fe72016-02-26 13:26:55 -0800891 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
892 symlink = os.path.join(host_dir, tmp_dirname)
893 os.mkdir(real_dir)
894 os.symlink(real_dir, symlink)
895
896 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
897 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
898
899 # Populate device directory with random files.
900 temp_files = make_random_device_files(
901 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
902
903 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
904
905 for temp_file in temp_files:
906 host_path = os.path.join(real_dir, temp_file.base_name)
907 self._verify_local(temp_file.checksum, host_path)
908
909 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
910 finally:
911 if host_dir is not None:
912 shutil.rmtree(host_dir)
913
Josh Gao379612b2016-03-02 16:00:02 -0800914 def test_pull_dir_nonexistent(self):
915 """Pull a directory of files from the device to a nonexistent path."""
916 try:
917 host_dir = tempfile.mkdtemp()
918 dest_dir = os.path.join(host_dir, 'dest')
919
920 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
921 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
922
923 # Populate device directory with random files.
924 temp_files = make_random_device_files(
925 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
926
927 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
928
929 for temp_file in temp_files:
930 host_path = os.path.join(dest_dir, temp_file.base_name)
931 self._verify_local(temp_file.checksum, host_path)
932
933 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
934 finally:
935 if host_dir is not None:
936 shutil.rmtree(host_dir)
937
Josh Gaof2642242015-12-09 14:03:30 -0800938 def test_pull_symlink_dir(self):
939 """Pull a symlink to a directory of symlinks to files."""
940 try:
941 host_dir = tempfile.mkdtemp()
942
943 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
944 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
945 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
946
947 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
948 self.device.shell(['mkdir', '-p', remote_dir, remote_links])
949 self.device.shell(['ln', '-s', remote_links, remote_symlink])
950
951 # Populate device directory with random files.
952 temp_files = make_random_device_files(
953 self.device, in_dir=remote_dir, num_files=32)
954
955 for temp_file in temp_files:
956 self.device.shell(
957 ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
958 posixpath.join(remote_links, temp_file.base_name)])
959
960 self.device.pull(remote=remote_symlink, local=host_dir)
961
962 for temp_file in temp_files:
963 host_path = os.path.join(
964 host_dir, 'symlink', temp_file.base_name)
965 self._verify_local(temp_file.checksum, host_path)
966
967 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
968 finally:
969 if host_dir is not None:
970 shutil.rmtree(host_dir)
971
Josh Gao191c1542015-12-09 11:26:11 -0800972 def test_pull_empty(self):
973 """Pull a directory containing an empty directory from the device."""
974 try:
975 host_dir = tempfile.mkdtemp()
976
977 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
978 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
979 self.device.shell(['mkdir', '-p', remote_empty_path])
980
981 self.device.pull(remote=remote_empty_path, local=host_dir)
982 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
983 finally:
984 if host_dir is not None:
985 shutil.rmtree(host_dir)
986
987 def test_multiple_pull(self):
988 """Pull a randomly generated directory of files from the device."""
989
990 try:
991 host_dir = tempfile.mkdtemp()
992
Josh Gaoa996c292016-03-03 14:49:02 -0800993 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -0800994 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
995 self.device.shell(['mkdir', '-p', subdir])
996
997 # Create some random files and a subdirectory containing more files.
998 temp_files = make_random_device_files(
999 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1000
1001 subdir_temp_files = make_random_device_files(
1002 self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1003
1004 paths = map(lambda temp_file: temp_file.full_path, temp_files)
1005 paths.append(subdir)
1006 self.device._simple_call(['pull'] + paths + [host_dir])
1007
1008 for temp_file in temp_files:
1009 local_path = os.path.join(host_dir, temp_file.base_name)
1010 self._verify_local(temp_file.checksum, local_path)
1011
1012 for subdir_temp_file in subdir_temp_files:
1013 local_path = os.path.join(host_dir,
Josh Gaoa996c292016-03-03 14:49:02 -08001014 'subdir',
Josh Gao191c1542015-12-09 11:26:11 -08001015 subdir_temp_file.base_name)
1016 self._verify_local(subdir_temp_file.checksum, local_path)
1017
1018 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1019 finally:
1020 if host_dir is not None:
1021 shutil.rmtree(host_dir)
1022
1023 def test_sync(self):
1024 """Sync a randomly generated directory of files to specified device."""
1025
1026 try:
1027 base_dir = tempfile.mkdtemp()
1028
1029 # Create mirror device directory hierarchy within base_dir.
1030 full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1031 os.makedirs(full_dir_path)
1032
1033 # Create 32 random files within the host mirror.
1034 temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
1035
1036 # Clean up any trash on the device.
1037 device = adb.get_device(product=base_dir)
1038 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1039
1040 device.sync('data')
1041
1042 # Confirm that every file on the device mirrors that on the host.
1043 for temp_file in temp_files:
1044 device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
1045 temp_file.base_name)
1046 dev_md5, _ = device.shell(
1047 [get_md5_prog(self.device), device_full_path])[0].split()
1048 self.assertEqual(temp_file.checksum, dev_md5)
1049
1050 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1051 finally:
1052 if base_dir is not None:
1053 shutil.rmtree(base_dir)
1054
1055 def test_unicode_paths(self):
1056 """Ensure that we can support non-ASCII paths, even on Windows."""
1057 name = u'로보카 폴리'
1058
1059 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1060 remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1061
1062 ## push.
1063 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1064 tf.close()
1065 self.device.push(tf.name, remote_path)
1066 os.remove(tf.name)
1067 self.assertFalse(os.path.exists(tf.name))
1068
1069 # Verify that the device ended up with the expected UTF-8 path
1070 output = self.device.shell(
1071 ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1072 self.assertEqual(remote_path.encode('utf-8'), output)
1073
1074 # pull.
1075 self.device.pull(remote_path, tf.name)
1076 self.assertTrue(os.path.exists(tf.name))
1077 os.remove(tf.name)
1078 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1079
1080
1081def main():
1082 random.seed(0)
1083 if len(adb.get_devices()) > 0:
1084 suite = unittest.TestLoader().loadTestsFromName(__name__)
1085 unittest.TextTestRunner(verbosity=3).run(suite)
1086 else:
1087 print('Test suite must be run with attached devices')
1088
1089
1090if __name__ == '__main__':
1091 main()