blob: 8d754687d5778e6454228f4a045dd2232b88610e [file] [log] [blame]
Josh Gao191c1542015-12-09 11:26:11 -08001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import unittest
35
36import mock
37
38import adb
39
40
41def requires_root(func):
42 def wrapper(self, *args):
43 if self.device.get_prop('ro.debuggable') != '1':
44 raise unittest.SkipTest('requires rootable build')
45
46 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
47 if not was_root:
48 self.device.root()
49 self.device.wait()
50
51 try:
52 func(self, *args)
53 finally:
54 if not was_root:
55 self.device.unroot()
56 self.device.wait()
57
58 return wrapper
59
60
61def requires_non_root(func):
62 def wrapper(self, *args):
63 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
64 if was_root:
65 self.device.unroot()
66 self.device.wait()
67
68 try:
69 func(self, *args)
70 finally:
71 if was_root:
72 self.device.root()
73 self.device.wait()
74
75 return wrapper
76
77
78class GetDeviceTest(unittest.TestCase):
79 def setUp(self):
80 self.android_serial = os.getenv('ANDROID_SERIAL')
81 if 'ANDROID_SERIAL' in os.environ:
82 del os.environ['ANDROID_SERIAL']
83
84 def tearDown(self):
85 if self.android_serial is not None:
86 os.environ['ANDROID_SERIAL'] = self.android_serial
87 else:
88 if 'ANDROID_SERIAL' in os.environ:
89 del os.environ['ANDROID_SERIAL']
90
91 @mock.patch('adb.device.get_devices')
92 def test_explicit(self, mock_get_devices):
93 mock_get_devices.return_value = ['foo', 'bar']
94 device = adb.get_device('foo')
95 self.assertEqual(device.serial, 'foo')
96
97 @mock.patch('adb.device.get_devices')
98 def test_from_env(self, mock_get_devices):
99 mock_get_devices.return_value = ['foo', 'bar']
100 os.environ['ANDROID_SERIAL'] = 'foo'
101 device = adb.get_device()
102 self.assertEqual(device.serial, 'foo')
103
104 @mock.patch('adb.device.get_devices')
105 def test_arg_beats_env(self, mock_get_devices):
106 mock_get_devices.return_value = ['foo', 'bar']
107 os.environ['ANDROID_SERIAL'] = 'bar'
108 device = adb.get_device('foo')
109 self.assertEqual(device.serial, 'foo')
110
111 @mock.patch('adb.device.get_devices')
112 def test_no_such_device(self, mock_get_devices):
113 mock_get_devices.return_value = ['foo', 'bar']
114 self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
115
116 os.environ['ANDROID_SERIAL'] = 'baz'
117 self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
118
119 @mock.patch('adb.device.get_devices')
120 def test_unique_device(self, mock_get_devices):
121 mock_get_devices.return_value = ['foo']
122 device = adb.get_device()
123 self.assertEqual(device.serial, 'foo')
124
125 @mock.patch('adb.device.get_devices')
126 def test_no_unique_device(self, mock_get_devices):
127 mock_get_devices.return_value = ['foo', 'bar']
128 self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
129
130
131class DeviceTest(unittest.TestCase):
132 def setUp(self):
133 self.device = adb.get_device()
134
135
136class ForwardReverseTest(DeviceTest):
137 def _test_no_rebind(self, description, direction_list, direction,
138 direction_no_rebind, direction_remove_all):
139 msg = direction_list()
140 self.assertEqual('', msg.strip(),
141 description + ' list must be empty to run this test.')
142
143 # Use --no-rebind with no existing binding
144 direction_no_rebind('tcp:5566', 'tcp:6655')
145 msg = direction_list()
146 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
147
148 # Use --no-rebind with existing binding
149 with self.assertRaises(subprocess.CalledProcessError):
150 direction_no_rebind('tcp:5566', 'tcp:6677')
151 msg = direction_list()
152 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
153 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
154
155 # Use the absence of --no-rebind with existing binding
156 direction('tcp:5566', 'tcp:6677')
157 msg = direction_list()
158 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
159 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
160
161 direction_remove_all()
162 msg = direction_list()
163 self.assertEqual('', msg.strip())
164
165 def test_forward_no_rebind(self):
166 self._test_no_rebind('forward', self.device.forward_list,
167 self.device.forward, self.device.forward_no_rebind,
168 self.device.forward_remove_all)
169
170 def test_reverse_no_rebind(self):
171 self._test_no_rebind('reverse', self.device.reverse_list,
172 self.device.reverse, self.device.reverse_no_rebind,
173 self.device.reverse_remove_all)
174
175 def test_forward(self):
176 msg = self.device.forward_list()
177 self.assertEqual('', msg.strip(),
178 'Forwarding list must be empty to run this test.')
179 self.device.forward('tcp:5566', 'tcp:6655')
180 msg = self.device.forward_list()
181 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
182 self.device.forward('tcp:7788', 'tcp:8877')
183 msg = self.device.forward_list()
184 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
185 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
186 self.device.forward_remove('tcp:5566')
187 msg = self.device.forward_list()
188 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
189 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
190 self.device.forward_remove_all()
191 msg = self.device.forward_list()
192 self.assertEqual('', msg.strip())
193
194 def test_reverse(self):
195 msg = self.device.reverse_list()
196 self.assertEqual('', msg.strip(),
197 'Reverse forwarding list must be empty to run this test.')
198 self.device.reverse('tcp:5566', 'tcp:6655')
199 msg = self.device.reverse_list()
200 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
201 self.device.reverse('tcp:7788', 'tcp:8877')
202 msg = self.device.reverse_list()
203 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
204 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
205 self.device.reverse_remove('tcp:5566')
206 msg = self.device.reverse_list()
207 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
208 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
209 self.device.reverse_remove_all()
210 msg = self.device.reverse_list()
211 self.assertEqual('', msg.strip())
212
213 # Note: If you run this test when adb connect'd to a physical device over
214 # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821
215 def test_forward_reverse_echo(self):
216 """Send data through adb forward and read it back via adb reverse"""
217 forward_port = 12345
218 reverse_port = forward_port + 1
219 forward_spec = "tcp:" + str(forward_port)
220 reverse_spec = "tcp:" + str(reverse_port)
221 forward_setup = False
222 reverse_setup = False
223
224 try:
225 # listen on localhost:forward_port, connect to remote:forward_port
226 self.device.forward(forward_spec, forward_spec)
227 forward_setup = True
228 # listen on remote:forward_port, connect to localhost:reverse_port
229 self.device.reverse(forward_spec, reverse_spec)
230 reverse_setup = True
231
232 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
233 with contextlib.closing(listener):
234 # Use SO_REUSEADDR so that subsequent runs of the test can grab
235 # the port even if it is in TIME_WAIT.
236 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
237
238 # Listen on localhost:reverse_port before connecting to
239 # localhost:forward_port because that will cause adb to connect
240 # back to localhost:reverse_port.
241 listener.bind(('127.0.0.1', reverse_port))
242 listener.listen(4)
243
244 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
245 with contextlib.closing(client):
246 # Connect to the listener.
247 client.connect(('127.0.0.1', forward_port))
248
249 # Accept the client connection.
250 accepted_connection, addr = listener.accept()
251 with contextlib.closing(accepted_connection) as server:
252 data = 'hello'
253
254 # Send data into the port setup by adb forward.
255 client.sendall(data)
256 # Explicitly close() so that server gets EOF.
257 client.close()
258
259 # Verify that the data came back via adb reverse.
260 self.assertEqual(data, server.makefile().read())
261 finally:
262 if reverse_setup:
263 self.device.reverse_remove(forward_spec)
264 if forward_setup:
265 self.device.forward_remove(forward_spec)
266
267
268class ShellTest(DeviceTest):
269 def _interactive_shell(self, shell_args, input):
270 """Runs an interactive adb shell.
271
272 Args:
273 shell_args: List of string arguments to `adb shell`.
274 input: String input to send to the interactive shell.
275
276 Returns:
277 The remote exit code.
278
279 Raises:
280 unittest.SkipTest: The device doesn't support exit codes.
281 """
282 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
283 raise unittest.SkipTest('exit codes are unavailable on this device')
284
285 proc = subprocess.Popen(
286 self.device.adb_cmd + ['shell'] + shell_args,
287 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
288 stderr=subprocess.PIPE)
289 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
290 # to explicitly add an exit command to close the session from the device
291 # side, plus the necessary newline to complete the interactive command.
292 proc.communicate(input + '; exit\n')
293 return proc.returncode
294
295 def test_cat(self):
296 """Check that we can at least cat a file."""
297 out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
298 elements = out.split()
299 self.assertEqual(len(elements), 2)
300
301 uptime, idle = elements
302 self.assertGreater(float(uptime), 0.0)
303 self.assertGreater(float(idle), 0.0)
304
305 def test_throws_on_failure(self):
306 self.assertRaises(adb.ShellError, self.device.shell, ['false'])
307
308 def test_output_not_stripped(self):
309 out = self.device.shell(['echo', 'foo'])[0]
310 self.assertEqual(out, 'foo' + self.device.linesep)
311
312 def test_shell_nocheck_failure(self):
313 rc, out, _ = self.device.shell_nocheck(['false'])
314 self.assertNotEqual(rc, 0)
315 self.assertEqual(out, '')
316
317 def test_shell_nocheck_output_not_stripped(self):
318 rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
319 self.assertEqual(rc, 0)
320 self.assertEqual(out, 'foo' + self.device.linesep)
321
322 def test_can_distinguish_tricky_results(self):
323 # If result checking on ADB shell is naively implemented as
324 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
325 # output from the result for a cmd of `echo -n 1`.
326 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
327 self.assertEqual(rc, 0)
328 self.assertEqual(out, '1')
329
330 def test_line_endings(self):
331 """Ensure that line ending translation is not happening in the pty.
332
333 Bug: http://b/19735063
334 """
335 output = self.device.shell(['uname'])[0]
336 self.assertEqual(output, 'Linux' + self.device.linesep)
337
338 def test_pty_logic(self):
339 """Tests that a PTY is allocated when it should be.
340
341 PTY allocation behavior should match ssh; some behavior requires
342 a terminal stdin to test so this test will be skipped if stdin
343 is not a terminal.
344 """
345 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
346 raise unittest.SkipTest('PTY arguments unsupported on this device')
347 if not os.isatty(sys.stdin.fileno()):
348 raise unittest.SkipTest('PTY tests require stdin terminal')
349
350 def check_pty(args):
351 """Checks adb shell PTY allocation.
352
353 Tests |args| for terminal and non-terminal stdin.
354
355 Args:
356 args: -Tt args in a list (e.g. ['-t', '-t']).
357
358 Returns:
359 A tuple (<terminal>, <non-terminal>). True indicates
360 the corresponding shell allocated a remote PTY.
361 """
362 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
363
364 terminal = subprocess.Popen(
365 test_cmd, stdin=None,
366 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
367 terminal.communicate()
368
369 non_terminal = subprocess.Popen(
370 test_cmd, stdin=subprocess.PIPE,
371 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
372 non_terminal.communicate()
373
374 return (terminal.returncode == 0, non_terminal.returncode == 0)
375
376 # -T: never allocate PTY.
377 self.assertEqual((False, False), check_pty(['-T']))
378
379 # No args: PTY only if stdin is a terminal and shell is interactive,
380 # which is difficult to reliably test from a script.
381 self.assertEqual((False, False), check_pty([]))
382
383 # -t: PTY if stdin is a terminal.
384 self.assertEqual((True, False), check_pty(['-t']))
385
386 # -t -t: always allocate PTY.
387 self.assertEqual((True, True), check_pty(['-t', '-t']))
388
389 def test_shell_protocol(self):
390 """Tests the shell protocol on the device.
391
392 If the device supports shell protocol, this gives us the ability
393 to separate stdout/stderr and return the exit code directly.
394
395 Bug: http://b/19734861
396 """
397 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
398 raise unittest.SkipTest('shell protocol unsupported on this device')
399
400 # Shell protocol should be used by default.
401 result = self.device.shell_nocheck(
402 shlex.split('echo foo; echo bar >&2; exit 17'))
403 self.assertEqual(17, result[0])
404 self.assertEqual('foo' + self.device.linesep, result[1])
405 self.assertEqual('bar' + self.device.linesep, result[2])
406
407 self.assertEqual(17, self._interactive_shell([], 'exit 17'))
408
409 # -x flag should disable shell protocol.
410 result = self.device.shell_nocheck(
411 shlex.split('-x echo foo; echo bar >&2; exit 17'))
412 self.assertEqual(0, result[0])
413 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
414 self.assertEqual('', result[2])
415
416 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
417
418 def test_non_interactive_sigint(self):
419 """Tests that SIGINT in a non-interactive shell kills the process.
420
421 This requires the shell protocol in order to detect the broken
422 pipe; raw data transfer mode will only see the break once the
423 subprocess tries to read or write.
424
425 Bug: http://b/23825725
426 """
427 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
428 raise unittest.SkipTest('shell protocol unsupported on this device')
429
430 # Start a long-running process.
431 sleep_proc = subprocess.Popen(
432 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
433 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
434 stderr=subprocess.STDOUT)
435 remote_pid = sleep_proc.stdout.readline().strip()
436 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
437 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
438
439 # Verify that the process is running, send signal, verify it stopped.
440 self.device.shell(proc_query)
441 os.kill(sleep_proc.pid, signal.SIGINT)
442 sleep_proc.communicate()
443 self.assertEqual(1, self.device.shell_nocheck(proc_query)[0],
444 'subprocess failed to terminate')
445
446 def test_non_interactive_stdin(self):
447 """Tests that non-interactive shells send stdin."""
448 if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
449 raise unittest.SkipTest('non-interactive stdin unsupported '
450 'on this device')
451
452 # Test both small and large inputs.
453 small_input = 'foo'
454 large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
455 string.digits))
456
457 for input in (small_input, large_input):
458 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
459 stdin=subprocess.PIPE,
460 stdout=subprocess.PIPE,
461 stderr=subprocess.PIPE)
462 stdout, stderr = proc.communicate(input)
463 self.assertEqual(input.splitlines(), stdout.splitlines())
464 self.assertEqual('', stderr)
465
466
467class ArgumentEscapingTest(DeviceTest):
468 def test_shell_escaping(self):
469 """Make sure that argument escaping is somewhat sane."""
470
471 # http://b/19734868
472 # Note that this actually matches ssh(1)'s behavior --- it's
473 # converted to `sh -c echo hello; echo world` which sh interprets
474 # as `sh -c echo` (with an argument to that shell of "hello"),
475 # and then `echo world` back in the first shell.
476 result = self.device.shell(
477 shlex.split("sh -c 'echo hello; echo world'"))[0]
478 result = result.splitlines()
479 self.assertEqual(['', 'world'], result)
480 # If you really wanted "hello" and "world", here's what you'd do:
481 result = self.device.shell(
482 shlex.split(r'echo hello\;echo world'))[0].splitlines()
483 self.assertEqual(['hello', 'world'], result)
484
485 # http://b/15479704
486 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
487 self.assertEqual('t', result)
488 result = self.device.shell(
489 shlex.split("sh -c 'true && echo t'"))[0].strip()
490 self.assertEqual('t', result)
491
492 # http://b/20564385
493 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
494 self.assertEqual('t', result)
495 result = self.device.shell(
496 shlex.split(r'echo -n 123\;uname'))[0].strip()
497 self.assertEqual('123Linux', result)
498
499 def test_install_argument_escaping(self):
500 """Make sure that install argument escaping works."""
501 # http://b/20323053, http://b/3090932.
502 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
503 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
504 delete=False)
505 tf.close()
506
507 # Installing bogus .apks fails if the device supports exit codes.
508 try:
509 output = self.device.install(tf.name)
510 except subprocess.CalledProcessError as e:
511 output = e.output
512
513 self.assertIn(file_suffix, output)
514 os.remove(tf.name)
515
516
517class RootUnrootTest(DeviceTest):
518 def _test_root(self):
519 message = self.device.root()
520 if 'adbd cannot run as root in production builds' in message:
521 return
522 self.device.wait()
523 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
524
525 def _test_unroot(self):
526 self.device.unroot()
527 self.device.wait()
528 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
529
530 def test_root_unroot(self):
531 """Make sure that adb root and adb unroot work, using id(1)."""
532 if self.device.get_prop('ro.debuggable') != '1':
533 raise unittest.SkipTest('requires rootable build')
534
535 original_user = self.device.shell(['id', '-un'])[0].strip()
536 try:
537 if original_user == 'root':
538 self._test_unroot()
539 self._test_root()
540 elif original_user == 'shell':
541 self._test_root()
542 self._test_unroot()
543 finally:
544 if original_user == 'root':
545 self.device.root()
546 else:
547 self.device.unroot()
548 self.device.wait()
549
550
551class TcpIpTest(DeviceTest):
552 def test_tcpip_failure_raises(self):
553 """adb tcpip requires a port.
554
555 Bug: http://b/22636927
556 """
557 self.assertRaises(
558 subprocess.CalledProcessError, self.device.tcpip, '')
559 self.assertRaises(
560 subprocess.CalledProcessError, self.device.tcpip, 'foo')
561
562
563class SystemPropertiesTest(DeviceTest):
564 def test_get_prop(self):
565 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
566
567 @requires_root
568 def test_set_prop(self):
569 prop_name = 'foo.bar'
570 self.device.shell(['setprop', prop_name, '""'])
571
572 self.device.set_prop(prop_name, 'qux')
573 self.assertEqual(
574 self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
575
576
577def compute_md5(string):
578 hsh = hashlib.md5()
579 hsh.update(string)
580 return hsh.hexdigest()
581
582
583def get_md5_prog(device):
584 """Older platforms (pre-L) had the name md5 rather than md5sum."""
585 try:
586 device.shell(['md5sum', '/proc/uptime'])
587 return 'md5sum'
588 except adb.ShellError:
589 return 'md5'
590
591
592class HostFile(object):
593 def __init__(self, handle, checksum):
594 self.handle = handle
595 self.checksum = checksum
596 self.full_path = handle.name
597 self.base_name = os.path.basename(self.full_path)
598
599
600class DeviceFile(object):
601 def __init__(self, checksum, full_path):
602 self.checksum = checksum
603 self.full_path = full_path
604 self.base_name = posixpath.basename(self.full_path)
605
606
607def make_random_host_files(in_dir, num_files):
608 min_size = 1 * (1 << 10)
609 max_size = 16 * (1 << 10)
610
611 files = []
612 for _ in xrange(num_files):
613 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
614
615 size = random.randrange(min_size, max_size, 1024)
616 rand_str = os.urandom(size)
617 file_handle.write(rand_str)
618 file_handle.flush()
619 file_handle.close()
620
621 md5 = compute_md5(rand_str)
622 files.append(HostFile(file_handle, md5))
623 return files
624
625
626def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
627 min_size = 1 * (1 << 10)
628 max_size = 16 * (1 << 10)
629
630 files = []
631 for file_num in xrange(num_files):
632 size = random.randrange(min_size, max_size, 1024)
633
634 base_name = prefix + str(file_num)
635 full_path = posixpath.join(in_dir, base_name)
636
637 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
638 'bs={}'.format(size), 'count=1'])
639 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
640
641 files.append(DeviceFile(dev_md5, full_path))
642 return files
643
644
645class FileOperationsTest(DeviceTest):
646 SCRATCH_DIR = '/data/local/tmp'
647 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
648 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
649
650 def _verify_remote(self, checksum, remote_path):
651 dev_md5, _ = self.device.shell([get_md5_prog(self.device),
652 remote_path])[0].split()
653 self.assertEqual(checksum, dev_md5)
654
655 def _verify_local(self, checksum, local_path):
656 with open(local_path, 'rb') as host_file:
657 host_md5 = compute_md5(host_file.read())
658 self.assertEqual(host_md5, checksum)
659
660 def test_push(self):
661 """Push a randomly generated file to specified device."""
662 kbytes = 512
663 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
664 rand_str = os.urandom(1024 * kbytes)
665 tmp.write(rand_str)
666 tmp.close()
667
668 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
669 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
670
671 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
672 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
673
674 os.remove(tmp.name)
675
676 def test_push_dir(self):
677 """Push a randomly generated directory of files to the device."""
678 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
679 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
680
681 try:
682 host_dir = tempfile.mkdtemp()
683
684 # Make sure the temp directory isn't setuid, or else adb will complain.
685 os.chmod(host_dir, 0o700)
686
687 # Create 32 random files.
688 temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
689 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
690
691 for temp_file in temp_files:
692 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
693 os.path.basename(host_dir),
694 temp_file.base_name)
695 self._verify_remote(temp_file.checksum, remote_path)
696 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
697 finally:
698 if host_dir is not None:
699 shutil.rmtree(host_dir)
700
701 @unittest.expectedFailure # b/25566053
702 def test_push_empty(self):
703 """Push a directory containing an empty directory to the device."""
704 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
705 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
706
707 try:
708 host_dir = tempfile.mkdtemp()
709
710 # Make sure the temp directory isn't setuid, or else adb will complain.
711 os.chmod(host_dir, 0o700)
712
713 # Create an empty directory.
714 os.mkdir(os.path.join(host_dir, 'empty'))
715
716 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
717
718 test_empty_cmd = ['[', '-d',
719 os.path.join(self.DEVICE_TEMP_DIR, 'empty')]
720 rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
721 self.assertEqual(rc, 0)
722 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
723 finally:
724 if host_dir is not None:
725 shutil.rmtree(host_dir)
726
727 def test_multiple_push(self):
728 """Push multiple files to the device in one adb push command.
729
730 Bug: http://b/25324823
731 """
732
733 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
734 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
735
736 try:
737 host_dir = tempfile.mkdtemp()
738
739 # Create some random files and a subdirectory containing more files.
740 temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
741
742 subdir = os.path.join(host_dir, "subdir")
743 os.mkdir(subdir)
744 subdir_temp_files = make_random_host_files(in_dir=subdir,
745 num_files=4)
746
747 paths = map(lambda temp_file: temp_file.full_path, temp_files)
748 paths.append(subdir)
749 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
750
751 for temp_file in temp_files:
752 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
753 temp_file.base_name)
754 self._verify_remote(temp_file.checksum, remote_path)
755
756 for subdir_temp_file in subdir_temp_files:
757 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
758 # BROKEN: http://b/25394682
759 # "subdir",
760 temp_file.base_name)
761 self._verify_remote(temp_file.checksum, remote_path)
762
763
764 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
765 finally:
766 if host_dir is not None:
767 shutil.rmtree(host_dir)
768
769
770 def _test_pull(self, remote_file, checksum):
771 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
772 tmp_write.close()
773 self.device.pull(remote=remote_file, local=tmp_write.name)
774 with open(tmp_write.name, 'rb') as tmp_read:
775 host_contents = tmp_read.read()
776 host_md5 = compute_md5(host_contents)
777 self.assertEqual(checksum, host_md5)
778 os.remove(tmp_write.name)
779
780 @requires_non_root
781 def test_pull_error_reporting(self):
782 self.device.shell(['touch', self.DEVICE_TEMP_FILE])
783 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
784
785 try:
786 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
787 except subprocess.CalledProcessError as e:
788 output = e.output
789
790 self.assertIn('Permission denied', output)
791
792 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
793
794 def test_pull(self):
795 """Pull a randomly generated file from specified device."""
796 kbytes = 512
797 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
798 cmd = ['dd', 'if=/dev/urandom',
799 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
800 'count={}'.format(kbytes)]
801 self.device.shell(cmd)
802 dev_md5, _ = self.device.shell(
803 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
804 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
805 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
806
807 def test_pull_dir(self):
808 """Pull a randomly generated directory of files from the device."""
809 try:
810 host_dir = tempfile.mkdtemp()
811
812 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
813 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
814
815 # Populate device directory with random files.
816 temp_files = make_random_device_files(
817 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
818
819 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
820
821 for temp_file in temp_files:
822 host_path = os.path.join(host_dir, temp_file.base_name)
823
824 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
825 finally:
826 if host_dir is not None:
827 shutil.rmtree(host_dir)
828
829 def test_pull_empty(self):
830 """Pull a directory containing an empty directory from the device."""
831 try:
832 host_dir = tempfile.mkdtemp()
833
834 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
835 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
836 self.device.shell(['mkdir', '-p', remote_empty_path])
837
838 self.device.pull(remote=remote_empty_path, local=host_dir)
839 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
840 finally:
841 if host_dir is not None:
842 shutil.rmtree(host_dir)
843
844 def test_multiple_pull(self):
845 """Pull a randomly generated directory of files from the device."""
846
847 try:
848 host_dir = tempfile.mkdtemp()
849
850 subdir = posixpath.join(self.DEVICE_TEMP_DIR, "subdir")
851 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
852 self.device.shell(['mkdir', '-p', subdir])
853
854 # Create some random files and a subdirectory containing more files.
855 temp_files = make_random_device_files(
856 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
857
858 subdir_temp_files = make_random_device_files(
859 self.device, in_dir=subdir, num_files=4, prefix='subdir_')
860
861 paths = map(lambda temp_file: temp_file.full_path, temp_files)
862 paths.append(subdir)
863 self.device._simple_call(['pull'] + paths + [host_dir])
864
865 for temp_file in temp_files:
866 local_path = os.path.join(host_dir, temp_file.base_name)
867 self._verify_local(temp_file.checksum, local_path)
868
869 for subdir_temp_file in subdir_temp_files:
870 local_path = os.path.join(host_dir,
871 "subdir",
872 subdir_temp_file.base_name)
873 self._verify_local(subdir_temp_file.checksum, local_path)
874
875 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
876 finally:
877 if host_dir is not None:
878 shutil.rmtree(host_dir)
879
880 def test_sync(self):
881 """Sync a randomly generated directory of files to specified device."""
882
883 try:
884 base_dir = tempfile.mkdtemp()
885
886 # Create mirror device directory hierarchy within base_dir.
887 full_dir_path = base_dir + self.DEVICE_TEMP_DIR
888 os.makedirs(full_dir_path)
889
890 # Create 32 random files within the host mirror.
891 temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
892
893 # Clean up any trash on the device.
894 device = adb.get_device(product=base_dir)
895 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
896
897 device.sync('data')
898
899 # Confirm that every file on the device mirrors that on the host.
900 for temp_file in temp_files:
901 device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
902 temp_file.base_name)
903 dev_md5, _ = device.shell(
904 [get_md5_prog(self.device), device_full_path])[0].split()
905 self.assertEqual(temp_file.checksum, dev_md5)
906
907 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
908 finally:
909 if base_dir is not None:
910 shutil.rmtree(base_dir)
911
912 def test_unicode_paths(self):
913 """Ensure that we can support non-ASCII paths, even on Windows."""
914 name = u'로보카 폴리'
915
916 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
917 remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
918
919 ## push.
920 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
921 tf.close()
922 self.device.push(tf.name, remote_path)
923 os.remove(tf.name)
924 self.assertFalse(os.path.exists(tf.name))
925
926 # Verify that the device ended up with the expected UTF-8 path
927 output = self.device.shell(
928 ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
929 self.assertEqual(remote_path.encode('utf-8'), output)
930
931 # pull.
932 self.device.pull(remote_path, tf.name)
933 self.assertTrue(os.path.exists(tf.name))
934 os.remove(tf.name)
935 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
936
937
938def main():
939 random.seed(0)
940 if len(adb.get_devices()) > 0:
941 suite = unittest.TestLoader().loadTestsFromName(__name__)
942 unittest.TextTestRunner(verbosity=3).run(suite)
943 else:
944 print('Test suite must be run with attached devices')
945
946
947if __name__ == '__main__':
948 main()