blob: 480b4d2d92e2d55dd7d4a3fd9a7b13714880cd86 [file] [log] [blame]
Fang Deng69498c32017-03-02 14:29:30 -08001#!/usr/bin/env python
2#
3# Copyright 2016 - The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
Fang Deng69498c32017-03-02 14:29:30 -080016"""Tests for acloud.internal.lib.utils."""
17
Fang Deng26e4dc12018-03-04 19:01:59 -080018import errno
Fang Deng69498c32017-03-02 14:29:30 -080019import getpass
herbertxue07293a32018-11-05 20:40:11 +080020import grp
Fang Deng69498c32017-03-02 14:29:30 -080021import os
Fang Deng26e4dc12018-03-04 19:01:59 -080022import shutil
Fang Deng69498c32017-03-02 14:29:30 -080023import subprocess
Fang Deng26e4dc12018-03-04 19:01:59 -080024import tempfile
Fang Dengf24be082018-02-10 10:09:55 -080025import time
Fang Deng69498c32017-03-02 14:29:30 -080026
cylan0d77ae12018-05-18 08:36:48 +000027import unittest
Fang Deng69498c32017-03-02 14:29:30 -080028import mock
chojoyce7479ce32019-10-29 12:05:44 +080029import six
Fang Deng69498c32017-03-02 14:29:30 -080030
Sam Chiu7de3b232018-12-06 19:45:52 +080031from acloud import errors
Fang Deng69498c32017-03-02 14:29:30 -080032from acloud.internal.lib import driver_test_lib
33from acloud.internal.lib import utils
34
herbertxue1512f8a2019-06-27 13:56:23 +080035
Kevin Cheng53aa5a52018-12-03 01:33:55 -080036# Tkinter may not be supported so mock it out.
37try:
38 import Tkinter
39except ImportError:
40 Tkinter = mock.Mock()
41
herbertxue1512f8a2019-06-27 13:56:23 +080042
Kevin Cheng358fb3e2018-11-13 14:05:54 -080043class FakeTkinter(object):
44 """Fake implementation of Tkinter.Tk()"""
45
46 def __init__(self, width=None, height=None):
47 self.width = width
48 self.height = height
49
50 # pylint: disable=invalid-name
51 def winfo_screenheight(self):
52 """Return the screen height."""
53 return self.height
54
55 # pylint: disable=invalid-name
56 def winfo_screenwidth(self):
57 """Return the screen width."""
58 return self.width
59
Fang Deng69498c32017-03-02 14:29:30 -080060
chojoyceefafc022018-11-08 17:22:16 +080061# pylint: disable=too-many-public-methods
Fang Deng69498c32017-03-02 14:29:30 -080062class UtilsTest(driver_test_lib.BaseDriverTest):
cylan0d77ae12018-05-18 08:36:48 +000063 """Test Utils."""
Fang Deng69498c32017-03-02 14:29:30 -080064
cylan0d77ae12018-05-18 08:36:48 +000065 def TestTempDirSuccess(self):
66 """Test create a temp dir."""
67 self.Patch(os, "chmod")
68 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
69 self.Patch(shutil, "rmtree")
70 with utils.TempDir():
71 pass
72 # Verify.
73 tempfile.mkdtemp.assert_called_once() # pylint: disable=no-member
74 shutil.rmtree.assert_called_with("/tmp/tempdir") # pylint: disable=no-member
Fang Deng26e4dc12018-03-04 19:01:59 -080075
cylan0d77ae12018-05-18 08:36:48 +000076 def TestTempDirExceptionRaised(self):
77 """Test create a temp dir and exception is raised within with-clause."""
78 self.Patch(os, "chmod")
79 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
80 self.Patch(shutil, "rmtree")
Fang Deng26e4dc12018-03-04 19:01:59 -080081
cylan0d77ae12018-05-18 08:36:48 +000082 class ExpectedException(Exception):
83 """Expected exception."""
Fang Deng26e4dc12018-03-04 19:01:59 -080084
cylan0d77ae12018-05-18 08:36:48 +000085 def _Call():
86 with utils.TempDir():
87 raise ExpectedException("Expected exception.")
Fang Deng26e4dc12018-03-04 19:01:59 -080088
cylan0d77ae12018-05-18 08:36:48 +000089 # Verify. ExpectedException should be raised.
90 self.assertRaises(ExpectedException, _Call)
91 tempfile.mkdtemp.assert_called_once() # pylint: disable=no-member
92 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member
Fang Deng26e4dc12018-03-04 19:01:59 -080093
cylan0d77ae12018-05-18 08:36:48 +000094 def testTempDirWhenDeleteTempDirNoLongerExist(self): # pylint: disable=invalid-name
95 """Test create a temp dir and dir no longer exists during deletion."""
96 self.Patch(os, "chmod")
97 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
98 expected_error = EnvironmentError()
99 expected_error.errno = errno.ENOENT
100 self.Patch(shutil, "rmtree", side_effect=expected_error)
Fang Deng26e4dc12018-03-04 19:01:59 -0800101
cylan0d77ae12018-05-18 08:36:48 +0000102 def _Call():
103 with utils.TempDir():
104 pass
Fang Deng26e4dc12018-03-04 19:01:59 -0800105
cylan0d77ae12018-05-18 08:36:48 +0000106 # Verify no exception should be raised when rmtree raises
107 # EnvironmentError with errno.ENOENT, i.e.
108 # directory no longer exists.
109 _Call()
110 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member
111 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member
Fang Deng26e4dc12018-03-04 19:01:59 -0800112
cylan0d77ae12018-05-18 08:36:48 +0000113 def testTempDirWhenDeleteEncounterError(self):
114 """Test create a temp dir and encoutered error during deletion."""
115 self.Patch(os, "chmod")
116 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
117 expected_error = OSError("Expected OS Error")
118 self.Patch(shutil, "rmtree", side_effect=expected_error)
Fang Deng26e4dc12018-03-04 19:01:59 -0800119
cylan0d77ae12018-05-18 08:36:48 +0000120 def _Call():
121 with utils.TempDir():
122 pass
Fang Deng26e4dc12018-03-04 19:01:59 -0800123
cylan0d77ae12018-05-18 08:36:48 +0000124 # Verify OSError should be raised.
125 self.assertRaises(OSError, _Call)
126 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member
127 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member
Fang Deng26e4dc12018-03-04 19:01:59 -0800128
cylan0d77ae12018-05-18 08:36:48 +0000129 def testTempDirOrininalErrorRaised(self):
130 """Test original error is raised even if tmp dir deletion failed."""
131 self.Patch(os, "chmod")
132 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
133 expected_error = OSError("Expected OS Error")
134 self.Patch(shutil, "rmtree", side_effect=expected_error)
Fang Deng69498c32017-03-02 14:29:30 -0800135
cylan0d77ae12018-05-18 08:36:48 +0000136 class ExpectedException(Exception):
137 """Expected exception."""
Fang Deng69498c32017-03-02 14:29:30 -0800138
cylan0d77ae12018-05-18 08:36:48 +0000139 def _Call():
140 with utils.TempDir():
141 raise ExpectedException("Expected Exception")
Fang Dengf24be082018-02-10 10:09:55 -0800142
cylan0d77ae12018-05-18 08:36:48 +0000143 # Verify.
144 # ExpectedException should be raised, and OSError
145 # should not be raised.
146 self.assertRaises(ExpectedException, _Call)
147 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member
148 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member
Fang Dengf24be082018-02-10 10:09:55 -0800149
cylan0d77ae12018-05-18 08:36:48 +0000150 def testCreateSshKeyPairKeyAlreadyExists(self): #pylint: disable=invalid-name
151 """Test when the key pair already exists."""
152 public_key = "/fake/public_key"
153 private_key = "/fake/private_key"
cylan4f73c1f2018-07-19 16:40:31 +0800154 self.Patch(os.path, "exists", side_effect=[True, True])
cylan0d77ae12018-05-18 08:36:48 +0000155 self.Patch(subprocess, "check_call")
cylan4f73c1f2018-07-19 16:40:31 +0800156 self.Patch(os, "makedirs", return_value=True)
cylan0d77ae12018-05-18 08:36:48 +0000157 utils.CreateSshKeyPairIfNotExist(private_key, public_key)
158 self.assertEqual(subprocess.check_call.call_count, 0) #pylint: disable=no-member
Fang Dengf24be082018-02-10 10:09:55 -0800159
cylan0d77ae12018-05-18 08:36:48 +0000160 def testCreateSshKeyPairKeyAreCreated(self):
161 """Test when the key pair created."""
162 public_key = "/fake/public_key"
163 private_key = "/fake/private_key"
164 self.Patch(os.path, "exists", return_value=False)
cylan4f73c1f2018-07-19 16:40:31 +0800165 self.Patch(os, "makedirs", return_value=True)
cylan0d77ae12018-05-18 08:36:48 +0000166 self.Patch(subprocess, "check_call")
167 self.Patch(os, "rename")
168 utils.CreateSshKeyPairIfNotExist(private_key, public_key)
169 self.assertEqual(subprocess.check_call.call_count, 1) #pylint: disable=no-member
170 subprocess.check_call.assert_called_with( #pylint: disable=no-member
171 utils.SSH_KEYGEN_CMD +
172 ["-C", getpass.getuser(), "-f", private_key],
173 stdout=mock.ANY,
174 stderr=mock.ANY)
Fang Dengf24be082018-02-10 10:09:55 -0800175
cylan4f73c1f2018-07-19 16:40:31 +0800176 def testCreatePublicKeyAreCreated(self):
177 """Test when the PublicKey created."""
178 public_key = "/fake/public_key"
179 private_key = "/fake/private_key"
180 self.Patch(os.path, "exists", side_effect=[False, True, True])
181 self.Patch(os, "makedirs", return_value=True)
182 mock_open = mock.mock_open(read_data=public_key)
cylan4f73c1f2018-07-19 16:40:31 +0800183 self.Patch(subprocess, "check_output")
184 self.Patch(os, "rename")
chojoyce7479ce32019-10-29 12:05:44 +0800185 with mock.patch.object(six.moves.builtins, "open", mock_open):
Kevin Chengda4f07a2018-06-26 10:25:05 -0700186 utils.CreateSshKeyPairIfNotExist(private_key, public_key)
cylan4f73c1f2018-07-19 16:40:31 +0800187 self.assertEqual(subprocess.check_output.call_count, 1) #pylint: disable=no-member
188 subprocess.check_output.assert_called_with( #pylint: disable=no-member
189 utils.SSH_KEYGEN_PUB_CMD +["-f", private_key])
190
cylan0d77ae12018-05-18 08:36:48 +0000191 def TestRetryOnException(self):
192 """Test Retry."""
Fang Dengf24be082018-02-10 10:09:55 -0800193
cylan0d77ae12018-05-18 08:36:48 +0000194 def _IsValueError(exc):
195 return isinstance(exc, ValueError)
Fang Dengf24be082018-02-10 10:09:55 -0800196
cylan0d77ae12018-05-18 08:36:48 +0000197 num_retry = 5
Fang Dengf24be082018-02-10 10:09:55 -0800198
cylan0d77ae12018-05-18 08:36:48 +0000199 @utils.RetryOnException(_IsValueError, num_retry)
200 def _RaiseAndRetry(sentinel):
201 sentinel.alert()
202 raise ValueError("Fake error.")
203
204 sentinel = mock.MagicMock()
205 self.assertRaises(ValueError, _RaiseAndRetry, sentinel)
206 self.assertEqual(1 + num_retry, sentinel.alert.call_count)
207
208 def testRetryExceptionType(self):
209 """Test RetryExceptionType function."""
210
211 def _RaiseAndRetry(sentinel):
212 sentinel.alert()
213 raise ValueError("Fake error.")
214
215 num_retry = 5
216 sentinel = mock.MagicMock()
217 self.assertRaises(
218 ValueError,
219 utils.RetryExceptionType, (KeyError, ValueError),
220 num_retry,
221 _RaiseAndRetry,
Kevin Chengd25feee2018-05-24 10:15:20 -0700222 0, # sleep_multiplier
223 1, # retry_backoff_factor
cylan0d77ae12018-05-18 08:36:48 +0000224 sentinel=sentinel)
225 self.assertEqual(1 + num_retry, sentinel.alert.call_count)
226
227 def testRetry(self):
228 """Test Retry."""
Kevin Chengd25feee2018-05-24 10:15:20 -0700229 mock_sleep = self.Patch(time, "sleep")
cylan0d77ae12018-05-18 08:36:48 +0000230
231 def _RaiseAndRetry(sentinel):
232 sentinel.alert()
233 raise ValueError("Fake error.")
234
235 num_retry = 5
236 sentinel = mock.MagicMock()
237 self.assertRaises(
238 ValueError,
239 utils.RetryExceptionType, (ValueError, KeyError),
240 num_retry,
241 _RaiseAndRetry,
Kevin Chengd25feee2018-05-24 10:15:20 -0700242 1, # sleep_multiplier
243 2, # retry_backoff_factor
cylan0d77ae12018-05-18 08:36:48 +0000244 sentinel=sentinel)
245
246 self.assertEqual(1 + num_retry, sentinel.alert.call_count)
Kevin Chengd25feee2018-05-24 10:15:20 -0700247 mock_sleep.assert_has_calls(
cylan0d77ae12018-05-18 08:36:48 +0000248 [
249 mock.call(1),
250 mock.call(2),
251 mock.call(4),
252 mock.call(8),
253 mock.call(16)
254 ])
Fang Dengf24be082018-02-10 10:09:55 -0800255
chojoyce7479ce32019-10-29 12:05:44 +0800256 @mock.patch.object(six.moves, "input")
Kevin Chengeb85e862018-10-09 15:35:13 -0700257 def testGetAnswerFromList(self, mock_raw_input):
258 """Test GetAnswerFromList."""
259 answer_list = ["image1.zip", "image2.zip", "image3.zip"]
260 mock_raw_input.return_value = 0
261 with self.assertRaises(SystemExit):
262 utils.GetAnswerFromList(answer_list)
Sam Chiu71691342019-03-27 17:30:12 +0800263 mock_raw_input.side_effect = [1, 2, 3, 4]
Kevin Chengeb85e862018-10-09 15:35:13 -0700264 self.assertEqual(utils.GetAnswerFromList(answer_list),
265 ["image1.zip"])
266 self.assertEqual(utils.GetAnswerFromList(answer_list),
267 ["image2.zip"])
268 self.assertEqual(utils.GetAnswerFromList(answer_list),
269 ["image3.zip"])
270 self.assertEqual(utils.GetAnswerFromList(answer_list,
271 enable_choose_all=True),
272 answer_list)
273
Kevin Cheng53aa5a52018-12-03 01:33:55 -0800274 @unittest.skipIf(isinstance(Tkinter, mock.Mock), "Tkinter mocked out, test case not needed.")
Kevin Cheng358fb3e2018-11-13 14:05:54 -0800275 @mock.patch.object(Tkinter, "Tk")
276 def testCalculateVNCScreenRatio(self, mock_tk):
Sam Chiu7a477f52018-10-22 11:20:36 +0800277 """Test Calculating the scale ratio of VNC display."""
Sam Chiu7a477f52018-10-22 11:20:36 +0800278 # Get scale-down ratio if screen height is smaller than AVD height.
Kevin Cheng358fb3e2018-11-13 14:05:54 -0800279 mock_tk.return_value = FakeTkinter(height=800, width=1200)
Sam Chiu7a477f52018-10-22 11:20:36 +0800280 avd_h = 1920
281 avd_w = 1080
282 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.4)
283
284 # Get scale-down ratio if screen width is smaller than AVD width.
Kevin Cheng358fb3e2018-11-13 14:05:54 -0800285 mock_tk.return_value = FakeTkinter(height=800, width=1200)
Sam Chiu7a477f52018-10-22 11:20:36 +0800286 avd_h = 900
287 avd_w = 1920
288 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6)
289
290 # Scale ratio = 1 if screen is larger than AVD.
Kevin Cheng358fb3e2018-11-13 14:05:54 -0800291 mock_tk.return_value = FakeTkinter(height=1080, width=1920)
Sam Chiu7a477f52018-10-22 11:20:36 +0800292 avd_h = 800
293 avd_w = 1280
294 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 1)
295
296 # Get the scale if ratio of width is smaller than the
297 # ratio of height.
Kevin Cheng358fb3e2018-11-13 14:05:54 -0800298 mock_tk.return_value = FakeTkinter(height=1200, width=800)
Sam Chiu7a477f52018-10-22 11:20:36 +0800299 avd_h = 1920
300 avd_w = 1080
301 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6)
302
herbertxue07293a32018-11-05 20:40:11 +0800303 # pylint: disable=protected-access
304 def testCheckUserInGroups(self):
305 """Test CheckUserInGroups."""
306 self.Patch(os, "getgroups", return_value=[1, 2, 3])
307 gr1 = mock.MagicMock()
308 gr1.gr_name = "fake_gr_1"
309 gr2 = mock.MagicMock()
310 gr2.gr_name = "fake_gr_2"
311 gr3 = mock.MagicMock()
312 gr3.gr_name = "fake_gr_3"
313 self.Patch(grp, "getgrgid", side_effect=[gr1, gr2, gr3])
314
315 # User in all required groups should return true.
316 self.assertTrue(
317 utils.CheckUserInGroups(
318 ["fake_gr_1", "fake_gr_2"]))
319
320 # User not in all required groups should return False.
321 self.Patch(grp, "getgrgid", side_effect=[gr1, gr2, gr3])
322 self.assertFalse(
323 utils.CheckUserInGroups(
324 ["fake_gr_1", "fake_gr_4"]))
325
326 @mock.patch.object(utils, "CheckUserInGroups")
327 def testAddUserGroupsToCmd(self, mock_user_group):
328 """Test AddUserGroupsToCmd."""
329 command = "test_command"
330 groups = ["group1", "group2"]
331 # Don't add user group in command
332 mock_user_group.return_value = True
333 expected_value = "test_command"
334 self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command,
335 groups))
336
337 # Add user group in command
338 mock_user_group.return_value = False
339 expected_value = "sg group1 <<EOF\nsg group2\ntest_command\nEOF"
340 self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command,
341 groups))
342
Kevin Chengce6cfb02018-12-04 13:21:31 -0800343 # pylint: disable=invalid-name
chojoyceefafc022018-11-08 17:22:16 +0800344 def testTimeoutException(self):
345 """Test TimeoutException."""
346 @utils.TimeoutException(1, "should time out")
347 def functionThatWillTimeOut():
348 """Test decorator of @utils.TimeoutException should timeout."""
349 time.sleep(5)
350
351 self.assertRaises(errors.FunctionTimeoutError,
352 functionThatWillTimeOut)
353
354
355 def testTimeoutExceptionNoTimeout(self):
356 """Test No TimeoutException."""
357 @utils.TimeoutException(5, "shouldn't time out")
358 def functionThatShouldNotTimeout():
359 """Test decorator of @utils.TimeoutException shouldn't timeout."""
360 return None
361 try:
362 functionThatShouldNotTimeout()
363 except errors.FunctionTimeoutError:
364 self.fail("shouldn't timeout")
365
cylan4b4bbd02019-01-16 14:49:17 +0800366 def testAutoConnectCreateSSHTunnelFail(self):
cyland370db22019-07-17 16:04:00 +0800367 """Test auto connect."""
cylan4b4bbd02019-01-16 14:49:17 +0800368 fake_ip_addr = "1.1.1.1"
369 fake_rsa_key_file = "/tmp/rsa_file"
370 fake_target_vnc_port = 8888
371 target_adb_port = 9999
372 ssh_user = "fake_user"
373 call_side_effect = subprocess.CalledProcessError(123, "fake",
374 "fake error")
375 result = utils.ForwardedPorts(vnc_port=None, adb_port=None)
376 self.Patch(subprocess, "check_call", side_effect=call_side_effect)
377 self.assertEqual(result, utils.AutoConnect(fake_ip_addr,
378 fake_rsa_key_file,
379 fake_target_vnc_port,
380 target_adb_port,
381 ssh_user))
382
cyland370db22019-07-17 16:04:00 +0800383 # pylint: disable=protected-access,no-member
384 def testExtraArgsSSHTunnel(self):
385 """Tesg extra args will be the same with expanded args."""
386 fake_ip_addr = "1.1.1.1"
387 fake_rsa_key_file = "/tmp/rsa_file"
388 fake_target_vnc_port = 8888
389 target_adb_port = 9999
390 ssh_user = "fake_user"
391 fake_port = 12345
392 self.Patch(utils, "PickFreePort", return_value=fake_port)
393 self.Patch(utils, "_ExecuteCommand")
394 self.Patch(subprocess, "check_call", return_value=True)
395 extra_args_ssh_tunnel = "-o command='shell %s %h' -o command1='ls -la'"
396 utils.AutoConnect(ip_addr=fake_ip_addr,
397 rsa_key_file=fake_rsa_key_file,
398 target_vnc_port=fake_target_vnc_port,
399 target_adb_port=target_adb_port,
400 ssh_user=ssh_user,
401 client_adb_port=fake_port,
402 extra_args_ssh_tunnel=extra_args_ssh_tunnel)
403 args_list = ["-i", "/tmp/rsa_file",
404 "-o", "UserKnownHostsFile=/dev/null",
405 "-o", "StrictHostKeyChecking=no",
406 "-L", "12345:127.0.0.1:8888",
407 "-L", "12345:127.0.0.1:9999",
408 "-N", "-f", "-l", "fake_user", "1.1.1.1",
409 "-o", "command=shell %s %h",
410 "-o", "command1=ls -la"]
411 first_call_args = utils._ExecuteCommand.call_args_list[0][0]
412 self.assertEqual(first_call_args[1], args_list)
413
chojoyceefafc022018-11-08 17:22:16 +0800414
Fang Deng69498c32017-03-02 14:29:30 -0800415if __name__ == "__main__":
416 unittest.main()