blob: b7a19d7aa4d8bd95ab2640571685e290edf0dfbe [file] [log] [blame]
mbligh7c8ea992009-06-22 19:03:08 +00001#!/usr/bin/python
mblighe48bcfb2008-11-11 17:09:44 +00002
3import fcntl, os, signal, subprocess, StringIO
4import tempfile, textwrap, time, unittest
5import monitors_util
6
7
8def InlineStringIO(text):
9 return StringIO.StringIO(textwrap.dedent(text).strip())
10
11
12class WriteLoglineTestCase(unittest.TestCase):
13 def setUp(self):
14 self.time_tuple = (2008, 10, 31, 18, 58, 17, 4, 305, 1)
15 self.format = '[%Y-%m-%d %H:%M:%S]'
16 self.formatted_time_tuple = '[2008-10-31 18:58:17]'
17 self.msg = 'testing testing'
18
19 # Stub out time.localtime()
20 self.orig_localtime = time.localtime
21 time.localtime = lambda: self.time_tuple
22
23
24 def tearDown(self):
25 time.localtime = self.orig_localtime
26
27
28 def test_prepend_timestamp(self):
29 timestamped = monitors_util.prepend_timestamp(
30 self.msg, self.format)
31 self.assertEquals(
32 '%s\t%s' % (self.formatted_time_tuple, self.msg), timestamped)
33
34
35 def test_write_logline_with_timestamp(self):
36 logfile = StringIO.StringIO()
37 monitors_util.write_logline(logfile, self.msg, self.format)
38 logfile.seek(0)
39 written = logfile.read()
40 self.assertEquals(
41 '%s\t%s\n' % (self.formatted_time_tuple, self.msg), written)
42
43
44 def test_write_logline_without_timestamp(self):
45 logfile = StringIO.StringIO()
46 monitors_util.write_logline(logfile, self.msg)
47 logfile.seek(0)
48 written = logfile.read()
49 self.assertEquals(
50 '%s\n' % self.msg, written)
51
52
53class AlertHooksTestCase(unittest.TestCase):
54 def setUp(self):
55 self.msg_template = 'alert yay %s haha %s'
56 self.params = ('foo', 'bar')
57 self.epoch_seconds = 1225501829.9300611
58 # Stub out time.time
59 self.orig_time = time.time
60 time.time = lambda: self.epoch_seconds
61
62
63 def tearDown(self):
64 time.time = self.orig_time
65
66
67 def test_make_alert(self):
68 warnfile = StringIO.StringIO()
jadmanskif37df842009-02-11 00:03:26 +000069 alert = monitors_util.make_alert(warnfile, "MSGTYPE",
70 self.msg_template)
mblighe48bcfb2008-11-11 17:09:44 +000071 alert(*self.params)
72 warnfile.seek(0)
73 written = warnfile.read()
74 ts = str(int(self.epoch_seconds))
jadmanskif37df842009-02-11 00:03:26 +000075 expected = '%s\tMSGTYPE\t%s\n' % (ts, self.msg_template % self.params)
76 self.assertEquals(expected, written)
mblighe48bcfb2008-11-11 17:09:44 +000077
78
79 def test_build_alert_hooks(self):
80 warnfile = StringIO.StringIO()
81 patterns_file = InlineStringIO("""
jadmanskif37df842009-02-11 00:03:26 +000082 BUG
mblighe48bcfb2008-11-11 17:09:44 +000083 ^.*Kernel panic ?(.*)
84 machine panic'd (%s)
85
jadmanskif37df842009-02-11 00:03:26 +000086 BUG
mblighe48bcfb2008-11-11 17:09:44 +000087 ^.*Oops ?(.*)
88 machine Oops'd (%s)
89 """)
90 hooks = monitors_util.build_alert_hooks(patterns_file, warnfile)
91 self.assertEquals(len(hooks), 2)
92
93
94class ProcessInputTestCase(unittest.TestCase):
95 def test_process_input_simple(self):
96 input = InlineStringIO("""
97 woo yay
98 this is a line
99 booya
100 """)
101 logfile = StringIO.StringIO()
102 monitors_util.process_input(input, logfile)
103 input.seek(0)
104 logfile.seek(0)
105
106 self.assertEquals(
107 '%s\n%s\n' % (input.read(), monitors_util.TERM_MSG),
108 logfile.read())
109
110
111class FollowFilesTestCase(unittest.TestCase):
112 def setUp(self):
113 self.logfile_dirpath = tempfile.mkdtemp()
114 self.logfile_path = os.path.join(self.logfile_dirpath, 'messages')
115 self.firstline = 'bip\n'
116 self.lastline_seen = 'wooo\n'
117 self.line_after_lastline_seen = 'yeah\n'
118 self.lastline = 'pow\n'
119
120 self.logfile = open(self.logfile_path, 'w')
121 self.logfile.write(self.firstline)
122 self.logfile.write(self.lastline_seen)
123 self.logfile.write(self.line_after_lastline_seen) # 3
124 self.logfile.write('man\n') # 2
125 self.logfile.write(self.lastline) # 1
126 self.logfile.close()
127
128 self.lastlines_dirpath = tempfile.mkdtemp()
129 monitors_util.write_lastlines_file(
130 self.lastlines_dirpath, self.logfile_path, self.lastline_seen)
131
132
133 def test_lookup_lastlines(self):
134 reverse_lineno = monitors_util.lookup_lastlines(
135 self.lastlines_dirpath, self.logfile_path)
136 self.assertEquals(reverse_lineno, 3)
137
138
139 def test_nonblocking(self):
140 po = subprocess.Popen('echo', stdout=subprocess.PIPE)
141 flags = fcntl.fcntl(po.stdout, fcntl.F_GETFL)
142 self.assertEquals(flags, 0)
143 monitors_util.nonblocking(po.stdout)
144 flags = fcntl.fcntl(po.stdout, fcntl.F_GETFL)
145 self.assertEquals(flags, 2048)
146 po.wait()
147
148
149 def test_follow_files_nostate(self):
150 follow_paths = [self.logfile_path]
151 lastlines_dirpath = tempfile.mkdtemp()
152 procs, pipes = monitors_util.launch_tails(
153 follow_paths, lastlines_dirpath)
154 lines, bad_pipes = monitors_util.poll_tail_pipes(
155 pipes, lastlines_dirpath)
156 first_shouldmatch = '[%s]\t%s' % (
157 self.logfile_path, self.lastline)
158 self.assertEquals(lines[0], first_shouldmatch)
159 monitors_util.snuff(procs.values())
160
161
162 def test_follow_files(self):
163 follow_paths = [self.logfile_path]
164 procs, pipes = monitors_util.launch_tails(
165 follow_paths, self.lastlines_dirpath)
166 lines, bad_pipes = monitors_util.poll_tail_pipes(
167 pipes, self.lastlines_dirpath)
168 first_shouldmatch = '[%s]\t%s' % (
169 self.logfile_path, self.line_after_lastline_seen)
170 self.assertEquals(lines[0], first_shouldmatch)
171 monitors_util.snuff(procs.values())
172 last_shouldmatch = '[%s]\t%s' % (self.logfile_path, self.lastline)
173 self.assertEquals(lines[-1], last_shouldmatch)
174
175
176if __name__ == '__main__':
177 unittest.main()