blob: cdc57c6bcf0d75bb040cc700ac380ae85691f4c3 [file] [log] [blame]
Josh Gao191c1542015-12-09 11:26:11 -08001#!/usr/bin/env python
2# -*- coding: utf-8 -*-
3#
4# Copyright (C) 2015 The Android Open Source Project
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18from __future__ import print_function
19
20import contextlib
21import hashlib
22import os
23import posixpath
24import random
25import re
26import shlex
27import shutil
28import signal
29import socket
30import string
31import subprocess
32import sys
33import tempfile
34import unittest
35
36import mock
37
38import adb
39
40
41def requires_root(func):
42 def wrapper(self, *args):
43 if self.device.get_prop('ro.debuggable') != '1':
44 raise unittest.SkipTest('requires rootable build')
45
46 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
47 if not was_root:
48 self.device.root()
49 self.device.wait()
50
51 try:
52 func(self, *args)
53 finally:
54 if not was_root:
55 self.device.unroot()
56 self.device.wait()
57
58 return wrapper
59
60
61def requires_non_root(func):
62 def wrapper(self, *args):
63 was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
64 if was_root:
65 self.device.unroot()
66 self.device.wait()
67
68 try:
69 func(self, *args)
70 finally:
71 if was_root:
72 self.device.root()
73 self.device.wait()
74
75 return wrapper
76
77
78class GetDeviceTest(unittest.TestCase):
79 def setUp(self):
80 self.android_serial = os.getenv('ANDROID_SERIAL')
81 if 'ANDROID_SERIAL' in os.environ:
82 del os.environ['ANDROID_SERIAL']
83
84 def tearDown(self):
85 if self.android_serial is not None:
86 os.environ['ANDROID_SERIAL'] = self.android_serial
87 else:
88 if 'ANDROID_SERIAL' in os.environ:
89 del os.environ['ANDROID_SERIAL']
90
91 @mock.patch('adb.device.get_devices')
92 def test_explicit(self, mock_get_devices):
93 mock_get_devices.return_value = ['foo', 'bar']
94 device = adb.get_device('foo')
95 self.assertEqual(device.serial, 'foo')
96
97 @mock.patch('adb.device.get_devices')
98 def test_from_env(self, mock_get_devices):
99 mock_get_devices.return_value = ['foo', 'bar']
100 os.environ['ANDROID_SERIAL'] = 'foo'
101 device = adb.get_device()
102 self.assertEqual(device.serial, 'foo')
103
104 @mock.patch('adb.device.get_devices')
105 def test_arg_beats_env(self, mock_get_devices):
106 mock_get_devices.return_value = ['foo', 'bar']
107 os.environ['ANDROID_SERIAL'] = 'bar'
108 device = adb.get_device('foo')
109 self.assertEqual(device.serial, 'foo')
110
111 @mock.patch('adb.device.get_devices')
112 def test_no_such_device(self, mock_get_devices):
113 mock_get_devices.return_value = ['foo', 'bar']
114 self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
115
116 os.environ['ANDROID_SERIAL'] = 'baz'
117 self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
118
119 @mock.patch('adb.device.get_devices')
120 def test_unique_device(self, mock_get_devices):
121 mock_get_devices.return_value = ['foo']
122 device = adb.get_device()
123 self.assertEqual(device.serial, 'foo')
124
125 @mock.patch('adb.device.get_devices')
126 def test_no_unique_device(self, mock_get_devices):
127 mock_get_devices.return_value = ['foo', 'bar']
128 self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
129
130
131class DeviceTest(unittest.TestCase):
132 def setUp(self):
133 self.device = adb.get_device()
134
135
136class ForwardReverseTest(DeviceTest):
137 def _test_no_rebind(self, description, direction_list, direction,
138 direction_no_rebind, direction_remove_all):
139 msg = direction_list()
140 self.assertEqual('', msg.strip(),
141 description + ' list must be empty to run this test.')
142
143 # Use --no-rebind with no existing binding
144 direction_no_rebind('tcp:5566', 'tcp:6655')
145 msg = direction_list()
146 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
147
148 # Use --no-rebind with existing binding
149 with self.assertRaises(subprocess.CalledProcessError):
150 direction_no_rebind('tcp:5566', 'tcp:6677')
151 msg = direction_list()
152 self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
153 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
154
155 # Use the absence of --no-rebind with existing binding
156 direction('tcp:5566', 'tcp:6677')
157 msg = direction_list()
158 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
159 self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
160
161 direction_remove_all()
162 msg = direction_list()
163 self.assertEqual('', msg.strip())
164
165 def test_forward_no_rebind(self):
166 self._test_no_rebind('forward', self.device.forward_list,
167 self.device.forward, self.device.forward_no_rebind,
168 self.device.forward_remove_all)
169
170 def test_reverse_no_rebind(self):
171 self._test_no_rebind('reverse', self.device.reverse_list,
172 self.device.reverse, self.device.reverse_no_rebind,
173 self.device.reverse_remove_all)
174
175 def test_forward(self):
176 msg = self.device.forward_list()
177 self.assertEqual('', msg.strip(),
178 'Forwarding list must be empty to run this test.')
179 self.device.forward('tcp:5566', 'tcp:6655')
180 msg = self.device.forward_list()
181 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
182 self.device.forward('tcp:7788', 'tcp:8877')
183 msg = self.device.forward_list()
184 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
185 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
186 self.device.forward_remove('tcp:5566')
187 msg = self.device.forward_list()
188 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
189 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
190 self.device.forward_remove_all()
191 msg = self.device.forward_list()
192 self.assertEqual('', msg.strip())
193
David Purselleaae97e2016-04-07 11:25:48 -0700194 def test_forward_tcp_port_0(self):
195 self.assertEqual('', self.device.forward_list().strip(),
196 'Forwarding list must be empty to run this test.')
197
198 try:
199 # If resolving TCP port 0 is supported, `adb forward` will print
200 # the actual port number.
201 port = self.device.forward('tcp:0', 'tcp:8888').strip()
202 if not port:
203 raise unittest.SkipTest('Forwarding tcp:0 is not available.')
204
205 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
206 self.device.forward_list()))
207 finally:
208 self.device.forward_remove_all()
209
Josh Gao191c1542015-12-09 11:26:11 -0800210 def test_reverse(self):
211 msg = self.device.reverse_list()
212 self.assertEqual('', msg.strip(),
213 'Reverse forwarding list must be empty to run this test.')
214 self.device.reverse('tcp:5566', 'tcp:6655')
215 msg = self.device.reverse_list()
216 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
217 self.device.reverse('tcp:7788', 'tcp:8877')
218 msg = self.device.reverse_list()
219 self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
220 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
221 self.device.reverse_remove('tcp:5566')
222 msg = self.device.reverse_list()
223 self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
224 self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
225 self.device.reverse_remove_all()
226 msg = self.device.reverse_list()
227 self.assertEqual('', msg.strip())
228
David Purselleaae97e2016-04-07 11:25:48 -0700229 def test_reverse_tcp_port_0(self):
230 self.assertEqual('', self.device.reverse_list().strip(),
231 'Reverse list must be empty to run this test.')
232
233 try:
234 # If resolving TCP port 0 is supported, `adb reverse` will print
235 # the actual port number.
236 port = self.device.reverse('tcp:0', 'tcp:8888').strip()
237 if not port:
238 raise unittest.SkipTest('Reversing tcp:0 is not available.')
239
240 self.assertTrue(re.search(r'tcp:{}.+tcp:8888'.format(port),
241 self.device.reverse_list()))
242 finally:
243 self.device.reverse_remove_all()
244
Josh Gao191c1542015-12-09 11:26:11 -0800245 # Note: If you run this test when adb connect'd to a physical device over
246 # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821
247 def test_forward_reverse_echo(self):
248 """Send data through adb forward and read it back via adb reverse"""
249 forward_port = 12345
250 reverse_port = forward_port + 1
Josh Gao255c5c82016-03-03 14:49:02 -0800251 forward_spec = 'tcp:' + str(forward_port)
252 reverse_spec = 'tcp:' + str(reverse_port)
Josh Gao191c1542015-12-09 11:26:11 -0800253 forward_setup = False
254 reverse_setup = False
255
256 try:
257 # listen on localhost:forward_port, connect to remote:forward_port
258 self.device.forward(forward_spec, forward_spec)
259 forward_setup = True
260 # listen on remote:forward_port, connect to localhost:reverse_port
261 self.device.reverse(forward_spec, reverse_spec)
262 reverse_setup = True
263
264 listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
265 with contextlib.closing(listener):
266 # Use SO_REUSEADDR so that subsequent runs of the test can grab
267 # the port even if it is in TIME_WAIT.
268 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
269
270 # Listen on localhost:reverse_port before connecting to
271 # localhost:forward_port because that will cause adb to connect
272 # back to localhost:reverse_port.
273 listener.bind(('127.0.0.1', reverse_port))
274 listener.listen(4)
275
276 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
277 with contextlib.closing(client):
278 # Connect to the listener.
279 client.connect(('127.0.0.1', forward_port))
280
281 # Accept the client connection.
282 accepted_connection, addr = listener.accept()
283 with contextlib.closing(accepted_connection) as server:
284 data = 'hello'
285
286 # Send data into the port setup by adb forward.
287 client.sendall(data)
288 # Explicitly close() so that server gets EOF.
289 client.close()
290
291 # Verify that the data came back via adb reverse.
292 self.assertEqual(data, server.makefile().read())
293 finally:
294 if reverse_setup:
295 self.device.reverse_remove(forward_spec)
296 if forward_setup:
297 self.device.forward_remove(forward_spec)
298
299
300class ShellTest(DeviceTest):
301 def _interactive_shell(self, shell_args, input):
302 """Runs an interactive adb shell.
303
304 Args:
305 shell_args: List of string arguments to `adb shell`.
306 input: String input to send to the interactive shell.
307
308 Returns:
309 The remote exit code.
310
311 Raises:
312 unittest.SkipTest: The device doesn't support exit codes.
313 """
David Pursellcf467412016-04-26 13:25:57 -0700314 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800315 raise unittest.SkipTest('exit codes are unavailable on this device')
316
317 proc = subprocess.Popen(
318 self.device.adb_cmd + ['shell'] + shell_args,
319 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
320 stderr=subprocess.PIPE)
321 # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
322 # to explicitly add an exit command to close the session from the device
323 # side, plus the necessary newline to complete the interactive command.
324 proc.communicate(input + '; exit\n')
325 return proc.returncode
326
327 def test_cat(self):
328 """Check that we can at least cat a file."""
329 out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
330 elements = out.split()
331 self.assertEqual(len(elements), 2)
332
333 uptime, idle = elements
334 self.assertGreater(float(uptime), 0.0)
335 self.assertGreater(float(idle), 0.0)
336
337 def test_throws_on_failure(self):
338 self.assertRaises(adb.ShellError, self.device.shell, ['false'])
339
340 def test_output_not_stripped(self):
341 out = self.device.shell(['echo', 'foo'])[0]
342 self.assertEqual(out, 'foo' + self.device.linesep)
343
344 def test_shell_nocheck_failure(self):
345 rc, out, _ = self.device.shell_nocheck(['false'])
346 self.assertNotEqual(rc, 0)
347 self.assertEqual(out, '')
348
349 def test_shell_nocheck_output_not_stripped(self):
350 rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
351 self.assertEqual(rc, 0)
352 self.assertEqual(out, 'foo' + self.device.linesep)
353
354 def test_can_distinguish_tricky_results(self):
355 # If result checking on ADB shell is naively implemented as
356 # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
357 # output from the result for a cmd of `echo -n 1`.
358 rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
359 self.assertEqual(rc, 0)
360 self.assertEqual(out, '1')
361
362 def test_line_endings(self):
363 """Ensure that line ending translation is not happening in the pty.
364
365 Bug: http://b/19735063
366 """
367 output = self.device.shell(['uname'])[0]
368 self.assertEqual(output, 'Linux' + self.device.linesep)
369
370 def test_pty_logic(self):
371 """Tests that a PTY is allocated when it should be.
372
373 PTY allocation behavior should match ssh; some behavior requires
374 a terminal stdin to test so this test will be skipped if stdin
375 is not a terminal.
376 """
David Pursellcf467412016-04-26 13:25:57 -0700377 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800378 raise unittest.SkipTest('PTY arguments unsupported on this device')
379 if not os.isatty(sys.stdin.fileno()):
380 raise unittest.SkipTest('PTY tests require stdin terminal')
381
382 def check_pty(args):
383 """Checks adb shell PTY allocation.
384
385 Tests |args| for terminal and non-terminal stdin.
386
387 Args:
388 args: -Tt args in a list (e.g. ['-t', '-t']).
389
390 Returns:
391 A tuple (<terminal>, <non-terminal>). True indicates
392 the corresponding shell allocated a remote PTY.
393 """
394 test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
395
396 terminal = subprocess.Popen(
397 test_cmd, stdin=None,
398 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
399 terminal.communicate()
400
401 non_terminal = subprocess.Popen(
402 test_cmd, stdin=subprocess.PIPE,
403 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
404 non_terminal.communicate()
405
406 return (terminal.returncode == 0, non_terminal.returncode == 0)
407
408 # -T: never allocate PTY.
409 self.assertEqual((False, False), check_pty(['-T']))
410
411 # No args: PTY only if stdin is a terminal and shell is interactive,
412 # which is difficult to reliably test from a script.
413 self.assertEqual((False, False), check_pty([]))
414
415 # -t: PTY if stdin is a terminal.
416 self.assertEqual((True, False), check_pty(['-t']))
417
418 # -t -t: always allocate PTY.
419 self.assertEqual((True, True), check_pty(['-t', '-t']))
420
421 def test_shell_protocol(self):
422 """Tests the shell protocol on the device.
423
424 If the device supports shell protocol, this gives us the ability
425 to separate stdout/stderr and return the exit code directly.
426
427 Bug: http://b/19734861
428 """
David Pursellcf467412016-04-26 13:25:57 -0700429 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800430 raise unittest.SkipTest('shell protocol unsupported on this device')
431
432 # Shell protocol should be used by default.
433 result = self.device.shell_nocheck(
434 shlex.split('echo foo; echo bar >&2; exit 17'))
435 self.assertEqual(17, result[0])
436 self.assertEqual('foo' + self.device.linesep, result[1])
437 self.assertEqual('bar' + self.device.linesep, result[2])
438
439 self.assertEqual(17, self._interactive_shell([], 'exit 17'))
440
441 # -x flag should disable shell protocol.
442 result = self.device.shell_nocheck(
443 shlex.split('-x echo foo; echo bar >&2; exit 17'))
444 self.assertEqual(0, result[0])
445 self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
446 self.assertEqual('', result[2])
447
448 self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
449
450 def test_non_interactive_sigint(self):
451 """Tests that SIGINT in a non-interactive shell kills the process.
452
453 This requires the shell protocol in order to detect the broken
454 pipe; raw data transfer mode will only see the break once the
455 subprocess tries to read or write.
456
457 Bug: http://b/23825725
458 """
David Pursellcf467412016-04-26 13:25:57 -0700459 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800460 raise unittest.SkipTest('shell protocol unsupported on this device')
461
462 # Start a long-running process.
463 sleep_proc = subprocess.Popen(
464 self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
465 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
466 stderr=subprocess.STDOUT)
467 remote_pid = sleep_proc.stdout.readline().strip()
468 self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
469 proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
470
471 # Verify that the process is running, send signal, verify it stopped.
472 self.device.shell(proc_query)
473 os.kill(sleep_proc.pid, signal.SIGINT)
474 sleep_proc.communicate()
475 self.assertEqual(1, self.device.shell_nocheck(proc_query)[0],
476 'subprocess failed to terminate')
477
478 def test_non_interactive_stdin(self):
479 """Tests that non-interactive shells send stdin."""
David Pursellcf467412016-04-26 13:25:57 -0700480 if not self.device.has_shell_protocol():
Josh Gao191c1542015-12-09 11:26:11 -0800481 raise unittest.SkipTest('non-interactive stdin unsupported '
482 'on this device')
483
484 # Test both small and large inputs.
485 small_input = 'foo'
486 large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
487 string.digits))
488
489 for input in (small_input, large_input):
490 proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
491 stdin=subprocess.PIPE,
492 stdout=subprocess.PIPE,
493 stderr=subprocess.PIPE)
494 stdout, stderr = proc.communicate(input)
495 self.assertEqual(input.splitlines(), stdout.splitlines())
496 self.assertEqual('', stderr)
497
498
499class ArgumentEscapingTest(DeviceTest):
500 def test_shell_escaping(self):
501 """Make sure that argument escaping is somewhat sane."""
502
503 # http://b/19734868
504 # Note that this actually matches ssh(1)'s behavior --- it's
505 # converted to `sh -c echo hello; echo world` which sh interprets
506 # as `sh -c echo` (with an argument to that shell of "hello"),
507 # and then `echo world` back in the first shell.
508 result = self.device.shell(
509 shlex.split("sh -c 'echo hello; echo world'"))[0]
510 result = result.splitlines()
511 self.assertEqual(['', 'world'], result)
512 # If you really wanted "hello" and "world", here's what you'd do:
513 result = self.device.shell(
514 shlex.split(r'echo hello\;echo world'))[0].splitlines()
515 self.assertEqual(['hello', 'world'], result)
516
517 # http://b/15479704
518 result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
519 self.assertEqual('t', result)
520 result = self.device.shell(
521 shlex.split("sh -c 'true && echo t'"))[0].strip()
522 self.assertEqual('t', result)
523
524 # http://b/20564385
525 result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
526 self.assertEqual('t', result)
527 result = self.device.shell(
528 shlex.split(r'echo -n 123\;uname'))[0].strip()
529 self.assertEqual('123Linux', result)
530
531 def test_install_argument_escaping(self):
532 """Make sure that install argument escaping works."""
533 # http://b/20323053, http://b/3090932.
534 for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
535 tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
536 delete=False)
537 tf.close()
538
539 # Installing bogus .apks fails if the device supports exit codes.
540 try:
541 output = self.device.install(tf.name)
542 except subprocess.CalledProcessError as e:
543 output = e.output
544
545 self.assertIn(file_suffix, output)
546 os.remove(tf.name)
547
548
549class RootUnrootTest(DeviceTest):
550 def _test_root(self):
551 message = self.device.root()
552 if 'adbd cannot run as root in production builds' in message:
553 return
554 self.device.wait()
555 self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
556
557 def _test_unroot(self):
558 self.device.unroot()
559 self.device.wait()
560 self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
561
562 def test_root_unroot(self):
563 """Make sure that adb root and adb unroot work, using id(1)."""
564 if self.device.get_prop('ro.debuggable') != '1':
565 raise unittest.SkipTest('requires rootable build')
566
567 original_user = self.device.shell(['id', '-un'])[0].strip()
568 try:
569 if original_user == 'root':
570 self._test_unroot()
571 self._test_root()
572 elif original_user == 'shell':
573 self._test_root()
574 self._test_unroot()
575 finally:
576 if original_user == 'root':
577 self.device.root()
578 else:
579 self.device.unroot()
580 self.device.wait()
581
582
583class TcpIpTest(DeviceTest):
584 def test_tcpip_failure_raises(self):
585 """adb tcpip requires a port.
586
587 Bug: http://b/22636927
588 """
589 self.assertRaises(
590 subprocess.CalledProcessError, self.device.tcpip, '')
591 self.assertRaises(
592 subprocess.CalledProcessError, self.device.tcpip, 'foo')
593
594
595class SystemPropertiesTest(DeviceTest):
596 def test_get_prop(self):
597 self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
598
599 @requires_root
600 def test_set_prop(self):
601 prop_name = 'foo.bar'
602 self.device.shell(['setprop', prop_name, '""'])
603
604 self.device.set_prop(prop_name, 'qux')
605 self.assertEqual(
606 self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
607
608
609def compute_md5(string):
610 hsh = hashlib.md5()
611 hsh.update(string)
612 return hsh.hexdigest()
613
614
615def get_md5_prog(device):
616 """Older platforms (pre-L) had the name md5 rather than md5sum."""
617 try:
618 device.shell(['md5sum', '/proc/uptime'])
619 return 'md5sum'
620 except adb.ShellError:
621 return 'md5'
622
623
624class HostFile(object):
625 def __init__(self, handle, checksum):
626 self.handle = handle
627 self.checksum = checksum
628 self.full_path = handle.name
629 self.base_name = os.path.basename(self.full_path)
630
631
632class DeviceFile(object):
633 def __init__(self, checksum, full_path):
634 self.checksum = checksum
635 self.full_path = full_path
636 self.base_name = posixpath.basename(self.full_path)
637
638
639def make_random_host_files(in_dir, num_files):
640 min_size = 1 * (1 << 10)
641 max_size = 16 * (1 << 10)
642
643 files = []
644 for _ in xrange(num_files):
645 file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
646
647 size = random.randrange(min_size, max_size, 1024)
648 rand_str = os.urandom(size)
649 file_handle.write(rand_str)
650 file_handle.flush()
651 file_handle.close()
652
653 md5 = compute_md5(rand_str)
654 files.append(HostFile(file_handle, md5))
655 return files
656
657
658def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
659 min_size = 1 * (1 << 10)
660 max_size = 16 * (1 << 10)
661
662 files = []
663 for file_num in xrange(num_files):
664 size = random.randrange(min_size, max_size, 1024)
665
666 base_name = prefix + str(file_num)
667 full_path = posixpath.join(in_dir, base_name)
668
669 device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
670 'bs={}'.format(size), 'count=1'])
671 dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
672
673 files.append(DeviceFile(dev_md5, full_path))
674 return files
675
676
677class FileOperationsTest(DeviceTest):
678 SCRATCH_DIR = '/data/local/tmp'
679 DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
680 DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
681
682 def _verify_remote(self, checksum, remote_path):
683 dev_md5, _ = self.device.shell([get_md5_prog(self.device),
684 remote_path])[0].split()
685 self.assertEqual(checksum, dev_md5)
686
687 def _verify_local(self, checksum, local_path):
688 with open(local_path, 'rb') as host_file:
689 host_md5 = compute_md5(host_file.read())
690 self.assertEqual(host_md5, checksum)
691
692 def test_push(self):
693 """Push a randomly generated file to specified device."""
694 kbytes = 512
695 tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
696 rand_str = os.urandom(1024 * kbytes)
697 tmp.write(rand_str)
698 tmp.close()
699
700 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
701 self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
702
703 self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
704 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
705
706 os.remove(tmp.name)
707
708 def test_push_dir(self):
709 """Push a randomly generated directory of files to the device."""
710 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
711 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
712
713 try:
714 host_dir = tempfile.mkdtemp()
715
716 # Make sure the temp directory isn't setuid, or else adb will complain.
717 os.chmod(host_dir, 0o700)
718
719 # Create 32 random files.
720 temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
721 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
722
723 for temp_file in temp_files:
724 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
725 os.path.basename(host_dir),
726 temp_file.base_name)
727 self._verify_remote(temp_file.checksum, remote_path)
728 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
729 finally:
730 if host_dir is not None:
731 shutil.rmtree(host_dir)
732
733 @unittest.expectedFailure # b/25566053
734 def test_push_empty(self):
735 """Push a directory containing an empty directory to the device."""
736 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
737 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
738
739 try:
740 host_dir = tempfile.mkdtemp()
741
742 # Make sure the temp directory isn't setuid, or else adb will complain.
743 os.chmod(host_dir, 0o700)
744
745 # Create an empty directory.
746 os.mkdir(os.path.join(host_dir, 'empty'))
747
748 self.device.push(host_dir, self.DEVICE_TEMP_DIR)
749
750 test_empty_cmd = ['[', '-d',
751 os.path.join(self.DEVICE_TEMP_DIR, 'empty')]
752 rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
753 self.assertEqual(rc, 0)
754 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
755 finally:
756 if host_dir is not None:
757 shutil.rmtree(host_dir)
758
759 def test_multiple_push(self):
760 """Push multiple files to the device in one adb push command.
761
762 Bug: http://b/25324823
763 """
764
765 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
766 self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
767
768 try:
769 host_dir = tempfile.mkdtemp()
770
771 # Create some random files and a subdirectory containing more files.
772 temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
773
Josh Gao255c5c82016-03-03 14:49:02 -0800774 subdir = os.path.join(host_dir, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -0800775 os.mkdir(subdir)
776 subdir_temp_files = make_random_host_files(in_dir=subdir,
777 num_files=4)
778
779 paths = map(lambda temp_file: temp_file.full_path, temp_files)
780 paths.append(subdir)
781 self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
782
783 for temp_file in temp_files:
784 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
785 temp_file.base_name)
786 self._verify_remote(temp_file.checksum, remote_path)
787
788 for subdir_temp_file in subdir_temp_files:
789 remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
790 # BROKEN: http://b/25394682
Josh Gao255c5c82016-03-03 14:49:02 -0800791 # 'subdir';
Josh Gao191c1542015-12-09 11:26:11 -0800792 temp_file.base_name)
793 self._verify_remote(temp_file.checksum, remote_path)
794
795
796 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
797 finally:
798 if host_dir is not None:
799 shutil.rmtree(host_dir)
800
Josh Gaoafcdcd72016-02-19 15:55:55 -0800801 @requires_non_root
802 def test_push_error_reporting(self):
803 """Make sure that errors that occur while pushing a file get reported
804
805 Bug: http://b/26816782
806 """
807 with tempfile.NamedTemporaryFile() as tmp_file:
808 tmp_file.write('\0' * 1024 * 1024)
809 tmp_file.flush()
810 try:
811 self.device.push(local=tmp_file.name, remote='/system/')
Josh Gao255c5c82016-03-03 14:49:02 -0800812 self.fail('push should not have succeeded')
Josh Gaoafcdcd72016-02-19 15:55:55 -0800813 except subprocess.CalledProcessError as e:
814 output = e.output
815
Josh Gao255c5c82016-03-03 14:49:02 -0800816 self.assertIn('Permission denied', output)
Josh Gao191c1542015-12-09 11:26:11 -0800817
818 def _test_pull(self, remote_file, checksum):
819 tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
820 tmp_write.close()
821 self.device.pull(remote=remote_file, local=tmp_write.name)
822 with open(tmp_write.name, 'rb') as tmp_read:
823 host_contents = tmp_read.read()
824 host_md5 = compute_md5(host_contents)
825 self.assertEqual(checksum, host_md5)
826 os.remove(tmp_write.name)
827
828 @requires_non_root
829 def test_pull_error_reporting(self):
830 self.device.shell(['touch', self.DEVICE_TEMP_FILE])
831 self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
832
833 try:
834 output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
835 except subprocess.CalledProcessError as e:
836 output = e.output
837
838 self.assertIn('Permission denied', output)
839
840 self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
841
842 def test_pull(self):
843 """Pull a randomly generated file from specified device."""
844 kbytes = 512
845 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
846 cmd = ['dd', 'if=/dev/urandom',
847 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
848 'count={}'.format(kbytes)]
849 self.device.shell(cmd)
850 dev_md5, _ = self.device.shell(
851 [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
852 self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
853 self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
854
855 def test_pull_dir(self):
856 """Pull a randomly generated directory of files from the device."""
857 try:
858 host_dir = tempfile.mkdtemp()
859
860 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
861 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
862
863 # Populate device directory with random files.
864 temp_files = make_random_device_files(
865 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
866
867 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
868
869 for temp_file in temp_files:
Josh Gaoce8f2cd2015-12-09 14:20:23 -0800870 host_path = os.path.join(
871 host_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
872 temp_file.base_name)
873 self._verify_local(temp_file.checksum, host_path)
Josh Gao191c1542015-12-09 11:26:11 -0800874
875 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
876 finally:
877 if host_dir is not None:
878 shutil.rmtree(host_dir)
879
Josh Gao1e611a32016-02-26 13:26:55 -0800880 def test_pull_dir_symlink(self):
881 """Pull a directory into a symlink to a directory.
882
883 Bug: http://b/27362811
884 """
Josh Gao255c5c82016-03-03 14:49:02 -0800885 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -0800886 raise unittest.SkipTest('requires POSIX')
887
888 try:
889 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -0800890 real_dir = os.path.join(host_dir, 'dir')
891 symlink = os.path.join(host_dir, 'symlink')
Josh Gao1e611a32016-02-26 13:26:55 -0800892 os.mkdir(real_dir)
893 os.symlink(real_dir, symlink)
894
895 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
896 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
897
898 # Populate device directory with random files.
899 temp_files = make_random_device_files(
900 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
901
902 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=symlink)
903
904 for temp_file in temp_files:
905 host_path = os.path.join(
906 real_dir, posixpath.basename(self.DEVICE_TEMP_DIR),
907 temp_file.base_name)
908 self._verify_local(temp_file.checksum, host_path)
909
910 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
911 finally:
912 if host_dir is not None:
913 shutil.rmtree(host_dir)
914
915 def test_pull_dir_symlink_collision(self):
916 """Pull a directory into a colliding symlink to directory."""
Josh Gao255c5c82016-03-03 14:49:02 -0800917 if os.name != 'posix':
Josh Gao1e611a32016-02-26 13:26:55 -0800918 raise unittest.SkipTest('requires POSIX')
919
920 try:
921 host_dir = tempfile.mkdtemp()
Josh Gao255c5c82016-03-03 14:49:02 -0800922 real_dir = os.path.join(host_dir, 'real')
Josh Gao1e611a32016-02-26 13:26:55 -0800923 tmp_dirname = os.path.basename(self.DEVICE_TEMP_DIR)
924 symlink = os.path.join(host_dir, tmp_dirname)
925 os.mkdir(real_dir)
926 os.symlink(real_dir, symlink)
927
928 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
929 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
930
931 # Populate device directory with random files.
932 temp_files = make_random_device_files(
933 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
934
935 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
936
937 for temp_file in temp_files:
938 host_path = os.path.join(real_dir, temp_file.base_name)
939 self._verify_local(temp_file.checksum, host_path)
940
941 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
942 finally:
943 if host_dir is not None:
944 shutil.rmtree(host_dir)
945
Josh Gao89ec3a82016-03-02 16:00:02 -0800946 def test_pull_dir_nonexistent(self):
947 """Pull a directory of files from the device to a nonexistent path."""
948 try:
949 host_dir = tempfile.mkdtemp()
950 dest_dir = os.path.join(host_dir, 'dest')
951
952 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
953 self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
954
955 # Populate device directory with random files.
956 temp_files = make_random_device_files(
957 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
958
959 self.device.pull(remote=self.DEVICE_TEMP_DIR, local=dest_dir)
960
961 for temp_file in temp_files:
962 host_path = os.path.join(dest_dir, temp_file.base_name)
963 self._verify_local(temp_file.checksum, host_path)
964
965 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
966 finally:
967 if host_dir is not None:
968 shutil.rmtree(host_dir)
969
Josh Gaof2642242015-12-09 14:03:30 -0800970 def test_pull_symlink_dir(self):
971 """Pull a symlink to a directory of symlinks to files."""
972 try:
973 host_dir = tempfile.mkdtemp()
974
975 remote_dir = posixpath.join(self.DEVICE_TEMP_DIR, 'contents')
976 remote_links = posixpath.join(self.DEVICE_TEMP_DIR, 'links')
977 remote_symlink = posixpath.join(self.DEVICE_TEMP_DIR, 'symlink')
978
979 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
980 self.device.shell(['mkdir', '-p', remote_dir, remote_links])
981 self.device.shell(['ln', '-s', remote_links, remote_symlink])
982
983 # Populate device directory with random files.
984 temp_files = make_random_device_files(
985 self.device, in_dir=remote_dir, num_files=32)
986
987 for temp_file in temp_files:
988 self.device.shell(
989 ['ln', '-s', '../contents/{}'.format(temp_file.base_name),
990 posixpath.join(remote_links, temp_file.base_name)])
991
992 self.device.pull(remote=remote_symlink, local=host_dir)
993
994 for temp_file in temp_files:
995 host_path = os.path.join(
996 host_dir, 'symlink', temp_file.base_name)
997 self._verify_local(temp_file.checksum, host_path)
998
999 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1000 finally:
1001 if host_dir is not None:
1002 shutil.rmtree(host_dir)
1003
Josh Gao191c1542015-12-09 11:26:11 -08001004 def test_pull_empty(self):
1005 """Pull a directory containing an empty directory from the device."""
1006 try:
1007 host_dir = tempfile.mkdtemp()
1008
1009 remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
1010 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1011 self.device.shell(['mkdir', '-p', remote_empty_path])
1012
1013 self.device.pull(remote=remote_empty_path, local=host_dir)
1014 self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
1015 finally:
1016 if host_dir is not None:
1017 shutil.rmtree(host_dir)
1018
1019 def test_multiple_pull(self):
1020 """Pull a randomly generated directory of files from the device."""
1021
1022 try:
1023 host_dir = tempfile.mkdtemp()
1024
Josh Gao255c5c82016-03-03 14:49:02 -08001025 subdir = posixpath.join(self.DEVICE_TEMP_DIR, 'subdir')
Josh Gao191c1542015-12-09 11:26:11 -08001026 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1027 self.device.shell(['mkdir', '-p', subdir])
1028
1029 # Create some random files and a subdirectory containing more files.
1030 temp_files = make_random_device_files(
1031 self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
1032
1033 subdir_temp_files = make_random_device_files(
1034 self.device, in_dir=subdir, num_files=4, prefix='subdir_')
1035
1036 paths = map(lambda temp_file: temp_file.full_path, temp_files)
1037 paths.append(subdir)
1038 self.device._simple_call(['pull'] + paths + [host_dir])
1039
1040 for temp_file in temp_files:
1041 local_path = os.path.join(host_dir, temp_file.base_name)
1042 self._verify_local(temp_file.checksum, local_path)
1043
1044 for subdir_temp_file in subdir_temp_files:
1045 local_path = os.path.join(host_dir,
Josh Gao255c5c82016-03-03 14:49:02 -08001046 'subdir',
Josh Gao191c1542015-12-09 11:26:11 -08001047 subdir_temp_file.base_name)
1048 self._verify_local(subdir_temp_file.checksum, local_path)
1049
1050 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1051 finally:
1052 if host_dir is not None:
1053 shutil.rmtree(host_dir)
1054
1055 def test_sync(self):
1056 """Sync a randomly generated directory of files to specified device."""
1057
1058 try:
1059 base_dir = tempfile.mkdtemp()
1060
1061 # Create mirror device directory hierarchy within base_dir.
1062 full_dir_path = base_dir + self.DEVICE_TEMP_DIR
1063 os.makedirs(full_dir_path)
1064
1065 # Create 32 random files within the host mirror.
1066 temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
1067
1068 # Clean up any trash on the device.
1069 device = adb.get_device(product=base_dir)
1070 device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1071
1072 device.sync('data')
1073
1074 # Confirm that every file on the device mirrors that on the host.
1075 for temp_file in temp_files:
1076 device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
1077 temp_file.base_name)
1078 dev_md5, _ = device.shell(
1079 [get_md5_prog(self.device), device_full_path])[0].split()
1080 self.assertEqual(temp_file.checksum, dev_md5)
1081
1082 self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
1083 finally:
1084 if base_dir is not None:
1085 shutil.rmtree(base_dir)
1086
1087 def test_unicode_paths(self):
1088 """Ensure that we can support non-ASCII paths, even on Windows."""
1089 name = u'로보카 폴리'
1090
1091 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1092 remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
1093
1094 ## push.
1095 tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
1096 tf.close()
1097 self.device.push(tf.name, remote_path)
1098 os.remove(tf.name)
1099 self.assertFalse(os.path.exists(tf.name))
1100
1101 # Verify that the device ended up with the expected UTF-8 path
1102 output = self.device.shell(
1103 ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
1104 self.assertEqual(remote_path.encode('utf-8'), output)
1105
1106 # pull.
1107 self.device.pull(remote_path, tf.name)
1108 self.assertTrue(os.path.exists(tf.name))
1109 os.remove(tf.name)
1110 self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
1111
1112
1113def main():
1114 random.seed(0)
1115 if len(adb.get_devices()) > 0:
1116 suite = unittest.TestLoader().loadTestsFromName(__name__)
1117 unittest.TextTestRunner(verbosity=3).run(suite)
1118 else:
1119 print('Test suite must be run with attached devices')
1120
1121
1122if __name__ == '__main__':
1123 main()