blob: 08920e50a9bfa325d6f1a8ab5c97aee7eca5146b [file] [log] [blame]
Richard Barnettea31ab772016-07-13 17:50:17 -07001# pylint: disable=missing-docstring
Aviv Keshet0b9cfc92013-02-05 11:36:02 -08002
jamesrenb55378a2010-03-02 22:19:49 +00003"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out. Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
14 one of these statuses, an email will be sent to the job's email_list.
15 comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
Paul Hobbsf508f8d2017-08-16 15:45:04 -070021import base64
Richard Barnettea31ab772016-07-13 17:50:17 -070022import datetime
Paul Hobbsf508f8d2017-08-16 15:45:04 -070023import errno
Richard Barnettea31ab772016-07-13 17:50:17 -070024import itertools
25import logging
Richard Barnettea31ab772016-07-13 17:50:17 -070026import re
Richard Barnettea31ab772016-07-13 17:50:17 -070027import weakref
28
Paul Hobbsf508f8d2017-08-16 15:45:04 -070029import google.protobuf.internal.well_known_types as types
30
Dan Shidfea3682014-08-10 23:38:40 -070031from autotest_lib.client.common_lib import global_config, host_protections
32from autotest_lib.client.common_lib import time_utils
33from autotest_lib.client.common_lib import utils
jamesrenb55378a2010-03-02 22:19:49 +000034from autotest_lib.frontend.afe import models, model_attributes
jamesrenb55378a2010-03-02 22:19:49 +000035from autotest_lib.scheduler import drone_manager, email_manager
beepscc9fc702013-12-02 12:45:38 -080036from autotest_lib.scheduler import rdb_lib
jamesrenb55378a2010-03-02 22:19:49 +000037from autotest_lib.scheduler import scheduler_config
Prashanth B0e960282014-05-13 19:38:28 -070038from autotest_lib.scheduler import scheduler_lib
Allen Li6a612392016-08-18 12:09:32 -070039from autotest_lib.server import afe_urls
Alex Miller627694a2014-05-01 18:04:29 -070040from autotest_lib.server.cros import provision
Michael Liangda8c60a2014-06-03 13:24:51 -070041
Dan Shi5e2efb72017-02-07 11:40:23 -080042try:
43 from chromite.lib import metrics
Paul Hobbsf508f8d2017-08-16 15:45:04 -070044 from chromite.lib import cloud_trace
Dan Shi5e2efb72017-02-07 11:40:23 -080045except ImportError:
46 metrics = utils.metrics_mock
Paul Hobbsf508f8d2017-08-16 15:45:04 -070047 import mock
48 cloud_trace = mock.Mock()
Prathmesh Prabhuc7e97602016-11-22 18:36:59 -080049
jamesrenb55378a2010-03-02 22:19:49 +000050
51_notify_email_statuses = []
52_base_url = None
53
54_db = None
55_drone_manager = None
56
Xixuan Wuaae18b42018-01-11 10:52:02 -080057RESPECT_STATIC_LABELS = global_config.global_config.get_config_value(
58 'SKYLAB', 'respect_static_labels', type=bool, default=False)
59
60
jamesrenb55378a2010-03-02 22:19:49 +000061def initialize():
62 global _db
Prashanth B0e960282014-05-13 19:38:28 -070063 _db = scheduler_lib.ConnectionManager().get_connection()
jamesrenb55378a2010-03-02 22:19:49 +000064
65 notify_statuses_list = global_config.global_config.get_config_value(
66 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
67 default='')
68 global _notify_email_statuses
69 _notify_email_statuses = [status for status in
70 re.split(r'[\s,;:]', notify_statuses_list.lower())
71 if status]
72
73 # AUTOTEST_WEB.base_url is still a supported config option as some people
74 # may wish to override the entire url.
75 global _base_url
76 config_base_url = global_config.global_config.get_config_value(
77 scheduler_config.CONFIG_SECTION, 'base_url', default='')
78 if config_base_url:
79 _base_url = config_base_url
80 else:
Allen Li6a612392016-08-18 12:09:32 -070081 _base_url = afe_urls.ROOT_URL
jamesrenb55378a2010-03-02 22:19:49 +000082
83 initialize_globals()
84
85
86def initialize_globals():
87 global _drone_manager
88 _drone_manager = drone_manager.instance()
89
90
Dan Shi7cf3d842014-08-13 11:20:38 -070091def get_job_metadata(job):
92 """Get a dictionary of the job information.
93
94 The return value is a dictionary that includes job information like id,
95 name and parent job information. The value will be stored in metadata
96 database.
97
98 @param job: A Job object.
99 @return: A dictionary containing the job id, owner and name.
100 """
101 if not job:
102 logging.error('Job is None, no metadata returned.')
103 return {}
104 try:
105 return {'job_id': job.id,
106 'owner': job.owner,
107 'job_name': job.name,
108 'parent_job_id': job.parent_job_id}
109 except AttributeError as e:
110 logging.error('Job has missing attribute: %s', e)
111 return {}
112
113
jamesrenb55378a2010-03-02 22:19:49 +0000114class DBError(Exception):
115 """Raised by the DBObject constructor when its select fails."""
116
117
118class DBObject(object):
119 """A miniature object relational model for the database."""
120
121 # Subclasses MUST override these:
122 _table_name = ''
123 _fields = ()
124
125 # A mapping from (type, id) to the instance of the object for that
126 # particular id. This prevents us from creating new Job() and Host()
127 # instances for every HostQueueEntry object that we instantiate as
128 # multiple HQEs often share the same Job.
129 _instances_by_type_and_id = weakref.WeakValueDictionary()
130 _initialized = False
131
132
133 def __new__(cls, id=None, **kwargs):
134 """
135 Look to see if we already have an instance for this particular type
136 and id. If so, use it instead of creating a duplicate instance.
137 """
138 if id is not None:
139 instance = cls._instances_by_type_and_id.get((cls, id))
140 if instance:
141 return instance
142 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
143
144
145 def __init__(self, id=None, row=None, new_record=False, always_query=True):
146 assert bool(id) or bool(row)
147 if id is not None and row is not None:
148 assert id == row[0]
149 assert self._table_name, '_table_name must be defined in your class'
150 assert self._fields, '_fields must be defined in your class'
151 if not new_record:
152 if self._initialized and not always_query:
153 return # We've already been initialized.
154 if id is None:
155 id = row[0]
156 # Tell future constructors to use us instead of re-querying while
157 # this instance is still around.
158 self._instances_by_type_and_id[(type(self), id)] = self
159
160 self.__table = self._table_name
161
162 self.__new_record = new_record
163
164 if row is None:
165 row = self._fetch_row_from_db(id)
166
167 if self._initialized:
168 differences = self._compare_fields_in_row(row)
169 if differences:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -0700170 logging.warning(
jamesrenb55378a2010-03-02 22:19:49 +0000171 'initialized %s %s instance requery is updating: %s',
172 type(self), self.id, differences)
173 self._update_fields_from_row(row)
174 self._initialized = True
175
176
177 @classmethod
178 def _clear_instance_cache(cls):
179 """Used for testing, clear the internal instance cache."""
180 cls._instances_by_type_and_id.clear()
181
182
183 def _fetch_row_from_db(self, row_id):
shuqian zhao215801b2017-12-13 11:07:42 +0800184 fields = ', '.join(self._fields)
185 sql = 'SELECT %s FROM %s WHERE ID=%%s' % (fields, self.__table)
jamesrenb55378a2010-03-02 22:19:49 +0000186 rows = _db.execute(sql, (row_id,))
187 if not rows:
188 raise DBError("row not found (table=%s, row id=%s)"
189 % (self.__table, row_id))
190 return rows[0]
191
192
193 def _assert_row_length(self, row):
194 assert len(row) == len(self._fields), (
195 "table = %s, row = %s/%d, fields = %s/%d" % (
196 self.__table, row, len(row), self._fields, len(self._fields)))
197
198
199 def _compare_fields_in_row(self, row):
200 """
201 Given a row as returned by a SELECT query, compare it to our existing in
202 memory fields. Fractional seconds are stripped from datetime values
203 before comparison.
204
205 @param row - A sequence of values corresponding to fields named in
206 The class attribute _fields.
207
208 @returns A dictionary listing the differences keyed by field name
209 containing tuples of (current_value, row_value).
210 """
211 self._assert_row_length(row)
212 differences = {}
jamesrenb55378a2010-03-02 22:19:49 +0000213 for field, row_value in itertools.izip(self._fields, row):
214 current_value = getattr(self, field)
215 if (isinstance(current_value, datetime.datetime)
216 and isinstance(row_value, datetime.datetime)):
Dan Shidfea3682014-08-10 23:38:40 -0700217 current_value = current_value.strftime(time_utils.TIME_FMT)
218 row_value = row_value.strftime(time_utils.TIME_FMT)
jamesrenb55378a2010-03-02 22:19:49 +0000219 if current_value != row_value:
220 differences[field] = (current_value, row_value)
221 return differences
222
223
224 def _update_fields_from_row(self, row):
225 """
226 Update our field attributes using a single row returned by SELECT.
227
228 @param row - A sequence of values corresponding to fields named in
229 the class fields list.
230 """
231 self._assert_row_length(row)
232
233 self._valid_fields = set()
234 for field, value in itertools.izip(self._fields, row):
235 setattr(self, field, value)
236 self._valid_fields.add(field)
237
238 self._valid_fields.remove('id')
239
240
241 def update_from_database(self):
242 assert self.id is not None
243 row = self._fetch_row_from_db(self.id)
244 self._update_fields_from_row(row)
245
246
247 def count(self, where, table = None):
248 if not table:
249 table = self.__table
250
251 rows = _db.execute("""
252 SELECT count(*) FROM %s
253 WHERE %s
254 """ % (table, where))
255
256 assert len(rows) == 1
257
258 return int(rows[0][0])
259
260
261 def update_field(self, field, value):
262 assert field in self._valid_fields
263
264 if getattr(self, field) == value:
265 return
266
267 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
268 _db.execute(query, (value, self.id))
269
270 setattr(self, field, value)
271
272
273 def save(self):
274 if self.__new_record:
275 keys = self._fields[1:] # avoid id
276 columns = ','.join([str(key) for key in keys])
277 values = []
278 for key in keys:
279 value = getattr(self, key)
280 if value is None:
281 values.append('NULL')
282 else:
283 values.append('"%s"' % value)
284 values_str = ','.join(values)
285 query = ('INSERT INTO %s (%s) VALUES (%s)' %
286 (self.__table, columns, values_str))
287 _db.execute(query)
288 # Update our id to the one the database just assigned to us.
289 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
290
291
292 def delete(self):
293 self._instances_by_type_and_id.pop((type(self), id), None)
294 self._initialized = False
295 self._valid_fields.clear()
296 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
297 _db.execute(query, (self.id,))
298
299
300 @staticmethod
301 def _prefix_with(string, prefix):
302 if string:
303 string = prefix + string
304 return string
305
306
307 @classmethod
Po-Hsien Wang085c1c52017-05-22 20:30:39 -0700308 def fetch_rows(cls, where='', params=(), joins='', order_by=''):
jamesrenb55378a2010-03-02 22:19:49 +0000309 """
Po-Hsien Wang085c1c52017-05-22 20:30:39 -0700310 Fetch the rows based on the given database query.
jamesrenb55378a2010-03-02 22:19:49 +0000311
Po-Hsien Wang085c1c52017-05-22 20:30:39 -0700312 @yields the rows fetched by the given query.
jamesrenb55378a2010-03-02 22:19:49 +0000313 """
314 order_by = cls._prefix_with(order_by, 'ORDER BY ')
315 where = cls._prefix_with(where, 'WHERE ')
Xixuan Wubafe9f12017-12-19 15:04:19 -0800316 fields = []
317 for field in cls._fields:
318 fields.append('%s.%s' % (cls._table_name, field))
319
320 query = ('SELECT %(fields)s FROM %(table)s %(joins)s '
321 '%(where)s %(order_by)s' % {'fields' : ', '.join(fields),
322 'table' : cls._table_name,
jamesrenb55378a2010-03-02 22:19:49 +0000323 'joins' : joins,
324 'where' : where,
325 'order_by' : order_by})
326 rows = _db.execute(query, params)
Po-Hsien Wang085c1c52017-05-22 20:30:39 -0700327 return rows
328
329 @classmethod
330 def fetch(cls, where='', params=(), joins='', order_by=''):
331 """
332 Construct instances of our class based on the given database query.
333
334 @yields One class instance for each row fetched.
335 """
336 rows = cls.fetch_rows(where=where, params=params, joins=joins,
337 order_by=order_by)
jamesrenb55378a2010-03-02 22:19:49 +0000338 return [cls(id=row[0], row=row) for row in rows]
339
340
341class IneligibleHostQueue(DBObject):
342 _table_name = 'afe_ineligible_host_queues'
343 _fields = ('id', 'job_id', 'host_id')
344
345
346class AtomicGroup(DBObject):
347 _table_name = 'afe_atomic_groups'
348 _fields = ('id', 'name', 'description', 'max_number_of_machines',
349 'invalid')
350
351
352class Label(DBObject):
353 _table_name = 'afe_labels'
354 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
Xixuan Wue6c68612017-12-21 19:50:57 +0000355 'only_if_needed', 'atomic_group_id')
jamesrenb55378a2010-03-02 22:19:49 +0000356
357
358 def __repr__(self):
359 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
360 self.name, self.id, self.atomic_group_id)
361
362
363class Host(DBObject):
364 _table_name = 'afe_hosts'
Prathmesh Prabhu2d3884a2017-03-06 14:50:47 -0800365 # TODO(ayatane): synch_id is not used, remove after fixing DB.
366 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
beepscc9fc702013-12-02 12:45:38 -0800367 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
Matthew Sartori68186332015-04-27 17:19:53 -0700368 'leased', 'shard_id', 'lock_reason')
jamesrenb55378a2010-03-02 22:19:49 +0000369
370
371 def set_status(self,status):
372 logging.info('%s -> %s', self.hostname, status)
373 self.update_field('status',status)
374
375
Xixuan Wuaae18b42018-01-11 10:52:02 -0800376 def _get_labels_with_platform(self, non_static_rows, static_rows):
377 """Helper function to fetch labels & platform for a host."""
378 if not RESPECT_STATIC_LABELS:
379 return non_static_rows
380
381 combined_rows = []
382 replaced_labels = _db.execute(
383 'SELECT label_id FROM afe_replaced_labels')
384 replaced_label_ids = {l[0] for l in replaced_labels}
385
386 # We respect afe_labels more, which means:
387 # * if non-static labels are replaced, we find its replaced static
388 # labels from afe_static_labels by label name.
389 # * if non-static labels are not replaced, we keep it.
390 # * Drop static labels which don't have reference non-static labels.
391 static_label_names = []
392 for label_id, label_name, is_platform in non_static_rows:
393 if label_id not in replaced_label_ids:
394 combined_rows.append((label_id, label_name, is_platform))
395 else:
396 static_label_names.append(label_name)
397
398 # Only keep static labels who have replaced non-static labels.
399 for label_id, label_name, is_platform in static_rows:
400 if label_name in static_label_names:
401 combined_rows.append((label_id, label_name, is_platform))
402
403 return combined_rows
404
405
jamesrenb55378a2010-03-02 22:19:49 +0000406 def platform_and_labels(self):
407 """
408 Returns a tuple (platform_name, list_of_all_label_names).
409 """
Xixuan Wuaae18b42018-01-11 10:52:02 -0800410 template = ('SELECT %(label_table)s.id, %(label_table)s.name, '
411 '%(label_table)s.platform FROM %(label_table)s INNER '
412 'JOIN %(host_label_table)s '
413 'ON %(label_table)s.id = %(host_label_table)s.%(column)s '
414 'WHERE %(host_label_table)s.host_id = %(host_id)s '
415 'ORDER BY %(label_table)s.name')
416 static_query = template % {
417 'host_label_table': 'afe_static_hosts_labels',
418 'label_table': 'afe_static_labels',
419 'column': 'staticlabel_id',
420 'host_id': self.id
421 }
422 non_static_query = template % {
423 'host_label_table': 'afe_hosts_labels',
424 'label_table': 'afe_labels',
425 'column': 'label_id',
426 'host_id': self.id
427 }
428 non_static_rows = _db.execute(non_static_query)
429 static_rows = _db.execute(static_query)
430
431 rows = self._get_labels_with_platform(non_static_rows, static_rows)
jamesrenb55378a2010-03-02 22:19:49 +0000432 platform = None
433 all_labels = []
Xixuan Wuaae18b42018-01-11 10:52:02 -0800434 for _, label_name, is_platform in rows:
jamesrenb55378a2010-03-02 22:19:49 +0000435 if is_platform:
436 platform = label_name
437 all_labels.append(label_name)
438 return platform, all_labels
439
440
441 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
442
443
444 @classmethod
445 def cmp_for_sort(cls, a, b):
446 """
447 A comparison function for sorting Host objects by hostname.
448
449 This strips any trailing numeric digits, ignores leading 0s and
450 compares hostnames by the leading name and the trailing digits as a
451 number. If both hostnames do not match this pattern, they are simply
452 compared as lower case strings.
453
454 Example of how hostnames will be sorted:
455
456 alice, host1, host2, host09, host010, host10, host11, yolkfolk
457
458 This hopefully satisfy most people's hostname sorting needs regardless
459 of their exact naming schemes. Nobody sane should have both a host10
460 and host010 (but the algorithm works regardless).
461 """
462 lower_a = a.hostname.lower()
463 lower_b = b.hostname.lower()
464 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
465 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
466 if match_a and match_b:
467 name_a, number_a_str = match_a.groups()
468 name_b, number_b_str = match_b.groups()
469 number_a = int(number_a_str.lstrip('0'))
470 number_b = int(number_b_str.lstrip('0'))
471 result = cmp((name_a, number_a), (name_b, number_b))
472 if result == 0 and lower_a != lower_b:
473 # If they compared equal above but the lower case names are
474 # indeed different, don't report equality. abc012 != abc12.
475 return cmp(lower_a, lower_b)
476 return result
477 else:
478 return cmp(lower_a, lower_b)
479
480
481class HostQueueEntry(DBObject):
482 _table_name = 'afe_host_queue_entries'
483 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
484 'active', 'complete', 'deleted', 'execution_subdir',
Fang Deng51599032014-06-23 17:24:27 -0700485 'atomic_group_id', 'aborted', 'started_on', 'finished_on')
jamesrenb55378a2010-03-02 22:19:49 +0000486
Richard Barnettea31ab772016-07-13 17:50:17 -0700487 _COMPLETION_COUNT_METRIC = metrics.Counter(
488 'chromeos/autotest/scheduler/hqe_completion_count')
jamesrenb55378a2010-03-02 22:19:49 +0000489
Po-Hsien Wang085c1c52017-05-22 20:30:39 -0700490 def __init__(self, id=None, row=None, job_row=None, **kwargs):
491 """
492 @param id: ID field from afe_host_queue_entries table.
493 Either id or row should be specified for initialization.
494 @param row: The DB row for a particular HostQueueEntry.
495 Either id or row should be specified for initialization.
496 @param job_row: The DB row for the job of this HostQueueEntry.
497 """
jamesrenb55378a2010-03-02 22:19:49 +0000498 assert id or row
499 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
Po-Hsien Wang085c1c52017-05-22 20:30:39 -0700500 self.job = Job(self.job_id, row=job_row)
jamesrenb55378a2010-03-02 22:19:49 +0000501
502 if self.host_id:
beepscc9fc702013-12-02 12:45:38 -0800503 self.host = rdb_lib.get_hosts([self.host_id])[0]
504 self.host.dbg_str = self.get_dbg_str()
Dan Shi7cf3d842014-08-13 11:20:38 -0700505 self.host.metadata = get_job_metadata(self.job)
jamesrenb55378a2010-03-02 22:19:49 +0000506 else:
507 self.host = None
508
jamesrenb55378a2010-03-02 22:19:49 +0000509
510 @classmethod
511 def clone(cls, template):
512 """
513 Creates a new row using the values from a template instance.
514
515 The new instance will not exist in the database or have a valid
516 id attribute until its save() method is called.
517 """
518 assert isinstance(template, cls)
519 new_row = [getattr(template, field) for field in cls._fields]
520 clone = cls(row=new_row, new_record=True)
521 clone.id = None
522 return clone
523
524
Po-Hsien Wang085c1c52017-05-22 20:30:39 -0700525 @classmethod
526 def fetch(cls, where='', params=(), joins='', order_by=''):
527 """
528 Construct instances of our class based on the given database query.
529
530 @yields One class instance for each row fetched.
531 """
532 # Override the original fetch method to pre-fetch the jobs from the DB
533 # in order to prevent each HQE making separate DB queries.
534 rows = cls.fetch_rows(where=where, params=params, joins=joins,
535 order_by=order_by)
536 if len(rows) <= 1:
537 return [cls(id=row[0], row=row) for row in rows]
538
539 job_params = ', '.join([str(row[1]) for row in rows])
540 job_rows = Job.fetch_rows(where='id IN (%s)' % (job_params))
541 # Create a Job_id to Job_row match dictionary to match the HQE
542 # to its corresponding job.
543 job_dict = {job_row[0]: job_row for job_row in job_rows}
544 return [cls(id=row[0], row=row, job_row=job_dict.get(row[1]))
545 for row in rows]
546
547
jamesrenb55378a2010-03-02 22:19:49 +0000548 def _view_job_url(self):
549 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
550
551
552 def get_labels(self):
553 """
554 Get all labels associated with this host queue entry (either via the
555 meta_host or as a job dependency label). The labels yielded are not
556 guaranteed to be unique.
557
558 @yields Label instances associated with this host_queue_entry.
559 """
560 if self.meta_host:
561 yield Label(id=self.meta_host, always_query=False)
562 labels = Label.fetch(
563 joins="JOIN afe_jobs_dependency_labels AS deps "
564 "ON (afe_labels.id = deps.label_id)",
565 where="deps.job_id = %d" % self.job.id)
566 for label in labels:
567 yield label
568
569
570 def set_host(self, host):
571 if host:
572 logging.info('Assigning host %s to entry %s', host.hostname, self)
jamesrenb55378a2010-03-02 22:19:49 +0000573 self.update_field('host_id', host.id)
574 self.block_host(host.id)
575 else:
576 logging.info('Releasing host from %s', self)
jamesrenb55378a2010-03-02 22:19:49 +0000577 self.unblock_host(self.host.id)
578 self.update_field('host_id', None)
579
580 self.host = host
581
582
jamesrenb55378a2010-03-02 22:19:49 +0000583 def block_host(self, host_id):
584 logging.info("creating block %s/%s", self.job.id, host_id)
585 row = [0, self.job.id, host_id]
586 block = IneligibleHostQueue(row=row, new_record=True)
587 block.save()
588
589
590 def unblock_host(self, host_id):
591 logging.info("removing block %s/%s", self.job.id, host_id)
592 blocks = IneligibleHostQueue.fetch(
593 'job_id=%d and host_id=%d' % (self.job.id, host_id))
594 for block in blocks:
595 block.delete()
596
597
598 def set_execution_subdir(self, subdir=None):
599 if subdir is None:
600 assert self.host
601 subdir = self.host.hostname
602 self.update_field('execution_subdir', subdir)
603
604
605 def _get_hostname(self):
606 if self.host:
607 return self.host.hostname
608 return 'no host'
609
610
beepscc9fc702013-12-02 12:45:38 -0800611 def get_dbg_str(self):
612 """Get a debug string to identify this host.
613
614 @return: A string containing the hqe and job id.
615 """
616 try:
617 return 'HQE: %s, for job: %s' % (self.id, self.job_id)
618 except AttributeError as e:
619 return 'HQE has not been initialized yet: %s' % e
620
621
jamesrenb55378a2010-03-02 22:19:49 +0000622 def __str__(self):
623 flags = []
624 if self.active:
625 flags.append('active')
626 if self.complete:
627 flags.append('complete')
628 if self.deleted:
629 flags.append('deleted')
630 if self.aborted:
631 flags.append('aborted')
632 flags_str = ','.join(flags)
633 if flags_str:
634 flags_str = ' [%s]' % flags_str
beepscc9fc702013-12-02 12:45:38 -0800635 return ("%s and host: %s has status:%s%s" %
636 (self.get_dbg_str(), self._get_hostname(), self.status,
637 flags_str))
jamesrenb55378a2010-03-02 22:19:49 +0000638
639
640 def set_status(self, status):
641 logging.info("%s -> %s", self, status)
642
643 self.update_field('status', status)
644
645 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
646 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
jamesrenb55378a2010-03-02 22:19:49 +0000647
648 self.update_field('active', active)
jamesrenb55378a2010-03-02 22:19:49 +0000649
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800650 # The ordering of these operations is important. Once we set the
651 # complete bit this job will become indistinguishable from all
652 # the other complete jobs, unless we first set shard_id to NULL
653 # to signal to the shard_client that we need to upload it. However,
654 # we can only set both these after we've updated finished_on etc
655 # within _on_complete or the job will get synced in an intermediate
656 # state. This means that if someone sigkills the scheduler between
657 # setting finished_on and complete, we will have inconsistent jobs.
658 # This should be fine, because nothing critical checks finished_on,
659 # and the scheduler should never be killed mid-tick.
jamesrenb55378a2010-03-02 22:19:49 +0000660 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000661 self._on_complete(status)
Dale Curtis74a314b2011-06-23 14:55:46 -0700662 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000663
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800664 self.update_field('complete', complete)
665
jamesrenb55378a2010-03-02 22:19:49 +0000666 should_email_status = (status.lower() in _notify_email_statuses or
667 'all' in _notify_email_statuses)
668 if should_email_status:
669 self._email_on_status(status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700670 logging.debug('HQE Set Status Complete')
Michael Liang500dedc2014-07-15 16:16:44 -0700671
jamesrenb55378a2010-03-02 22:19:49 +0000672
jamesrene7c65cb2010-06-08 20:38:10 +0000673 def _on_complete(self, status):
Richard Barnettea31ab772016-07-13 17:50:17 -0700674 metric_fields = {'status': status.lower()}
675 if self.host:
Richard Barnette3aa7e122016-07-20 12:32:38 -0700676 metric_fields['board'] = self.host.board or ''
Richard Barnettea31ab772016-07-13 17:50:17 -0700677 if len(self.host.pools) == 1:
678 metric_fields['pool'] = self.host.pools[0]
679 else:
680 metric_fields['pool'] = 'MULTIPLE'
681 else:
682 metric_fields['board'] = 'NO_HOST'
683 metric_fields['pool'] = 'NO_HOST'
684 self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields)
jamesrene7c65cb2010-06-08 20:38:10 +0000685 if status is not models.HostQueueEntry.Status.ABORTED:
686 self.job.stop_if_necessary()
Fang Dengd44a1232014-08-18 14:40:28 -0700687 if self.started_on:
688 self.set_finished_on_now()
Paul Hobbsf508f8d2017-08-16 15:45:04 -0700689 self._log_trace()
Richard Barnettea31ab772016-07-13 17:50:17 -0700690 if self.job.shard_id is not None:
691 # If shard_id is None, the job will be synced back to the master
692 self.job.update_field('shard_id', None)
jamesrenb55378a2010-03-02 22:19:49 +0000693 if not self.execution_subdir:
694 return
695 # unregister any possible pidfiles associated with this queue entry
696 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
697 pidfile_id = _drone_manager.get_pidfile_id_from(
698 self.execution_path(), pidfile_name=pidfile_name)
699 _drone_manager.unregister_pidfile(pidfile_id)
700
Paul Hobbsf508f8d2017-08-16 15:45:04 -0700701 def _log_trace(self):
702 """Emits a Cloud Trace span for the HQE's duration."""
703 if self.started_on and self.finished_on:
704 span = cloud_trace.Span('HQE', spanId='0',
705 traceId=hqe_trace_id(self.id))
706 # TODO(phobbs) make a .SetStart() and .SetEnd() helper method
707 span.startTime = types.Timestamp()
708 span.startTime.FromDatetime(self.started_on)
709 span.endTime = types.Timestamp()
710 span.endTime.FromDatetime(self.finished_on)
711 # TODO(phobbs) any LogSpan calls need to be wrapped in this for
712 # safety during tests, so this should be caught within LogSpan.
713 try:
714 cloud_trace.LogSpan(span)
715 except IOError as e:
716 if e.errno == errno.ENOENT:
717 logging.warning('Error writing to cloud trace results '
718 'directory: %s', e)
719
jamesrenb55378a2010-03-02 22:19:49 +0000720
Eric Li6f27d4f2010-09-29 10:55:17 -0700721 def _get_status_email_contents(self, status, summary=None, hostname=None):
722 """
723 Gather info for the status notification e-mails.
724
725 If needed, we could start using the Django templating engine to create
726 the subject and the e-mail body, but that doesn't seem necessary right
727 now.
728
729 @param status: Job status text. Mandatory.
730 @param summary: Job summary text. Optional.
731 @param hostname: A hostname for the job. Optional.
732
733 @return: Tuple (subject, body) for the notification e-mail.
734 """
735 job_stats = Job(id=self.job.id).get_execution_details()
736
737 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
738 (self.job.id, self.job.name, status))
739
740 if hostname is not None:
741 subject += '| Hostname: %s ' % hostname
742
743 if status not in ["1 Failed", "Failed"]:
744 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
745
746 body = "Job ID: %s\n" % self.job.id
747 body += "Job name: %s\n" % self.job.name
748 if hostname is not None:
749 body += "Host: %s\n" % hostname
750 if summary is not None:
751 body += "Summary: %s\n" % summary
752 body += "Status: %s\n" % status
753 body += "Results interface URL: %s\n" % self._view_job_url()
754 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
755 if int(job_stats['total_executed']) > 0:
756 body += "User tests executed: %s\n" % job_stats['total_executed']
757 body += "User tests passed: %s\n" % job_stats['total_passed']
758 body += "User tests failed: %s\n" % job_stats['total_failed']
759 body += ("User tests success rate: %.2f %%\n" %
760 job_stats['success_rate'])
761
762 if job_stats['failed_rows']:
763 body += "Failures:\n"
764 body += job_stats['failed_rows']
765
766 return subject, body
767
768
jamesrenb55378a2010-03-02 22:19:49 +0000769 def _email_on_status(self, status):
770 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700771 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000772 email_manager.manager.send_email(self.job.email_list, subject, body)
773
774
775 def _email_on_job_complete(self):
776 if not self.job.is_finished():
777 return
778
Eric Li6f27d4f2010-09-29 10:55:17 -0700779 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000780 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
781 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700782 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000783 (queue_entry._get_hostname(),
784 queue_entry.status))
785
Eric Li6f27d4f2010-09-29 10:55:17 -0700786 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000787 status_counts = models.Job.objects.get_status_counts(
788 [self.job.id])[self.job.id]
789 status = ', '.join('%d %s' % (count, status) for status, count
790 in status_counts.iteritems())
791
Eric Li6f27d4f2010-09-29 10:55:17 -0700792 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000793 email_manager.manager.send_email(self.job.email_list, subject, body)
794
795
796 def schedule_pre_job_tasks(self):
797 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
798 self.job.name, self.meta_host, self.atomic_group_id,
799 self.job.id, self.id, self.host.hostname, self.status)
800
801 self._do_schedule_pre_job_tasks()
802
803
804 def _do_schedule_pre_job_tasks(self):
jamesrenb55378a2010-03-02 22:19:49 +0000805 self.job.schedule_pre_job_tasks(queue_entry=self)
806
807
808 def requeue(self):
809 assert self.host
810 self.set_status(models.HostQueueEntry.Status.QUEUED)
811 self.update_field('started_on', None)
Fang Deng51599032014-06-23 17:24:27 -0700812 self.update_field('finished_on', None)
jamesrenb55378a2010-03-02 22:19:49 +0000813 # verify/cleanup failure sets the execution subdir, so reset it here
814 self.set_execution_subdir('')
815 if self.meta_host:
816 self.set_host(None)
817
818
819 @property
820 def aborted_by(self):
821 self._load_abort_info()
822 return self._aborted_by
823
824
825 @property
826 def aborted_on(self):
827 self._load_abort_info()
828 return self._aborted_on
829
830
831 def _load_abort_info(self):
832 """ Fetch info about who aborted the job. """
833 if hasattr(self, "_aborted_by"):
834 return
835 rows = _db.execute("""
836 SELECT afe_users.login,
837 afe_aborted_host_queue_entries.aborted_on
838 FROM afe_aborted_host_queue_entries
839 INNER JOIN afe_users
840 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
841 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
842 """, (self.id,))
843 if rows:
844 self._aborted_by, self._aborted_on = rows[0]
845 else:
846 self._aborted_by = self._aborted_on = None
847
848
849 def on_pending(self):
850 """
851 Called when an entry in a synchronous job has passed verify. If the
852 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
853 them in PENDING.
854 """
855 self.set_status(models.HostQueueEntry.Status.PENDING)
Paul Hobbsf26bd192017-12-05 15:07:31 -0800856 if not self.host:
Paul Hobbs01846902017-12-05 15:32:32 -0800857 raise scheduler_lib.NoHostIdError(
Paul Hobbsf26bd192017-12-05 15:07:31 -0800858 'Failed to recover a job whose host_queue_entry_id=%r due'
859 ' to no host_id.'
860 % self.id)
jamesrenb55378a2010-03-02 22:19:49 +0000861 self.host.set_status(models.Host.Status.PENDING)
862
863 # Some debug code here: sends an email if an asynchronous job does not
864 # immediately enter Starting.
865 # TODO: Remove this once we figure out why asynchronous jobs are getting
866 # stuck in Pending.
867 self.job.run_if_ready(queue_entry=self)
868 if (self.job.synch_count == 1 and
869 self.status == models.HostQueueEntry.Status.PENDING):
870 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
871 message = 'Asynchronous job stuck in Pending'
872 email_manager.manager.enqueue_notify_email(subject, message)
873
874
875 def abort(self, dispatcher):
876 assert self.aborted and not self.complete
877
878 Status = models.HostQueueEntry.Status
Allen Li8745b0d2017-06-22 16:09:52 -0700879 if self.status in {Status.GATHERING, Status.PARSING}:
jamesrenb55378a2010-03-02 22:19:49 +0000880 # do nothing; post-job tasks will finish and then mark this entry
881 # with status "Aborted" and take care of the host
882 return
883
Allen Lifd907102017-06-22 16:13:44 -0700884 if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:
Dan Shi76af8022013-10-19 01:59:49 -0700885 # If hqe is in any of these status, it should not have any
886 # unfinished agent before it can be aborted.
887 agents = dispatcher.get_agents_for_entry(self)
888 # Agent with finished task can be left behind. This is added to
889 # handle the special case of aborting hostless job in STARTING
890 # status, in which the agent has only a HostlessQueueTask
891 # associated. The finished HostlessQueueTask will be cleaned up in
892 # the next tick, so it's safe to leave the agent there. Without
893 # filtering out finished agent, HQE abort won't be able to proceed.
894 assert all([agent.is_done() for agent in agents])
Aviv Keshet01b081d2018-08-23 15:19:47 -0700895
896 if self.host:
897 if self.status in {Status.STARTING, Status.PENDING, Status.RUNNING}:
Dan Shi76af8022013-10-19 01:59:49 -0700898 self.host.set_status(models.Host.Status.READY)
Aviv Keshet01b081d2018-08-23 15:19:47 -0700899 elif self.status in {Status.VERIFYING, Status.RESETTING}:
900 models.SpecialTask.objects.create(
901 task=models.SpecialTask.Task.CLEANUP,
902 host=models.Host.objects.get(id=self.host.id),
903 requested_by=self.job.owner_model())
904 elif self.status == Status.PROVISIONING:
905 models.SpecialTask.objects.create(
906 task=models.SpecialTask.Task.REPAIR,
907 host=models.Host.objects.get(id=self.host.id),
908 requested_by=self.job.owner_model())
jamesrenb55378a2010-03-02 22:19:49 +0000909
910 self.set_status(Status.ABORTED)
jamesrenb55378a2010-03-02 22:19:49 +0000911
912
913 def execution_tag(self):
Scott Zawalskid712cf32012-07-14 16:24:53 -0400914 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
915 'complete!=1 AND execution_subdir="" AND '
916 'status!="Queued";')
917 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
918 'status="Aborted" WHERE id=%s;')
919 try:
920 assert self.execution_subdir
921 except AssertionError:
922 # TODO(scottz): Remove temporary fix/info gathering pathway for
923 # crosbug.com/31595 once issue is root caused.
924 logging.error('No execution_subdir for host queue id:%s.', self.id)
925 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
926 for row in _db.execute(SQL_SUSPECT_ENTRIES):
Dan Shi76af8022013-10-19 01:59:49 -0700927 logging.error(row)
Scott Zawalskid712cf32012-07-14 16:24:53 -0400928 logging.error('====DB DEBUG====\n')
929 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
930 logging.error('EXECUTING: %s', fix_query)
931 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
932 raise AssertionError(('self.execution_subdir not found. '
933 'See log for details.'))
934
jamesrenb55378a2010-03-02 22:19:49 +0000935 return "%s/%s" % (self.job.tag(), self.execution_subdir)
936
937
938 def execution_path(self):
939 return self.execution_tag()
940
941
942 def set_started_on_now(self):
943 self.update_field('started_on', datetime.datetime.now())
944
945
Fang Deng51599032014-06-23 17:24:27 -0700946 def set_finished_on_now(self):
947 self.update_field('finished_on', datetime.datetime.now())
948
949
jamesrenb55378a2010-03-02 22:19:49 +0000950 def is_hostless(self):
951 return (self.host_id is None
Allen Li0f261de2017-02-01 17:11:06 -0800952 and self.meta_host is None)
jamesrenb55378a2010-03-02 22:19:49 +0000953
Allen Li9e2adb62017-11-13 11:57:34 -0800954
Paul Hobbsf508f8d2017-08-16 15:45:04 -0700955def hqe_trace_id(hqe_id):
956 """Constructs the canonical trace id based on the HQE's id.
957
958 Encodes 'HQE' in base16 and concatenates with the hex representation
959 of the HQE's id.
960
961 @param hqe_id: The HostQueueEntry's id.
962
963 Returns:
964 A trace id (in hex format)
965 """
966 return base64.b16encode('HQE') + hex(hqe_id)[2:]
jamesrenb55378a2010-03-02 22:19:49 +0000967
Allen Li9e2adb62017-11-13 11:57:34 -0800968
jamesrenb55378a2010-03-02 22:19:49 +0000969class Job(DBObject):
970 _table_name = 'afe_jobs'
971 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
972 'control_type', 'created_on', 'synch_count', 'timeout',
973 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
Simran Basi9f508a12012-11-09 12:20:44 -0800974 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800975 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
Dan Shic9e17142015-02-19 11:50:55 -0800976 'test_retry', 'run_reset', 'timeout_mins', 'shard_id',
977 'require_ssp')
jamesrenb55378a2010-03-02 22:19:49 +0000978
jamesrenb55378a2010-03-02 22:19:49 +0000979 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
980 # all status='Pending' atomic group HQEs incase a delay was running when the
981 # scheduler was restarted and no more hosts ever successfully exit Verify.
982
983 def __init__(self, id=None, row=None, **kwargs):
984 assert id or row
985 super(Job, self).__init__(id=id, row=row, **kwargs)
986 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800987 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000988
989
990 def model(self):
991 return models.Job.objects.get(id=self.id)
992
993
994 def owner_model(self):
995 # work around the fact that the Job owner field is a string, not a
996 # foreign key
997 if not self._owner_model:
998 self._owner_model = models.User.objects.get(login=self.owner)
999 return self._owner_model
1000
1001
jamesrenb55378a2010-03-02 22:19:49 +00001002 def tag(self):
1003 return "%s-%s" % (self.id, self.owner)
1004
1005
Eric Li6f27d4f2010-09-29 10:55:17 -07001006 def get_execution_details(self):
1007 """
1008 Get test execution details for this job.
1009
1010 @return: Dictionary with test execution details
1011 """
1012 def _find_test_jobs(rows):
1013 """
1014 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
1015 Those are autotest 'internal job' tests, so they should not be
1016 counted when evaluating the test stats.
1017
1018 @param rows: List of rows (matrix) with database results.
1019 """
1020 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
1021 n_test_jobs = 0
1022 for r in rows:
1023 test_name = r[0]
1024 if job_test_pattern.match(test_name):
1025 n_test_jobs += 1
1026
1027 return n_test_jobs
1028
1029 stats = {}
1030
1031 rows = _db.execute("""
1032 SELECT t.test, s.word, t.reason
1033 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
1034 WHERE t.job_idx = j.job_idx
1035 AND s.status_idx = t.status
1036 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -08001037 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -07001038 """ % self.id)
1039
Dale Curtis74a314b2011-06-23 14:55:46 -07001040 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -07001041
1042 n_test_jobs = _find_test_jobs(rows)
1043 n_test_jobs_failed = _find_test_jobs(failed_rows)
1044
1045 total_executed = len(rows) - n_test_jobs
1046 total_failed = len(failed_rows) - n_test_jobs_failed
1047
1048 if total_executed > 0:
1049 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
1050 else:
1051 success_rate = 0
1052
1053 stats['total_executed'] = total_executed
1054 stats['total_failed'] = total_failed
1055 stats['total_passed'] = total_executed - total_failed
1056 stats['success_rate'] = success_rate
1057
1058 status_header = ("Test Name", "Status", "Reason")
1059 if failed_rows:
1060 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
1061 status_header)
1062 else:
1063 stats['failed_rows'] = ''
1064
1065 time_row = _db.execute("""
1066 SELECT started_time, finished_time
1067 FROM tko_jobs
1068 WHERE afe_job_id = %s
1069 """ % self.id)
1070
1071 if time_row:
1072 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -08001073 try:
1074 delta = t_end - t_begin
1075 minutes, seconds = divmod(delta.seconds, 60)
1076 hours, minutes = divmod(minutes, 60)
1077 stats['execution_time'] = ("%02d:%02d:%02d" %
1078 (hours, minutes, seconds))
1079 # One of t_end or t_begin are None
1080 except TypeError:
1081 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -07001082 else:
1083 stats['execution_time'] = '(none)'
1084
1085 return stats
1086
1087
jamesrenb55378a2010-03-02 22:19:49 +00001088 def keyval_dict(self):
1089 return self.model().keyval_dict()
1090
1091
jamesrenb55378a2010-03-02 22:19:49 +00001092 def _pending_count(self):
1093 """The number of HostQueueEntries for this job in the Pending state."""
1094 pending_entries = models.HostQueueEntry.objects.filter(
1095 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1096 return pending_entries.count()
1097
1098
jamesrenb55378a2010-03-02 22:19:49 +00001099 def is_ready(self):
jamesrenb55378a2010-03-02 22:19:49 +00001100 pending_count = self._pending_count()
Allen Li0f261de2017-02-01 17:11:06 -08001101 ready = (pending_count >= self.synch_count)
jamesrenb55378a2010-03-02 22:19:49 +00001102
1103 if not ready:
1104 logging.info(
Allen Li0f261de2017-02-01 17:11:06 -08001105 'Job %s not ready: %s pending, %s required ',
1106 self, pending_count, self.synch_count)
jamesrenb55378a2010-03-02 22:19:49 +00001107
1108 return ready
1109
1110
1111 def num_machines(self, clause = None):
1112 sql = "job_id=%s" % self.id
1113 if clause:
1114 sql += " AND (%s)" % clause
1115 return self.count(sql, table='afe_host_queue_entries')
1116
1117
1118 def num_queued(self):
1119 return self.num_machines('not complete')
1120
1121
1122 def num_active(self):
1123 return self.num_machines('active')
1124
1125
1126 def num_complete(self):
1127 return self.num_machines('complete')
1128
1129
1130 def is_finished(self):
1131 return self.num_complete() == self.num_machines()
1132
1133
Laurence Goodby303d2662016-07-01 15:57:04 -07001134 def _not_yet_run_entries(self, include_active=True):
1135 if include_active:
1136 statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
1137 else:
1138 statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
jamesrenb55378a2010-03-02 22:19:49 +00001139 return models.HostQueueEntry.objects.filter(job=self.id,
1140 status__in=statuses)
1141
1142
1143 def _stop_all_entries(self):
Laurence Goodby303d2662016-07-01 15:57:04 -07001144 """Stops the job's inactive pre-job HQEs."""
jamesrenb55378a2010-03-02 22:19:49 +00001145 entries_to_stop = self._not_yet_run_entries(
Laurence Goodby303d2662016-07-01 15:57:04 -07001146 include_active=False)
jamesrenb55378a2010-03-02 22:19:49 +00001147 for child_entry in entries_to_stop:
1148 assert not child_entry.complete, (
1149 '%s status=%s, active=%s, complete=%s' %
1150 (child_entry.id, child_entry.status, child_entry.active,
1151 child_entry.complete))
1152 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1153 child_entry.host.status = models.Host.Status.READY
1154 child_entry.host.save()
1155 child_entry.status = models.HostQueueEntry.Status.STOPPED
1156 child_entry.save()
1157
1158
1159 def stop_if_necessary(self):
1160 not_yet_run = self._not_yet_run_entries()
1161 if not_yet_run.count() < self.synch_count:
1162 self._stop_all_entries()
1163
1164
Allen Li0f261de2017-02-01 17:11:06 -08001165 def _next_group_name(self):
jamesrenb55378a2010-03-02 22:19:49 +00001166 """@returns a directory name to use for the next host group results."""
Allen Li88197c42018-04-09 15:48:17 -07001167 group_count_re = re.compile(r'group(\d+)')
jamesrenb55378a2010-03-02 22:19:49 +00001168 query = models.HostQueueEntry.objects.filter(
1169 job=self.id).values('execution_subdir').distinct()
1170 subdirs = (entry['execution_subdir'] for entry in query)
1171 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1172 ids = [int(match.group(1)) for match in group_matches if match]
1173 if ids:
1174 next_id = max(ids) + 1
1175 else:
1176 next_id = 0
Allen Li88197c42018-04-09 15:48:17 -07001177 return 'group%d' % (next_id,)
jamesrenb55378a2010-03-02 22:19:49 +00001178
1179
1180 def get_group_entries(self, queue_entry_from_group):
1181 """
1182 @param queue_entry_from_group: A HostQueueEntry instance to find other
1183 group entries on this job for.
1184
1185 @returns A list of HostQueueEntry objects all executing this job as
1186 part of the same group as the one supplied (having the same
1187 execution_subdir).
1188 """
1189 execution_subdir = queue_entry_from_group.execution_subdir
1190 return list(HostQueueEntry.fetch(
1191 where='job_id=%s AND execution_subdir=%s',
1192 params=(self.id, execution_subdir)))
1193
1194
1195 def _should_run_cleanup(self, queue_entry):
1196 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1197 return True
1198 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1199 return queue_entry.host.dirty
1200 return False
1201
1202
1203 def _should_run_verify(self, queue_entry):
1204 do_not_verify = (queue_entry.host.protection ==
1205 host_protections.Protection.DO_NOT_VERIFY)
1206 if do_not_verify:
1207 return False
Alex Miller6ee996f2013-02-28 13:53:52 -08001208 # If RebootBefore is set to NEVER, then we won't run reset because
1209 # we can't cleanup, so we need to weaken a Reset into a Verify.
1210 weaker_reset = (self.run_reset and
1211 self.reboot_before == model_attributes.RebootBefore.NEVER)
1212 return self.run_verify or weaker_reset
jamesrenb55378a2010-03-02 22:19:49 +00001213
1214
Dan Shi07e09af2013-04-12 09:31:29 -07001215 def _should_run_reset(self, queue_entry):
1216 can_verify = (queue_entry.host.protection !=
1217 host_protections.Protection.DO_NOT_VERIFY)
1218 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
Allen Lif1dd3c22018-03-07 14:40:42 -08001219 return (can_reboot and can_verify
1220 and (self.run_reset
1221 or (self._should_run_cleanup(queue_entry)
1222 and self._should_run_verify(queue_entry))))
Dan Shi07e09af2013-04-12 09:31:29 -07001223
1224
Alex Millerdfff2fd2013-05-28 13:05:06 -07001225 def _should_run_provision(self, queue_entry):
1226 """
1227 Determine if the queue_entry needs to have a provision task run before
1228 it to provision queue_entry.host.
1229
1230 @param queue_entry: The host queue entry in question.
1231 @returns: True if we should schedule a provision task, False otherwise.
1232
1233 """
1234 # If we get to this point, it means that the scheduler has already
1235 # vetted that all the unprovisionable labels match, so we can just
1236 # find all labels on the job that aren't on the host to get the list
1237 # of what we need to provision. (See the scheduling logic in
1238 # host_scheduler.py:is_host_eligable_for_job() where we discard all
Alex Miller627694a2014-05-01 18:04:29 -07001239 # actionable labels when assigning jobs to hosts.)
Alex Millerdfff2fd2013-05-28 13:05:06 -07001240 job_labels = {x.name for x in queue_entry.get_labels()}
Dan Shie44f9c02016-02-18 13:25:05 -08001241 # Skip provision if `skip_provision` is listed in the job labels.
1242 if provision.SKIP_PROVISION in job_labels:
1243 return False
Alex Millerdfff2fd2013-05-28 13:05:06 -07001244 _, host_labels = queue_entry.host.platform_and_labels()
Alex Miller627694a2014-05-01 18:04:29 -07001245 # If there are any labels on the job that are not on the host and they
1246 # are labels that provisioning knows how to change, then that means
1247 # there is provisioning work to do. If there's no provisioning work to
1248 # do, then obviously we have no reason to schedule a provision task!
1249 diff = job_labels - set(host_labels)
1250 if any([provision.Provision.acts_on(x) for x in diff]):
Alex Millerdfff2fd2013-05-28 13:05:06 -07001251 return True
1252 return False
1253
1254
Alex Miller42437f92013-05-28 12:58:54 -07001255 def _queue_special_task(self, queue_entry, task):
jamesrenb55378a2010-03-02 22:19:49 +00001256 """
Alex Miller42437f92013-05-28 12:58:54 -07001257 Create a special task and associate it with a host queue entry.
jamesrenb55378a2010-03-02 22:19:49 +00001258
Alex Miller42437f92013-05-28 12:58:54 -07001259 @param queue_entry: The queue entry this special task should be
1260 associated with.
1261 @param task: One of the members of the enum models.SpecialTask.Task.
1262 @returns: None
1263
jamesrenb55378a2010-03-02 22:19:49 +00001264 """
jamesrenb55378a2010-03-02 22:19:49 +00001265 models.SpecialTask.objects.create(
1266 host=models.Host.objects.get(id=queue_entry.host_id),
1267 queue_entry=queue_entry, task=task)
1268
1269
Alex Miller42437f92013-05-28 12:58:54 -07001270 def schedule_pre_job_tasks(self, queue_entry):
1271 """
1272 Queue all of the special tasks that need to be run before a host
1273 queue entry may run.
1274
1275 If no special taskes need to be scheduled, then |on_pending| will be
1276 called directly.
1277
1278 @returns None
1279
1280 """
1281 task_queued = False
1282 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
1283
Dan Shi12ab5272014-12-16 15:42:07 -08001284 if self._should_run_provision(queue_entry):
1285 self._queue_special_task(hqe_model,
1286 models.SpecialTask.Task.PROVISION)
1287 task_queued = True
1288 elif self._should_run_reset(queue_entry):
Dan Shi07e09af2013-04-12 09:31:29 -07001289 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
Alex Miller42437f92013-05-28 12:58:54 -07001290 task_queued = True
Dan Shi07e09af2013-04-12 09:31:29 -07001291 else:
1292 if self._should_run_cleanup(queue_entry):
1293 self._queue_special_task(hqe_model,
1294 models.SpecialTask.Task.CLEANUP)
1295 task_queued = True
1296 if self._should_run_verify(queue_entry):
1297 self._queue_special_task(hqe_model,
1298 models.SpecialTask.Task.VERIFY)
1299 task_queued = True
Alex Miller42437f92013-05-28 12:58:54 -07001300
1301 if not task_queued:
1302 queue_entry.on_pending()
1303
1304
Allen Li0f261de2017-02-01 17:11:06 -08001305 def _assign_new_group(self, queue_entries):
jamesrenb55378a2010-03-02 22:19:49 +00001306 if len(queue_entries) == 1:
1307 group_subdir_name = queue_entries[0].host.hostname
1308 else:
Allen Li0f261de2017-02-01 17:11:06 -08001309 group_subdir_name = self._next_group_name()
jamesrenb55378a2010-03-02 22:19:49 +00001310 logging.info('Running synchronous job %d hosts %s as %s',
1311 self.id, [entry.host.hostname for entry in queue_entries],
1312 group_subdir_name)
1313
1314 for queue_entry in queue_entries:
1315 queue_entry.set_execution_subdir(group_subdir_name)
1316
1317
1318 def _choose_group_to_run(self, include_queue_entry):
1319 """
1320 @returns A tuple containing a list of HostQueueEntry instances to be
1321 used to run this Job, a string group name to suggest giving
1322 to this job in the results database.
1323 """
jamesrenb55378a2010-03-02 22:19:49 +00001324 chosen_entries = [include_queue_entry]
Allen Li0f261de2017-02-01 17:11:06 -08001325 num_entries_wanted = self.synch_count
jamesrenb55378a2010-03-02 22:19:49 +00001326 num_entries_wanted -= len(chosen_entries)
1327
1328 if num_entries_wanted > 0:
1329 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1330 pending_entries = list(HostQueueEntry.fetch(
1331 where=where_clause,
1332 params=(self.id, include_queue_entry.id)))
1333
1334 # Sort the chosen hosts by hostname before slicing.
1335 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1336 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1337 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1338 chosen_entries += pending_entries[:num_entries_wanted]
1339
1340 # Sanity check. We'll only ever be called if this can be met.
1341 if len(chosen_entries) < self.synch_count:
1342 message = ('job %s got less than %s chosen entries: %s' % (
1343 self.id, self.synch_count, chosen_entries))
1344 logging.error(message)
1345 email_manager.manager.enqueue_notify_email(
1346 'Job not started, too few chosen entries', message)
1347 return []
1348
Allen Li0f261de2017-02-01 17:11:06 -08001349 self._assign_new_group(chosen_entries)
jamesrenb55378a2010-03-02 22:19:49 +00001350 return chosen_entries
1351
1352
1353 def run_if_ready(self, queue_entry):
1354 """
1355 Run this job by kicking its HQEs into status='Starting' if enough
1356 hosts are ready for it to run.
1357
1358 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1359 ready to run.
1360 """
1361 if not self.is_ready():
1362 self.stop_if_necessary()
jamesrenb55378a2010-03-02 22:19:49 +00001363 else:
1364 self.run(queue_entry)
1365
1366
jamesrenb55378a2010-03-02 22:19:49 +00001367 def request_abort(self):
1368 """Request that this Job be aborted on the next scheduler cycle."""
1369 self.model().abort()
1370
1371
jamesrenb55378a2010-03-02 22:19:49 +00001372 def run(self, queue_entry):
1373 """
1374 @param queue_entry: The HostQueueEntry instance calling this method.
1375 """
jamesrenb55378a2010-03-02 22:19:49 +00001376 queue_entries = self._choose_group_to_run(queue_entry)
1377 if queue_entries:
1378 self._finish_run(queue_entries)
1379
1380
1381 def _finish_run(self, queue_entries):
1382 for queue_entry in queue_entries:
1383 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
jamesrenb55378a2010-03-02 22:19:49 +00001384
1385
1386 def __str__(self):
1387 return '%s-%s' % (self.id, self.owner)