blob: 4d71d696c0765bdd8c7fa2671b21ae04c48d7a09 [file] [log] [blame]
Mike Frysingerd03e6b52019-08-03 12:49:01 -04001#!/usr/bin/python2
Allen Li5ed7e632017-02-03 16:31:33 -08002
Dean Liao34be76d2018-07-19 00:48:47 +08003# pylint: disable=missing-docstring
4
Allen Li5ed7e632017-02-03 16:31:33 -08005import StringIO
6import errno
Dean Liao7d51db02018-07-16 17:21:42 +08007import itertools
Allen Li5ed7e632017-02-03 16:31:33 -08008import logging
9import os
10import select
Allen Li5ed7e632017-02-03 16:31:33 -080011import socket
12import subprocess
13import time
14import unittest
15import urllib2
16
17import common
18from autotest_lib.client.common_lib import autotemp
Allen Li5ed7e632017-02-03 16:31:33 -080019from autotest_lib.client.common_lib import utils
20from autotest_lib.client.common_lib.test_utils import mock
21
Dean Liao7d51db02018-07-16 17:21:42 +080022# mock 1.0.0 (in site-packages/chromite/third_party/mock.py)
23# which is an ancestor of Python's default library starting from Python 3.3.
24# See https://docs.python.org/3/library/unittest.mock.html
25import mock as pymock
26
Allen Li5ed7e632017-02-03 16:31:33 -080027metrics = utils.metrics_mock
28
29
30class test_read_one_line(unittest.TestCase):
31 def setUp(self):
32 self.god = mock.mock_god(ut=self)
33 self.god.stub_function(utils, "open")
34
35
36 def tearDown(self):
37 self.god.unstub_all()
38
39
40 def test_ip_to_long(self):
41 self.assertEqual(utils.ip_to_long('0.0.0.0'), 0)
42 self.assertEqual(utils.ip_to_long('255.255.255.255'), 4294967295)
43 self.assertEqual(utils.ip_to_long('192.168.0.1'), 3232235521)
44 self.assertEqual(utils.ip_to_long('1.2.4.8'), 16909320)
45
46
47 def test_long_to_ip(self):
48 self.assertEqual(utils.long_to_ip(0), '0.0.0.0')
49 self.assertEqual(utils.long_to_ip(4294967295), '255.255.255.255')
50 self.assertEqual(utils.long_to_ip(3232235521), '192.168.0.1')
51 self.assertEqual(utils.long_to_ip(16909320), '1.2.4.8')
52
53
54 def test_create_subnet_mask(self):
55 self.assertEqual(utils.create_subnet_mask(0), 0)
56 self.assertEqual(utils.create_subnet_mask(32), 4294967295)
57 self.assertEqual(utils.create_subnet_mask(25), 4294967168)
58
59
60 def test_format_ip_with_mask(self):
61 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 0),
62 '0.0.0.0/0')
63 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 32),
64 '192.168.0.1/32')
65 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 26),
66 '192.168.0.0/26')
67 self.assertEqual(utils.format_ip_with_mask('192.168.0.255', 26),
68 '192.168.0.192/26')
69
70
71 def create_test_file(self, contents):
72 test_file = StringIO.StringIO(contents)
73 utils.open.expect_call("filename", "r").and_return(test_file)
74
75
76 def test_reads_one_line_file(self):
77 self.create_test_file("abc\n")
78 self.assertEqual("abc", utils.read_one_line("filename"))
79 self.god.check_playback()
80
81
82 def test_strips_read_lines(self):
83 self.create_test_file("abc \n")
84 self.assertEqual("abc ", utils.read_one_line("filename"))
85 self.god.check_playback()
86
87
88 def test_drops_extra_lines(self):
89 self.create_test_file("line 1\nline 2\nline 3\n")
90 self.assertEqual("line 1", utils.read_one_line("filename"))
91 self.god.check_playback()
92
93
94 def test_works_on_empty_file(self):
95 self.create_test_file("")
96 self.assertEqual("", utils.read_one_line("filename"))
97 self.god.check_playback()
98
99
100 def test_works_on_file_with_no_newlines(self):
101 self.create_test_file("line but no newline")
102 self.assertEqual("line but no newline",
103 utils.read_one_line("filename"))
104 self.god.check_playback()
105
106
107 def test_preserves_leading_whitespace(self):
108 self.create_test_file(" has leading whitespace")
109 self.assertEqual(" has leading whitespace",
110 utils.read_one_line("filename"))
111
112
113class test_write_one_line(unittest.TestCase):
114 def setUp(self):
115 self.god = mock.mock_god(ut=self)
116 self.god.stub_function(utils, "open")
117
118
119 def tearDown(self):
120 self.god.unstub_all()
121
122
123 def get_write_one_line_output(self, content):
124 test_file = mock.SaveDataAfterCloseStringIO()
125 utils.open.expect_call("filename", "w").and_return(test_file)
126 utils.write_one_line("filename", content)
127 self.god.check_playback()
128 return test_file.final_data
129
130
131 def test_writes_one_line_file(self):
132 self.assertEqual("abc\n", self.get_write_one_line_output("abc"))
133
134
135 def test_preserves_existing_newline(self):
136 self.assertEqual("abc\n", self.get_write_one_line_output("abc\n"))
137
138
139 def test_preserves_leading_whitespace(self):
140 self.assertEqual(" abc\n", self.get_write_one_line_output(" abc"))
141
142
143 def test_preserves_trailing_whitespace(self):
144 self.assertEqual("abc \n", self.get_write_one_line_output("abc "))
145
146
147 def test_handles_empty_input(self):
148 self.assertEqual("\n", self.get_write_one_line_output(""))
149
150
151class test_open_write_close(unittest.TestCase):
152 def setUp(self):
153 self.god = mock.mock_god(ut=self)
154 self.god.stub_function(utils, "open")
155
156
157 def tearDown(self):
158 self.god.unstub_all()
159
160
161 def test_simple_functionality(self):
162 data = "\n\nwhee\n"
163 test_file = mock.SaveDataAfterCloseStringIO()
164 utils.open.expect_call("filename", "w").and_return(test_file)
165 utils.open_write_close("filename", data)
166 self.god.check_playback()
167 self.assertEqual(data, test_file.final_data)
168
169
170class test_read_keyval(unittest.TestCase):
171 def setUp(self):
172 self.god = mock.mock_god(ut=self)
173 self.god.stub_function(utils, "open")
174 self.god.stub_function(os.path, "isdir")
175 self.god.stub_function(os.path, "exists")
176
177
178 def tearDown(self):
179 self.god.unstub_all()
180
181
182 def create_test_file(self, filename, contents):
183 test_file = StringIO.StringIO(contents)
184 os.path.exists.expect_call(filename).and_return(True)
185 utils.open.expect_call(filename).and_return(test_file)
186
187
188 def read_keyval(self, contents):
189 os.path.isdir.expect_call("file").and_return(False)
190 self.create_test_file("file", contents)
191 keyval = utils.read_keyval("file")
192 self.god.check_playback()
193 return keyval
194
195
196 def test_returns_empty_when_file_doesnt_exist(self):
197 os.path.isdir.expect_call("file").and_return(False)
198 os.path.exists.expect_call("file").and_return(False)
199 self.assertEqual({}, utils.read_keyval("file"))
200 self.god.check_playback()
201
202
203 def test_accesses_files_directly(self):
204 os.path.isdir.expect_call("file").and_return(False)
205 self.create_test_file("file", "")
206 utils.read_keyval("file")
207 self.god.check_playback()
208
209
210 def test_accesses_directories_through_keyval_file(self):
211 os.path.isdir.expect_call("dir").and_return(True)
212 self.create_test_file("dir/keyval", "")
213 utils.read_keyval("dir")
214 self.god.check_playback()
215
216
217 def test_values_are_rstripped(self):
218 keyval = self.read_keyval("a=b \n")
219 self.assertEquals(keyval, {"a": "b"})
220
221
222 def test_comments_are_ignored(self):
223 keyval = self.read_keyval("a=b # a comment\n")
224 self.assertEquals(keyval, {"a": "b"})
225
226
227 def test_integers_become_ints(self):
228 keyval = self.read_keyval("a=1\n")
229 self.assertEquals(keyval, {"a": 1})
230 self.assertEquals(int, type(keyval["a"]))
231
232
233 def test_float_values_become_floats(self):
234 keyval = self.read_keyval("a=1.5\n")
235 self.assertEquals(keyval, {"a": 1.5})
236 self.assertEquals(float, type(keyval["a"]))
237
238
239 def test_multiple_lines(self):
240 keyval = self.read_keyval("a=one\nb=two\n")
241 self.assertEquals(keyval, {"a": "one", "b": "two"})
242
243
244 def test_the_last_duplicate_line_is_used(self):
245 keyval = self.read_keyval("a=one\nb=two\na=three\n")
246 self.assertEquals(keyval, {"a": "three", "b": "two"})
247
248
249 def test_extra_equals_are_included_in_values(self):
250 keyval = self.read_keyval("a=b=c\n")
251 self.assertEquals(keyval, {"a": "b=c"})
252
253
254 def test_non_alphanumeric_keynames_are_rejected(self):
255 self.assertRaises(ValueError, self.read_keyval, "a$=one\n")
256
257
258 def test_underscores_are_allowed_in_key_names(self):
259 keyval = self.read_keyval("a_b=value\n")
260 self.assertEquals(keyval, {"a_b": "value"})
261
262
263 def test_dashes_are_allowed_in_key_names(self):
264 keyval = self.read_keyval("a-b=value\n")
265 self.assertEquals(keyval, {"a-b": "value"})
266
267 def test_empty_value_is_allowed(self):
268 keyval = self.read_keyval("a=\n")
269 self.assertEquals(keyval, {"a": ""})
270
271
272class test_write_keyval(unittest.TestCase):
273 def setUp(self):
274 self.god = mock.mock_god(ut=self)
275 self.god.stub_function(utils, "open")
276 self.god.stub_function(os.path, "isdir")
277
278
279 def tearDown(self):
280 self.god.unstub_all()
281
282
283 def assertHasLines(self, value, lines):
284 vlines = value.splitlines()
285 vlines.sort()
286 self.assertEquals(vlines, sorted(lines))
287
288
289 def write_keyval(self, filename, dictionary, expected_filename=None,
290 type_tag=None):
291 if expected_filename is None:
292 expected_filename = filename
293 test_file = StringIO.StringIO()
294 self.god.stub_function(test_file, "close")
295 utils.open.expect_call(expected_filename, "a").and_return(test_file)
296 test_file.close.expect_call()
297 if type_tag is None:
298 utils.write_keyval(filename, dictionary)
299 else:
300 utils.write_keyval(filename, dictionary, type_tag)
301 return test_file.getvalue()
302
303
304 def write_keyval_file(self, dictionary, type_tag=None):
305 os.path.isdir.expect_call("file").and_return(False)
306 return self.write_keyval("file", dictionary, type_tag=type_tag)
307
308
309 def test_accesses_files_directly(self):
310 os.path.isdir.expect_call("file").and_return(False)
311 result = self.write_keyval("file", {"a": "1"})
312 self.assertEquals(result, "a=1\n")
313
314
315 def test_accesses_directories_through_keyval_file(self):
316 os.path.isdir.expect_call("dir").and_return(True)
317 result = self.write_keyval("dir", {"b": "2"}, "dir/keyval")
318 self.assertEquals(result, "b=2\n")
319
320
321 def test_numbers_are_stringified(self):
322 result = self.write_keyval_file({"c": 3})
323 self.assertEquals(result, "c=3\n")
324
325
326 def test_type_tags_are_excluded_by_default(self):
327 result = self.write_keyval_file({"d": "a string"})
328 self.assertEquals(result, "d=a string\n")
329 self.assertRaises(ValueError, self.write_keyval_file,
330 {"d{perf}": "a string"})
331
332
333 def test_perf_tags_are_allowed(self):
334 result = self.write_keyval_file({"a{perf}": 1, "b{perf}": 2},
335 type_tag="perf")
336 self.assertHasLines(result, ["a{perf}=1", "b{perf}=2"])
337 self.assertRaises(ValueError, self.write_keyval_file,
338 {"a": 1, "b": 2}, type_tag="perf")
339
340
341 def test_non_alphanumeric_keynames_are_rejected(self):
342 self.assertRaises(ValueError, self.write_keyval_file, {"x$": 0})
343
344
345 def test_underscores_are_allowed_in_key_names(self):
346 result = self.write_keyval_file({"a_b": "value"})
347 self.assertEquals(result, "a_b=value\n")
348
349
350 def test_dashes_are_allowed_in_key_names(self):
351 result = self.write_keyval_file({"a-b": "value"})
352 self.assertEquals(result, "a-b=value\n")
353
354
355class test_is_url(unittest.TestCase):
356 def test_accepts_http(self):
357 self.assertTrue(utils.is_url("http://example.com"))
358
359
360 def test_accepts_ftp(self):
361 self.assertTrue(utils.is_url("ftp://ftp.example.com"))
362
363
364 def test_rejects_local_path(self):
365 self.assertFalse(utils.is_url("/home/username/file"))
366
367
368 def test_rejects_local_filename(self):
369 self.assertFalse(utils.is_url("filename"))
370
371
372 def test_rejects_relative_local_path(self):
373 self.assertFalse(utils.is_url("somedir/somesubdir/file"))
374
375
376 def test_rejects_local_path_containing_url(self):
377 self.assertFalse(utils.is_url("somedir/http://path/file"))
378
379
380class test_urlopen(unittest.TestCase):
381 def setUp(self):
382 self.god = mock.mock_god(ut=self)
383
384
385 def tearDown(self):
386 self.god.unstub_all()
387
388
389 def stub_urlopen_with_timeout_comparison(self, test_func, expected_return,
390 *expected_args):
391 expected_args += (None,) * (2 - len(expected_args))
392 def urlopen(url, data=None):
393 self.assertEquals(expected_args, (url,data))
394 test_func(socket.getdefaulttimeout())
395 return expected_return
396 self.god.stub_with(urllib2, "urlopen", urlopen)
397
398
399 def stub_urlopen_with_timeout_check(self, expected_timeout,
400 expected_return, *expected_args):
401 def test_func(timeout):
402 self.assertEquals(timeout, expected_timeout)
403 self.stub_urlopen_with_timeout_comparison(test_func, expected_return,
404 *expected_args)
405
406
407 def test_timeout_set_during_call(self):
408 self.stub_urlopen_with_timeout_check(30, "retval", "url")
409 retval = utils.urlopen("url", timeout=30)
410 self.assertEquals(retval, "retval")
411
412
413 def test_timeout_reset_after_call(self):
414 old_timeout = socket.getdefaulttimeout()
415 self.stub_urlopen_with_timeout_check(30, None, "url")
416 try:
417 socket.setdefaulttimeout(1234)
418 utils.urlopen("url", timeout=30)
419 self.assertEquals(1234, socket.getdefaulttimeout())
420 finally:
421 socket.setdefaulttimeout(old_timeout)
422
423
424 def test_timeout_set_by_default(self):
425 def test_func(timeout):
426 self.assertTrue(timeout is not None)
427 self.stub_urlopen_with_timeout_comparison(test_func, None, "url")
428 utils.urlopen("url")
429
430
431 def test_args_are_untouched(self):
432 self.stub_urlopen_with_timeout_check(30, None, "http://url",
433 "POST data")
434 utils.urlopen("http://url", timeout=30, data="POST data")
435
436
437class test_urlretrieve(unittest.TestCase):
438 def setUp(self):
439 self.god = mock.mock_god(ut=self)
440
441
442 def tearDown(self):
443 self.god.unstub_all()
444
445
446 def test_urlopen_passed_arguments(self):
447 self.god.stub_function(utils, "urlopen")
448 self.god.stub_function(utils.shutil, "copyfileobj")
449 self.god.stub_function(utils, "open")
450
451 url = "url"
452 dest = "somefile"
453 data = object()
454 timeout = 10
455
456 src_file = self.god.create_mock_class(file, "file")
457 dest_file = self.god.create_mock_class(file, "file")
458
459 (utils.urlopen.expect_call(url, data=data, timeout=timeout)
460 .and_return(src_file))
461 utils.open.expect_call(dest, "wb").and_return(dest_file)
462 utils.shutil.copyfileobj.expect_call(src_file, dest_file)
463 dest_file.close.expect_call()
464 src_file.close.expect_call()
465
466 utils.urlretrieve(url, dest, data=data, timeout=timeout)
467 self.god.check_playback()
468
469
470class test_merge_trees(unittest.TestCase):
471 # a some path-handling helper functions
472 def src(self, *path_segments):
473 return os.path.join(self.src_tree.name, *path_segments)
474
475
476 def dest(self, *path_segments):
477 return os.path.join(self.dest_tree.name, *path_segments)
478
479
480 def paths(self, *path_segments):
481 return self.src(*path_segments), self.dest(*path_segments)
482
483
484 def assertFileEqual(self, *path_segments):
485 src, dest = self.paths(*path_segments)
486 self.assertEqual(True, os.path.isfile(src))
487 self.assertEqual(True, os.path.isfile(dest))
488 self.assertEqual(os.path.getsize(src), os.path.getsize(dest))
489 self.assertEqual(open(src).read(), open(dest).read())
490
491
492 def assertFileContents(self, contents, *path_segments):
493 dest = self.dest(*path_segments)
494 self.assertEqual(True, os.path.isfile(dest))
495 self.assertEqual(os.path.getsize(dest), len(contents))
496 self.assertEqual(contents, open(dest).read())
497
498
499 def setUp(self):
500 self.src_tree = autotemp.tempdir(unique_id='utilsrc')
501 self.dest_tree = autotemp.tempdir(unique_id='utilsdest')
502
503 # empty subdirs
504 os.mkdir(self.src("empty"))
505 os.mkdir(self.dest("empty"))
506
507
508 def tearDown(self):
509 self.src_tree.clean()
510 self.dest_tree.clean()
511
512
513 def test_both_dont_exist(self):
514 utils.merge_trees(*self.paths("empty"))
515
516
517 def test_file_only_at_src(self):
518 print >> open(self.src("src_only"), "w"), "line 1"
519 utils.merge_trees(*self.paths("src_only"))
520 self.assertFileEqual("src_only")
521
522
523 def test_file_only_at_dest(self):
524 print >> open(self.dest("dest_only"), "w"), "line 1"
525 utils.merge_trees(*self.paths("dest_only"))
526 self.assertEqual(False, os.path.exists(self.src("dest_only")))
527 self.assertFileContents("line 1\n", "dest_only")
528
529
530 def test_file_at_both(self):
531 print >> open(self.dest("in_both"), "w"), "line 1"
532 print >> open(self.src("in_both"), "w"), "line 2"
533 utils.merge_trees(*self.paths("in_both"))
534 self.assertFileContents("line 1\nline 2\n", "in_both")
535
536
537 def test_directory_with_files_in_both(self):
538 print >> open(self.dest("in_both"), "w"), "line 1"
539 print >> open(self.src("in_both"), "w"), "line 3"
540 utils.merge_trees(*self.paths())
541 self.assertFileContents("line 1\nline 3\n", "in_both")
542
543
544 def test_directory_with_mix_of_files(self):
545 print >> open(self.dest("in_dest"), "w"), "dest line"
546 print >> open(self.src("in_src"), "w"), "src line"
547 utils.merge_trees(*self.paths())
548 self.assertFileContents("dest line\n", "in_dest")
549 self.assertFileContents("src line\n", "in_src")
550
551
552 def test_directory_with_subdirectories(self):
553 os.mkdir(self.src("src_subdir"))
554 print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line"
555 os.mkdir(self.src("both_subdir"))
556 os.mkdir(self.dest("both_subdir"))
557 print >> open(self.src("both_subdir", "subfile"), "w"), "src line"
558 print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line"
559 utils.merge_trees(*self.paths())
560 self.assertFileContents("subdir line\n", "src_subdir", "subfile")
561 self.assertFileContents("dest line\nsrc line\n", "both_subdir",
562 "subfile")
563
564
565class test_get_relative_path(unittest.TestCase):
566 def test_not_absolute(self):
567 self.assertRaises(AssertionError, utils.get_relative_path, "a", "b")
568
569 def test_same_dir(self):
570 self.assertEqual(utils.get_relative_path("/a/b/c", "/a/b"), "c")
571
572 def test_forward_dir(self):
573 self.assertEqual(utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d")
574
575 def test_previous_dir(self):
576 self.assertEqual(utils.get_relative_path("/a/b", "/a/b/c/d"), "../..")
577
578 def test_parallel_dir(self):
579 self.assertEqual(utils.get_relative_path("/a/c/d", "/a/b/c/d"),
580 "../../../c/d")
581
582
583class test_sh_escape(unittest.TestCase):
584 def _test_in_shell(self, text):
585 escaped_text = utils.sh_escape(text)
586 proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True,
587 stdin=open(os.devnull, 'r'),
588 stdout=subprocess.PIPE,
589 stderr=open(os.devnull, 'w'))
590 stdout, _ = proc.communicate()
591 self.assertEqual(proc.returncode, 0)
592 self.assertEqual(stdout[:-1], text)
593
594
595 def test_normal_string(self):
596 self._test_in_shell('abcd')
597
598
599 def test_spaced_string(self):
600 self._test_in_shell('abcd efgh')
601
602
603 def test_dollar(self):
604 self._test_in_shell('$')
605
606
607 def test_single_quote(self):
608 self._test_in_shell('\'')
609
610
611 def test_single_quoted_string(self):
612 self._test_in_shell('\'efgh\'')
613
614
615 def test_string_with_single_quote(self):
616 self._test_in_shell("a'b")
617
618
619 def test_string_with_escaped_single_quote(self):
620 self._test_in_shell(r"a\'b")
621
622
623 def test_double_quote(self):
624 self._test_in_shell('"')
625
626
627 def test_double_quoted_string(self):
628 self._test_in_shell('"abcd"')
629
630
631 def test_backtick(self):
632 self._test_in_shell('`')
633
634
635 def test_backticked_string(self):
636 self._test_in_shell('`jklm`')
637
638
639 def test_backslash(self):
640 self._test_in_shell('\\')
641
642
643 def test_backslashed_special_characters(self):
644 self._test_in_shell('\\$')
645 self._test_in_shell('\\"')
646 self._test_in_shell('\\\'')
647 self._test_in_shell('\\`')
648
649
650 def test_backslash_codes(self):
651 self._test_in_shell('\\n')
652 self._test_in_shell('\\r')
653 self._test_in_shell('\\t')
654 self._test_in_shell('\\v')
655 self._test_in_shell('\\b')
656 self._test_in_shell('\\a')
657 self._test_in_shell('\\000')
658
659 def test_real_newline(self):
660 self._test_in_shell('\n')
661 self._test_in_shell('\\\n')
662
663
664class test_sh_quote_word(test_sh_escape):
665 """Run tests on sh_quote_word.
666
667 Inherit from test_sh_escape to get the same tests to run on both.
668 """
669
670 def _test_in_shell(self, text):
671 quoted_word = utils.sh_quote_word(text)
672 echoed_value = subprocess.check_output('echo %s' % quoted_word,
673 shell=True)
674 self.assertEqual(echoed_value, text + '\n')
675
676
677class test_nested_sh_quote_word(test_sh_quote_word):
678 """Run nested tests on sh_quote_word.
679
680 Inherit from test_sh_quote_word to get the same tests to run on both.
681 """
682
683 def _test_in_shell(self, text):
684 command = 'echo ' + utils.sh_quote_word(text)
685 nested_command = 'echo ' + utils.sh_quote_word(command)
686 produced_command = subprocess.check_output(nested_command, shell=True)
687 echoed_value = subprocess.check_output(produced_command, shell=True)
688 self.assertEqual(echoed_value, text + '\n')
689
690
691class test_run(unittest.TestCase):
692 """
693 Test the utils.run() function.
694
695 Note: This test runs simple external commands to test the utils.run()
696 API without assuming implementation details.
697 """
Daniel Erat3300f6e2018-01-09 17:34:31 -0800698
699 # Log levels in ascending severity.
700 LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
701 logging.CRITICAL]
702
703
Allen Li5ed7e632017-02-03 16:31:33 -0800704 def setUp(self):
705 self.god = mock.mock_god(ut=self)
706 self.god.stub_function(utils.logging, 'warning')
707 self.god.stub_function(utils.logging, 'debug')
708
Daniel Erat3300f6e2018-01-09 17:34:31 -0800709 # Log level -> StringIO.StringIO.
710 self.logs = {}
711 for level in self.LOG_LEVELS:
712 self.logs[level] = StringIO.StringIO()
713
714 # Override logging_manager.LoggingFile to return buffers.
715 def logging_file(level=None, prefix=None):
716 return self.logs[level]
717 self.god.stub_with(utils.logging_manager, 'LoggingFile', logging_file)
Allen Li5ed7e632017-02-03 16:31:33 -0800718
719 def tearDown(self):
720 self.god.unstub_all()
721
722
723 def __check_result(self, result, command, exit_status=0, stdout='',
724 stderr=''):
725 self.assertEquals(result.command, command)
726 self.assertEquals(result.exit_status, exit_status)
727 self.assertEquals(result.stdout, stdout)
728 self.assertEquals(result.stderr, stderr)
729
730
Daniel Erat3300f6e2018-01-09 17:34:31 -0800731 def __get_logs(self):
732 """Returns contents of log buffers at all levels.
733
734 @return: 5-element list of strings corresponding to logged messages
735 at the levels in self.LOG_LEVELS.
736 """
737 return [self.logs[v].getvalue() for v in self.LOG_LEVELS]
738
739
Allen Li5ed7e632017-02-03 16:31:33 -0800740 def test_default_simple(self):
741 cmd = 'echo "hello world"'
742 # expect some king of logging.debug() call but don't care about args
743 utils.logging.debug.expect_any_call()
744 self.__check_result(utils.run(cmd), cmd, stdout='hello world\n')
745
746
747 def test_default_failure(self):
748 cmd = 'exit 11'
749 try:
750 utils.run(cmd, verbose=False)
751 except utils.error.CmdError, err:
752 self.__check_result(err.result_obj, cmd, exit_status=11)
753
754
755 def test_ignore_status(self):
756 cmd = 'echo error >&2 && exit 11'
757 self.__check_result(utils.run(cmd, ignore_status=True, verbose=False),
758 cmd, exit_status=11, stderr='error\n')
759
760
761 def test_timeout(self):
762 # we expect a logging.warning() message, don't care about the contents
763 utils.logging.warning.expect_any_call()
764 try:
765 utils.run('echo -n output && sleep 10', timeout=1, verbose=False)
766 except utils.error.CmdError, err:
767 self.assertEquals(err.result_obj.stdout, 'output')
768
769
770 def test_stdout_stderr_tee(self):
771 cmd = 'echo output && echo error >&2'
772 stdout_tee = StringIO.StringIO()
773 stderr_tee = StringIO.StringIO()
774
775 self.__check_result(utils.run(
776 cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee,
777 verbose=False), cmd, stdout='output\n', stderr='error\n')
778 self.assertEqual(stdout_tee.getvalue(), 'output\n')
779 self.assertEqual(stderr_tee.getvalue(), 'error\n')
780
781
782 def test_stdin_string(self):
783 cmd = 'cat'
784 self.__check_result(utils.run(cmd, verbose=False, stdin='hi!\n'),
785 cmd, stdout='hi!\n')
786
787
Daniel Erat3300f6e2018-01-09 17:34:31 -0800788 def test_stdout_tee_to_logs_info(self):
789 """Test logging stdout at the info level."""
790 utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS,
791 stdout_level=logging.INFO, verbose=False)
792 self.assertEqual(self.__get_logs(), ['', 'output\n', '', '', ''])
793
794
795 def test_stdout_tee_to_logs_warning(self):
796 """Test logging stdout at the warning level."""
797 utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS,
798 stdout_level=logging.WARNING, verbose=False)
799 self.assertEqual(self.__get_logs(), ['', '', 'output\n', '', ''])
800
801
802 def test_stdout_and_stderr_tee_to_logs(self):
803 """Test simultaneous stdout and stderr log levels."""
804 utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS,
805 stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO,
806 stderr_level=logging.ERROR, verbose=False)
807 self.assertEqual(self.__get_logs(), ['', 'output\n', '', 'error\n', ''])
808
809
810 def test_default_expected_stderr_log_level(self):
811 """Test default expected stderr log level.
812
813 stderr should be logged at the same level as stdout when
814 stderr_is_expected is true and stderr_level isn't passed.
815 """
816 utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS,
817 stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO,
818 stderr_is_expected=True, verbose=False)
819 self.assertEqual(self.__get_logs(), ['', 'output\nerror\n', '', '', ''])
820
821
Allen Li5ed7e632017-02-03 16:31:33 -0800822 def test_safe_args(self):
823 # NOTE: The string in expected_quoted_cmd depends on the internal
824 # implementation of shell quoting which is used by utils.run(),
825 # in this case, sh_quote_word().
826 expected_quoted_cmd = "echo 'hello \"world' again"
827 self.__check_result(utils.run(
828 'echo', verbose=False, args=('hello "world', 'again')),
829 expected_quoted_cmd, stdout='hello "world again\n')
830
831
832 def test_safe_args_given_string(self):
833 self.assertRaises(TypeError, utils.run, 'echo', args='hello')
834
835
836 def test_wait_interrupt(self):
837 """Test that we actually select twice if the first one returns EINTR."""
838 utils.logging.debug.expect_any_call()
839
840 bg_job = utils.BgJob('echo "hello world"')
841 bg_job.result.exit_status = 0
842 self.god.stub_function(utils.select, 'select')
843
844 utils.select.select.expect_any_call().and_raises(
845 select.error(errno.EINTR, 'Select interrupted'))
846 utils.logging.warning.expect_any_call()
847
848 utils.select.select.expect_any_call().and_return(
849 ([bg_job.sp.stdout, bg_job.sp.stderr], [], None))
850 utils.logging.warning.expect_any_call()
851
852 self.assertFalse(
853 utils._wait_for_commands([bg_job], time.time(), None))
854
855
856class test_compare_versions(unittest.TestCase):
857 def test_zerofill(self):
858 self.assertEqual(utils.compare_versions('1.7', '1.10'), -1)
859 self.assertEqual(utils.compare_versions('1.222', '1.3'), 1)
860 self.assertEqual(utils.compare_versions('1.03', '1.3'), 0)
861
862
863 def test_unequal_len(self):
864 self.assertEqual(utils.compare_versions('1.3', '1.3.4'), -1)
865 self.assertEqual(utils.compare_versions('1.3.1', '1.3'), 1)
866
867
868 def test_dash_delimited(self):
869 self.assertEqual(utils.compare_versions('1-2-3', '1-5-1'), -1)
870 self.assertEqual(utils.compare_versions('1-2-1', '1-1-1'), 1)
871 self.assertEqual(utils.compare_versions('1-2-4', '1-2-4'), 0)
872
873
874 def test_alphabets(self):
875 self.assertEqual(utils.compare_versions('m.l.b', 'n.b.a'), -1)
876 self.assertEqual(utils.compare_versions('n.b.a', 'm.l.b'), 1)
877 self.assertEqual(utils.compare_versions('abc.e', 'abc.e'), 0)
878
879
880 def test_mix_symbols(self):
881 self.assertEqual(utils.compare_versions('k-320.1', 'k-320.3'), -1)
882 self.assertEqual(utils.compare_versions('k-231.5', 'k-231.1'), 1)
883 self.assertEqual(utils.compare_versions('k-231.1', 'k-231.1'), 0)
884
885 self.assertEqual(utils.compare_versions('k.320-1', 'k.320-3'), -1)
886 self.assertEqual(utils.compare_versions('k.231-5', 'k.231-1'), 1)
887 self.assertEqual(utils.compare_versions('k.231-1', 'k.231-1'), 0)
888
889
890class test_args_to_dict(unittest.TestCase):
891 def test_no_args(self):
892 result = utils.args_to_dict([])
893 self.assertEqual({}, result)
894
895
896 def test_matches(self):
897 result = utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:',
898 'F__o0O=', 'B8r:=:=', '_bAZ_=:=:'])
899 self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'',
900 'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'})
901
902
903 def test_unmatches(self):
904 # Temporarily shut warning messages from args_to_dict() when an argument
905 # doesn't match its pattern.
906 logger = logging.getLogger()
907 saved_level = logger.level
908 logger.setLevel(logging.ERROR)
909
910 try:
911 result = utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', 'a*b',
912 ':VAL', '=VVV', 'WORD'])
913 self.assertEqual({}, result)
914 finally:
915 # Restore level.
916 logger.setLevel(saved_level)
917
918
919class test_get_random_port(unittest.TestCase):
920 def do_bind(self, port, socket_type, socket_proto):
921 s = socket.socket(socket.AF_INET, socket_type, socket_proto)
922 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
923 s.bind(('', port))
924 return s
925
926
927 def test_get_port(self):
928 for _ in xrange(100):
929 p = utils.get_unused_port()
930 s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP)
931 self.assert_(s.getsockname())
932 s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
933 self.assert_(s.getsockname())
934
935
936def test_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
937 """Test global function.
938 """
939
940
941class TestClass(object):
942 """Test class.
943 """
944
945 def test_instance_function(self, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
946 """Test instance function.
947 """
948
949
950 @classmethod
951 def test_class_function(cls, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
952 """Test class function.
953 """
954
955
956 @staticmethod
957 def test_static_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
958 """Test static function.
959 """
960
961
962class GetFunctionArgUnittest(unittest.TestCase):
963 """Tests for method get_function_arg_value."""
964
965 def run_test(self, func, insert_arg):
966 """Run test.
967
968 @param func: Function being called with given arguments.
969 @param insert_arg: Set to True to insert an object in the argument list.
970 This is to mock instance/class object.
971 """
972 if insert_arg:
973 args = (None, 1, 2, 3)
974 else:
975 args = (1, 2, 3)
976 for i in range(1, 7):
977 self.assertEquals(utils.get_function_arg_value(
978 func, 'arg%d'%i, args, {}), i)
979
980 self.assertEquals(utils.get_function_arg_value(
981 func, 'arg7', args, {'arg7': 7}), 7)
982 self.assertRaises(
983 KeyError, utils.get_function_arg_value,
984 func, 'arg3', args[:-1], {})
985
986
987 def test_global_function(self):
988 """Test global function.
989 """
990 self.run_test(test_function, False)
991
992
993 def test_instance_function(self):
994 """Test instance function.
995 """
996 self.run_test(TestClass().test_instance_function, True)
997
998
999 def test_class_function(self):
1000 """Test class function.
1001 """
1002 self.run_test(TestClass.test_class_function, True)
1003
1004
1005 def test_static_function(self):
1006 """Test static function.
1007 """
1008 self.run_test(TestClass.test_static_function, False)
1009
1010
Allen Li5ed7e632017-02-03 16:31:33 -08001011class IsInSameSubnetUnittest(unittest.TestCase):
1012 """Test is_in_same_subnet function."""
1013
1014 def test_is_in_same_subnet(self):
1015 """Test is_in_same_subnet function."""
1016 self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2',
1017 23))
1018 self.assertFalse(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2',
1019 24))
1020 self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.0.255',
1021 24))
1022 self.assertFalse(utils.is_in_same_subnet('191.168.0.0', '192.168.0.0',
1023 24))
1024
1025
1026class GetWirelessSsidUnittest(unittest.TestCase):
1027 """Test get_wireless_ssid function."""
1028
1029 DEFAULT_SSID = 'default'
1030 SSID_1 = 'ssid_1'
1031 SSID_2 = 'ssid_2'
1032 SSID_3 = 'ssid_3'
1033
1034 def test_get_wireless_ssid(self):
1035 """Test is_in_same_subnet function."""
1036 god = mock.mock_god()
1037 god.stub_function_to_return(utils.CONFIG, 'get_config_value',
1038 self.DEFAULT_SSID)
1039 god.stub_function_to_return(utils.CONFIG, 'get_config_value_regex',
1040 {'wireless_ssid_1.2.3.4/24': self.SSID_1,
1041 'wireless_ssid_4.3.2.1/16': self.SSID_2,
1042 'wireless_ssid_4.3.2.111/32': self.SSID_3})
1043 self.assertEqual(self.SSID_1, utils.get_wireless_ssid('1.2.3.100'))
1044 self.assertEqual(self.SSID_2, utils.get_wireless_ssid('4.3.2.100'))
1045 self.assertEqual(self.SSID_3, utils.get_wireless_ssid('4.3.2.111'))
1046 self.assertEqual(self.DEFAULT_SSID,
1047 utils.get_wireless_ssid('100.0.0.100'))
1048
1049
1050class LaunchControlBuildParseUnittest(unittest.TestCase):
1051 """Test various parsing functions related to Launch Control builds and
1052 devices.
1053 """
1054
1055 def test_parse_launch_control_target(self):
1056 """Test parse_launch_control_target function."""
1057 target_tests = {
1058 ('shamu', 'userdebug'): 'shamu-userdebug',
1059 ('shamu', 'eng'): 'shamu-eng',
1060 ('shamu-board', 'eng'): 'shamu-board-eng',
1061 (None, None): 'bad_target',
1062 (None, None): 'target'}
1063 for result, target in target_tests.items():
1064 self.assertEqual(result, utils.parse_launch_control_target(target))
1065
1066
1067class GetOffloaderUriTest(unittest.TestCase):
1068 """Test get_offload_gsuri function."""
1069 _IMAGE_STORAGE_SERVER = 'gs://test_image_bucket'
1070
1071 def setUp(self):
1072 self.god = mock.mock_god()
1073
1074 def tearDown(self):
1075 self.god.unstub_all()
1076
1077 def test_get_default_lab_offload_gsuri(self):
1078 """Test default lab offload gsuri ."""
1079 self.god.mock_up(utils.CONFIG, 'CONFIG')
1080 self.god.stub_function_to_return(utils, 'is_moblab', False)
1081 self.assertEqual(utils.DEFAULT_OFFLOAD_GSURI,
1082 utils.get_offload_gsuri())
1083
1084 self.god.check_playback()
1085
1086 def test_get_default_moblab_offload_gsuri(self):
1087 self.god.mock_up(utils.CONFIG, 'CONFIG')
1088 self.god.stub_function_to_return(utils, 'is_moblab', True)
1089 utils.CONFIG.get_config_value.expect_call(
1090 'CROS', 'image_storage_server').and_return(
1091 self._IMAGE_STORAGE_SERVER)
1092 self.god.stub_function_to_return(utils,
Keith Haddowa4b55dd2018-02-28 14:34:59 -08001093 'get_moblab_serial_number', 'test_serial_number')
Allen Li5ed7e632017-02-03 16:31:33 -08001094 self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id')
1095 expected_gsuri = '%sresults/%s/%s/' % (
Keith Haddowa4b55dd2018-02-28 14:34:59 -08001096 self._IMAGE_STORAGE_SERVER, 'test_serial_number', 'test_id')
Allen Li5ed7e632017-02-03 16:31:33 -08001097 cached_gsuri = utils.DEFAULT_OFFLOAD_GSURI
1098 utils.DEFAULT_OFFLOAD_GSURI = None
1099 gsuri = utils.get_offload_gsuri()
1100 utils.DEFAULT_OFFLOAD_GSURI = cached_gsuri
1101 self.assertEqual(expected_gsuri, gsuri)
1102
1103 self.god.check_playback()
1104
1105 def test_get_moblab_offload_gsuri(self):
1106 """Test default lab offload gsuri ."""
1107 self.god.mock_up(utils.CONFIG, 'CONFIG')
1108 self.god.stub_function_to_return(utils, 'is_moblab', True)
1109 self.god.stub_function_to_return(utils,
Keith Haddowa4b55dd2018-02-28 14:34:59 -08001110 'get_moblab_serial_number', 'test_serial_number')
Allen Li5ed7e632017-02-03 16:31:33 -08001111 self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id')
1112 gsuri = '%s%s/%s/' % (
Keith Haddowa4b55dd2018-02-28 14:34:59 -08001113 utils.DEFAULT_OFFLOAD_GSURI, 'test_serial_number', 'test_id')
Allen Li5ed7e632017-02-03 16:31:33 -08001114 self.assertEqual(gsuri, utils.get_offload_gsuri())
1115
1116 self.god.check_playback()
1117
1118
Allen Li5ed7e632017-02-03 16:31:33 -08001119
1120class MockMetricsTest(unittest.TestCase):
1121 """Test metrics mock class can handle various metrics calls."""
1122
1123 def test_Counter(self):
1124 """Test the mock class can create an instance and call any method.
1125 """
1126 c = metrics.Counter('counter')
1127 c.increment(fields={'key': 1})
1128
1129
1130 def test_Context(self):
1131 """Test the mock class can handle context class.
1132 """
1133 test_value = None
1134 with metrics.SecondsTimer('context') as t:
1135 test_value = 'called_in_context'
1136 t['random_key'] = 'pass'
1137 self.assertEqual('called_in_context', test_value)
1138
1139
1140 def test_decorator(self):
1141 """Test the mock class can handle decorator.
1142 """
1143 class TestClass(object):
1144
1145 def __init__(self):
1146 self.value = None
1147
1148 test_value = TestClass()
1149 test_value.value = None
1150 @metrics.SecondsTimerDecorator('decorator')
1151 def test(arg):
1152 arg.value = 'called_in_decorator'
1153
1154 test(test_value)
1155 self.assertEqual('called_in_decorator', test_value.value)
1156
1157
1158 def test_setitem(self):
1159 """Test the mock class can handle set item call.
1160 """
1161 timer = metrics.SecondsTimer('name')
1162 timer['random_key'] = 'pass'
1163
1164
Zachary Marcus4e4cd7b2018-05-24 15:07:16 -07001165class test_background_sample(unittest.TestCase):
1166 """Test that the background sample can sample as desired.
1167 """
1168
1169 def test_can_sample(self):
1170 """Test that a simple sample will work with no other complications.
1171 """
1172 should_be_sampled = 'name'
1173
1174 def sample_function():
1175 """Return value of variable stored in method."""
1176 return should_be_sampled
1177 still_sampling = True
1178
1179 t = utils.background_sample_until_condition(
1180 function=sample_function,
1181 condition=lambda: still_sampling,
1182 timeout=5,
1183 sleep_interval=0.1)
1184 result = t.finish()
1185 self.assertIn(should_be_sampled, result)
1186
1187
1188 def test_samples_multiple_values(self):
1189 """Test that a sample will work and actually samples at the necessary
1190 intervals, such that it will pick up changes.
1191 """
1192 should_be_sampled = 'name'
1193
1194 def sample_function():
1195 """Return value of variable stored in method."""
1196 return should_be_sampled
1197 still_sampling = True
1198
1199 t = utils.background_sample_until_condition(
1200 function=sample_function,
1201 condition=lambda: still_sampling,
1202 timeout=5,
1203 sleep_interval=0.1)
1204 # Let it sample values some with the initial value.
1205 time.sleep(2.5)
1206 # It should also sample some with the new value.
1207 should_be_sampled = 'noname'
1208 result = t.finish()
1209 self.assertIn('name', result)
1210 self.assertIn('noname', result)
1211
Allen Li5ed7e632017-02-03 16:31:33 -08001212
Dean Liao7d51db02018-07-16 17:21:42 +08001213class FakeTime(object):
1214 """Provides time() and sleep() for faking time module.
1215 """
1216
1217 def __init__(self, start_time):
1218 self._time = start_time
1219
1220
1221 def time(self):
1222 return self._time
1223
1224
1225 def sleep(self, interval):
1226 self._time += interval
1227
1228
1229class TimeModuleMockTestCase(unittest.TestCase):
1230 """Mocks up utils.time with a FakeTime.
1231
1232 It substitudes time.time() and time.sleep() with FakeTime.time()
1233 and FakeTime.sleep(), respectively.
1234 """
1235
1236 def setUp(self):
1237 self.fake_time_begin = 10
1238 self.fake_time = FakeTime(self.fake_time_begin)
1239 self.patcher = pymock.patch(
1240 'autotest_lib.client.common_lib.utils.time')
1241 self.time_mock = self.patcher.start()
1242 self.addCleanup(self.patcher.stop)
1243 self.time_mock.time.side_effect = self.fake_time.time
1244 self.time_mock.sleep.side_effect = self.fake_time.sleep
1245
1246
1247def always_raise():
1248 """A function that raises an exception."""
1249 raise Exception('always raise')
1250
1251
1252def fail_n_times(count):
1253 """Creates a function that returns False for the first count-th calls.
1254
1255 @return a function returns False for the first count-th calls and True
1256 afterwards.
1257 """
1258 counter = itertools.count(count, -1)
1259 return lambda: next(counter) <= 0
1260
1261
1262class test_poll_for_condition(TimeModuleMockTestCase):
1263 """Test poll_for_condition.
1264 """
1265
1266 def test_ok(self):
1267 """Test polling condition that returns True.
1268 """
1269 self.assertTrue(utils.poll_for_condition(lambda: True))
1270
1271
1272 def test_ok_evaluated_as_true(self):
1273 """Test polling condition which's return value is evaluated as True.
1274 """
1275 self.assertEqual(1, utils.poll_for_condition(lambda: 1))
1276
1277 self.assertEqual('something',
1278 utils.poll_for_condition(lambda: 'something'))
1279
1280
1281 def test_fail(self):
1282 """Test polling condition that returns False.
1283
1284 Expect TimeoutError exception as neither customized exception nor
1285 exception raised from condition().
1286 """
1287 with self.assertRaises(utils.TimeoutError):
1288 utils.poll_for_condition(lambda: False, timeout=3, sleep_interval=1)
1289 self.assertEqual(3, self.time_mock.sleep.call_count)
1290
1291
1292 def test_fail_evaluated_as_false(self):
1293 """Test polling condition which's return value is evaluated as False.
1294
1295 Expect TimeoutError exception as neither customized exception nor
1296 exception raised from condition().
1297 """
1298 with self.assertRaises(utils.TimeoutError):
1299 utils.poll_for_condition(lambda: 0, timeout=3, sleep_interval=1)
1300 self.assertEqual(3, self.time_mock.sleep.call_count)
1301
1302 with self.assertRaises(utils.TimeoutError):
1303 utils.poll_for_condition(lambda: None, timeout=3, sleep_interval=1)
1304
1305
1306 def test_exception_arg(self):
1307 """Test polling condition always fails.
1308
1309 Expect exception raised by 'exception' args.
1310 """
1311 with self.assertRaisesRegexp(Exception, 'from args'):
1312 utils.poll_for_condition(lambda: False,
1313 exception=Exception('from args'),
1314 timeout=3, sleep_interval=1)
1315 self.assertEqual(3, self.time_mock.sleep.call_count)
1316
1317
1318 def test_exception_from_condition(self):
1319 """Test polling condition always fails.
1320
1321 Expect exception raised by condition().
1322 """
1323 with self.assertRaisesRegexp(Exception, 'always raise'):
1324 utils.poll_for_condition(always_raise,
1325 exception=Exception('from args'),
1326 timeout=3, sleep_interval=1)
1327 # For poll_for_condition, if condition() raises exception, it raises
1328 # immidiately without retry. So sleep() should not be called.
1329 self.time_mock.sleep.assert_not_called()
1330
1331
1332 def test_ok_after_retry(self):
1333 """Test polling a condition which is success after retry twice.
1334 """
1335 self.assertTrue(utils.poll_for_condition(fail_n_times(2), timeout=3,
1336 sleep_interval=1))
1337
1338
1339 def test_cannot_wait(self):
1340 """Test polling a condition which fails till timeout.
1341 """
1342 with self.assertRaisesRegexp(
1343 utils.TimeoutError,
1344 'Timed out waiting for unnamed condition'):
1345 utils.poll_for_condition(fail_n_times(4), timeout=3,
1346 sleep_interval=1)
1347 self.assertEqual(3, self.time_mock.sleep.call_count)
1348
1349
1350class test_poll_for_condition_ex(TimeModuleMockTestCase):
1351 """Test poll_for_condition_ex.
1352 """
1353
1354 def test_ok(self):
1355 """Test polling condition that returns True.
1356 """
1357 self.assertTrue(utils.poll_for_condition_ex(lambda: True))
1358
1359
1360 def test_ok_evaluated_as_true(self):
1361 """Test polling condition which's return value is evaluated as True.
1362 """
1363 self.assertEqual(1, utils.poll_for_condition_ex(lambda: 1))
1364
1365 self.assertEqual('something',
1366 utils.poll_for_condition_ex(lambda: 'something'))
1367
1368
1369 def test_fail(self):
1370 """Test polling condition that returns False.
1371
1372 Expect TimeoutError raised.
1373 """
1374 with self.assertRaisesRegexp(
1375 utils.TimeoutError,
1376 'Timed out waiting for unamed condition'):
1377 utils.poll_for_condition_ex(lambda: False, timeout=3,
1378 sleep_interval=1)
1379 self.assertEqual(2, self.time_mock.sleep.call_count)
1380
1381
1382 def test_fail_evaluated_as_false(self):
1383 """Test polling condition which's return value is evaluated as False.
1384
1385 Expect TimeoutError raised.
1386 """
1387 with self.assertRaisesRegexp(
1388 utils.TimeoutError,
1389 'Timed out waiting for unamed condition'):
1390 utils.poll_for_condition_ex(lambda: 0, timeout=3,
1391 sleep_interval=1)
1392 self.assertEqual(2, self.time_mock.sleep.call_count)
1393
1394 with self.assertRaisesRegexp(
1395 utils.TimeoutError,
1396 'Timed out waiting for unamed condition'):
1397 utils.poll_for_condition_ex(lambda: None, timeout=3,
1398 sleep_interval=1)
1399
1400
1401 def test_desc_arg(self):
1402 """Test polling condition always fails with desc.
1403
1404 Expect TimeoutError with condition description embedded.
1405 """
1406 with self.assertRaisesRegexp(
1407 utils.TimeoutError,
1408 'Timed out waiting for always false condition'):
1409 utils.poll_for_condition_ex(lambda: False,
1410 desc='always false condition',
1411 timeout=3, sleep_interval=1)
1412 self.assertEqual(2, self.time_mock.sleep.call_count)
1413
1414
1415 def test_exception(self):
1416 """Test polling condition that raises.
1417
1418 Expect TimeoutError with condition raised exception embedded.
1419 """
1420 with self.assertRaisesRegexp(
1421 utils.TimeoutError,
1422 "Reason: Exception\('always raise',\)"):
1423 utils.poll_for_condition_ex(always_raise, timeout=3,
1424 sleep_interval=1)
1425 self.assertEqual(2, self.time_mock.sleep.call_count)
1426
1427
1428 def test_ok_after_retry(self):
1429 """Test polling a condition which is success after retry twice.
1430 """
1431 self.assertTrue(utils.poll_for_condition_ex(fail_n_times(2), timeout=3,
1432 sleep_interval=1))
1433
1434
1435 def test_cannot_wait(self):
1436 """Test polling a condition which fails till timeout.
1437 """
1438 with self.assertRaisesRegexp(
1439 utils.TimeoutError,
1440 'condition evaluted as false'):
1441 utils.poll_for_condition_ex(fail_n_times(3), timeout=3,
1442 sleep_interval=1)
1443 self.assertEqual(2, self.time_mock.sleep.call_count)
1444
1445
1446class test_timer(TimeModuleMockTestCase):
1447 """Test Timer.
1448 """
1449
1450 def test_zero_timeout(self):
1451 """Test Timer with zero timeout.
1452
1453 Only the first timer.sleep(0) is True.
1454 """
1455 timer = utils.Timer(0)
1456 self.assertTrue(timer.sleep(0))
1457 self.assertFalse(timer.sleep(0))
1458 self.time_mock.sleep.assert_not_called()
1459
1460
1461 def test_sleep(self):
1462 """Test Timer.sleep()
1463 """
1464 timeout = 3
1465 sleep_interval = 2
1466 timer = utils.Timer(timeout)
1467
1468 # Kicks off timer.
1469 self.assertTrue(timer.sleep(sleep_interval))
1470 self.assertEqual(self.fake_time_begin + timeout, timer.deadline)
1471 self.assertTrue(timer.sleep(sleep_interval))
1472 # now: 12. 12 + 2 > 13, unable to sleep
1473 self.assertFalse(timer.sleep(sleep_interval))
1474
1475 self.time_mock.sleep.assert_has_calls([pymock.call(sleep_interval)])
1476
1477
1478class test_timeout_error(unittest.TestCase):
1479 """Test TimeoutError.
1480
1481 Test TimeoutError with three invocations format.
1482 """
1483
1484 def test_no_args(self):
1485 """Create TimeoutError without arguments.
1486 """
1487 e = utils.TimeoutError()
1488 self.assertEqual('', str(e))
1489 self.assertEqual('TimeoutError()', repr(e))
1490
1491
1492 def test_with_message(self):
1493 """Create TimeoutError with text message.
1494 """
1495 e = utils.TimeoutError(message='Waiting for condition')
1496 self.assertEqual('Waiting for condition', str(e))
1497 self.assertEqual("TimeoutError('Waiting for condition',)", repr(e))
1498
1499 # Positional message argument for backward compatibility.
1500 e = utils.TimeoutError('Waiting for condition')
1501 self.assertEqual('Waiting for condition', str(e))
1502 self.assertEqual("TimeoutError('Waiting for condition',)", repr(e))
1503
1504
1505
1506 def test_with_reason(self):
1507 """Create TimeoutError with reason only.
1508 """
1509 e = utils.TimeoutError(reason='illegal input')
1510 self.assertEqual("Reason: 'illegal input'", str(e))
1511 self.assertEqual("TimeoutError(\"Reason: 'illegal input'\",)", repr(e))
1512 self.assertEqual('illegal input', e.reason)
1513
1514
1515 def test_with_message_reason(self):
1516 """Create TimeoutError with text message and reason.
1517 """
1518 e = utils.TimeoutError(message='Waiting for condition',
1519 reason='illegal input')
1520 self.assertEqual("Waiting for condition. Reason: 'illegal input'",
1521 str(e))
1522 self.assertEqual('illegal input', e.reason)
1523
1524 # Positional message argument for backward compatibility.
1525 e = utils.TimeoutError('Waiting for condition', reason='illegal input')
1526 self.assertEqual("Waiting for condition. Reason: 'illegal input'",
1527 str(e))
1528 self.assertEqual('illegal input', e.reason)
1529
1530
1531 def test_with_message_reason_object(self):
1532 """Create TimeoutError with text message and reason as exception object.
1533 """
1534 e = utils.TimeoutError(message='Waiting for condition',
1535 reason=Exception('illegal input'))
1536 self.assertEqual(
1537 "Waiting for condition. Reason: Exception('illegal input',)",
1538 str(e))
1539 self.assertIsInstance(e.reason, Exception)
1540 self.assertEqual('illegal input', e.reason.message)
1541
1542 # Positional message argument for backward compatibility.
1543 e = utils.TimeoutError('Waiting for condition',
1544 reason=Exception('illegal input'))
1545 self.assertEqual(
1546 "Waiting for condition. Reason: Exception('illegal input',)",
1547 str(e))
1548 self.assertIsInstance(e.reason, Exception)
1549 self.assertEqual('illegal input', e.reason.message)
1550
1551
1552
Allen Li5ed7e632017-02-03 16:31:33 -08001553if __name__ == "__main__":
1554 unittest.main()