Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 1 | #!/usr/bin/python |
| 2 | #pylint: disable-msg=C0111 |
| 3 | |
| 4 | # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. |
| 5 | # Use of this source code is governed by a BSD-style license that can be |
| 6 | # found in the LICENSE file. |
| 7 | |
| 8 | import argparse |
Paul Hobbs | f536138 | 2018-03-05 11:56:56 -0800 | [diff] [blame] | 9 | import datetime |
MK Ryu | 8a43715 | 2015-07-20 14:25:39 -0700 | [diff] [blame] | 10 | import httplib |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 11 | import logging |
| 12 | import os |
Aviv Keshet | dfd1f52 | 2017-03-22 20:14:09 -0700 | [diff] [blame] | 13 | import random |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 14 | import signal |
| 15 | import time |
MK Ryu | 8a43715 | 2015-07-20 14:25:39 -0700 | [diff] [blame] | 16 | import urllib2 |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 17 | |
| 18 | import common |
Paul Hobbs | eedcb8b | 2016-10-05 16:44:27 -0700 | [diff] [blame] | 19 | |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 20 | from autotest_lib.frontend import setup_django_environment |
Aviv Keshet | 8a93beb | 2017-04-21 12:55:28 -0700 | [diff] [blame] | 21 | from autotest_lib.frontend.afe.json_rpc import proxy |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 22 | from autotest_lib.client.common_lib import error |
| 23 | from autotest_lib.client.common_lib import global_config |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 24 | from autotest_lib.frontend.afe import models |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 25 | from autotest_lib.scheduler import email_manager |
Prashanth Balasubramanian | 5949b4a | 2014-11-23 12:58:30 -0800 | [diff] [blame] | 26 | from autotest_lib.scheduler import scheduler_lib |
Jakob Juelich | 8421d59 | 2014-09-17 15:27:06 -0700 | [diff] [blame] | 27 | from autotest_lib.server.cros.dynamic_suite import frontend_wrappers |
Fang Deng | 0cb2a3b | 2015-12-10 17:59:00 -0800 | [diff] [blame] | 28 | from autotest_lib.server import utils as server_utils |
MK Ryu | 89cca5d | 2015-09-18 13:07:22 -0700 | [diff] [blame] | 29 | from chromite.lib import timeout_util |
Ningning Xia | cdc2310 | 2018-03-29 14:18:05 -0700 | [diff] [blame] | 30 | from django.core.exceptions import MultipleObjectsReturned |
Prashanth Balasubramanian | 75be1d3 | 2014-11-25 18:03:09 -0800 | [diff] [blame] | 31 | from django.db import transaction |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 32 | |
Dan Shi | 5e2efb7 | 2017-02-07 11:40:23 -0800 | [diff] [blame] | 33 | try: |
| 34 | from chromite.lib import metrics |
| 35 | from chromite.lib import ts_mon_config |
Paul Hobbs | f536138 | 2018-03-05 11:56:56 -0800 | [diff] [blame] | 36 | from infra_libs import ts_mon |
Dan Shi | 5e2efb7 | 2017-02-07 11:40:23 -0800 | [diff] [blame] | 37 | except ImportError: |
| 38 | metrics = server_utils.metrics_mock |
| 39 | ts_mon_config = server_utils.metrics_mock |
| 40 | |
| 41 | |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 42 | """ |
| 43 | Autotest shard client |
| 44 | |
| 45 | The shard client can be run as standalone service. It periodically polls the |
| 46 | master in a heartbeat, retrieves new jobs and hosts and inserts them into the |
| 47 | local database. |
| 48 | |
| 49 | A shard is set up (by a human) and pointed to the global AFE (cautotest). |
| 50 | On the shard, this script periodically makes so called heartbeat requests to the |
| 51 | global AFE, which will then complete the following actions: |
| 52 | |
| 53 | 1. Find the previously created (with atest) record for the shard. Shards are |
| 54 | identified by their hostnames, specified in the shadow_config. |
| 55 | 2. Take the records that were sent in the heartbeat and insert them into the |
| 56 | global database. |
| 57 | - This is to set the status of jobs to completed in the master database after |
| 58 | they were run by a slave. This is necessary so one can just look at the |
| 59 | master's afe to see the statuses of all jobs. Otherwise one would have to |
| 60 | check the tko tables or the individual slave AFEs. |
| 61 | 3. Find labels that have been assigned to this shard. |
Jakob Juelich | 1b52574 | 2014-09-30 13:08:07 -0700 | [diff] [blame] | 62 | 4. Assign hosts that: |
| 63 | - have the specified label |
| 64 | - aren't leased |
| 65 | - have an id which is not in the known_host_ids which were sent in the |
| 66 | heartbeat request. |
| 67 | 5. Assign jobs that: |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 68 | - depend on the specified label |
| 69 | - haven't been assigned before |
| 70 | - aren't started yet |
| 71 | - aren't completed yet |
Jakob Juelich | 1b52574 | 2014-09-30 13:08:07 -0700 | [diff] [blame] | 72 | - have an id which is not in the jobs_known_ids which were sent in the |
| 73 | heartbeat request. |
| 74 | 6. Serialize the chosen jobs and hosts. |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 75 | - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users, |
| 76 | and many more. Details about this can be found around |
| 77 | model_logic.serialize() |
| 78 | 7. Send these objects to the slave. |
| 79 | |
| 80 | |
| 81 | On the client side, this will happen: |
| 82 | 1. Deserialize the objects sent from the master and persist them to the local |
| 83 | database. |
| 84 | 2. monitor_db on the shard will pick up these jobs and schedule them on the |
| 85 | available hosts (which were retrieved from a heartbeat). |
| 86 | 3. Once a job is finished, it's shard_id is set to NULL |
| 87 | 4. The shard_client will pick up all jobs where shard_id=NULL and will |
| 88 | send them to the master in the request of the next heartbeat. |
| 89 | - The master will persist them as described earlier. |
| 90 | - the shard_id will be set back to the shard's id, so the record won't be |
| 91 | uploaded again. |
Jakob Juelich | 1b52574 | 2014-09-30 13:08:07 -0700 | [diff] [blame] | 92 | The heartbeat request will also contain the ids of incomplete jobs and the |
| 93 | ids of all hosts. This is used to not send objects repeatedly. For more |
| 94 | information on this and alternatives considered |
Allen Li | cdd00f2 | 2017-02-01 18:01:52 -0800 | [diff] [blame] | 95 | see rpc_interface.shard_heartbeat. |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 96 | """ |
| 97 | |
| 98 | |
| 99 | HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat' |
Aviv Keshet | 43dc40b | 2018-02-13 16:13:46 -0800 | [diff] [blame] | 100 | _METRICS_PREFIX = 'chromeos/autotest/shard_client/heartbeat/' |
Jakob Juelich | f960d89 | 2014-09-25 12:34:00 -0700 | [diff] [blame] | 101 | |
Jakob Juelich | 8421d59 | 2014-09-17 15:27:06 -0700 | [diff] [blame] | 102 | RPC_TIMEOUT_MIN = 5 |
| 103 | RPC_DELAY_SEC = 5 |
| 104 | |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 105 | _heartbeat_client = None |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 106 | |
| 107 | |
| 108 | class ShardClient(object): |
| 109 | """Performs client side tasks of sharding, i.e. the heartbeat. |
| 110 | |
Jakob Juelich | 8421d59 | 2014-09-17 15:27:06 -0700 | [diff] [blame] | 111 | This class contains the logic to do periodic heartbeats to a global AFE, |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 112 | to retrieve new jobs from it and to report completed jobs back. |
| 113 | """ |
| 114 | |
| 115 | def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec): |
Jakob Juelich | 8421d59 | 2014-09-17 15:27:06 -0700 | [diff] [blame] | 116 | self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname, |
| 117 | timeout_min=RPC_TIMEOUT_MIN, |
| 118 | delay_sec=RPC_DELAY_SEC) |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 119 | self.hostname = shard_hostname |
| 120 | self.tick_pause_sec = tick_pause_sec |
Prathmesh Prabhu | 01ef91b | 2018-01-24 17:47:05 -0800 | [diff] [blame] | 121 | self._shutdown_requested = False |
Jakob Juelich | 8421d59 | 2014-09-17 15:27:06 -0700 | [diff] [blame] | 122 | self._shard = None |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 123 | |
| 124 | |
MK Ryu | e72a90b | 2015-07-21 13:48:00 -0700 | [diff] [blame] | 125 | def _deserialize_many(self, serialized_list, djmodel, message): |
| 126 | """Deserialize data in JSON format to database. |
| 127 | |
| 128 | Deserialize a list of JSON-formatted data to database using Django. |
| 129 | |
| 130 | @param serialized_list: A list of JSON-formatted data. |
| 131 | @param djmodel: Django model type. |
| 132 | @param message: A string to be used in a logging message. |
| 133 | """ |
| 134 | for serialized in serialized_list: |
| 135 | with transaction.commit_on_success(): |
| 136 | try: |
| 137 | djmodel.deserialize(serialized) |
| 138 | except Exception as e: |
| 139 | logging.error('Deserializing a %s fails: %s, Error: %s', |
| 140 | message, serialized, e) |
Aviv Keshet | 75edb9c | 2016-10-06 22:12:36 -0700 | [diff] [blame] | 141 | metrics.Counter( |
| 142 | 'chromeos/autotest/shard_client/deserialization_failed' |
| 143 | ).increment() |
MK Ryu | e72a90b | 2015-07-21 13:48:00 -0700 | [diff] [blame] | 144 | |
| 145 | |
Prathmesh Prabhu | 6441807 | 2016-11-22 18:39:16 -0800 | [diff] [blame] | 146 | @metrics.SecondsTimerDecorator( |
| 147 | 'chromeos/autotest/shard_client/heartbeat_response_duration') |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 148 | def process_heartbeat_response(self, heartbeat_response): |
| 149 | """Save objects returned by a heartbeat to the local database. |
| 150 | |
| 151 | This deseralizes hosts and jobs including their dependencies and saves |
| 152 | them to the local database. |
| 153 | |
| 154 | @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs', |
| 155 | as returned by the `shard_heartbeat` rpc |
| 156 | call. |
| 157 | """ |
| 158 | hosts_serialized = heartbeat_response['hosts'] |
| 159 | jobs_serialized = heartbeat_response['jobs'] |
Fang Deng | f370599 | 2014-12-16 17:32:18 -0800 | [diff] [blame] | 160 | suite_keyvals_serialized = heartbeat_response['suite_keyvals'] |
Aviv Keshet | b9077b9 | 2017-03-23 00:40:32 -0700 | [diff] [blame] | 161 | incorrect_host_ids = heartbeat_response.get('incorrect_host_ids', []) |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 162 | |
Aviv Keshet | 75edb9c | 2016-10-06 22:12:36 -0700 | [diff] [blame] | 163 | metrics.Gauge('chromeos/autotest/shard_client/hosts_received' |
| 164 | ).set(len(hosts_serialized)) |
| 165 | metrics.Gauge('chromeos/autotest/shard_client/jobs_received' |
| 166 | ).set(len(jobs_serialized)) |
| 167 | metrics.Gauge('chromeos/autotest/shard_client/suite_keyvals_received' |
| 168 | ).set(len(suite_keyvals_serialized)) |
| 169 | |
MK Ryu | e72a90b | 2015-07-21 13:48:00 -0700 | [diff] [blame] | 170 | self._deserialize_many(hosts_serialized, models.Host, 'host') |
| 171 | self._deserialize_many(jobs_serialized, models.Job, 'job') |
| 172 | self._deserialize_many(suite_keyvals_serialized, models.JobKeyval, |
| 173 | 'jobkeyval') |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 174 | |
MK Ryu | 5cfd96a | 2015-01-30 15:31:23 -0800 | [diff] [blame] | 175 | host_ids = [h['id'] for h in hosts_serialized] |
| 176 | logging.info('Heartbeat response contains hosts %s', host_ids) |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 177 | job_ids = [j['id'] for j in jobs_serialized] |
| 178 | logging.info('Heartbeat response contains jobs %s', job_ids) |
Fang Deng | f370599 | 2014-12-16 17:32:18 -0800 | [diff] [blame] | 179 | parent_jobs_with_keyval = set([kv['job_id'] |
| 180 | for kv in suite_keyvals_serialized]) |
| 181 | logging.info('Heartbeat response contains suite_keyvals_for jobs %s', |
MK Ryu | e72a90b | 2015-07-21 13:48:00 -0700 | [diff] [blame] | 182 | list(parent_jobs_with_keyval)) |
Aviv Keshet | b9077b9 | 2017-03-23 00:40:32 -0700 | [diff] [blame] | 183 | if incorrect_host_ids: |
| 184 | logging.info('Heartbeat response contains incorrect_host_ids %s ' |
| 185 | 'which will be deleted.', incorrect_host_ids) |
| 186 | self._remove_incorrect_hosts(incorrect_host_ids) |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 187 | |
| 188 | # If the master has just sent any jobs that we think have completed, |
| 189 | # re-sync them with the master. This is especially useful when a |
| 190 | # heartbeat or job is silently dropped, as the next heartbeat will |
| 191 | # have a disagreement. Updating the shard_id to NULL will mark these |
| 192 | # jobs for upload on the next heartbeat. |
MK Ryu | e72a90b | 2015-07-21 13:48:00 -0700 | [diff] [blame] | 193 | job_models = models.Job.objects.filter( |
| 194 | id__in=job_ids, hostqueueentry__complete=True) |
| 195 | if job_models: |
| 196 | job_models.update(shard=None) |
| 197 | job_ids_repr = ', '.join([str(job.id) for job in job_models]) |
| 198 | logging.warn('Following completed jobs are reset shard_id to NULL ' |
| 199 | 'to be uploaded to master again: %s', job_ids_repr) |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 200 | |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 201 | |
Aviv Keshet | b9077b9 | 2017-03-23 00:40:32 -0700 | [diff] [blame] | 202 | def _remove_incorrect_hosts(self, incorrect_host_ids=None): |
| 203 | """Remove from local database any hosts that should not exist. |
| 204 | |
| 205 | Entries of |incorrect_host_ids| that are absent from database will be |
| 206 | silently ignored. |
| 207 | |
| 208 | @param incorrect_host_ids: a list of ids (as integers) to remove. |
| 209 | """ |
| 210 | if not incorrect_host_ids: |
| 211 | return |
| 212 | |
Ningning Xia | cdc2310 | 2018-03-29 14:18:05 -0700 | [diff] [blame] | 213 | try: |
| 214 | models.Host.objects.filter(id__in=incorrect_host_ids).delete() |
| 215 | except MultipleObjectsReturned as e: |
| 216 | logging.exception('Failed to remove incorrect hosts %s', |
| 217 | incorrect_host_ids) |
Aviv Keshet | b9077b9 | 2017-03-23 00:40:32 -0700 | [diff] [blame] | 218 | |
| 219 | |
Jakob Juelich | 8421d59 | 2014-09-17 15:27:06 -0700 | [diff] [blame] | 220 | @property |
| 221 | def shard(self): |
| 222 | """Return this shard's own shard object, fetched from the database. |
| 223 | |
| 224 | A shard's object is fetched from the master with the first jobs. It will |
| 225 | not exist before that time. |
| 226 | |
| 227 | @returns: The shard object if it already exists, otherwise None |
| 228 | """ |
| 229 | if self._shard is None: |
| 230 | try: |
| 231 | self._shard = models.Shard.smart_get(self.hostname) |
| 232 | except models.Shard.DoesNotExist: |
| 233 | # This might happen before any jobs are assigned to this shard. |
| 234 | # This is okay because then there is nothing to offload anyway. |
| 235 | pass |
| 236 | return self._shard |
| 237 | |
| 238 | |
| 239 | def _get_jobs_to_upload(self): |
| 240 | jobs = [] |
| 241 | # The scheduler sets shard to None upon completion of the job. |
| 242 | # For more information on the shard field's semantic see |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 243 | # models.Job.shard. We need to be careful to wait for both the |
| 244 | # shard_id and the complete bit here, or we will end up syncing |
| 245 | # the job without ever setting the complete bit. |
Jakob Juelich | 8421d59 | 2014-09-17 15:27:06 -0700 | [diff] [blame] | 246 | job_ids = list(models.Job.objects.filter( |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 247 | shard=None, |
| 248 | hostqueueentry__complete=True).values_list('pk', flat=True)) |
Jakob Juelich | 8421d59 | 2014-09-17 15:27:06 -0700 | [diff] [blame] | 249 | |
| 250 | for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all(): |
| 251 | jobs.append(job_to_upload) |
| 252 | return jobs |
| 253 | |
| 254 | |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 255 | def _mark_jobs_as_uploaded(self, job_ids): |
Jakob Juelich | 8421d59 | 2014-09-17 15:27:06 -0700 | [diff] [blame] | 256 | # self.shard might be None if no jobs were downloaded yet. |
| 257 | # But then job_ids is empty, so this is harmless. |
| 258 | # Even if there were jobs we'd in the worst case upload them twice. |
| 259 | models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard) |
| 260 | |
| 261 | |
| 262 | def _get_hqes_for_jobs(self, jobs): |
| 263 | hqes = [] |
| 264 | for job in jobs: |
| 265 | hqes.extend(job.hostqueueentry_set.all()) |
| 266 | return hqes |
| 267 | |
| 268 | |
MK Ryu | 07a109f | 2015-07-21 17:44:32 -0700 | [diff] [blame] | 269 | def _get_known_jobs_and_hosts(self): |
| 270 | """Returns lists of host and job info to send in a heartbeat. |
Jakob Juelich | 1b52574 | 2014-09-30 13:08:07 -0700 | [diff] [blame] | 271 | |
| 272 | The host and job ids are ids of objects that are already present on the |
| 273 | shard and therefore don't need to be sent again. |
| 274 | |
MK Ryu | 07a109f | 2015-07-21 17:44:32 -0700 | [diff] [blame] | 275 | For jobs, only incomplete jobs are sent, as the master won't send |
Jakob Juelich | 1b52574 | 2014-09-30 13:08:07 -0700 | [diff] [blame] | 276 | already completed jobs anyway. This helps keeping the list of id's |
| 277 | considerably small. |
| 278 | |
MK Ryu | 07a109f | 2015-07-21 17:44:32 -0700 | [diff] [blame] | 279 | For hosts, host status in addition to host id are sent to master |
| 280 | to sync the host status. |
| 281 | |
| 282 | @returns: Tuple of three lists. The first one contains job ids, the |
| 283 | second one host ids, and the third one host statuses. |
Jakob Juelich | 1b52574 | 2014-09-30 13:08:07 -0700 | [diff] [blame] | 284 | """ |
Paul Hobbs | f536138 | 2018-03-05 11:56:56 -0800 | [diff] [blame] | 285 | jobs = models.Job.objects.filter(hostqueueentry__complete=False) |
| 286 | job_ids = list(jobs.values_list('id', flat=True)) |
| 287 | self._report_job_time_distribution(jobs) |
| 288 | |
MK Ryu | 07a109f | 2015-07-21 17:44:32 -0700 | [diff] [blame] | 289 | host_models = models.Host.objects.filter(invalid=0) |
| 290 | host_ids = [] |
| 291 | host_statuses = [] |
| 292 | for h in host_models: |
| 293 | host_ids.append(h.id) |
| 294 | host_statuses.append(h.status) |
| 295 | return job_ids, host_ids, host_statuses |
Jakob Juelich | 1b52574 | 2014-09-30 13:08:07 -0700 | [diff] [blame] | 296 | |
| 297 | |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 298 | def _heartbeat_packet(self): |
| 299 | """Construct the heartbeat packet. |
| 300 | |
Allen Li | cdd00f2 | 2017-02-01 18:01:52 -0800 | [diff] [blame] | 301 | See rpc_interface for a more detailed description of the heartbeat. |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 302 | |
| 303 | @return: A heartbeat packet. |
| 304 | """ |
MK Ryu | 07a109f | 2015-07-21 17:44:32 -0700 | [diff] [blame] | 305 | known_job_ids, known_host_ids, known_host_statuses = ( |
| 306 | self._get_known_jobs_and_hosts()) |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 307 | logging.info('Known jobs: %s', known_job_ids) |
| 308 | |
| 309 | job_objs = self._get_jobs_to_upload() |
| 310 | hqes = [hqe.serialize(include_dependencies=False) |
| 311 | for hqe in self._get_hqes_for_jobs(job_objs)] |
| 312 | jobs = [job.serialize(include_dependencies=False) for job in job_objs] |
| 313 | logging.info('Uploading jobs %s', [j['id'] for j in jobs]) |
| 314 | |
| 315 | return {'shard_hostname': self.hostname, |
| 316 | 'known_job_ids': known_job_ids, |
| 317 | 'known_host_ids': known_host_ids, |
MK Ryu | 07a109f | 2015-07-21 17:44:32 -0700 | [diff] [blame] | 318 | 'known_host_statuses': known_host_statuses, |
Paul Hobbs | f536138 | 2018-03-05 11:56:56 -0800 | [diff] [blame] | 319 | 'jobs': jobs, |
| 320 | 'hqes': hqes} |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 321 | |
| 322 | |
Paul Hobbs | f536138 | 2018-03-05 11:56:56 -0800 | [diff] [blame] | 323 | def _report_job_time_distribution(self, jobs): |
| 324 | """Report distribution of job durations to monarch.""" |
| 325 | jobs_time_distribution = metrics.Distribution( |
Paul Hobbs | 2a18362 | 2018-03-29 16:50:42 -0700 | [diff] [blame] | 326 | _METRICS_PREFIX + 'known_jobs_durations') |
Paul Hobbs | f536138 | 2018-03-05 11:56:56 -0800 | [diff] [blame] | 327 | now = datetime.datetime.now() |
| 328 | |
| 329 | # The type expected by the .set(...) of a distribution is a |
| 330 | # distribution. |
| 331 | dist = ts_mon.Distribution(ts_mon.GeometricBucketer()) |
| 332 | for job in jobs: |
| 333 | duration = int( |
| 334 | max(0, (now - job.created_on).total_seconds())) |
| 335 | dist.add(duration) |
| 336 | jobs_time_distribution.set(dist) |
| 337 | |
Aviv Keshet | 43dc40b | 2018-02-13 16:13:46 -0800 | [diff] [blame] | 338 | def _report_packet_metrics(self, packet): |
| 339 | """Report stats about outgoing packet to monarch.""" |
| 340 | metrics.Gauge(_METRICS_PREFIX + 'known_job_ids_count').set( |
Paul Hobbs | f536138 | 2018-03-05 11:56:56 -0800 | [diff] [blame] | 341 | len(packet['known_job_ids'])) |
Aviv Keshet | 43dc40b | 2018-02-13 16:13:46 -0800 | [diff] [blame] | 342 | metrics.Gauge(_METRICS_PREFIX + 'jobs_upload_count').set( |
Paul Hobbs | f536138 | 2018-03-05 11:56:56 -0800 | [diff] [blame] | 343 | len(packet['jobs'])) |
Aviv Keshet | 43dc40b | 2018-02-13 16:13:46 -0800 | [diff] [blame] | 344 | metrics.Gauge(_METRICS_PREFIX + 'known_host_ids_count').set( |
Paul Hobbs | f536138 | 2018-03-05 11:56:56 -0800 | [diff] [blame] | 345 | len(packet['known_host_ids'])) |
Aviv Keshet | 43dc40b | 2018-02-13 16:13:46 -0800 | [diff] [blame] | 346 | |
| 347 | |
Aviv Keshet | 8a93beb | 2017-04-21 12:55:28 -0700 | [diff] [blame] | 348 | def _heartbeat_failure(self, log_message, failure_type_str=''): |
MK Ryu | 8a43715 | 2015-07-20 14:25:39 -0700 | [diff] [blame] | 349 | logging.error("Heartbeat failed. %s", log_message) |
Aviv Keshet | 75edb9c | 2016-10-06 22:12:36 -0700 | [diff] [blame] | 350 | metrics.Counter('chromeos/autotest/shard_client/heartbeat_failure' |
Aviv Keshet | 8a93beb | 2017-04-21 12:55:28 -0700 | [diff] [blame] | 351 | ).increment(fields={'failure_type': failure_type_str}) |
MK Ryu | 8a43715 | 2015-07-20 14:25:39 -0700 | [diff] [blame] | 352 | |
| 353 | |
Prathmesh Prabhu | 6441807 | 2016-11-22 18:39:16 -0800 | [diff] [blame] | 354 | @metrics.SecondsTimerDecorator( |
| 355 | 'chromeos/autotest/shard_client/do_heatbeat_duration') |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 356 | def do_heartbeat(self): |
| 357 | """Perform a heartbeat: Retreive new jobs. |
| 358 | |
| 359 | This function executes a `shard_heartbeat` RPC. It retrieves the |
| 360 | response of this call and processes the response by storing the returned |
| 361 | objects in the local database. |
Aviv Keshet | 04d40db | 2017-12-08 18:08:44 -0800 | [diff] [blame] | 362 | |
| 363 | Returns: True if the heartbeat ran successfully, False otherwise. |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 364 | """ |
Prathmesh Prabhu | 6441807 | 2016-11-22 18:39:16 -0800 | [diff] [blame] | 365 | |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 366 | logging.info("Performing heartbeat.") |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 367 | packet = self._heartbeat_packet() |
Aviv Keshet | 43dc40b | 2018-02-13 16:13:46 -0800 | [diff] [blame] | 368 | self._report_packet_metrics(packet) |
| 369 | metrics.Gauge(_METRICS_PREFIX + 'request_size').set( |
Prathmesh Prabhu | 6441807 | 2016-11-22 18:39:16 -0800 | [diff] [blame] | 370 | len(str(packet))) |
MK Ryu | 8a43715 | 2015-07-20 14:25:39 -0700 | [diff] [blame] | 371 | |
| 372 | try: |
| 373 | response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet) |
MK Ryu | 89cca5d | 2015-09-18 13:07:22 -0700 | [diff] [blame] | 374 | except urllib2.HTTPError as e: |
Aviv Keshet | 8a93beb | 2017-04-21 12:55:28 -0700 | [diff] [blame] | 375 | self._heartbeat_failure('HTTPError %d: %s' % (e.code, e.reason), |
| 376 | 'HTTPError') |
Aviv Keshet | 04d40db | 2017-12-08 18:08:44 -0800 | [diff] [blame] | 377 | return False |
MK Ryu | 89cca5d | 2015-09-18 13:07:22 -0700 | [diff] [blame] | 378 | except urllib2.URLError as e: |
Aviv Keshet | 8a93beb | 2017-04-21 12:55:28 -0700 | [diff] [blame] | 379 | self._heartbeat_failure('URLError: %s' % e.reason, |
| 380 | 'URLError') |
Aviv Keshet | 04d40db | 2017-12-08 18:08:44 -0800 | [diff] [blame] | 381 | return False |
MK Ryu | 89cca5d | 2015-09-18 13:07:22 -0700 | [diff] [blame] | 382 | except httplib.HTTPException as e: |
Aviv Keshet | 8a93beb | 2017-04-21 12:55:28 -0700 | [diff] [blame] | 383 | self._heartbeat_failure('HTTPException: %s' % e, |
| 384 | 'HTTPException') |
Aviv Keshet | 04d40db | 2017-12-08 18:08:44 -0800 | [diff] [blame] | 385 | return False |
MK Ryu | 89cca5d | 2015-09-18 13:07:22 -0700 | [diff] [blame] | 386 | except timeout_util.TimeoutError as e: |
Aviv Keshet | 8a93beb | 2017-04-21 12:55:28 -0700 | [diff] [blame] | 387 | self._heartbeat_failure('TimeoutError: %s' % e, |
| 388 | 'TimeoutError') |
Aviv Keshet | 04d40db | 2017-12-08 18:08:44 -0800 | [diff] [blame] | 389 | return False |
Aviv Keshet | 8a93beb | 2017-04-21 12:55:28 -0700 | [diff] [blame] | 390 | except proxy.JSONRPCException as e: |
| 391 | self._heartbeat_failure('JSONRPCException: %s' % e, |
| 392 | 'JSONRPCException') |
Aviv Keshet | 04d40db | 2017-12-08 18:08:44 -0800 | [diff] [blame] | 393 | return False |
MK Ryu | 8a43715 | 2015-07-20 14:25:39 -0700 | [diff] [blame] | 394 | |
Aviv Keshet | 43dc40b | 2018-02-13 16:13:46 -0800 | [diff] [blame] | 395 | metrics.Gauge(_METRICS_PREFIX + 'response_size').set( |
Prathmesh Prabhu | 6441807 | 2016-11-22 18:39:16 -0800 | [diff] [blame] | 396 | len(str(response))) |
Prashanth Balasubramanian | 22dd226 | 2014-11-28 18:19:18 -0800 | [diff] [blame] | 397 | self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']]) |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 398 | self.process_heartbeat_response(response) |
| 399 | logging.info("Heartbeat completed.") |
Aviv Keshet | 04d40db | 2017-12-08 18:08:44 -0800 | [diff] [blame] | 400 | return True |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 401 | |
| 402 | |
| 403 | def tick(self): |
| 404 | """Performs all tasks the shard clients needs to do periodically.""" |
Aviv Keshet | 04d40db | 2017-12-08 18:08:44 -0800 | [diff] [blame] | 405 | success = self.do_heartbeat() |
| 406 | if success: |
| 407 | metrics.Counter('chromeos/autotest/shard_client/tick').increment() |
| 408 | |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 409 | |
Prathmesh Prabhu | 01ef91b | 2018-01-24 17:47:05 -0800 | [diff] [blame] | 410 | def loop(self, lifetime_hours): |
| 411 | """Calls tick() until shutdown() is called or lifetime expires. |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 412 | |
Prathmesh Prabhu | 01ef91b | 2018-01-24 17:47:05 -0800 | [diff] [blame] | 413 | @param lifetime_hours: (int) hours to loop for. |
| 414 | """ |
| 415 | loop_start_time = time.time() |
| 416 | while self._continue_looping(lifetime_hours, loop_start_time): |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 417 | self.tick() |
Aviv Keshet | dfd1f52 | 2017-03-22 20:14:09 -0700 | [diff] [blame] | 418 | # Sleep with +/- 10% fuzzing to avoid phaselock of shards. |
| 419 | tick_fuzz = self.tick_pause_sec * 0.2 * (random.random() - 0.5) |
| 420 | time.sleep(self.tick_pause_sec + tick_fuzz) |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 421 | |
| 422 | |
| 423 | def shutdown(self): |
| 424 | """Stops the shard client after the current tick.""" |
| 425 | logging.info("Shutdown request received.") |
Prathmesh Prabhu | 01ef91b | 2018-01-24 17:47:05 -0800 | [diff] [blame] | 426 | self._shutdown_requested = True |
| 427 | |
| 428 | |
| 429 | def _continue_looping(self, lifetime_hours, loop_start_time): |
| 430 | """Determines if we should continue with the next mainloop iteration. |
| 431 | |
| 432 | @param lifetime_hours: (float) number of hours to loop for. None |
| 433 | implies no deadline. |
| 434 | @param process_start_time: Time when we started looping. |
| 435 | @returns True if we should continue looping, False otherwise. |
| 436 | """ |
| 437 | if self._shutdown_requested: |
| 438 | return False |
| 439 | |
| 440 | if (lifetime_hours is None |
| 441 | or time.time() - loop_start_time < lifetime_hours * 3600): |
| 442 | return True |
| 443 | logging.info('Process lifetime %0.3f hours exceeded. Shutting down.', |
| 444 | lifetime_hours) |
| 445 | return False |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 446 | |
| 447 | |
| 448 | def handle_signal(signum, frame): |
| 449 | """Sigint handler so we don't crash mid-tick.""" |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 450 | _heartbeat_client.shutdown() |
| 451 | |
| 452 | |
Jakob Juelich | 8421d59 | 2014-09-17 15:27:06 -0700 | [diff] [blame] | 453 | def _get_shard_hostname_and_ensure_running_on_shard(): |
| 454 | """Read the hostname the local shard from the global configuration. |
| 455 | |
| 456 | Raise an exception if run from elsewhere than a shard. |
| 457 | |
| 458 | @raises error.HeartbeatOnlyAllowedInShardModeException if run from |
| 459 | elsewhere than from a shard. |
| 460 | """ |
| 461 | hostname = global_config.global_config.get_config_value( |
| 462 | 'SHARD', 'shard_hostname', default=None) |
| 463 | if not hostname: |
| 464 | raise error.HeartbeatOnlyAllowedInShardModeException( |
| 465 | 'To run the shard client, shard_hostname must neither be None nor ' |
| 466 | 'empty.') |
| 467 | return hostname |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 468 | |
| 469 | |
| 470 | def _get_tick_pause_sec(): |
| 471 | """Read pause to make between two ticks from the global configuration.""" |
| 472 | return global_config.global_config.get_config_value( |
| 473 | 'SHARD', 'heartbeat_pause_sec', type=float) |
| 474 | |
| 475 | |
| 476 | def get_shard_client(): |
| 477 | """Instantiate a shard client instance. |
| 478 | |
| 479 | Configuration values will be read from the global configuration. |
| 480 | |
| 481 | @returns A shard client instance. |
| 482 | """ |
Fang Deng | 0cb2a3b | 2015-12-10 17:59:00 -0800 | [diff] [blame] | 483 | global_afe_hostname = server_utils.get_global_afe_hostname() |
Jakob Juelich | 8421d59 | 2014-09-17 15:27:06 -0700 | [diff] [blame] | 484 | shard_hostname = _get_shard_hostname_and_ensure_running_on_shard() |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 485 | tick_pause_sec = _get_tick_pause_sec() |
| 486 | return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec) |
| 487 | |
| 488 | |
| 489 | def main(): |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 490 | parser = argparse.ArgumentParser(description='Shard client.') |
Prathmesh Prabhu | 01ef91b | 2018-01-24 17:47:05 -0800 | [diff] [blame] | 491 | parser.add_argument( |
| 492 | '--lifetime-hours', |
| 493 | type=float, |
| 494 | default=None, |
| 495 | help='If provided, number of hours we should run for. ' |
| 496 | 'At the expiry of this time, the process will exit ' |
| 497 | 'gracefully.', |
| 498 | ) |
Prathmesh Prabhu | 68baeb3 | 2018-01-24 17:51:37 -0800 | [diff] [blame] | 499 | parser.add_argument( |
| 500 | '--metrics-file', |
| 501 | help='If provided, drop metrics to this local file instead of ' |
| 502 | 'reporting to ts_mon', |
| 503 | type=str, |
| 504 | default=None, |
| 505 | ) |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 506 | options = parser.parse_args() |
| 507 | |
Prathmesh Prabhu | 68baeb3 | 2018-01-24 17:51:37 -0800 | [diff] [blame] | 508 | with ts_mon_config.SetupTsMonGlobalState( |
| 509 | 'shard_client', |
| 510 | indirect=True, |
| 511 | debug_file=options.metrics_file, |
| 512 | ): |
| 513 | try: |
| 514 | metrics.Counter('chromeos/autotest/shard_client/start').increment() |
| 515 | main_without_exception_handling(options) |
| 516 | except Exception as e: |
| 517 | metrics.Counter('chromeos/autotest/shard_client/uncaught_exception' |
| 518 | ).increment() |
| 519 | message = 'Uncaught exception. Terminating shard_client.' |
| 520 | email_manager.manager.log_stacktrace(message) |
| 521 | logging.exception(message) |
| 522 | raise |
| 523 | finally: |
| 524 | email_manager.manager.send_queued_emails() |
| 525 | |
| 526 | |
| 527 | def main_without_exception_handling(options): |
Prashanth Balasubramanian | 5949b4a | 2014-11-23 12:58:30 -0800 | [diff] [blame] | 528 | scheduler_lib.setup_logging( |
| 529 | os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None), |
| 530 | None, timestamped_logfile_prefix='shard_client') |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 531 | |
| 532 | logging.info("Setting signal handler.") |
| 533 | signal.signal(signal.SIGINT, handle_signal) |
| 534 | signal.signal(signal.SIGTERM, handle_signal) |
| 535 | |
| 536 | logging.info("Starting shard client.") |
| 537 | global _heartbeat_client |
| 538 | _heartbeat_client = get_shard_client() |
Prathmesh Prabhu | 01ef91b | 2018-01-24 17:47:05 -0800 | [diff] [blame] | 539 | _heartbeat_client.loop(options.lifetime_hours) |
Jakob Juelich | 3b27dbc | 2014-09-03 18:05:37 -0700 | [diff] [blame] | 540 | |
| 541 | |
| 542 | if __name__ == '__main__': |
| 543 | main() |