blob: 46fd32878ee37c5aae5fa421b95b43f1905a74d1 [file] [log] [blame]
Jakob Juelich3b27dbc2014-09-03 18:05:37 -07001#!/usr/bin/python
2#pylint: disable-msg=C0111
3
4# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8import argparse
Paul Hobbsf5361382018-03-05 11:56:56 -08009import datetime
MK Ryu8a437152015-07-20 14:25:39 -070010import httplib
Jakob Juelich3b27dbc2014-09-03 18:05:37 -070011import logging
12import os
Aviv Keshetdfd1f522017-03-22 20:14:09 -070013import random
Jakob Juelich3b27dbc2014-09-03 18:05:37 -070014import signal
15import time
MK Ryu8a437152015-07-20 14:25:39 -070016import urllib2
Jakob Juelich3b27dbc2014-09-03 18:05:37 -070017
18import common
Paul Hobbseedcb8b2016-10-05 16:44:27 -070019
Jakob Juelich3b27dbc2014-09-03 18:05:37 -070020from autotest_lib.frontend import setup_django_environment
Aviv Keshet8a93beb2017-04-21 12:55:28 -070021from autotest_lib.frontend.afe.json_rpc import proxy
Jakob Juelich3b27dbc2014-09-03 18:05:37 -070022from autotest_lib.client.common_lib import error
23from autotest_lib.client.common_lib import global_config
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -080024from autotest_lib.frontend.afe import models
Jakob Juelich3b27dbc2014-09-03 18:05:37 -070025from autotest_lib.scheduler import email_manager
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -080026from autotest_lib.scheduler import scheduler_lib
Jakob Juelich8421d592014-09-17 15:27:06 -070027from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
Fang Deng0cb2a3b2015-12-10 17:59:00 -080028from autotest_lib.server import utils as server_utils
MK Ryu89cca5d2015-09-18 13:07:22 -070029from chromite.lib import timeout_util
Ningning Xiacdc23102018-03-29 14:18:05 -070030from django.core.exceptions import MultipleObjectsReturned
Prashanth Balasubramanian75be1d32014-11-25 18:03:09 -080031from django.db import transaction
Jakob Juelich3b27dbc2014-09-03 18:05:37 -070032
Dan Shi5e2efb72017-02-07 11:40:23 -080033try:
34 from chromite.lib import metrics
35 from chromite.lib import ts_mon_config
Paul Hobbsf5361382018-03-05 11:56:56 -080036 from infra_libs import ts_mon
Dan Shi5e2efb72017-02-07 11:40:23 -080037except ImportError:
38 metrics = server_utils.metrics_mock
39 ts_mon_config = server_utils.metrics_mock
40
41
Jakob Juelich3b27dbc2014-09-03 18:05:37 -070042"""
43Autotest shard client
44
45The shard client can be run as standalone service. It periodically polls the
46master in a heartbeat, retrieves new jobs and hosts and inserts them into the
47local database.
48
49A shard is set up (by a human) and pointed to the global AFE (cautotest).
50On the shard, this script periodically makes so called heartbeat requests to the
51global AFE, which will then complete the following actions:
52
531. Find the previously created (with atest) record for the shard. Shards are
54 identified by their hostnames, specified in the shadow_config.
552. Take the records that were sent in the heartbeat and insert them into the
56 global database.
57 - This is to set the status of jobs to completed in the master database after
58 they were run by a slave. This is necessary so one can just look at the
59 master's afe to see the statuses of all jobs. Otherwise one would have to
60 check the tko tables or the individual slave AFEs.
613. Find labels that have been assigned to this shard.
Jakob Juelich1b525742014-09-30 13:08:07 -0700624. Assign hosts that:
63 - have the specified label
64 - aren't leased
65 - have an id which is not in the known_host_ids which were sent in the
66 heartbeat request.
675. Assign jobs that:
Jakob Juelich3b27dbc2014-09-03 18:05:37 -070068 - depend on the specified label
69 - haven't been assigned before
70 - aren't started yet
71 - aren't completed yet
Jakob Juelich1b525742014-09-30 13:08:07 -070072 - have an id which is not in the jobs_known_ids which were sent in the
73 heartbeat request.
746. Serialize the chosen jobs and hosts.
Jakob Juelich3b27dbc2014-09-03 18:05:37 -070075 - Find objects that the Host/Job objects depend on: Labels, AclGroups, Users,
76 and many more. Details about this can be found around
77 model_logic.serialize()
787. Send these objects to the slave.
79
80
81On the client side, this will happen:
821. Deserialize the objects sent from the master and persist them to the local
83 database.
842. monitor_db on the shard will pick up these jobs and schedule them on the
85 available hosts (which were retrieved from a heartbeat).
863. Once a job is finished, it's shard_id is set to NULL
874. The shard_client will pick up all jobs where shard_id=NULL and will
88 send them to the master in the request of the next heartbeat.
89 - The master will persist them as described earlier.
90 - the shard_id will be set back to the shard's id, so the record won't be
91 uploaded again.
Jakob Juelich1b525742014-09-30 13:08:07 -070092 The heartbeat request will also contain the ids of incomplete jobs and the
93 ids of all hosts. This is used to not send objects repeatedly. For more
94 information on this and alternatives considered
Allen Licdd00f22017-02-01 18:01:52 -080095 see rpc_interface.shard_heartbeat.
Jakob Juelich3b27dbc2014-09-03 18:05:37 -070096"""
97
98
99HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat'
Aviv Keshet43dc40b2018-02-13 16:13:46 -0800100_METRICS_PREFIX = 'chromeos/autotest/shard_client/heartbeat/'
Jakob Juelichf960d892014-09-25 12:34:00 -0700101
Jakob Juelich8421d592014-09-17 15:27:06 -0700102RPC_TIMEOUT_MIN = 5
103RPC_DELAY_SEC = 5
104
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800105_heartbeat_client = None
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700106
107
108class ShardClient(object):
109 """Performs client side tasks of sharding, i.e. the heartbeat.
110
Jakob Juelich8421d592014-09-17 15:27:06 -0700111 This class contains the logic to do periodic heartbeats to a global AFE,
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700112 to retrieve new jobs from it and to report completed jobs back.
113 """
114
115 def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec):
Jakob Juelich8421d592014-09-17 15:27:06 -0700116 self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname,
117 timeout_min=RPC_TIMEOUT_MIN,
118 delay_sec=RPC_DELAY_SEC)
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700119 self.hostname = shard_hostname
120 self.tick_pause_sec = tick_pause_sec
Prathmesh Prabhu01ef91b2018-01-24 17:47:05 -0800121 self._shutdown_requested = False
Jakob Juelich8421d592014-09-17 15:27:06 -0700122 self._shard = None
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700123
124
MK Ryue72a90b2015-07-21 13:48:00 -0700125 def _deserialize_many(self, serialized_list, djmodel, message):
126 """Deserialize data in JSON format to database.
127
128 Deserialize a list of JSON-formatted data to database using Django.
129
130 @param serialized_list: A list of JSON-formatted data.
131 @param djmodel: Django model type.
132 @param message: A string to be used in a logging message.
133 """
134 for serialized in serialized_list:
135 with transaction.commit_on_success():
136 try:
137 djmodel.deserialize(serialized)
138 except Exception as e:
139 logging.error('Deserializing a %s fails: %s, Error: %s',
140 message, serialized, e)
Aviv Keshet75edb9c2016-10-06 22:12:36 -0700141 metrics.Counter(
142 'chromeos/autotest/shard_client/deserialization_failed'
143 ).increment()
MK Ryue72a90b2015-07-21 13:48:00 -0700144
145
Prathmesh Prabhu64418072016-11-22 18:39:16 -0800146 @metrics.SecondsTimerDecorator(
147 'chromeos/autotest/shard_client/heartbeat_response_duration')
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700148 def process_heartbeat_response(self, heartbeat_response):
149 """Save objects returned by a heartbeat to the local database.
150
151 This deseralizes hosts and jobs including their dependencies and saves
152 them to the local database.
153
154 @param heartbeat_response: A dictionary with keys 'hosts' and 'jobs',
155 as returned by the `shard_heartbeat` rpc
156 call.
157 """
158 hosts_serialized = heartbeat_response['hosts']
159 jobs_serialized = heartbeat_response['jobs']
Fang Dengf3705992014-12-16 17:32:18 -0800160 suite_keyvals_serialized = heartbeat_response['suite_keyvals']
Aviv Keshetb9077b92017-03-23 00:40:32 -0700161 incorrect_host_ids = heartbeat_response.get('incorrect_host_ids', [])
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700162
Aviv Keshet75edb9c2016-10-06 22:12:36 -0700163 metrics.Gauge('chromeos/autotest/shard_client/hosts_received'
164 ).set(len(hosts_serialized))
165 metrics.Gauge('chromeos/autotest/shard_client/jobs_received'
166 ).set(len(jobs_serialized))
167 metrics.Gauge('chromeos/autotest/shard_client/suite_keyvals_received'
168 ).set(len(suite_keyvals_serialized))
169
MK Ryue72a90b2015-07-21 13:48:00 -0700170 self._deserialize_many(hosts_serialized, models.Host, 'host')
171 self._deserialize_many(jobs_serialized, models.Job, 'job')
172 self._deserialize_many(suite_keyvals_serialized, models.JobKeyval,
173 'jobkeyval')
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700174
MK Ryu5cfd96a2015-01-30 15:31:23 -0800175 host_ids = [h['id'] for h in hosts_serialized]
176 logging.info('Heartbeat response contains hosts %s', host_ids)
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800177 job_ids = [j['id'] for j in jobs_serialized]
178 logging.info('Heartbeat response contains jobs %s', job_ids)
Fang Dengf3705992014-12-16 17:32:18 -0800179 parent_jobs_with_keyval = set([kv['job_id']
180 for kv in suite_keyvals_serialized])
181 logging.info('Heartbeat response contains suite_keyvals_for jobs %s',
MK Ryue72a90b2015-07-21 13:48:00 -0700182 list(parent_jobs_with_keyval))
Aviv Keshetb9077b92017-03-23 00:40:32 -0700183 if incorrect_host_ids:
184 logging.info('Heartbeat response contains incorrect_host_ids %s '
185 'which will be deleted.', incorrect_host_ids)
186 self._remove_incorrect_hosts(incorrect_host_ids)
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800187
188 # If the master has just sent any jobs that we think have completed,
189 # re-sync them with the master. This is especially useful when a
190 # heartbeat or job is silently dropped, as the next heartbeat will
191 # have a disagreement. Updating the shard_id to NULL will mark these
192 # jobs for upload on the next heartbeat.
MK Ryue72a90b2015-07-21 13:48:00 -0700193 job_models = models.Job.objects.filter(
194 id__in=job_ids, hostqueueentry__complete=True)
195 if job_models:
196 job_models.update(shard=None)
197 job_ids_repr = ', '.join([str(job.id) for job in job_models])
198 logging.warn('Following completed jobs are reset shard_id to NULL '
199 'to be uploaded to master again: %s', job_ids_repr)
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800200
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700201
Aviv Keshetb9077b92017-03-23 00:40:32 -0700202 def _remove_incorrect_hosts(self, incorrect_host_ids=None):
203 """Remove from local database any hosts that should not exist.
204
205 Entries of |incorrect_host_ids| that are absent from database will be
206 silently ignored.
207
208 @param incorrect_host_ids: a list of ids (as integers) to remove.
209 """
210 if not incorrect_host_ids:
211 return
212
Ningning Xiacdc23102018-03-29 14:18:05 -0700213 try:
214 models.Host.objects.filter(id__in=incorrect_host_ids).delete()
215 except MultipleObjectsReturned as e:
216 logging.exception('Failed to remove incorrect hosts %s',
217 incorrect_host_ids)
Aviv Keshetb9077b92017-03-23 00:40:32 -0700218
219
Jakob Juelich8421d592014-09-17 15:27:06 -0700220 @property
221 def shard(self):
222 """Return this shard's own shard object, fetched from the database.
223
224 A shard's object is fetched from the master with the first jobs. It will
225 not exist before that time.
226
227 @returns: The shard object if it already exists, otherwise None
228 """
229 if self._shard is None:
230 try:
231 self._shard = models.Shard.smart_get(self.hostname)
232 except models.Shard.DoesNotExist:
233 # This might happen before any jobs are assigned to this shard.
234 # This is okay because then there is nothing to offload anyway.
235 pass
236 return self._shard
237
238
239 def _get_jobs_to_upload(self):
240 jobs = []
241 # The scheduler sets shard to None upon completion of the job.
242 # For more information on the shard field's semantic see
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800243 # models.Job.shard. We need to be careful to wait for both the
244 # shard_id and the complete bit here, or we will end up syncing
245 # the job without ever setting the complete bit.
Jakob Juelich8421d592014-09-17 15:27:06 -0700246 job_ids = list(models.Job.objects.filter(
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800247 shard=None,
248 hostqueueentry__complete=True).values_list('pk', flat=True))
Jakob Juelich8421d592014-09-17 15:27:06 -0700249
250 for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all():
251 jobs.append(job_to_upload)
252 return jobs
253
254
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800255 def _mark_jobs_as_uploaded(self, job_ids):
Jakob Juelich8421d592014-09-17 15:27:06 -0700256 # self.shard might be None if no jobs were downloaded yet.
257 # But then job_ids is empty, so this is harmless.
258 # Even if there were jobs we'd in the worst case upload them twice.
259 models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard)
260
261
262 def _get_hqes_for_jobs(self, jobs):
263 hqes = []
264 for job in jobs:
265 hqes.extend(job.hostqueueentry_set.all())
266 return hqes
267
268
MK Ryu07a109f2015-07-21 17:44:32 -0700269 def _get_known_jobs_and_hosts(self):
270 """Returns lists of host and job info to send in a heartbeat.
Jakob Juelich1b525742014-09-30 13:08:07 -0700271
272 The host and job ids are ids of objects that are already present on the
273 shard and therefore don't need to be sent again.
274
MK Ryu07a109f2015-07-21 17:44:32 -0700275 For jobs, only incomplete jobs are sent, as the master won't send
Jakob Juelich1b525742014-09-30 13:08:07 -0700276 already completed jobs anyway. This helps keeping the list of id's
277 considerably small.
278
MK Ryu07a109f2015-07-21 17:44:32 -0700279 For hosts, host status in addition to host id are sent to master
280 to sync the host status.
281
282 @returns: Tuple of three lists. The first one contains job ids, the
283 second one host ids, and the third one host statuses.
Jakob Juelich1b525742014-09-30 13:08:07 -0700284 """
Paul Hobbsf5361382018-03-05 11:56:56 -0800285 jobs = models.Job.objects.filter(hostqueueentry__complete=False)
286 job_ids = list(jobs.values_list('id', flat=True))
287 self._report_job_time_distribution(jobs)
288
MK Ryu07a109f2015-07-21 17:44:32 -0700289 host_models = models.Host.objects.filter(invalid=0)
290 host_ids = []
291 host_statuses = []
292 for h in host_models:
293 host_ids.append(h.id)
294 host_statuses.append(h.status)
295 return job_ids, host_ids, host_statuses
Jakob Juelich1b525742014-09-30 13:08:07 -0700296
297
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800298 def _heartbeat_packet(self):
299 """Construct the heartbeat packet.
300
Allen Licdd00f22017-02-01 18:01:52 -0800301 See rpc_interface for a more detailed description of the heartbeat.
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800302
303 @return: A heartbeat packet.
304 """
MK Ryu07a109f2015-07-21 17:44:32 -0700305 known_job_ids, known_host_ids, known_host_statuses = (
306 self._get_known_jobs_and_hosts())
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800307 logging.info('Known jobs: %s', known_job_ids)
308
309 job_objs = self._get_jobs_to_upload()
310 hqes = [hqe.serialize(include_dependencies=False)
311 for hqe in self._get_hqes_for_jobs(job_objs)]
312 jobs = [job.serialize(include_dependencies=False) for job in job_objs]
313 logging.info('Uploading jobs %s', [j['id'] for j in jobs])
314
315 return {'shard_hostname': self.hostname,
316 'known_job_ids': known_job_ids,
317 'known_host_ids': known_host_ids,
MK Ryu07a109f2015-07-21 17:44:32 -0700318 'known_host_statuses': known_host_statuses,
Paul Hobbsf5361382018-03-05 11:56:56 -0800319 'jobs': jobs,
320 'hqes': hqes}
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800321
322
Paul Hobbsf5361382018-03-05 11:56:56 -0800323 def _report_job_time_distribution(self, jobs):
324 """Report distribution of job durations to monarch."""
325 jobs_time_distribution = metrics.Distribution(
Paul Hobbs2a183622018-03-29 16:50:42 -0700326 _METRICS_PREFIX + 'known_jobs_durations')
Paul Hobbsf5361382018-03-05 11:56:56 -0800327 now = datetime.datetime.now()
328
329 # The type expected by the .set(...) of a distribution is a
330 # distribution.
331 dist = ts_mon.Distribution(ts_mon.GeometricBucketer())
332 for job in jobs:
333 duration = int(
334 max(0, (now - job.created_on).total_seconds()))
335 dist.add(duration)
336 jobs_time_distribution.set(dist)
337
Aviv Keshet43dc40b2018-02-13 16:13:46 -0800338 def _report_packet_metrics(self, packet):
339 """Report stats about outgoing packet to monarch."""
340 metrics.Gauge(_METRICS_PREFIX + 'known_job_ids_count').set(
Paul Hobbsf5361382018-03-05 11:56:56 -0800341 len(packet['known_job_ids']))
Aviv Keshet43dc40b2018-02-13 16:13:46 -0800342 metrics.Gauge(_METRICS_PREFIX + 'jobs_upload_count').set(
Paul Hobbsf5361382018-03-05 11:56:56 -0800343 len(packet['jobs']))
Aviv Keshet43dc40b2018-02-13 16:13:46 -0800344 metrics.Gauge(_METRICS_PREFIX + 'known_host_ids_count').set(
Paul Hobbsf5361382018-03-05 11:56:56 -0800345 len(packet['known_host_ids']))
Aviv Keshet43dc40b2018-02-13 16:13:46 -0800346
347
Aviv Keshet8a93beb2017-04-21 12:55:28 -0700348 def _heartbeat_failure(self, log_message, failure_type_str=''):
MK Ryu8a437152015-07-20 14:25:39 -0700349 logging.error("Heartbeat failed. %s", log_message)
Aviv Keshet75edb9c2016-10-06 22:12:36 -0700350 metrics.Counter('chromeos/autotest/shard_client/heartbeat_failure'
Aviv Keshet8a93beb2017-04-21 12:55:28 -0700351 ).increment(fields={'failure_type': failure_type_str})
MK Ryu8a437152015-07-20 14:25:39 -0700352
353
Prathmesh Prabhu64418072016-11-22 18:39:16 -0800354 @metrics.SecondsTimerDecorator(
355 'chromeos/autotest/shard_client/do_heatbeat_duration')
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700356 def do_heartbeat(self):
357 """Perform a heartbeat: Retreive new jobs.
358
359 This function executes a `shard_heartbeat` RPC. It retrieves the
360 response of this call and processes the response by storing the returned
361 objects in the local database.
Aviv Keshet04d40db2017-12-08 18:08:44 -0800362
363 Returns: True if the heartbeat ran successfully, False otherwise.
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700364 """
Prathmesh Prabhu64418072016-11-22 18:39:16 -0800365
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700366 logging.info("Performing heartbeat.")
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800367 packet = self._heartbeat_packet()
Aviv Keshet43dc40b2018-02-13 16:13:46 -0800368 self._report_packet_metrics(packet)
369 metrics.Gauge(_METRICS_PREFIX + 'request_size').set(
Prathmesh Prabhu64418072016-11-22 18:39:16 -0800370 len(str(packet)))
MK Ryu8a437152015-07-20 14:25:39 -0700371
372 try:
373 response = self.afe.run(HEARTBEAT_AFE_ENDPOINT, **packet)
MK Ryu89cca5d2015-09-18 13:07:22 -0700374 except urllib2.HTTPError as e:
Aviv Keshet8a93beb2017-04-21 12:55:28 -0700375 self._heartbeat_failure('HTTPError %d: %s' % (e.code, e.reason),
376 'HTTPError')
Aviv Keshet04d40db2017-12-08 18:08:44 -0800377 return False
MK Ryu89cca5d2015-09-18 13:07:22 -0700378 except urllib2.URLError as e:
Aviv Keshet8a93beb2017-04-21 12:55:28 -0700379 self._heartbeat_failure('URLError: %s' % e.reason,
380 'URLError')
Aviv Keshet04d40db2017-12-08 18:08:44 -0800381 return False
MK Ryu89cca5d2015-09-18 13:07:22 -0700382 except httplib.HTTPException as e:
Aviv Keshet8a93beb2017-04-21 12:55:28 -0700383 self._heartbeat_failure('HTTPException: %s' % e,
384 'HTTPException')
Aviv Keshet04d40db2017-12-08 18:08:44 -0800385 return False
MK Ryu89cca5d2015-09-18 13:07:22 -0700386 except timeout_util.TimeoutError as e:
Aviv Keshet8a93beb2017-04-21 12:55:28 -0700387 self._heartbeat_failure('TimeoutError: %s' % e,
388 'TimeoutError')
Aviv Keshet04d40db2017-12-08 18:08:44 -0800389 return False
Aviv Keshet8a93beb2017-04-21 12:55:28 -0700390 except proxy.JSONRPCException as e:
391 self._heartbeat_failure('JSONRPCException: %s' % e,
392 'JSONRPCException')
Aviv Keshet04d40db2017-12-08 18:08:44 -0800393 return False
MK Ryu8a437152015-07-20 14:25:39 -0700394
Aviv Keshet43dc40b2018-02-13 16:13:46 -0800395 metrics.Gauge(_METRICS_PREFIX + 'response_size').set(
Prathmesh Prabhu64418072016-11-22 18:39:16 -0800396 len(str(response)))
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800397 self._mark_jobs_as_uploaded([job['id'] for job in packet['jobs']])
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700398 self.process_heartbeat_response(response)
399 logging.info("Heartbeat completed.")
Aviv Keshet04d40db2017-12-08 18:08:44 -0800400 return True
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700401
402
403 def tick(self):
404 """Performs all tasks the shard clients needs to do periodically."""
Aviv Keshet04d40db2017-12-08 18:08:44 -0800405 success = self.do_heartbeat()
406 if success:
407 metrics.Counter('chromeos/autotest/shard_client/tick').increment()
408
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700409
Prathmesh Prabhu01ef91b2018-01-24 17:47:05 -0800410 def loop(self, lifetime_hours):
411 """Calls tick() until shutdown() is called or lifetime expires.
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700412
Prathmesh Prabhu01ef91b2018-01-24 17:47:05 -0800413 @param lifetime_hours: (int) hours to loop for.
414 """
415 loop_start_time = time.time()
416 while self._continue_looping(lifetime_hours, loop_start_time):
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700417 self.tick()
Aviv Keshetdfd1f522017-03-22 20:14:09 -0700418 # Sleep with +/- 10% fuzzing to avoid phaselock of shards.
419 tick_fuzz = self.tick_pause_sec * 0.2 * (random.random() - 0.5)
420 time.sleep(self.tick_pause_sec + tick_fuzz)
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700421
422
423 def shutdown(self):
424 """Stops the shard client after the current tick."""
425 logging.info("Shutdown request received.")
Prathmesh Prabhu01ef91b2018-01-24 17:47:05 -0800426 self._shutdown_requested = True
427
428
429 def _continue_looping(self, lifetime_hours, loop_start_time):
430 """Determines if we should continue with the next mainloop iteration.
431
432 @param lifetime_hours: (float) number of hours to loop for. None
433 implies no deadline.
434 @param process_start_time: Time when we started looping.
435 @returns True if we should continue looping, False otherwise.
436 """
437 if self._shutdown_requested:
438 return False
439
440 if (lifetime_hours is None
441 or time.time() - loop_start_time < lifetime_hours * 3600):
442 return True
443 logging.info('Process lifetime %0.3f hours exceeded. Shutting down.',
444 lifetime_hours)
445 return False
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700446
447
448def handle_signal(signum, frame):
449 """Sigint handler so we don't crash mid-tick."""
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700450 _heartbeat_client.shutdown()
451
452
Jakob Juelich8421d592014-09-17 15:27:06 -0700453def _get_shard_hostname_and_ensure_running_on_shard():
454 """Read the hostname the local shard from the global configuration.
455
456 Raise an exception if run from elsewhere than a shard.
457
458 @raises error.HeartbeatOnlyAllowedInShardModeException if run from
459 elsewhere than from a shard.
460 """
461 hostname = global_config.global_config.get_config_value(
462 'SHARD', 'shard_hostname', default=None)
463 if not hostname:
464 raise error.HeartbeatOnlyAllowedInShardModeException(
465 'To run the shard client, shard_hostname must neither be None nor '
466 'empty.')
467 return hostname
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700468
469
470def _get_tick_pause_sec():
471 """Read pause to make between two ticks from the global configuration."""
472 return global_config.global_config.get_config_value(
473 'SHARD', 'heartbeat_pause_sec', type=float)
474
475
476def get_shard_client():
477 """Instantiate a shard client instance.
478
479 Configuration values will be read from the global configuration.
480
481 @returns A shard client instance.
482 """
Fang Deng0cb2a3b2015-12-10 17:59:00 -0800483 global_afe_hostname = server_utils.get_global_afe_hostname()
Jakob Juelich8421d592014-09-17 15:27:06 -0700484 shard_hostname = _get_shard_hostname_and_ensure_running_on_shard()
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700485 tick_pause_sec = _get_tick_pause_sec()
486 return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec)
487
488
489def main():
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700490 parser = argparse.ArgumentParser(description='Shard client.')
Prathmesh Prabhu01ef91b2018-01-24 17:47:05 -0800491 parser.add_argument(
492 '--lifetime-hours',
493 type=float,
494 default=None,
495 help='If provided, number of hours we should run for. '
496 'At the expiry of this time, the process will exit '
497 'gracefully.',
498 )
Prathmesh Prabhu68baeb32018-01-24 17:51:37 -0800499 parser.add_argument(
500 '--metrics-file',
501 help='If provided, drop metrics to this local file instead of '
502 'reporting to ts_mon',
503 type=str,
504 default=None,
505 )
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700506 options = parser.parse_args()
507
Prathmesh Prabhu68baeb32018-01-24 17:51:37 -0800508 with ts_mon_config.SetupTsMonGlobalState(
509 'shard_client',
510 indirect=True,
511 debug_file=options.metrics_file,
512 ):
513 try:
514 metrics.Counter('chromeos/autotest/shard_client/start').increment()
515 main_without_exception_handling(options)
516 except Exception as e:
517 metrics.Counter('chromeos/autotest/shard_client/uncaught_exception'
518 ).increment()
519 message = 'Uncaught exception. Terminating shard_client.'
520 email_manager.manager.log_stacktrace(message)
521 logging.exception(message)
522 raise
523 finally:
524 email_manager.manager.send_queued_emails()
525
526
527def main_without_exception_handling(options):
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -0800528 scheduler_lib.setup_logging(
529 os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None),
530 None, timestamped_logfile_prefix='shard_client')
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700531
532 logging.info("Setting signal handler.")
533 signal.signal(signal.SIGINT, handle_signal)
534 signal.signal(signal.SIGTERM, handle_signal)
535
536 logging.info("Starting shard client.")
537 global _heartbeat_client
538 _heartbeat_client = get_shard_client()
Prathmesh Prabhu01ef91b2018-01-24 17:47:05 -0800539 _heartbeat_client.loop(options.lifetime_hours)
Jakob Juelich3b27dbc2014-09-03 18:05:37 -0700540
541
542if __name__ == '__main__':
543 main()