blob: 79c1c6dc43e42bc6312e4142a4e10d04e95611ee [file] [log] [blame]
Fang Deng69498c32017-03-02 14:29:30 -08001#!/usr/bin/env python
2#
3# Copyright 2016 - The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
Fang Deng69498c32017-03-02 14:29:30 -080016"""Tests for acloud.internal.lib.utils."""
17
Fang Deng26e4dc12018-03-04 19:01:59 -080018import errno
Fang Deng69498c32017-03-02 14:29:30 -080019import getpass
herbertxue07293a32018-11-05 20:40:11 +080020import grp
Fang Deng69498c32017-03-02 14:29:30 -080021import os
Fang Deng26e4dc12018-03-04 19:01:59 -080022import shutil
Fang Deng69498c32017-03-02 14:29:30 -080023import subprocess
Fang Deng26e4dc12018-03-04 19:01:59 -080024import tempfile
Fang Dengf24be082018-02-10 10:09:55 -080025import time
Fang Deng69498c32017-03-02 14:29:30 -080026
cylan0d77ae12018-05-18 08:36:48 +000027import unittest
Fang Deng69498c32017-03-02 14:29:30 -080028import mock
29
Sam Chiu7de3b232018-12-06 19:45:52 +080030from acloud import errors
Fang Deng69498c32017-03-02 14:29:30 -080031from acloud.internal.lib import driver_test_lib
32from acloud.internal.lib import utils
33
Kevin Cheng53aa5a52018-12-03 01:33:55 -080034# Tkinter may not be supported so mock it out.
35try:
36 import Tkinter
37except ImportError:
38 Tkinter = mock.Mock()
39
Kevin Cheng358fb3e2018-11-13 14:05:54 -080040class FakeTkinter(object):
41 """Fake implementation of Tkinter.Tk()"""
42
43 def __init__(self, width=None, height=None):
44 self.width = width
45 self.height = height
46
47 # pylint: disable=invalid-name
48 def winfo_screenheight(self):
49 """Return the screen height."""
50 return self.height
51
52 # pylint: disable=invalid-name
53 def winfo_screenwidth(self):
54 """Return the screen width."""
55 return self.width
56
Fang Deng69498c32017-03-02 14:29:30 -080057
58class UtilsTest(driver_test_lib.BaseDriverTest):
cylan0d77ae12018-05-18 08:36:48 +000059 """Test Utils."""
Fang Deng69498c32017-03-02 14:29:30 -080060
cylan0d77ae12018-05-18 08:36:48 +000061 def TestTempDirSuccess(self):
62 """Test create a temp dir."""
63 self.Patch(os, "chmod")
64 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
65 self.Patch(shutil, "rmtree")
66 with utils.TempDir():
67 pass
68 # Verify.
69 tempfile.mkdtemp.assert_called_once() # pylint: disable=no-member
70 shutil.rmtree.assert_called_with("/tmp/tempdir") # pylint: disable=no-member
Fang Deng26e4dc12018-03-04 19:01:59 -080071
cylan0d77ae12018-05-18 08:36:48 +000072 def TestTempDirExceptionRaised(self):
73 """Test create a temp dir and exception is raised within with-clause."""
74 self.Patch(os, "chmod")
75 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
76 self.Patch(shutil, "rmtree")
Fang Deng26e4dc12018-03-04 19:01:59 -080077
cylan0d77ae12018-05-18 08:36:48 +000078 class ExpectedException(Exception):
79 """Expected exception."""
80 pass
Fang Deng26e4dc12018-03-04 19:01:59 -080081
cylan0d77ae12018-05-18 08:36:48 +000082 def _Call():
83 with utils.TempDir():
84 raise ExpectedException("Expected exception.")
Fang Deng26e4dc12018-03-04 19:01:59 -080085
cylan0d77ae12018-05-18 08:36:48 +000086 # Verify. ExpectedException should be raised.
87 self.assertRaises(ExpectedException, _Call)
88 tempfile.mkdtemp.assert_called_once() # pylint: disable=no-member
89 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member
Fang Deng26e4dc12018-03-04 19:01:59 -080090
cylan0d77ae12018-05-18 08:36:48 +000091 def testTempDirWhenDeleteTempDirNoLongerExist(self): # pylint: disable=invalid-name
92 """Test create a temp dir and dir no longer exists during deletion."""
93 self.Patch(os, "chmod")
94 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
95 expected_error = EnvironmentError()
96 expected_error.errno = errno.ENOENT
97 self.Patch(shutil, "rmtree", side_effect=expected_error)
Fang Deng26e4dc12018-03-04 19:01:59 -080098
cylan0d77ae12018-05-18 08:36:48 +000099 def _Call():
100 with utils.TempDir():
101 pass
Fang Deng26e4dc12018-03-04 19:01:59 -0800102
cylan0d77ae12018-05-18 08:36:48 +0000103 # Verify no exception should be raised when rmtree raises
104 # EnvironmentError with errno.ENOENT, i.e.
105 # directory no longer exists.
106 _Call()
107 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member
108 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member
Fang Deng26e4dc12018-03-04 19:01:59 -0800109
cylan0d77ae12018-05-18 08:36:48 +0000110 def testTempDirWhenDeleteEncounterError(self):
111 """Test create a temp dir and encoutered error during deletion."""
112 self.Patch(os, "chmod")
113 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
114 expected_error = OSError("Expected OS Error")
115 self.Patch(shutil, "rmtree", side_effect=expected_error)
Fang Deng26e4dc12018-03-04 19:01:59 -0800116
cylan0d77ae12018-05-18 08:36:48 +0000117 def _Call():
118 with utils.TempDir():
119 pass
Fang Deng26e4dc12018-03-04 19:01:59 -0800120
cylan0d77ae12018-05-18 08:36:48 +0000121 # Verify OSError should be raised.
122 self.assertRaises(OSError, _Call)
123 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member
124 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member
Fang Deng26e4dc12018-03-04 19:01:59 -0800125
cylan0d77ae12018-05-18 08:36:48 +0000126 def testTempDirOrininalErrorRaised(self):
127 """Test original error is raised even if tmp dir deletion failed."""
128 self.Patch(os, "chmod")
129 self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
130 expected_error = OSError("Expected OS Error")
131 self.Patch(shutil, "rmtree", side_effect=expected_error)
Fang Deng69498c32017-03-02 14:29:30 -0800132
cylan0d77ae12018-05-18 08:36:48 +0000133 class ExpectedException(Exception):
134 """Expected exception."""
135 pass
Fang Deng69498c32017-03-02 14:29:30 -0800136
cylan0d77ae12018-05-18 08:36:48 +0000137 def _Call():
138 with utils.TempDir():
139 raise ExpectedException("Expected Exception")
Fang Dengf24be082018-02-10 10:09:55 -0800140
cylan0d77ae12018-05-18 08:36:48 +0000141 # Verify.
142 # ExpectedException should be raised, and OSError
143 # should not be raised.
144 self.assertRaises(ExpectedException, _Call)
145 tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member
146 shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member
Fang Dengf24be082018-02-10 10:09:55 -0800147
cylan0d77ae12018-05-18 08:36:48 +0000148 def testCreateSshKeyPairKeyAlreadyExists(self): #pylint: disable=invalid-name
149 """Test when the key pair already exists."""
150 public_key = "/fake/public_key"
151 private_key = "/fake/private_key"
cylan4f73c1f2018-07-19 16:40:31 +0800152 self.Patch(os.path, "exists", side_effect=[True, True])
cylan0d77ae12018-05-18 08:36:48 +0000153 self.Patch(subprocess, "check_call")
cylan4f73c1f2018-07-19 16:40:31 +0800154 self.Patch(os, "makedirs", return_value=True)
cylan0d77ae12018-05-18 08:36:48 +0000155 utils.CreateSshKeyPairIfNotExist(private_key, public_key)
156 self.assertEqual(subprocess.check_call.call_count, 0) #pylint: disable=no-member
Fang Dengf24be082018-02-10 10:09:55 -0800157
cylan0d77ae12018-05-18 08:36:48 +0000158 def testCreateSshKeyPairKeyAreCreated(self):
159 """Test when the key pair created."""
160 public_key = "/fake/public_key"
161 private_key = "/fake/private_key"
162 self.Patch(os.path, "exists", return_value=False)
cylan4f73c1f2018-07-19 16:40:31 +0800163 self.Patch(os, "makedirs", return_value=True)
cylan0d77ae12018-05-18 08:36:48 +0000164 self.Patch(subprocess, "check_call")
165 self.Patch(os, "rename")
166 utils.CreateSshKeyPairIfNotExist(private_key, public_key)
167 self.assertEqual(subprocess.check_call.call_count, 1) #pylint: disable=no-member
168 subprocess.check_call.assert_called_with( #pylint: disable=no-member
169 utils.SSH_KEYGEN_CMD +
170 ["-C", getpass.getuser(), "-f", private_key],
171 stdout=mock.ANY,
172 stderr=mock.ANY)
Fang Dengf24be082018-02-10 10:09:55 -0800173
cylan4f73c1f2018-07-19 16:40:31 +0800174 def testCreatePublicKeyAreCreated(self):
175 """Test when the PublicKey created."""
176 public_key = "/fake/public_key"
177 private_key = "/fake/private_key"
178 self.Patch(os.path, "exists", side_effect=[False, True, True])
179 self.Patch(os, "makedirs", return_value=True)
180 mock_open = mock.mock_open(read_data=public_key)
cylan4f73c1f2018-07-19 16:40:31 +0800181 self.Patch(subprocess, "check_output")
182 self.Patch(os, "rename")
Kevin Chengda4f07a2018-06-26 10:25:05 -0700183 with mock.patch("__builtin__.open", mock_open):
184 utils.CreateSshKeyPairIfNotExist(private_key, public_key)
cylan4f73c1f2018-07-19 16:40:31 +0800185 self.assertEqual(subprocess.check_output.call_count, 1) #pylint: disable=no-member
186 subprocess.check_output.assert_called_with( #pylint: disable=no-member
187 utils.SSH_KEYGEN_PUB_CMD +["-f", private_key])
188
cylan0d77ae12018-05-18 08:36:48 +0000189 def TestRetryOnException(self):
190 """Test Retry."""
Fang Dengf24be082018-02-10 10:09:55 -0800191
cylan0d77ae12018-05-18 08:36:48 +0000192 def _IsValueError(exc):
193 return isinstance(exc, ValueError)
Fang Dengf24be082018-02-10 10:09:55 -0800194
cylan0d77ae12018-05-18 08:36:48 +0000195 num_retry = 5
Fang Dengf24be082018-02-10 10:09:55 -0800196
cylan0d77ae12018-05-18 08:36:48 +0000197 @utils.RetryOnException(_IsValueError, num_retry)
198 def _RaiseAndRetry(sentinel):
199 sentinel.alert()
200 raise ValueError("Fake error.")
201
202 sentinel = mock.MagicMock()
203 self.assertRaises(ValueError, _RaiseAndRetry, sentinel)
204 self.assertEqual(1 + num_retry, sentinel.alert.call_count)
205
206 def testRetryExceptionType(self):
207 """Test RetryExceptionType function."""
208
209 def _RaiseAndRetry(sentinel):
210 sentinel.alert()
211 raise ValueError("Fake error.")
212
213 num_retry = 5
214 sentinel = mock.MagicMock()
215 self.assertRaises(
216 ValueError,
217 utils.RetryExceptionType, (KeyError, ValueError),
218 num_retry,
219 _RaiseAndRetry,
Kevin Chengd25feee2018-05-24 10:15:20 -0700220 0, # sleep_multiplier
221 1, # retry_backoff_factor
cylan0d77ae12018-05-18 08:36:48 +0000222 sentinel=sentinel)
223 self.assertEqual(1 + num_retry, sentinel.alert.call_count)
224
225 def testRetry(self):
226 """Test Retry."""
Kevin Chengd25feee2018-05-24 10:15:20 -0700227 mock_sleep = self.Patch(time, "sleep")
cylan0d77ae12018-05-18 08:36:48 +0000228
229 def _RaiseAndRetry(sentinel):
230 sentinel.alert()
231 raise ValueError("Fake error.")
232
233 num_retry = 5
234 sentinel = mock.MagicMock()
235 self.assertRaises(
236 ValueError,
237 utils.RetryExceptionType, (ValueError, KeyError),
238 num_retry,
239 _RaiseAndRetry,
Kevin Chengd25feee2018-05-24 10:15:20 -0700240 1, # sleep_multiplier
241 2, # retry_backoff_factor
cylan0d77ae12018-05-18 08:36:48 +0000242 sentinel=sentinel)
243
244 self.assertEqual(1 + num_retry, sentinel.alert.call_count)
Kevin Chengd25feee2018-05-24 10:15:20 -0700245 mock_sleep.assert_has_calls(
cylan0d77ae12018-05-18 08:36:48 +0000246 [
247 mock.call(1),
248 mock.call(2),
249 mock.call(4),
250 mock.call(8),
251 mock.call(16)
252 ])
Fang Dengf24be082018-02-10 10:09:55 -0800253
Kevin Chengeb85e862018-10-09 15:35:13 -0700254 @mock.patch("__builtin__.raw_input")
255 def testGetAnswerFromList(self, mock_raw_input):
256 """Test GetAnswerFromList."""
257 answer_list = ["image1.zip", "image2.zip", "image3.zip"]
258 mock_raw_input.return_value = 0
259 with self.assertRaises(SystemExit):
260 utils.GetAnswerFromList(answer_list)
261 mock_raw_input.side_effect = [1, 2, 3, 1]
262 self.assertEqual(utils.GetAnswerFromList(answer_list),
263 ["image1.zip"])
264 self.assertEqual(utils.GetAnswerFromList(answer_list),
265 ["image2.zip"])
266 self.assertEqual(utils.GetAnswerFromList(answer_list),
267 ["image3.zip"])
268 self.assertEqual(utils.GetAnswerFromList(answer_list,
269 enable_choose_all=True),
270 answer_list)
271
Kevin Cheng53aa5a52018-12-03 01:33:55 -0800272 @unittest.skipIf(isinstance(Tkinter, mock.Mock), "Tkinter mocked out, test case not needed.")
Kevin Cheng358fb3e2018-11-13 14:05:54 -0800273 @mock.patch.object(Tkinter, "Tk")
274 def testCalculateVNCScreenRatio(self, mock_tk):
Sam Chiu7a477f52018-10-22 11:20:36 +0800275 """Test Calculating the scale ratio of VNC display."""
Sam Chiu7a477f52018-10-22 11:20:36 +0800276 # Get scale-down ratio if screen height is smaller than AVD height.
Kevin Cheng358fb3e2018-11-13 14:05:54 -0800277 mock_tk.return_value = FakeTkinter(height=800, width=1200)
Sam Chiu7a477f52018-10-22 11:20:36 +0800278 avd_h = 1920
279 avd_w = 1080
280 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.4)
281
282 # Get scale-down ratio if screen width is smaller than AVD width.
Kevin Cheng358fb3e2018-11-13 14:05:54 -0800283 mock_tk.return_value = FakeTkinter(height=800, width=1200)
Sam Chiu7a477f52018-10-22 11:20:36 +0800284 avd_h = 900
285 avd_w = 1920
286 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6)
287
288 # Scale ratio = 1 if screen is larger than AVD.
Kevin Cheng358fb3e2018-11-13 14:05:54 -0800289 mock_tk.return_value = FakeTkinter(height=1080, width=1920)
Sam Chiu7a477f52018-10-22 11:20:36 +0800290 avd_h = 800
291 avd_w = 1280
292 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 1)
293
294 # Get the scale if ratio of width is smaller than the
295 # ratio of height.
Kevin Cheng358fb3e2018-11-13 14:05:54 -0800296 mock_tk.return_value = FakeTkinter(height=1200, width=800)
Sam Chiu7a477f52018-10-22 11:20:36 +0800297 avd_h = 1920
298 avd_w = 1080
299 self.assertEqual(utils.CalculateVNCScreenRatio(avd_w, avd_h), 0.6)
300
herbertxue07293a32018-11-05 20:40:11 +0800301 # pylint: disable=protected-access
302 def testCheckUserInGroups(self):
303 """Test CheckUserInGroups."""
304 self.Patch(os, "getgroups", return_value=[1, 2, 3])
305 gr1 = mock.MagicMock()
306 gr1.gr_name = "fake_gr_1"
307 gr2 = mock.MagicMock()
308 gr2.gr_name = "fake_gr_2"
309 gr3 = mock.MagicMock()
310 gr3.gr_name = "fake_gr_3"
311 self.Patch(grp, "getgrgid", side_effect=[gr1, gr2, gr3])
312
313 # User in all required groups should return true.
314 self.assertTrue(
315 utils.CheckUserInGroups(
316 ["fake_gr_1", "fake_gr_2"]))
317
318 # User not in all required groups should return False.
319 self.Patch(grp, "getgrgid", side_effect=[gr1, gr2, gr3])
320 self.assertFalse(
321 utils.CheckUserInGroups(
322 ["fake_gr_1", "fake_gr_4"]))
323
324 @mock.patch.object(utils, "CheckUserInGroups")
325 def testAddUserGroupsToCmd(self, mock_user_group):
326 """Test AddUserGroupsToCmd."""
327 command = "test_command"
328 groups = ["group1", "group2"]
329 # Don't add user group in command
330 mock_user_group.return_value = True
331 expected_value = "test_command"
332 self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command,
333 groups))
334
335 # Add user group in command
336 mock_user_group.return_value = False
337 expected_value = "sg group1 <<EOF\nsg group2\ntest_command\nEOF"
338 self.assertEqual(expected_value, utils.AddUserGroupsToCmd(command,
339 groups))
340
Kevin Chengce6cfb02018-12-04 13:21:31 -0800341 @staticmethod
342 def testScpPullFileSuccess():
343 """Test scp pull file successfully."""
344 subprocess.check_call = mock.MagicMock()
345 utils.ScpPullFile("/tmp/test", "/tmp/test_1.log", "192.168.0.1")
346 subprocess.check_call.assert_called_with(utils.SCP_CMD + [
347 "192.168.0.1:/tmp/test", "/tmp/test_1.log"])
348
349 @staticmethod
350 def testScpPullFileWithUserNameSuccess():
351 """Test scp pull file successfully."""
352 subprocess.check_call = mock.MagicMock()
353 utils.ScpPullFile("/tmp/test", "/tmp/test_1.log", "192.168.0.1",
354 user_name="abc")
355 subprocess.check_call.assert_called_with(utils.SCP_CMD + [
356 "abc@192.168.0.1:/tmp/test", "/tmp/test_1.log"])
357
358 # pylint: disable=invalid-name
359 @staticmethod
360 def testScpPullFileWithUserNameWithRsaKeySuccess():
361 """Test scp pull file successfully."""
362 subprocess.check_call = mock.MagicMock()
363 utils.ScpPullFile("/tmp/test", "/tmp/test_1.log", "192.168.0.1",
364 user_name="abc", rsa_key_file="/tmp/my_key")
365 subprocess.check_call.assert_called_with(utils.SCP_CMD + [
366 "-i", "/tmp/my_key", "abc@192.168.0.1:/tmp/test",
367 "/tmp/test_1.log"])
368
369 def testScpPullFileScpFailure(self):
370 """Test scp pull file failure."""
371 subprocess.check_call = mock.MagicMock(
372 side_effect=subprocess.CalledProcessError(123, "fake",
373 "fake error"))
374 self.assertRaises(
375 errors.DeviceConnectionError,
376 utils.ScpPullFile, "/tmp/test", "/tmp/test_1.log", "192.168.0.1")
377
Fang Deng69498c32017-03-02 14:29:30 -0800378
379if __name__ == "__main__":
380 unittest.main()