blob: 72e41d62163f54fd311321fa61473438192c75b6 [file] [log] [blame]
Chris Chingdf9a8ae2017-05-10 00:46:01 -06001#!/usr/bin/env python
2# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Standalone service to monitor AFE servers and report to ts_mon"""
7import sys
8import time
Aviv Keshetec44c102018-04-02 10:27:38 -07009import logging
Chris Chingdf9a8ae2017-05-10 00:46:01 -060010import multiprocessing
11import urllib2
12
13import common
14from autotest_lib.client.common_lib import global_config
15from autotest_lib.frontend.afe.json_rpc import proxy
16from autotest_lib.server import frontend
Chris Chingb311a422017-06-13 11:21:11 -060017# import needed to setup host_attributes
18# pylint: disable=unused-import
19from autotest_lib.server import site_host_attributes
20from autotest_lib.site_utils import server_manager_utils
Chris Chingdf9a8ae2017-05-10 00:46:01 -060021from chromite.lib import commandline
Chris Chingdf9a8ae2017-05-10 00:46:01 -060022from chromite.lib import metrics
23from chromite.lib import ts_mon_config
24
25METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc'
26METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations'
27METRIC_TICK = METRIC_ROOT + '/tick'
28METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error'
29
30FAILURE_REASONS = {
31 proxy.JSONRPCException: 'JSONRPCException',
32 }
33
Chris Chingdf9a8ae2017-05-10 00:46:01 -060034def afe_rpc_call(hostname):
35 """Perform one rpc call set on server
36
37 @param hostname: server's hostname to poll
38 """
39 afe_monitor = AfeMonitor(hostname)
40 try:
41 afe_monitor.run()
42 except Exception as e:
Chris Ching1c0fe8b2017-06-01 15:29:16 -060043 metrics.Counter(METRIC_MONITOR_ERROR).increment(
44 fields={'target_hostname': hostname})
Aviv Keshetec44c102018-04-02 10:27:38 -070045 logging.exception('Exception when running against host %s', hostname)
Chris Chingdf9a8ae2017-05-10 00:46:01 -060046
47
Chris Chingb311a422017-06-13 11:21:11 -060048def update_shards(shards, shards_lock, period=600, stop_event=None):
49 """Updates dict of shards
50
51 @param shards: list of shards to be updated
52 @param shards_lock: shared lock for accessing shards
53 @param period: time between polls
54 @param stop_event: Event that can be set to stop polling
55 """
56 while(not stop_event or not stop_event.is_set()):
57 start_time = time.time()
58
59 logging.debug('Updating Shards')
60 new_shards = set(server_manager_utils.get_shards())
61
62 with shards_lock:
63 current_shards = set(shards)
64 rm_shards = current_shards - new_shards
65 add_shards = new_shards - current_shards
66
67 if rm_shards:
68 for s in rm_shards:
69 shards.remove(s)
70
71 if add_shards:
72 shards.extend(add_shards)
73
74 if rm_shards:
75 logging.info('Servers left production: %s', str(rm_shards))
76
77 if add_shards:
78 logging.info('Servers entered production: %s',
79 str(add_shards))
80
81 wait_time = (start_time + period) - time.time()
82 if wait_time > 0:
83 time.sleep(wait_time)
84
85
86def poll_rpc_servers(servers, servers_lock, shards=None, period=60,
87 stop_event=None):
88 """Blocking function that polls all servers and shards
89
90 @param servers: list of servers to poll
91 @param servers_lock: lock to be used when accessing servers or shards
92 @param shards: list of shards to poll
93 @param period: time between polls
94 @param stop_event: Event that can be set to stop polling
95 """
96 pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4)
97
98 while(not stop_event or not stop_event.is_set()):
99 start_time = time.time()
100 with servers_lock:
101 all_servers = set(servers).union(shards)
102
103 logging.debug('Starting Server Polling: %s', ', '.join(all_servers))
104 pool.map(afe_rpc_call, all_servers)
105
106 logging.debug('Finished Server Polling')
107
108 metrics.Counter(METRIC_TICK).increment()
109
110 wait_time = (start_time + period) - time.time()
111 if wait_time > 0:
112 time.sleep(wait_time)
113
114
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600115class RpcFlightRecorder(object):
116 """Monitors a list of AFE"""
Chris Chingb311a422017-06-13 11:21:11 -0600117 def __init__(self, servers, with_shards=True, poll_period=60):
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600118 """
Chris Chingb311a422017-06-13 11:21:11 -0600119 @param servers: list of afe services to monitor
120 @param with_shards: also record status on shards
121 @param poll_period: frequency to poll all services, in seconds
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600122 """
Chris Chingb311a422017-06-13 11:21:11 -0600123 self._manager = multiprocessing.Manager()
124
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600125 self._poll_period = poll_period
Chris Chingb311a422017-06-13 11:21:11 -0600126
127 self._servers = self._manager.list(servers)
128 self._servers_lock = self._manager.RLock()
129
130 self._with_shards = with_shards
131 self._shards = self._manager.list()
132 self._update_shards_ps = None
133 self._poll_rpc_server_ps = None
134
135 self._stop_event = multiprocessing.Event()
136
137 def start(self):
138 """Call to start recorder"""
139 if(self._with_shards):
140 shard_args = [self._shards, self._servers_lock]
141 shard_kwargs = {'stop_event': self._stop_event}
142 self._update_shards_ps = multiprocessing.Process(
143 name='update_shards',
144 target=update_shards,
145 args=shard_args,
146 kwargs=shard_kwargs)
147
148 self._update_shards_ps.start()
149
150 poll_args = [self._servers, self._servers_lock]
151 poll_kwargs= {'shards':self._shards,
152 'period':self._poll_period,
153 'stop_event':self._stop_event}
154 self._poll_rpc_server_ps = multiprocessing.Process(
155 name='poll_rpc_servers',
156 target=poll_rpc_servers,
157 args=poll_args,
158 kwargs=poll_kwargs)
159
160 self._poll_rpc_server_ps.start()
161
162 def close(self):
163 """Send close event to all sub processes"""
164 self._stop_event.set()
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600165
166
Chris Chingb311a422017-06-13 11:21:11 -0600167 def termitate(self):
168 """Terminate processes"""
169 self.close()
170 if self._poll_rpc_server_ps:
171 self._poll_rpc_server_ps.terminate()
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600172
Chris Chingb311a422017-06-13 11:21:11 -0600173 if self._update_shards_ps:
174 self._update_shards_ps.terminate()
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600175
Chris Chingb311a422017-06-13 11:21:11 -0600176 if self._manager:
177 self._manager.shutdown()
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600178
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600179
Chris Chingb311a422017-06-13 11:21:11 -0600180 def join(self, timeout=None):
181 """Blocking call until closed and processes complete
182
183 @param timeout: passed to each process, so could be >timeout"""
184 if self._poll_rpc_server_ps:
185 self._poll_rpc_server_ps.join(timeout)
186
187 if self._update_shards_ps:
188 self._update_shards_ps.join(timeout)
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600189
Chris Ching43fdebc2017-06-15 11:05:38 -0600190def _failed(fields, msg_str, reason, err=None):
191 """Mark current run failed
192
193 @param fields, ts_mon fields to mark as failed
194 @param msg_str, message string to be filled
195 @param reason: why it failed
196 @param err: optional error to log more debug info
197 """
198 fields['success'] = False
199 fields['failure_reason'] = reason
200 logging.warning("%s failed - %s", msg_str, reason)
201 if err:
202 logging.debug("%s fail_err - %s", msg_str, str(err))
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600203
204class AfeMonitor(object):
205 """Object that runs rpc calls against the given afe frontend"""
206
207 def __init__(self, hostname):
208 """
209 @param hostname: hostname of server to monitor, string
210 """
211 self._hostname = hostname
212 self._afe = frontend.AFE(server=self._hostname)
213 self._metric_fields = {'target_hostname': self._hostname}
214
215
Chris Ching43fdebc2017-06-15 11:05:38 -0600216 def run_cmd(self, cmd, expected=None):
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600217 """Runs rpc command and log metrics
218
219 @param cmd: string of rpc command to send
Chris Ching43fdebc2017-06-15 11:05:38 -0600220 @param expected: expected result of rpc
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600221 """
222 metric_fields = self._metric_fields.copy()
223 metric_fields['command'] = cmd
Chris Ching43fdebc2017-06-15 11:05:38 -0600224 metric_fields['success'] = True
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600225 metric_fields['failure_reason'] = ''
226
227 with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS,
Aviv Keshete5a4c4c2017-10-05 19:14:43 -0700228 fields=dict(metric_fields), scale=0.001) as f:
Chris Ching43fdebc2017-06-15 11:05:38 -0600229
230 msg_str = "%s:%s" % (self._hostname, cmd)
231
232
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600233 try:
234 result = self._afe.run(cmd)
Chris Ching43fdebc2017-06-15 11:05:38 -0600235 logging.debug("%s result = %s", msg_str, result)
236 if expected is not None and expected != result:
237 _failed(f, msg_str, 'IncorrectResponse')
238
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600239 except urllib2.HTTPError as e:
Chris Ching43fdebc2017-06-15 11:05:38 -0600240 _failed(f, msg_str, 'HTTPError:%d' % e.code)
241
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600242 except Exception as e:
Chris Ching43fdebc2017-06-15 11:05:38 -0600243 _failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'),
244 err=e)
245
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600246 if type(e) not in FAILURE_REASONS:
247 raise
248
Chris Ching43fdebc2017-06-15 11:05:38 -0600249 if f['success']:
250 logging.info("%s success", msg_str)
251
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600252
253 def run(self):
254 """Tests server and returns the result"""
Chris Ching43fdebc2017-06-15 11:05:38 -0600255 self.run_cmd('get_server_time')
256 self.run_cmd('ping_db', [True])
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600257
258
259def get_parser():
260 """Returns argparse parser"""
261 parser = commandline.ArgumentParser(description=__doc__)
262
263 parser.add_argument('-a', '--afe', action='append', default=[],
264 help='Autotest FrontEnd server to monitor')
265
266 parser.add_argument('-p', '--poll-period', type=int, default=60,
267 help='Frequency to poll AFE servers')
Chris Chingb311a422017-06-13 11:21:11 -0600268
269 parser.add_argument('--no-shards', action='store_false', dest='with_shards',
270 help='Disable shard updating')
271
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600272 return parser
273
274
275def main(argv):
276 """Main function
277
278 @param argv: commandline arguments passed
279 """
280 parser = get_parser()
281 options = parser.parse_args(argv[1:])
282
283
284 if not options.afe:
285 options.afe = [global_config.global_config.get_config_value(
286 'SERVER', 'global_afe_hostname', default='cautotest')]
287
288 with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder',
289 indirect=True):
Chris Chingb311a422017-06-13 11:21:11 -0600290 flight_recorder = RpcFlightRecorder(options.afe,
291 with_shards=options.with_shards,
292 poll_period=options.poll_period)
293
294 flight_recorder.start()
295 flight_recorder.join()
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600296
297
298if __name__ == '__main__':
299 main(sys.argv)