blob: d0ff92845a41fd4ba53eb0f33112a473d1d04bbb [file] [log] [blame]
Mike Frysingerd03e6b52019-08-03 12:49:01 -04001#!/usr/bin/env python2
Chris Chingdf9a8ae2017-05-10 00:46:01 -06002# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Standalone service to monitor AFE servers and report to ts_mon"""
Mike Frysinger990f4d92020-02-06 15:21:32 -05007
8import argparse
Chris Chingdf9a8ae2017-05-10 00:46:01 -06009import sys
10import time
Aviv Keshetec44c102018-04-02 10:27:38 -070011import logging
Chris Chingdf9a8ae2017-05-10 00:46:01 -060012import multiprocessing
13import urllib2
14
15import common
16from autotest_lib.client.common_lib import global_config
17from autotest_lib.frontend.afe.json_rpc import proxy
18from autotest_lib.server import frontend
Chris Chingb311a422017-06-13 11:21:11 -060019# import needed to setup host_attributes
20# pylint: disable=unused-import
21from autotest_lib.server import site_host_attributes
22from autotest_lib.site_utils import server_manager_utils
Chris Chingdf9a8ae2017-05-10 00:46:01 -060023from chromite.lib import metrics
24from chromite.lib import ts_mon_config
25
26METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc'
27METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations'
28METRIC_TICK = METRIC_ROOT + '/tick'
29METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error'
30
31FAILURE_REASONS = {
32 proxy.JSONRPCException: 'JSONRPCException',
33 }
34
Chris Chingdf9a8ae2017-05-10 00:46:01 -060035def afe_rpc_call(hostname):
36 """Perform one rpc call set on server
37
38 @param hostname: server's hostname to poll
39 """
40 afe_monitor = AfeMonitor(hostname)
41 try:
42 afe_monitor.run()
43 except Exception as e:
Chris Ching1c0fe8b2017-06-01 15:29:16 -060044 metrics.Counter(METRIC_MONITOR_ERROR).increment(
45 fields={'target_hostname': hostname})
Aviv Keshetec44c102018-04-02 10:27:38 -070046 logging.exception('Exception when running against host %s', hostname)
Chris Chingdf9a8ae2017-05-10 00:46:01 -060047
48
Chris Chingb311a422017-06-13 11:21:11 -060049def update_shards(shards, shards_lock, period=600, stop_event=None):
50 """Updates dict of shards
51
52 @param shards: list of shards to be updated
53 @param shards_lock: shared lock for accessing shards
54 @param period: time between polls
55 @param stop_event: Event that can be set to stop polling
56 """
57 while(not stop_event or not stop_event.is_set()):
58 start_time = time.time()
59
60 logging.debug('Updating Shards')
61 new_shards = set(server_manager_utils.get_shards())
62
63 with shards_lock:
64 current_shards = set(shards)
65 rm_shards = current_shards - new_shards
66 add_shards = new_shards - current_shards
67
68 if rm_shards:
69 for s in rm_shards:
70 shards.remove(s)
71
72 if add_shards:
73 shards.extend(add_shards)
74
75 if rm_shards:
76 logging.info('Servers left production: %s', str(rm_shards))
77
78 if add_shards:
79 logging.info('Servers entered production: %s',
80 str(add_shards))
81
82 wait_time = (start_time + period) - time.time()
83 if wait_time > 0:
84 time.sleep(wait_time)
85
86
87def poll_rpc_servers(servers, servers_lock, shards=None, period=60,
88 stop_event=None):
89 """Blocking function that polls all servers and shards
90
91 @param servers: list of servers to poll
92 @param servers_lock: lock to be used when accessing servers or shards
93 @param shards: list of shards to poll
94 @param period: time between polls
95 @param stop_event: Event that can be set to stop polling
96 """
97 pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4)
98
99 while(not stop_event or not stop_event.is_set()):
100 start_time = time.time()
101 with servers_lock:
102 all_servers = set(servers).union(shards)
103
104 logging.debug('Starting Server Polling: %s', ', '.join(all_servers))
105 pool.map(afe_rpc_call, all_servers)
106
107 logging.debug('Finished Server Polling')
108
109 metrics.Counter(METRIC_TICK).increment()
110
111 wait_time = (start_time + period) - time.time()
112 if wait_time > 0:
113 time.sleep(wait_time)
114
115
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600116class RpcFlightRecorder(object):
117 """Monitors a list of AFE"""
Chris Chingb311a422017-06-13 11:21:11 -0600118 def __init__(self, servers, with_shards=True, poll_period=60):
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600119 """
Chris Chingb311a422017-06-13 11:21:11 -0600120 @param servers: list of afe services to monitor
121 @param with_shards: also record status on shards
122 @param poll_period: frequency to poll all services, in seconds
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600123 """
Chris Chingb311a422017-06-13 11:21:11 -0600124 self._manager = multiprocessing.Manager()
125
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600126 self._poll_period = poll_period
Chris Chingb311a422017-06-13 11:21:11 -0600127
128 self._servers = self._manager.list(servers)
129 self._servers_lock = self._manager.RLock()
130
131 self._with_shards = with_shards
132 self._shards = self._manager.list()
133 self._update_shards_ps = None
134 self._poll_rpc_server_ps = None
135
136 self._stop_event = multiprocessing.Event()
137
138 def start(self):
139 """Call to start recorder"""
140 if(self._with_shards):
141 shard_args = [self._shards, self._servers_lock]
142 shard_kwargs = {'stop_event': self._stop_event}
143 self._update_shards_ps = multiprocessing.Process(
144 name='update_shards',
145 target=update_shards,
146 args=shard_args,
147 kwargs=shard_kwargs)
148
149 self._update_shards_ps.start()
150
151 poll_args = [self._servers, self._servers_lock]
152 poll_kwargs= {'shards':self._shards,
153 'period':self._poll_period,
154 'stop_event':self._stop_event}
155 self._poll_rpc_server_ps = multiprocessing.Process(
156 name='poll_rpc_servers',
157 target=poll_rpc_servers,
158 args=poll_args,
159 kwargs=poll_kwargs)
160
161 self._poll_rpc_server_ps.start()
162
163 def close(self):
164 """Send close event to all sub processes"""
165 self._stop_event.set()
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600166
167
Chris Chingb311a422017-06-13 11:21:11 -0600168 def termitate(self):
169 """Terminate processes"""
170 self.close()
171 if self._poll_rpc_server_ps:
172 self._poll_rpc_server_ps.terminate()
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600173
Chris Chingb311a422017-06-13 11:21:11 -0600174 if self._update_shards_ps:
175 self._update_shards_ps.terminate()
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600176
Chris Chingb311a422017-06-13 11:21:11 -0600177 if self._manager:
178 self._manager.shutdown()
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600179
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600180
Chris Chingb311a422017-06-13 11:21:11 -0600181 def join(self, timeout=None):
182 """Blocking call until closed and processes complete
183
184 @param timeout: passed to each process, so could be >timeout"""
185 if self._poll_rpc_server_ps:
186 self._poll_rpc_server_ps.join(timeout)
187
188 if self._update_shards_ps:
189 self._update_shards_ps.join(timeout)
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600190
Chris Ching43fdebc2017-06-15 11:05:38 -0600191def _failed(fields, msg_str, reason, err=None):
192 """Mark current run failed
193
194 @param fields, ts_mon fields to mark as failed
195 @param msg_str, message string to be filled
196 @param reason: why it failed
197 @param err: optional error to log more debug info
198 """
199 fields['success'] = False
200 fields['failure_reason'] = reason
201 logging.warning("%s failed - %s", msg_str, reason)
202 if err:
203 logging.debug("%s fail_err - %s", msg_str, str(err))
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600204
205class AfeMonitor(object):
206 """Object that runs rpc calls against the given afe frontend"""
207
208 def __init__(self, hostname):
209 """
210 @param hostname: hostname of server to monitor, string
211 """
212 self._hostname = hostname
213 self._afe = frontend.AFE(server=self._hostname)
214 self._metric_fields = {'target_hostname': self._hostname}
215
216
Chris Ching43fdebc2017-06-15 11:05:38 -0600217 def run_cmd(self, cmd, expected=None):
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600218 """Runs rpc command and log metrics
219
220 @param cmd: string of rpc command to send
Chris Ching43fdebc2017-06-15 11:05:38 -0600221 @param expected: expected result of rpc
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600222 """
223 metric_fields = self._metric_fields.copy()
224 metric_fields['command'] = cmd
Chris Ching43fdebc2017-06-15 11:05:38 -0600225 metric_fields['success'] = True
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600226 metric_fields['failure_reason'] = ''
227
228 with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS,
Aviv Keshete5a4c4c2017-10-05 19:14:43 -0700229 fields=dict(metric_fields), scale=0.001) as f:
Chris Ching43fdebc2017-06-15 11:05:38 -0600230
231 msg_str = "%s:%s" % (self._hostname, cmd)
232
233
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600234 try:
235 result = self._afe.run(cmd)
Chris Ching43fdebc2017-06-15 11:05:38 -0600236 logging.debug("%s result = %s", msg_str, result)
237 if expected is not None and expected != result:
238 _failed(f, msg_str, 'IncorrectResponse')
239
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600240 except urllib2.HTTPError as e:
Chris Ching43fdebc2017-06-15 11:05:38 -0600241 _failed(f, msg_str, 'HTTPError:%d' % e.code)
242
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600243 except Exception as e:
Chris Ching43fdebc2017-06-15 11:05:38 -0600244 _failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'),
245 err=e)
246
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600247 if type(e) not in FAILURE_REASONS:
248 raise
249
Chris Ching43fdebc2017-06-15 11:05:38 -0600250 if f['success']:
251 logging.info("%s success", msg_str)
252
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600253
254 def run(self):
255 """Tests server and returns the result"""
Chris Ching43fdebc2017-06-15 11:05:38 -0600256 self.run_cmd('get_server_time')
257 self.run_cmd('ping_db', [True])
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600258
259
260def get_parser():
261 """Returns argparse parser"""
Mike Frysinger990f4d92020-02-06 15:21:32 -0500262 parser = argparse.ArgumentParser(description=__doc__)
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600263
264 parser.add_argument('-a', '--afe', action='append', default=[],
265 help='Autotest FrontEnd server to monitor')
266
267 parser.add_argument('-p', '--poll-period', type=int, default=60,
268 help='Frequency to poll AFE servers')
Chris Chingb311a422017-06-13 11:21:11 -0600269
270 parser.add_argument('--no-shards', action='store_false', dest='with_shards',
271 help='Disable shard updating')
272
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600273 return parser
274
275
276def main(argv):
277 """Main function
278
279 @param argv: commandline arguments passed
280 """
281 parser = get_parser()
282 options = parser.parse_args(argv[1:])
283
284
285 if not options.afe:
286 options.afe = [global_config.global_config.get_config_value(
287 'SERVER', 'global_afe_hostname', default='cautotest')]
288
289 with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder',
290 indirect=True):
Chris Chingb311a422017-06-13 11:21:11 -0600291 flight_recorder = RpcFlightRecorder(options.afe,
292 with_shards=options.with_shards,
293 poll_period=options.poll_period)
294
295 flight_recorder.start()
296 flight_recorder.join()
Chris Chingdf9a8ae2017-05-10 00:46:01 -0600297
298
299if __name__ == '__main__':
300 main(sys.argv)