Mike Frysinger | d03e6b5 | 2019-08-03 12:49:01 -0400 | [diff] [blame] | 1 | #!/usr/bin/env python2 |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 2 | # Copyright 2017 The Chromium OS Authors. All rights reserved. |
| 3 | # Use of this source code is governed by a BSD-style license that can be |
| 4 | # found in the LICENSE file. |
| 5 | |
| 6 | """Standalone service to monitor AFE servers and report to ts_mon""" |
Mike Frysinger | 990f4d9 | 2020-02-06 15:21:32 -0500 | [diff] [blame^] | 7 | |
| 8 | import argparse |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 9 | import sys |
| 10 | import time |
Aviv Keshet | ec44c10 | 2018-04-02 10:27:38 -0700 | [diff] [blame] | 11 | import logging |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 12 | import multiprocessing |
| 13 | import urllib2 |
| 14 | |
| 15 | import common |
| 16 | from autotest_lib.client.common_lib import global_config |
| 17 | from autotest_lib.frontend.afe.json_rpc import proxy |
| 18 | from autotest_lib.server import frontend |
Chris Ching | b311a42 | 2017-06-13 11:21:11 -0600 | [diff] [blame] | 19 | # import needed to setup host_attributes |
| 20 | # pylint: disable=unused-import |
| 21 | from autotest_lib.server import site_host_attributes |
| 22 | from autotest_lib.site_utils import server_manager_utils |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 23 | from chromite.lib import metrics |
| 24 | from chromite.lib import ts_mon_config |
| 25 | |
| 26 | METRIC_ROOT = 'chromeos/autotest/blackbox/afe_rpc' |
| 27 | METRIC_RPC_CALL_DURATIONS = METRIC_ROOT + '/rpc_call_durations' |
| 28 | METRIC_TICK = METRIC_ROOT + '/tick' |
| 29 | METRIC_MONITOR_ERROR = METRIC_ROOT + '/afe_monitor_error' |
| 30 | |
| 31 | FAILURE_REASONS = { |
| 32 | proxy.JSONRPCException: 'JSONRPCException', |
| 33 | } |
| 34 | |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 35 | def afe_rpc_call(hostname): |
| 36 | """Perform one rpc call set on server |
| 37 | |
| 38 | @param hostname: server's hostname to poll |
| 39 | """ |
| 40 | afe_monitor = AfeMonitor(hostname) |
| 41 | try: |
| 42 | afe_monitor.run() |
| 43 | except Exception as e: |
Chris Ching | 1c0fe8b | 2017-06-01 15:29:16 -0600 | [diff] [blame] | 44 | metrics.Counter(METRIC_MONITOR_ERROR).increment( |
| 45 | fields={'target_hostname': hostname}) |
Aviv Keshet | ec44c10 | 2018-04-02 10:27:38 -0700 | [diff] [blame] | 46 | logging.exception('Exception when running against host %s', hostname) |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 47 | |
| 48 | |
Chris Ching | b311a42 | 2017-06-13 11:21:11 -0600 | [diff] [blame] | 49 | def update_shards(shards, shards_lock, period=600, stop_event=None): |
| 50 | """Updates dict of shards |
| 51 | |
| 52 | @param shards: list of shards to be updated |
| 53 | @param shards_lock: shared lock for accessing shards |
| 54 | @param period: time between polls |
| 55 | @param stop_event: Event that can be set to stop polling |
| 56 | """ |
| 57 | while(not stop_event or not stop_event.is_set()): |
| 58 | start_time = time.time() |
| 59 | |
| 60 | logging.debug('Updating Shards') |
| 61 | new_shards = set(server_manager_utils.get_shards()) |
| 62 | |
| 63 | with shards_lock: |
| 64 | current_shards = set(shards) |
| 65 | rm_shards = current_shards - new_shards |
| 66 | add_shards = new_shards - current_shards |
| 67 | |
| 68 | if rm_shards: |
| 69 | for s in rm_shards: |
| 70 | shards.remove(s) |
| 71 | |
| 72 | if add_shards: |
| 73 | shards.extend(add_shards) |
| 74 | |
| 75 | if rm_shards: |
| 76 | logging.info('Servers left production: %s', str(rm_shards)) |
| 77 | |
| 78 | if add_shards: |
| 79 | logging.info('Servers entered production: %s', |
| 80 | str(add_shards)) |
| 81 | |
| 82 | wait_time = (start_time + period) - time.time() |
| 83 | if wait_time > 0: |
| 84 | time.sleep(wait_time) |
| 85 | |
| 86 | |
| 87 | def poll_rpc_servers(servers, servers_lock, shards=None, period=60, |
| 88 | stop_event=None): |
| 89 | """Blocking function that polls all servers and shards |
| 90 | |
| 91 | @param servers: list of servers to poll |
| 92 | @param servers_lock: lock to be used when accessing servers or shards |
| 93 | @param shards: list of shards to poll |
| 94 | @param period: time between polls |
| 95 | @param stop_event: Event that can be set to stop polling |
| 96 | """ |
| 97 | pool = multiprocessing.Pool(processes=multiprocessing.cpu_count() * 4) |
| 98 | |
| 99 | while(not stop_event or not stop_event.is_set()): |
| 100 | start_time = time.time() |
| 101 | with servers_lock: |
| 102 | all_servers = set(servers).union(shards) |
| 103 | |
| 104 | logging.debug('Starting Server Polling: %s', ', '.join(all_servers)) |
| 105 | pool.map(afe_rpc_call, all_servers) |
| 106 | |
| 107 | logging.debug('Finished Server Polling') |
| 108 | |
| 109 | metrics.Counter(METRIC_TICK).increment() |
| 110 | |
| 111 | wait_time = (start_time + period) - time.time() |
| 112 | if wait_time > 0: |
| 113 | time.sleep(wait_time) |
| 114 | |
| 115 | |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 116 | class RpcFlightRecorder(object): |
| 117 | """Monitors a list of AFE""" |
Chris Ching | b311a42 | 2017-06-13 11:21:11 -0600 | [diff] [blame] | 118 | def __init__(self, servers, with_shards=True, poll_period=60): |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 119 | """ |
Chris Ching | b311a42 | 2017-06-13 11:21:11 -0600 | [diff] [blame] | 120 | @param servers: list of afe services to monitor |
| 121 | @param with_shards: also record status on shards |
| 122 | @param poll_period: frequency to poll all services, in seconds |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 123 | """ |
Chris Ching | b311a42 | 2017-06-13 11:21:11 -0600 | [diff] [blame] | 124 | self._manager = multiprocessing.Manager() |
| 125 | |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 126 | self._poll_period = poll_period |
Chris Ching | b311a42 | 2017-06-13 11:21:11 -0600 | [diff] [blame] | 127 | |
| 128 | self._servers = self._manager.list(servers) |
| 129 | self._servers_lock = self._manager.RLock() |
| 130 | |
| 131 | self._with_shards = with_shards |
| 132 | self._shards = self._manager.list() |
| 133 | self._update_shards_ps = None |
| 134 | self._poll_rpc_server_ps = None |
| 135 | |
| 136 | self._stop_event = multiprocessing.Event() |
| 137 | |
| 138 | def start(self): |
| 139 | """Call to start recorder""" |
| 140 | if(self._with_shards): |
| 141 | shard_args = [self._shards, self._servers_lock] |
| 142 | shard_kwargs = {'stop_event': self._stop_event} |
| 143 | self._update_shards_ps = multiprocessing.Process( |
| 144 | name='update_shards', |
| 145 | target=update_shards, |
| 146 | args=shard_args, |
| 147 | kwargs=shard_kwargs) |
| 148 | |
| 149 | self._update_shards_ps.start() |
| 150 | |
| 151 | poll_args = [self._servers, self._servers_lock] |
| 152 | poll_kwargs= {'shards':self._shards, |
| 153 | 'period':self._poll_period, |
| 154 | 'stop_event':self._stop_event} |
| 155 | self._poll_rpc_server_ps = multiprocessing.Process( |
| 156 | name='poll_rpc_servers', |
| 157 | target=poll_rpc_servers, |
| 158 | args=poll_args, |
| 159 | kwargs=poll_kwargs) |
| 160 | |
| 161 | self._poll_rpc_server_ps.start() |
| 162 | |
| 163 | def close(self): |
| 164 | """Send close event to all sub processes""" |
| 165 | self._stop_event.set() |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 166 | |
| 167 | |
Chris Ching | b311a42 | 2017-06-13 11:21:11 -0600 | [diff] [blame] | 168 | def termitate(self): |
| 169 | """Terminate processes""" |
| 170 | self.close() |
| 171 | if self._poll_rpc_server_ps: |
| 172 | self._poll_rpc_server_ps.terminate() |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 173 | |
Chris Ching | b311a42 | 2017-06-13 11:21:11 -0600 | [diff] [blame] | 174 | if self._update_shards_ps: |
| 175 | self._update_shards_ps.terminate() |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 176 | |
Chris Ching | b311a42 | 2017-06-13 11:21:11 -0600 | [diff] [blame] | 177 | if self._manager: |
| 178 | self._manager.shutdown() |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 179 | |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 180 | |
Chris Ching | b311a42 | 2017-06-13 11:21:11 -0600 | [diff] [blame] | 181 | def join(self, timeout=None): |
| 182 | """Blocking call until closed and processes complete |
| 183 | |
| 184 | @param timeout: passed to each process, so could be >timeout""" |
| 185 | if self._poll_rpc_server_ps: |
| 186 | self._poll_rpc_server_ps.join(timeout) |
| 187 | |
| 188 | if self._update_shards_ps: |
| 189 | self._update_shards_ps.join(timeout) |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 190 | |
Chris Ching | 43fdebc | 2017-06-15 11:05:38 -0600 | [diff] [blame] | 191 | def _failed(fields, msg_str, reason, err=None): |
| 192 | """Mark current run failed |
| 193 | |
| 194 | @param fields, ts_mon fields to mark as failed |
| 195 | @param msg_str, message string to be filled |
| 196 | @param reason: why it failed |
| 197 | @param err: optional error to log more debug info |
| 198 | """ |
| 199 | fields['success'] = False |
| 200 | fields['failure_reason'] = reason |
| 201 | logging.warning("%s failed - %s", msg_str, reason) |
| 202 | if err: |
| 203 | logging.debug("%s fail_err - %s", msg_str, str(err)) |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 204 | |
| 205 | class AfeMonitor(object): |
| 206 | """Object that runs rpc calls against the given afe frontend""" |
| 207 | |
| 208 | def __init__(self, hostname): |
| 209 | """ |
| 210 | @param hostname: hostname of server to monitor, string |
| 211 | """ |
| 212 | self._hostname = hostname |
| 213 | self._afe = frontend.AFE(server=self._hostname) |
| 214 | self._metric_fields = {'target_hostname': self._hostname} |
| 215 | |
| 216 | |
Chris Ching | 43fdebc | 2017-06-15 11:05:38 -0600 | [diff] [blame] | 217 | def run_cmd(self, cmd, expected=None): |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 218 | """Runs rpc command and log metrics |
| 219 | |
| 220 | @param cmd: string of rpc command to send |
Chris Ching | 43fdebc | 2017-06-15 11:05:38 -0600 | [diff] [blame] | 221 | @param expected: expected result of rpc |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 222 | """ |
| 223 | metric_fields = self._metric_fields.copy() |
| 224 | metric_fields['command'] = cmd |
Chris Ching | 43fdebc | 2017-06-15 11:05:38 -0600 | [diff] [blame] | 225 | metric_fields['success'] = True |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 226 | metric_fields['failure_reason'] = '' |
| 227 | |
| 228 | with metrics.SecondsTimer(METRIC_RPC_CALL_DURATIONS, |
Aviv Keshet | e5a4c4c | 2017-10-05 19:14:43 -0700 | [diff] [blame] | 229 | fields=dict(metric_fields), scale=0.001) as f: |
Chris Ching | 43fdebc | 2017-06-15 11:05:38 -0600 | [diff] [blame] | 230 | |
| 231 | msg_str = "%s:%s" % (self._hostname, cmd) |
| 232 | |
| 233 | |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 234 | try: |
| 235 | result = self._afe.run(cmd) |
Chris Ching | 43fdebc | 2017-06-15 11:05:38 -0600 | [diff] [blame] | 236 | logging.debug("%s result = %s", msg_str, result) |
| 237 | if expected is not None and expected != result: |
| 238 | _failed(f, msg_str, 'IncorrectResponse') |
| 239 | |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 240 | except urllib2.HTTPError as e: |
Chris Ching | 43fdebc | 2017-06-15 11:05:38 -0600 | [diff] [blame] | 241 | _failed(f, msg_str, 'HTTPError:%d' % e.code) |
| 242 | |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 243 | except Exception as e: |
Chris Ching | 43fdebc | 2017-06-15 11:05:38 -0600 | [diff] [blame] | 244 | _failed(f, msg_str, FAILURE_REASONS.get(type(e), 'Unknown'), |
| 245 | err=e) |
| 246 | |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 247 | if type(e) not in FAILURE_REASONS: |
| 248 | raise |
| 249 | |
Chris Ching | 43fdebc | 2017-06-15 11:05:38 -0600 | [diff] [blame] | 250 | if f['success']: |
| 251 | logging.info("%s success", msg_str) |
| 252 | |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 253 | |
| 254 | def run(self): |
| 255 | """Tests server and returns the result""" |
Chris Ching | 43fdebc | 2017-06-15 11:05:38 -0600 | [diff] [blame] | 256 | self.run_cmd('get_server_time') |
| 257 | self.run_cmd('ping_db', [True]) |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 258 | |
| 259 | |
| 260 | def get_parser(): |
| 261 | """Returns argparse parser""" |
Mike Frysinger | 990f4d9 | 2020-02-06 15:21:32 -0500 | [diff] [blame^] | 262 | parser = argparse.ArgumentParser(description=__doc__) |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 263 | |
| 264 | parser.add_argument('-a', '--afe', action='append', default=[], |
| 265 | help='Autotest FrontEnd server to monitor') |
| 266 | |
| 267 | parser.add_argument('-p', '--poll-period', type=int, default=60, |
| 268 | help='Frequency to poll AFE servers') |
Chris Ching | b311a42 | 2017-06-13 11:21:11 -0600 | [diff] [blame] | 269 | |
| 270 | parser.add_argument('--no-shards', action='store_false', dest='with_shards', |
| 271 | help='Disable shard updating') |
| 272 | |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 273 | return parser |
| 274 | |
| 275 | |
| 276 | def main(argv): |
| 277 | """Main function |
| 278 | |
| 279 | @param argv: commandline arguments passed |
| 280 | """ |
| 281 | parser = get_parser() |
| 282 | options = parser.parse_args(argv[1:]) |
| 283 | |
| 284 | |
| 285 | if not options.afe: |
| 286 | options.afe = [global_config.global_config.get_config_value( |
| 287 | 'SERVER', 'global_afe_hostname', default='cautotest')] |
| 288 | |
| 289 | with ts_mon_config.SetupTsMonGlobalState('rpc_flight_recorder', |
| 290 | indirect=True): |
Chris Ching | b311a42 | 2017-06-13 11:21:11 -0600 | [diff] [blame] | 291 | flight_recorder = RpcFlightRecorder(options.afe, |
| 292 | with_shards=options.with_shards, |
| 293 | poll_period=options.poll_period) |
| 294 | |
| 295 | flight_recorder.start() |
| 296 | flight_recorder.join() |
Chris Ching | df9a8ae | 2017-05-10 00:46:01 -0600 | [diff] [blame] | 297 | |
| 298 | |
| 299 | if __name__ == '__main__': |
| 300 | main(sys.argv) |