blob: a5e845c8c0881b7ef9b132bafc983a2995ff0e6d [file] [log] [blame]
#!/usr/bin/env python
#
# Copyright 2016 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for acloud.internal.lib.utils."""
import errno
import getpass
import os
import shutil
import subprocess
import tempfile
import time
import unittest
import mock
from acloud.internal.lib import driver_test_lib
from acloud.internal.lib import utils
class UtilsTest(driver_test_lib.BaseDriverTest):
"""Test Utils."""
def TestTempDirSuccess(self):
"""Test create a temp dir."""
self.Patch(os, "chmod")
self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
self.Patch(shutil, "rmtree")
with utils.TempDir():
pass
# Verify.
tempfile.mkdtemp.assert_called_once() # pylint: disable=no-member
shutil.rmtree.assert_called_with("/tmp/tempdir") # pylint: disable=no-member
def TestTempDirExceptionRaised(self):
"""Test create a temp dir and exception is raised within with-clause."""
self.Patch(os, "chmod")
self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
self.Patch(shutil, "rmtree")
class ExpectedException(Exception):
"""Expected exception."""
pass
def _Call():
with utils.TempDir():
raise ExpectedException("Expected exception.")
# Verify. ExpectedException should be raised.
self.assertRaises(ExpectedException, _Call)
tempfile.mkdtemp.assert_called_once() # pylint: disable=no-member
shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member
def testTempDirWhenDeleteTempDirNoLongerExist(self): # pylint: disable=invalid-name
"""Test create a temp dir and dir no longer exists during deletion."""
self.Patch(os, "chmod")
self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
expected_error = EnvironmentError()
expected_error.errno = errno.ENOENT
self.Patch(shutil, "rmtree", side_effect=expected_error)
def _Call():
with utils.TempDir():
pass
# Verify no exception should be raised when rmtree raises
# EnvironmentError with errno.ENOENT, i.e.
# directory no longer exists.
_Call()
tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member
shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member
def testTempDirWhenDeleteEncounterError(self):
"""Test create a temp dir and encoutered error during deletion."""
self.Patch(os, "chmod")
self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
expected_error = OSError("Expected OS Error")
self.Patch(shutil, "rmtree", side_effect=expected_error)
def _Call():
with utils.TempDir():
pass
# Verify OSError should be raised.
self.assertRaises(OSError, _Call)
tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member
shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member
def testTempDirOrininalErrorRaised(self):
"""Test original error is raised even if tmp dir deletion failed."""
self.Patch(os, "chmod")
self.Patch(tempfile, "mkdtemp", return_value="/tmp/tempdir")
expected_error = OSError("Expected OS Error")
self.Patch(shutil, "rmtree", side_effect=expected_error)
class ExpectedException(Exception):
"""Expected exception."""
pass
def _Call():
with utils.TempDir():
raise ExpectedException("Expected Exception")
# Verify.
# ExpectedException should be raised, and OSError
# should not be raised.
self.assertRaises(ExpectedException, _Call)
tempfile.mkdtemp.assert_called_once() #pylint: disable=no-member
shutil.rmtree.assert_called_with("/tmp/tempdir") #pylint: disable=no-member
def testCreateSshKeyPairKeyAlreadyExists(self): #pylint: disable=invalid-name
"""Test when the key pair already exists."""
public_key = "/fake/public_key"
private_key = "/fake/private_key"
self.Patch(
os.path, "exists", side_effect=lambda path: path == public_key)
self.Patch(subprocess, "check_call")
utils.CreateSshKeyPairIfNotExist(private_key, public_key)
self.assertEqual(subprocess.check_call.call_count, 0) #pylint: disable=no-member
def testCreateSshKeyPairKeyAreCreated(self):
"""Test when the key pair created."""
public_key = "/fake/public_key"
private_key = "/fake/private_key"
self.Patch(os.path, "exists", return_value=False)
self.Patch(subprocess, "check_call")
self.Patch(os, "rename")
utils.CreateSshKeyPairIfNotExist(private_key, public_key)
self.assertEqual(subprocess.check_call.call_count, 1) #pylint: disable=no-member
subprocess.check_call.assert_called_with( #pylint: disable=no-member
utils.SSH_KEYGEN_CMD +
["-C", getpass.getuser(), "-f", private_key],
stdout=mock.ANY,
stderr=mock.ANY)
def TestRetryOnException(self):
"""Test Retry."""
def _IsValueError(exc):
return isinstance(exc, ValueError)
num_retry = 5
@utils.RetryOnException(_IsValueError, num_retry)
def _RaiseAndRetry(sentinel):
sentinel.alert()
raise ValueError("Fake error.")
sentinel = mock.MagicMock()
self.assertRaises(ValueError, _RaiseAndRetry, sentinel)
self.assertEqual(1 + num_retry, sentinel.alert.call_count)
def testRetryExceptionType(self):
"""Test RetryExceptionType function."""
def _RaiseAndRetry(sentinel):
sentinel.alert()
raise ValueError("Fake error.")
num_retry = 5
sentinel = mock.MagicMock()
self.assertRaises(
ValueError,
utils.RetryExceptionType, (KeyError, ValueError),
num_retry,
_RaiseAndRetry,
sentinel=sentinel)
self.assertEqual(1 + num_retry, sentinel.alert.call_count)
def testRetry(self):
"""Test Retry."""
self.Patch(time, "sleep")
def _RaiseAndRetry(sentinel):
sentinel.alert()
raise ValueError("Fake error.")
num_retry = 5
sentinel = mock.MagicMock()
self.assertRaises(
ValueError,
utils.RetryExceptionType, (ValueError, KeyError),
num_retry,
_RaiseAndRetry,
sleep_multiplier=1,
retry_backoff_factor=2,
sentinel=sentinel)
self.assertEqual(1 + num_retry, sentinel.alert.call_count)
time.sleep.assert_has_calls( #pylint: disable=no-member
[
mock.call(1),
mock.call(2),
mock.call(4),
mock.call(8),
mock.call(16)
])
if __name__ == "__main__":
unittest.main()