blob: 60436b870948dcbd78f6d263b35dd463440fc416 [file] [log] [blame]
Richard Barnettea31ab772016-07-13 17:50:17 -07001# pylint: disable=missing-docstring
Aviv Keshet0b9cfc92013-02-05 11:36:02 -08002
jamesrenb55378a2010-03-02 22:19:49 +00003"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out. Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
14 one of these statuses, an email will be sent to the job's email_list.
15 comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
Richard Barnettea31ab772016-07-13 17:50:17 -070021import datetime
22import itertools
23import logging
24import os
25import re
Richard Barnettea31ab772016-07-13 17:50:17 -070026import time
27import weakref
28
Dan Shidfea3682014-08-10 23:38:40 -070029from autotest_lib.client.common_lib import global_config, host_protections
30from autotest_lib.client.common_lib import time_utils
31from autotest_lib.client.common_lib import utils
Gabe Blackb72f4fb2015-01-20 16:47:13 -080032from autotest_lib.client.common_lib.cros.graphite import autotest_es
jamesrenb55378a2010-03-02 22:19:49 +000033from autotest_lib.frontend.afe import models, model_attributes
jamesrenb55378a2010-03-02 22:19:49 +000034from autotest_lib.scheduler import drone_manager, email_manager
beepscc9fc702013-12-02 12:45:38 -080035from autotest_lib.scheduler import rdb_lib
jamesrenb55378a2010-03-02 22:19:49 +000036from autotest_lib.scheduler import scheduler_config
Prashanth B0e960282014-05-13 19:38:28 -070037from autotest_lib.scheduler import scheduler_lib
Allen Li6a612392016-08-18 12:09:32 -070038from autotest_lib.server import afe_urls
Alex Miller627694a2014-05-01 18:04:29 -070039from autotest_lib.server.cros import provision
Michael Liangda8c60a2014-06-03 13:24:51 -070040
Dan Shi5e2efb72017-02-07 11:40:23 -080041try:
42 from chromite.lib import metrics
43except ImportError:
44 metrics = utils.metrics_mock
Prathmesh Prabhuc7e97602016-11-22 18:36:59 -080045
jamesrenb55378a2010-03-02 22:19:49 +000046
47_notify_email_statuses = []
48_base_url = None
49
50_db = None
51_drone_manager = None
52
53def initialize():
54 global _db
Prashanth B0e960282014-05-13 19:38:28 -070055 _db = scheduler_lib.ConnectionManager().get_connection()
jamesrenb55378a2010-03-02 22:19:49 +000056
57 notify_statuses_list = global_config.global_config.get_config_value(
58 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
59 default='')
60 global _notify_email_statuses
61 _notify_email_statuses = [status for status in
62 re.split(r'[\s,;:]', notify_statuses_list.lower())
63 if status]
64
65 # AUTOTEST_WEB.base_url is still a supported config option as some people
66 # may wish to override the entire url.
67 global _base_url
68 config_base_url = global_config.global_config.get_config_value(
69 scheduler_config.CONFIG_SECTION, 'base_url', default='')
70 if config_base_url:
71 _base_url = config_base_url
72 else:
Allen Li6a612392016-08-18 12:09:32 -070073 _base_url = afe_urls.ROOT_URL
jamesrenb55378a2010-03-02 22:19:49 +000074
75 initialize_globals()
76
77
78def initialize_globals():
79 global _drone_manager
80 _drone_manager = drone_manager.instance()
81
82
Dan Shi7cf3d842014-08-13 11:20:38 -070083def get_job_metadata(job):
84 """Get a dictionary of the job information.
85
86 The return value is a dictionary that includes job information like id,
87 name and parent job information. The value will be stored in metadata
88 database.
89
90 @param job: A Job object.
91 @return: A dictionary containing the job id, owner and name.
92 """
93 if not job:
94 logging.error('Job is None, no metadata returned.')
95 return {}
96 try:
97 return {'job_id': job.id,
98 'owner': job.owner,
99 'job_name': job.name,
100 'parent_job_id': job.parent_job_id}
101 except AttributeError as e:
102 logging.error('Job has missing attribute: %s', e)
103 return {}
104
105
jamesrenb55378a2010-03-02 22:19:49 +0000106class DelayedCallTask(object):
107 """
108 A task object like AgentTask for an Agent to run that waits for the
109 specified amount of time to have elapsed before calling the supplied
110 callback once and finishing. If the callback returns anything, it is
111 assumed to be a new Agent instance and will be added to the dispatcher.
112
113 @attribute end_time: The absolute posix time after which this task will
114 call its callback when it is polled and be finished.
115
116 Also has all attributes required by the Agent class.
117 """
118 def __init__(self, delay_seconds, callback, now_func=None):
119 """
120 @param delay_seconds: The delay in seconds from now that this task
121 will call the supplied callback and be done.
122 @param callback: A callable to be called by this task once after at
123 least delay_seconds time has elapsed. It must return None
124 or a new Agent instance.
125 @param now_func: A time.time like function. Default: time.time.
126 Used for testing.
127 """
128 assert delay_seconds > 0
129 assert callable(callback)
130 if not now_func:
131 now_func = time.time
132 self._now_func = now_func
133 self._callback = callback
134
135 self.end_time = self._now_func() + delay_seconds
136
137 # These attributes are required by Agent.
138 self.aborted = False
139 self.host_ids = ()
140 self.success = False
141 self.queue_entry_ids = ()
142 self.num_processes = 0
143
144
145 def poll(self):
146 if not self.is_done() and self._now_func() >= self.end_time:
147 self._callback()
148 self.success = True
149
150
151 def is_done(self):
152 return self.success or self.aborted
153
154
155 def abort(self):
156 self.aborted = True
157
158
159class DBError(Exception):
160 """Raised by the DBObject constructor when its select fails."""
161
162
163class DBObject(object):
164 """A miniature object relational model for the database."""
165
166 # Subclasses MUST override these:
167 _table_name = ''
168 _fields = ()
169
170 # A mapping from (type, id) to the instance of the object for that
171 # particular id. This prevents us from creating new Job() and Host()
172 # instances for every HostQueueEntry object that we instantiate as
173 # multiple HQEs often share the same Job.
174 _instances_by_type_and_id = weakref.WeakValueDictionary()
175 _initialized = False
176
177
178 def __new__(cls, id=None, **kwargs):
179 """
180 Look to see if we already have an instance for this particular type
181 and id. If so, use it instead of creating a duplicate instance.
182 """
183 if id is not None:
184 instance = cls._instances_by_type_and_id.get((cls, id))
185 if instance:
186 return instance
187 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
188
189
190 def __init__(self, id=None, row=None, new_record=False, always_query=True):
191 assert bool(id) or bool(row)
192 if id is not None and row is not None:
193 assert id == row[0]
194 assert self._table_name, '_table_name must be defined in your class'
195 assert self._fields, '_fields must be defined in your class'
196 if not new_record:
197 if self._initialized and not always_query:
198 return # We've already been initialized.
199 if id is None:
200 id = row[0]
201 # Tell future constructors to use us instead of re-querying while
202 # this instance is still around.
203 self._instances_by_type_and_id[(type(self), id)] = self
204
205 self.__table = self._table_name
206
207 self.__new_record = new_record
208
209 if row is None:
210 row = self._fetch_row_from_db(id)
211
212 if self._initialized:
213 differences = self._compare_fields_in_row(row)
214 if differences:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -0700215 logging.warning(
jamesrenb55378a2010-03-02 22:19:49 +0000216 'initialized %s %s instance requery is updating: %s',
217 type(self), self.id, differences)
218 self._update_fields_from_row(row)
219 self._initialized = True
220
221
222 @classmethod
223 def _clear_instance_cache(cls):
224 """Used for testing, clear the internal instance cache."""
225 cls._instances_by_type_and_id.clear()
226
227
228 def _fetch_row_from_db(self, row_id):
229 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
230 rows = _db.execute(sql, (row_id,))
231 if not rows:
232 raise DBError("row not found (table=%s, row id=%s)"
233 % (self.__table, row_id))
234 return rows[0]
235
236
237 def _assert_row_length(self, row):
238 assert len(row) == len(self._fields), (
239 "table = %s, row = %s/%d, fields = %s/%d" % (
240 self.__table, row, len(row), self._fields, len(self._fields)))
241
242
243 def _compare_fields_in_row(self, row):
244 """
245 Given a row as returned by a SELECT query, compare it to our existing in
246 memory fields. Fractional seconds are stripped from datetime values
247 before comparison.
248
249 @param row - A sequence of values corresponding to fields named in
250 The class attribute _fields.
251
252 @returns A dictionary listing the differences keyed by field name
253 containing tuples of (current_value, row_value).
254 """
255 self._assert_row_length(row)
256 differences = {}
jamesrenb55378a2010-03-02 22:19:49 +0000257 for field, row_value in itertools.izip(self._fields, row):
258 current_value = getattr(self, field)
259 if (isinstance(current_value, datetime.datetime)
260 and isinstance(row_value, datetime.datetime)):
Dan Shidfea3682014-08-10 23:38:40 -0700261 current_value = current_value.strftime(time_utils.TIME_FMT)
262 row_value = row_value.strftime(time_utils.TIME_FMT)
jamesrenb55378a2010-03-02 22:19:49 +0000263 if current_value != row_value:
264 differences[field] = (current_value, row_value)
265 return differences
266
267
268 def _update_fields_from_row(self, row):
269 """
270 Update our field attributes using a single row returned by SELECT.
271
272 @param row - A sequence of values corresponding to fields named in
273 the class fields list.
274 """
275 self._assert_row_length(row)
276
277 self._valid_fields = set()
278 for field, value in itertools.izip(self._fields, row):
279 setattr(self, field, value)
280 self._valid_fields.add(field)
281
282 self._valid_fields.remove('id')
283
284
285 def update_from_database(self):
286 assert self.id is not None
287 row = self._fetch_row_from_db(self.id)
288 self._update_fields_from_row(row)
289
290
291 def count(self, where, table = None):
292 if not table:
293 table = self.__table
294
295 rows = _db.execute("""
296 SELECT count(*) FROM %s
297 WHERE %s
298 """ % (table, where))
299
300 assert len(rows) == 1
301
302 return int(rows[0][0])
303
304
305 def update_field(self, field, value):
306 assert field in self._valid_fields
307
308 if getattr(self, field) == value:
309 return
310
311 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
312 _db.execute(query, (value, self.id))
313
314 setattr(self, field, value)
315
316
317 def save(self):
318 if self.__new_record:
319 keys = self._fields[1:] # avoid id
320 columns = ','.join([str(key) for key in keys])
321 values = []
322 for key in keys:
323 value = getattr(self, key)
324 if value is None:
325 values.append('NULL')
326 else:
327 values.append('"%s"' % value)
328 values_str = ','.join(values)
329 query = ('INSERT INTO %s (%s) VALUES (%s)' %
330 (self.__table, columns, values_str))
331 _db.execute(query)
332 # Update our id to the one the database just assigned to us.
333 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
334
335
336 def delete(self):
337 self._instances_by_type_and_id.pop((type(self), id), None)
338 self._initialized = False
339 self._valid_fields.clear()
340 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
341 _db.execute(query, (self.id,))
342
343
344 @staticmethod
345 def _prefix_with(string, prefix):
346 if string:
347 string = prefix + string
348 return string
349
350
351 @classmethod
352 def fetch(cls, where='', params=(), joins='', order_by=''):
353 """
354 Construct instances of our class based on the given database query.
355
356 @yields One class instance for each row fetched.
357 """
358 order_by = cls._prefix_with(order_by, 'ORDER BY ')
359 where = cls._prefix_with(where, 'WHERE ')
360 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
361 '%(where)s %(order_by)s' % {'table' : cls._table_name,
362 'joins' : joins,
363 'where' : where,
364 'order_by' : order_by})
365 rows = _db.execute(query, params)
366 return [cls(id=row[0], row=row) for row in rows]
367
368
369class IneligibleHostQueue(DBObject):
370 _table_name = 'afe_ineligible_host_queues'
371 _fields = ('id', 'job_id', 'host_id')
372
373
374class AtomicGroup(DBObject):
375 _table_name = 'afe_atomic_groups'
376 _fields = ('id', 'name', 'description', 'max_number_of_machines',
377 'invalid')
378
379
380class Label(DBObject):
381 _table_name = 'afe_labels'
382 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
383 'only_if_needed', 'atomic_group_id')
384
385
386 def __repr__(self):
387 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
388 self.name, self.id, self.atomic_group_id)
389
390
391class Host(DBObject):
392 _table_name = 'afe_hosts'
Prathmesh Prabhu2d3884a2017-03-06 14:50:47 -0800393 # TODO(ayatane): synch_id is not used, remove after fixing DB.
394 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
beepscc9fc702013-12-02 12:45:38 -0800395 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
Matthew Sartori68186332015-04-27 17:19:53 -0700396 'leased', 'shard_id', 'lock_reason')
jamesrenb55378a2010-03-02 22:19:49 +0000397
398
399 def set_status(self,status):
400 logging.info('%s -> %s', self.hostname, status)
401 self.update_field('status',status)
402
403
404 def platform_and_labels(self):
405 """
406 Returns a tuple (platform_name, list_of_all_label_names).
407 """
408 rows = _db.execute("""
409 SELECT afe_labels.name, afe_labels.platform
410 FROM afe_labels
411 INNER JOIN afe_hosts_labels ON
412 afe_labels.id = afe_hosts_labels.label_id
413 WHERE afe_hosts_labels.host_id = %s
414 ORDER BY afe_labels.name
415 """, (self.id,))
416 platform = None
417 all_labels = []
418 for label_name, is_platform in rows:
419 if is_platform:
420 platform = label_name
421 all_labels.append(label_name)
422 return platform, all_labels
423
424
425 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
426
427
428 @classmethod
429 def cmp_for_sort(cls, a, b):
430 """
431 A comparison function for sorting Host objects by hostname.
432
433 This strips any trailing numeric digits, ignores leading 0s and
434 compares hostnames by the leading name and the trailing digits as a
435 number. If both hostnames do not match this pattern, they are simply
436 compared as lower case strings.
437
438 Example of how hostnames will be sorted:
439
440 alice, host1, host2, host09, host010, host10, host11, yolkfolk
441
442 This hopefully satisfy most people's hostname sorting needs regardless
443 of their exact naming schemes. Nobody sane should have both a host10
444 and host010 (but the algorithm works regardless).
445 """
446 lower_a = a.hostname.lower()
447 lower_b = b.hostname.lower()
448 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
449 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
450 if match_a and match_b:
451 name_a, number_a_str = match_a.groups()
452 name_b, number_b_str = match_b.groups()
453 number_a = int(number_a_str.lstrip('0'))
454 number_b = int(number_b_str.lstrip('0'))
455 result = cmp((name_a, number_a), (name_b, number_b))
456 if result == 0 and lower_a != lower_b:
457 # If they compared equal above but the lower case names are
458 # indeed different, don't report equality. abc012 != abc12.
459 return cmp(lower_a, lower_b)
460 return result
461 else:
462 return cmp(lower_a, lower_b)
463
464
465class HostQueueEntry(DBObject):
466 _table_name = 'afe_host_queue_entries'
467 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
468 'active', 'complete', 'deleted', 'execution_subdir',
Fang Deng51599032014-06-23 17:24:27 -0700469 'atomic_group_id', 'aborted', 'started_on', 'finished_on')
jamesrenb55378a2010-03-02 22:19:49 +0000470
Richard Barnettea31ab772016-07-13 17:50:17 -0700471 _COMPLETION_COUNT_METRIC = metrics.Counter(
472 'chromeos/autotest/scheduler/hqe_completion_count')
jamesrenb55378a2010-03-02 22:19:49 +0000473
474 def __init__(self, id=None, row=None, **kwargs):
475 assert id or row
476 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
477 self.job = Job(self.job_id)
478
479 if self.host_id:
beepscc9fc702013-12-02 12:45:38 -0800480 self.host = rdb_lib.get_hosts([self.host_id])[0]
481 self.host.dbg_str = self.get_dbg_str()
Dan Shi7cf3d842014-08-13 11:20:38 -0700482 self.host.metadata = get_job_metadata(self.job)
jamesrenb55378a2010-03-02 22:19:49 +0000483 else:
484 self.host = None
485
486 if self.atomic_group_id:
487 self.atomic_group = AtomicGroup(self.atomic_group_id,
488 always_query=False)
489 else:
490 self.atomic_group = None
491
jamesrenb55378a2010-03-02 22:19:49 +0000492
493 @classmethod
494 def clone(cls, template):
495 """
496 Creates a new row using the values from a template instance.
497
498 The new instance will not exist in the database or have a valid
499 id attribute until its save() method is called.
500 """
501 assert isinstance(template, cls)
502 new_row = [getattr(template, field) for field in cls._fields]
503 clone = cls(row=new_row, new_record=True)
504 clone.id = None
505 return clone
506
507
508 def _view_job_url(self):
509 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
510
511
512 def get_labels(self):
513 """
514 Get all labels associated with this host queue entry (either via the
515 meta_host or as a job dependency label). The labels yielded are not
516 guaranteed to be unique.
517
518 @yields Label instances associated with this host_queue_entry.
519 """
520 if self.meta_host:
521 yield Label(id=self.meta_host, always_query=False)
522 labels = Label.fetch(
523 joins="JOIN afe_jobs_dependency_labels AS deps "
524 "ON (afe_labels.id = deps.label_id)",
525 where="deps.job_id = %d" % self.job.id)
526 for label in labels:
527 yield label
528
529
530 def set_host(self, host):
531 if host:
532 logging.info('Assigning host %s to entry %s', host.hostname, self)
jamesrenb55378a2010-03-02 22:19:49 +0000533 self.update_field('host_id', host.id)
534 self.block_host(host.id)
535 else:
536 logging.info('Releasing host from %s', self)
jamesrenb55378a2010-03-02 22:19:49 +0000537 self.unblock_host(self.host.id)
538 self.update_field('host_id', None)
539
540 self.host = host
541
542
jamesrenb55378a2010-03-02 22:19:49 +0000543 def block_host(self, host_id):
544 logging.info("creating block %s/%s", self.job.id, host_id)
545 row = [0, self.job.id, host_id]
546 block = IneligibleHostQueue(row=row, new_record=True)
547 block.save()
548
549
550 def unblock_host(self, host_id):
551 logging.info("removing block %s/%s", self.job.id, host_id)
552 blocks = IneligibleHostQueue.fetch(
553 'job_id=%d and host_id=%d' % (self.job.id, host_id))
554 for block in blocks:
555 block.delete()
556
557
558 def set_execution_subdir(self, subdir=None):
559 if subdir is None:
560 assert self.host
561 subdir = self.host.hostname
562 self.update_field('execution_subdir', subdir)
563
564
565 def _get_hostname(self):
566 if self.host:
567 return self.host.hostname
568 return 'no host'
569
570
beepscc9fc702013-12-02 12:45:38 -0800571 def get_dbg_str(self):
572 """Get a debug string to identify this host.
573
574 @return: A string containing the hqe and job id.
575 """
576 try:
577 return 'HQE: %s, for job: %s' % (self.id, self.job_id)
578 except AttributeError as e:
579 return 'HQE has not been initialized yet: %s' % e
580
581
jamesrenb55378a2010-03-02 22:19:49 +0000582 def __str__(self):
583 flags = []
584 if self.active:
585 flags.append('active')
586 if self.complete:
587 flags.append('complete')
588 if self.deleted:
589 flags.append('deleted')
590 if self.aborted:
591 flags.append('aborted')
592 flags_str = ','.join(flags)
593 if flags_str:
594 flags_str = ' [%s]' % flags_str
beepscc9fc702013-12-02 12:45:38 -0800595 return ("%s and host: %s has status:%s%s" %
596 (self.get_dbg_str(), self._get_hostname(), self.status,
597 flags_str))
jamesrenb55378a2010-03-02 22:19:49 +0000598
599
Michael Liang0d747462014-07-17 14:19:53 -0700600 def record_state(self, type_str, state, value):
Michael Liang500dedc2014-07-15 16:16:44 -0700601 """Record metadata in elasticsearch.
602
603 If ES configured to use http, then we will time that http request.
604 Otherwise, it uses UDP, so we will not need to time it.
605
Michael Liang0d747462014-07-17 14:19:53 -0700606 @param type_str: sets the _type field in elasticsearch db.
Michael Liang500dedc2014-07-15 16:16:44 -0700607 @param state: string representing what state we are recording,
608 e.g. 'status'
609 @param value: value of the state, e.g. 'verifying'
610 """
611 metadata = {
612 'time_changed': time.time(),
613 state: value,
614 'job_id': self.job_id,
615 }
616 if self.host:
617 metadata['hostname'] = self.host.hostname
Gabe Blackb72f4fb2015-01-20 16:47:13 -0800618 autotest_es.post(type_str=type_str, metadata=metadata)
Michael Liang500dedc2014-07-15 16:16:44 -0700619
620
jamesrenb55378a2010-03-02 22:19:49 +0000621 def set_status(self, status):
622 logging.info("%s -> %s", self, status)
623
624 self.update_field('status', status)
625
626 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
627 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
628 assert not (active and complete)
629
630 self.update_field('active', active)
jamesrenb55378a2010-03-02 22:19:49 +0000631
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800632 # The ordering of these operations is important. Once we set the
633 # complete bit this job will become indistinguishable from all
634 # the other complete jobs, unless we first set shard_id to NULL
635 # to signal to the shard_client that we need to upload it. However,
636 # we can only set both these after we've updated finished_on etc
637 # within _on_complete or the job will get synced in an intermediate
638 # state. This means that if someone sigkills the scheduler between
639 # setting finished_on and complete, we will have inconsistent jobs.
640 # This should be fine, because nothing critical checks finished_on,
641 # and the scheduler should never be killed mid-tick.
jamesrenb55378a2010-03-02 22:19:49 +0000642 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000643 self._on_complete(status)
Dale Curtis74a314b2011-06-23 14:55:46 -0700644 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000645
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800646 self.update_field('complete', complete)
647
jamesrenb55378a2010-03-02 22:19:49 +0000648 should_email_status = (status.lower() in _notify_email_statuses or
649 'all' in _notify_email_statuses)
650 if should_email_status:
651 self._email_on_status(status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700652 logging.debug('HQE Set Status Complete')
Michael Liang500dedc2014-07-15 16:16:44 -0700653 self.record_state('hqe_status', 'status', status)
654
jamesrenb55378a2010-03-02 22:19:49 +0000655
jamesrene7c65cb2010-06-08 20:38:10 +0000656 def _on_complete(self, status):
Richard Barnettea31ab772016-07-13 17:50:17 -0700657 metric_fields = {'status': status.lower()}
658 if self.host:
Richard Barnette3aa7e122016-07-20 12:32:38 -0700659 metric_fields['board'] = self.host.board or ''
Richard Barnettea31ab772016-07-13 17:50:17 -0700660 if len(self.host.pools) == 1:
661 metric_fields['pool'] = self.host.pools[0]
662 else:
663 metric_fields['pool'] = 'MULTIPLE'
664 else:
665 metric_fields['board'] = 'NO_HOST'
666 metric_fields['pool'] = 'NO_HOST'
667 self._COMPLETION_COUNT_METRIC.increment(fields=metric_fields)
jamesrene7c65cb2010-06-08 20:38:10 +0000668 if status is not models.HostQueueEntry.Status.ABORTED:
669 self.job.stop_if_necessary()
Fang Dengd44a1232014-08-18 14:40:28 -0700670 if self.started_on:
671 self.set_finished_on_now()
Richard Barnettea31ab772016-07-13 17:50:17 -0700672 if self.job.shard_id is not None:
673 # If shard_id is None, the job will be synced back to the master
674 self.job.update_field('shard_id', None)
jamesrenb55378a2010-03-02 22:19:49 +0000675 if not self.execution_subdir:
676 return
677 # unregister any possible pidfiles associated with this queue entry
678 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
679 pidfile_id = _drone_manager.get_pidfile_id_from(
680 self.execution_path(), pidfile_name=pidfile_name)
681 _drone_manager.unregister_pidfile(pidfile_id)
682
683
Eric Li6f27d4f2010-09-29 10:55:17 -0700684 def _get_status_email_contents(self, status, summary=None, hostname=None):
685 """
686 Gather info for the status notification e-mails.
687
688 If needed, we could start using the Django templating engine to create
689 the subject and the e-mail body, but that doesn't seem necessary right
690 now.
691
692 @param status: Job status text. Mandatory.
693 @param summary: Job summary text. Optional.
694 @param hostname: A hostname for the job. Optional.
695
696 @return: Tuple (subject, body) for the notification e-mail.
697 """
698 job_stats = Job(id=self.job.id).get_execution_details()
699
700 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
701 (self.job.id, self.job.name, status))
702
703 if hostname is not None:
704 subject += '| Hostname: %s ' % hostname
705
706 if status not in ["1 Failed", "Failed"]:
707 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
708
709 body = "Job ID: %s\n" % self.job.id
710 body += "Job name: %s\n" % self.job.name
711 if hostname is not None:
712 body += "Host: %s\n" % hostname
713 if summary is not None:
714 body += "Summary: %s\n" % summary
715 body += "Status: %s\n" % status
716 body += "Results interface URL: %s\n" % self._view_job_url()
717 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
718 if int(job_stats['total_executed']) > 0:
719 body += "User tests executed: %s\n" % job_stats['total_executed']
720 body += "User tests passed: %s\n" % job_stats['total_passed']
721 body += "User tests failed: %s\n" % job_stats['total_failed']
722 body += ("User tests success rate: %.2f %%\n" %
723 job_stats['success_rate'])
724
725 if job_stats['failed_rows']:
726 body += "Failures:\n"
727 body += job_stats['failed_rows']
728
729 return subject, body
730
731
jamesrenb55378a2010-03-02 22:19:49 +0000732 def _email_on_status(self, status):
733 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700734 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000735 email_manager.manager.send_email(self.job.email_list, subject, body)
736
737
738 def _email_on_job_complete(self):
739 if not self.job.is_finished():
740 return
741
Eric Li6f27d4f2010-09-29 10:55:17 -0700742 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000743 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
744 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700745 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000746 (queue_entry._get_hostname(),
747 queue_entry.status))
748
Eric Li6f27d4f2010-09-29 10:55:17 -0700749 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000750 status_counts = models.Job.objects.get_status_counts(
751 [self.job.id])[self.job.id]
752 status = ', '.join('%d %s' % (count, status) for status, count
753 in status_counts.iteritems())
754
Eric Li6f27d4f2010-09-29 10:55:17 -0700755 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000756 email_manager.manager.send_email(self.job.email_list, subject, body)
757
758
759 def schedule_pre_job_tasks(self):
760 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
761 self.job.name, self.meta_host, self.atomic_group_id,
762 self.job.id, self.id, self.host.hostname, self.status)
763
764 self._do_schedule_pre_job_tasks()
765
766
767 def _do_schedule_pre_job_tasks(self):
jamesrenb55378a2010-03-02 22:19:49 +0000768 self.job.schedule_pre_job_tasks(queue_entry=self)
769
770
771 def requeue(self):
772 assert self.host
773 self.set_status(models.HostQueueEntry.Status.QUEUED)
774 self.update_field('started_on', None)
Fang Deng51599032014-06-23 17:24:27 -0700775 self.update_field('finished_on', None)
jamesrenb55378a2010-03-02 22:19:49 +0000776 # verify/cleanup failure sets the execution subdir, so reset it here
777 self.set_execution_subdir('')
778 if self.meta_host:
779 self.set_host(None)
780
781
782 @property
783 def aborted_by(self):
784 self._load_abort_info()
785 return self._aborted_by
786
787
788 @property
789 def aborted_on(self):
790 self._load_abort_info()
791 return self._aborted_on
792
793
794 def _load_abort_info(self):
795 """ Fetch info about who aborted the job. """
796 if hasattr(self, "_aborted_by"):
797 return
798 rows = _db.execute("""
799 SELECT afe_users.login,
800 afe_aborted_host_queue_entries.aborted_on
801 FROM afe_aborted_host_queue_entries
802 INNER JOIN afe_users
803 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
804 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
805 """, (self.id,))
806 if rows:
807 self._aborted_by, self._aborted_on = rows[0]
808 else:
809 self._aborted_by = self._aborted_on = None
810
811
812 def on_pending(self):
813 """
814 Called when an entry in a synchronous job has passed verify. If the
815 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
816 them in PENDING.
817 """
818 self.set_status(models.HostQueueEntry.Status.PENDING)
819 self.host.set_status(models.Host.Status.PENDING)
820
821 # Some debug code here: sends an email if an asynchronous job does not
822 # immediately enter Starting.
823 # TODO: Remove this once we figure out why asynchronous jobs are getting
824 # stuck in Pending.
825 self.job.run_if_ready(queue_entry=self)
826 if (self.job.synch_count == 1 and
827 self.status == models.HostQueueEntry.Status.PENDING):
828 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
829 message = 'Asynchronous job stuck in Pending'
830 email_manager.manager.enqueue_notify_email(subject, message)
831
832
833 def abort(self, dispatcher):
834 assert self.aborted and not self.complete
835
836 Status = models.HostQueueEntry.Status
837 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
838 # do nothing; post-job tasks will finish and then mark this entry
839 # with status "Aborted" and take care of the host
840 return
841
jamesren3bc70a12010-04-12 18:23:38 +0000842 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
843 Status.WAITING):
Dan Shi76af8022013-10-19 01:59:49 -0700844 # If hqe is in any of these status, it should not have any
845 # unfinished agent before it can be aborted.
846 agents = dispatcher.get_agents_for_entry(self)
847 # Agent with finished task can be left behind. This is added to
848 # handle the special case of aborting hostless job in STARTING
849 # status, in which the agent has only a HostlessQueueTask
850 # associated. The finished HostlessQueueTask will be cleaned up in
851 # the next tick, so it's safe to leave the agent there. Without
852 # filtering out finished agent, HQE abort won't be able to proceed.
853 assert all([agent.is_done() for agent in agents])
854 # If hqe is still in STARTING status, it may not have assigned a
855 # host yet.
856 if self.host:
857 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -0700858 elif (self.status == Status.VERIFYING or
beepse50d8752013-11-20 18:23:02 -0800859 self.status == Status.RESETTING):
jamesrenb55378a2010-03-02 22:19:49 +0000860 models.SpecialTask.objects.create(
861 task=models.SpecialTask.Task.CLEANUP,
862 host=models.Host.objects.get(id=self.host.id),
863 requested_by=self.job.owner_model())
beepse50d8752013-11-20 18:23:02 -0800864 elif self.status == Status.PROVISIONING:
865 models.SpecialTask.objects.create(
866 task=models.SpecialTask.Task.REPAIR,
867 host=models.Host.objects.get(id=self.host.id),
868 requested_by=self.job.owner_model())
jamesrenb55378a2010-03-02 22:19:49 +0000869
870 self.set_status(Status.ABORTED)
871 self.job.abort_delay_ready_task()
872
873
874 def get_group_name(self):
875 atomic_group = self.atomic_group
876 if not atomic_group:
877 return ''
878
879 # Look at any meta_host and dependency labels and pick the first
880 # one that also specifies this atomic group. Use that label name
881 # as the group name if possible (it is more specific).
882 for label in self.get_labels():
883 if label.atomic_group_id:
884 assert label.atomic_group_id == atomic_group.id
885 return label.name
886 return atomic_group.name
887
888
889 def execution_tag(self):
Scott Zawalskid712cf32012-07-14 16:24:53 -0400890 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
891 'complete!=1 AND execution_subdir="" AND '
892 'status!="Queued";')
893 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
894 'status="Aborted" WHERE id=%s;')
895 try:
896 assert self.execution_subdir
897 except AssertionError:
898 # TODO(scottz): Remove temporary fix/info gathering pathway for
899 # crosbug.com/31595 once issue is root caused.
900 logging.error('No execution_subdir for host queue id:%s.', self.id)
901 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
902 for row in _db.execute(SQL_SUSPECT_ENTRIES):
Dan Shi76af8022013-10-19 01:59:49 -0700903 logging.error(row)
Scott Zawalskid712cf32012-07-14 16:24:53 -0400904 logging.error('====DB DEBUG====\n')
905 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
906 logging.error('EXECUTING: %s', fix_query)
907 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
908 raise AssertionError(('self.execution_subdir not found. '
909 'See log for details.'))
910
jamesrenb55378a2010-03-02 22:19:49 +0000911 return "%s/%s" % (self.job.tag(), self.execution_subdir)
912
913
914 def execution_path(self):
915 return self.execution_tag()
916
917
918 def set_started_on_now(self):
919 self.update_field('started_on', datetime.datetime.now())
920
921
Fang Deng51599032014-06-23 17:24:27 -0700922 def set_finished_on_now(self):
923 self.update_field('finished_on', datetime.datetime.now())
924
925
jamesrenb55378a2010-03-02 22:19:49 +0000926 def is_hostless(self):
927 return (self.host_id is None
928 and self.meta_host is None
929 and self.atomic_group_id is None)
930
931
932class Job(DBObject):
933 _table_name = 'afe_jobs'
934 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
935 'control_type', 'created_on', 'synch_count', 'timeout',
936 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
Simran Basi9f508a12012-11-09 12:20:44 -0800937 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800938 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
Dan Shic9e17142015-02-19 11:50:55 -0800939 'test_retry', 'run_reset', 'timeout_mins', 'shard_id',
940 'require_ssp')
jamesrenb55378a2010-03-02 22:19:49 +0000941
942 # This does not need to be a column in the DB. The delays are likely to
943 # be configured short. If the scheduler is stopped and restarted in
944 # the middle of a job's delay cycle, the delay cycle will either be
945 # repeated or skipped depending on the number of Pending machines found
946 # when the restarted scheduler recovers to track it. Not a problem.
947 #
948 # A reference to the DelayedCallTask that will wake up the job should
949 # no other HQEs change state in time. Its end_time attribute is used
950 # by our run_with_ready_delay() method to determine if the wait is over.
951 _delay_ready_task = None
952
953 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
954 # all status='Pending' atomic group HQEs incase a delay was running when the
955 # scheduler was restarted and no more hosts ever successfully exit Verify.
956
957 def __init__(self, id=None, row=None, **kwargs):
958 assert id or row
959 super(Job, self).__init__(id=id, row=row, **kwargs)
960 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800961 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000962
963
964 def model(self):
965 return models.Job.objects.get(id=self.id)
966
967
968 def owner_model(self):
969 # work around the fact that the Job owner field is a string, not a
970 # foreign key
971 if not self._owner_model:
972 self._owner_model = models.User.objects.get(login=self.owner)
973 return self._owner_model
974
975
jamesrenb55378a2010-03-02 22:19:49 +0000976 def tag(self):
977 return "%s-%s" % (self.id, self.owner)
978
979
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800980 def is_image_update_job(self):
981 """
982 Discover if the current job requires an OS update.
983
984 @return: True/False if OS should be updated before job is run.
985 """
986 # All image update jobs have the parameterized_job_id set.
987 if not self.parameterized_job_id:
988 return False
989
990 # Retrieve the ID of the ParameterizedJob this job is an instance of.
991 rows = _db.execute("""
992 SELECT test_id
993 FROM afe_parameterized_jobs
994 WHERE id = %s
995 """, (self.parameterized_job_id,))
996 if not rows:
997 return False
998 test_id = rows[0][0]
999
1000 # Retrieve the ID of the known autoupdate_ParameterizedJob.
1001 rows = _db.execute("""
1002 SELECT id
1003 FROM afe_autotests
1004 WHERE name = 'autoupdate_ParameterizedJob'
1005 """)
1006 if not rows:
1007 return False
1008 update_id = rows[0][0]
1009
1010 # If the IDs are the same we've found an image update job.
1011 if test_id == update_id:
1012 # Finally, get the path to the OS image to install.
1013 rows = _db.execute("""
1014 SELECT parameter_value
1015 FROM afe_parameterized_job_parameters
1016 WHERE parameterized_job_id = %s
1017 """, (self.parameterized_job_id,))
1018 if rows:
1019 # Save the path in update_image_path to use later as a command
1020 # line parameter to autoserv.
1021 self.update_image_path = rows[0][0]
1022 return True
1023
1024 return False
1025
1026
Eric Li6f27d4f2010-09-29 10:55:17 -07001027 def get_execution_details(self):
1028 """
1029 Get test execution details for this job.
1030
1031 @return: Dictionary with test execution details
1032 """
1033 def _find_test_jobs(rows):
1034 """
1035 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
1036 Those are autotest 'internal job' tests, so they should not be
1037 counted when evaluating the test stats.
1038
1039 @param rows: List of rows (matrix) with database results.
1040 """
1041 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
1042 n_test_jobs = 0
1043 for r in rows:
1044 test_name = r[0]
1045 if job_test_pattern.match(test_name):
1046 n_test_jobs += 1
1047
1048 return n_test_jobs
1049
1050 stats = {}
1051
1052 rows = _db.execute("""
1053 SELECT t.test, s.word, t.reason
1054 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
1055 WHERE t.job_idx = j.job_idx
1056 AND s.status_idx = t.status
1057 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -08001058 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -07001059 """ % self.id)
1060
Dale Curtis74a314b2011-06-23 14:55:46 -07001061 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -07001062
1063 n_test_jobs = _find_test_jobs(rows)
1064 n_test_jobs_failed = _find_test_jobs(failed_rows)
1065
1066 total_executed = len(rows) - n_test_jobs
1067 total_failed = len(failed_rows) - n_test_jobs_failed
1068
1069 if total_executed > 0:
1070 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
1071 else:
1072 success_rate = 0
1073
1074 stats['total_executed'] = total_executed
1075 stats['total_failed'] = total_failed
1076 stats['total_passed'] = total_executed - total_failed
1077 stats['success_rate'] = success_rate
1078
1079 status_header = ("Test Name", "Status", "Reason")
1080 if failed_rows:
1081 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
1082 status_header)
1083 else:
1084 stats['failed_rows'] = ''
1085
1086 time_row = _db.execute("""
1087 SELECT started_time, finished_time
1088 FROM tko_jobs
1089 WHERE afe_job_id = %s
1090 """ % self.id)
1091
1092 if time_row:
1093 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -08001094 try:
1095 delta = t_end - t_begin
1096 minutes, seconds = divmod(delta.seconds, 60)
1097 hours, minutes = divmod(minutes, 60)
1098 stats['execution_time'] = ("%02d:%02d:%02d" %
1099 (hours, minutes, seconds))
1100 # One of t_end or t_begin are None
1101 except TypeError:
1102 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -07001103 else:
1104 stats['execution_time'] = '(none)'
1105
1106 return stats
1107
1108
jamesrenb55378a2010-03-02 22:19:49 +00001109 def keyval_dict(self):
1110 return self.model().keyval_dict()
1111
1112
1113 def _atomic_and_has_started(self):
1114 """
1115 @returns True if any of the HostQueueEntries associated with this job
1116 have entered the Status.STARTING state or beyond.
1117 """
1118 atomic_entries = models.HostQueueEntry.objects.filter(
1119 job=self.id, atomic_group__isnull=False)
1120 if atomic_entries.count() <= 0:
1121 return False
1122
1123 # These states may *only* be reached if Job.run() has been called.
1124 started_statuses = (models.HostQueueEntry.Status.STARTING,
1125 models.HostQueueEntry.Status.RUNNING,
1126 models.HostQueueEntry.Status.COMPLETED)
1127
1128 started_entries = atomic_entries.filter(status__in=started_statuses)
1129 return started_entries.count() > 0
1130
1131
1132 def _hosts_assigned_count(self):
1133 """The number of HostQueueEntries assigned a Host for this job."""
1134 entries = models.HostQueueEntry.objects.filter(job=self.id,
1135 host__isnull=False)
1136 return entries.count()
1137
1138
1139 def _pending_count(self):
1140 """The number of HostQueueEntries for this job in the Pending state."""
1141 pending_entries = models.HostQueueEntry.objects.filter(
1142 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1143 return pending_entries.count()
1144
1145
1146 def _max_hosts_needed_to_run(self, atomic_group):
1147 """
1148 @param atomic_group: The AtomicGroup associated with this job that we
1149 are using to set an upper bound on the threshold.
1150 @returns The maximum number of HostQueueEntries assigned a Host before
1151 this job can run.
1152 """
1153 return min(self._hosts_assigned_count(),
1154 atomic_group.max_number_of_machines)
1155
1156
1157 def _min_hosts_needed_to_run(self):
1158 """Return the minumum number of hsots needed to run this job."""
1159 return self.synch_count
1160
1161
1162 def is_ready(self):
1163 # NOTE: Atomic group jobs stop reporting ready after they have been
1164 # started to avoid launching multiple copies of one atomic job.
1165 # Only possible if synch_count is less than than half the number of
1166 # machines in the atomic group.
1167 pending_count = self._pending_count()
1168 atomic_and_has_started = self._atomic_and_has_started()
1169 ready = (pending_count >= self.synch_count
1170 and not atomic_and_has_started)
1171
1172 if not ready:
1173 logging.info(
1174 'Job %s not ready: %s pending, %s required '
1175 '(Atomic and started: %s)',
1176 self, pending_count, self.synch_count,
1177 atomic_and_has_started)
1178
1179 return ready
1180
1181
1182 def num_machines(self, clause = None):
1183 sql = "job_id=%s" % self.id
1184 if clause:
1185 sql += " AND (%s)" % clause
1186 return self.count(sql, table='afe_host_queue_entries')
1187
1188
1189 def num_queued(self):
1190 return self.num_machines('not complete')
1191
1192
1193 def num_active(self):
1194 return self.num_machines('active')
1195
1196
1197 def num_complete(self):
1198 return self.num_machines('complete')
1199
1200
1201 def is_finished(self):
1202 return self.num_complete() == self.num_machines()
1203
1204
Laurence Goodby303d2662016-07-01 15:57:04 -07001205 def _not_yet_run_entries(self, include_active=True):
1206 if include_active:
1207 statuses = list(models.HostQueueEntry.PRE_JOB_STATUSES)
1208 else:
1209 statuses = list(models.HostQueueEntry.IDLE_PRE_JOB_STATUSES)
jamesrenb55378a2010-03-02 22:19:49 +00001210 return models.HostQueueEntry.objects.filter(job=self.id,
1211 status__in=statuses)
1212
1213
1214 def _stop_all_entries(self):
Laurence Goodby303d2662016-07-01 15:57:04 -07001215 """Stops the job's inactive pre-job HQEs."""
jamesrenb55378a2010-03-02 22:19:49 +00001216 entries_to_stop = self._not_yet_run_entries(
Laurence Goodby303d2662016-07-01 15:57:04 -07001217 include_active=False)
jamesrenb55378a2010-03-02 22:19:49 +00001218 for child_entry in entries_to_stop:
1219 assert not child_entry.complete, (
1220 '%s status=%s, active=%s, complete=%s' %
1221 (child_entry.id, child_entry.status, child_entry.active,
1222 child_entry.complete))
1223 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1224 child_entry.host.status = models.Host.Status.READY
1225 child_entry.host.save()
1226 child_entry.status = models.HostQueueEntry.Status.STOPPED
1227 child_entry.save()
1228
1229
1230 def stop_if_necessary(self):
1231 not_yet_run = self._not_yet_run_entries()
1232 if not_yet_run.count() < self.synch_count:
1233 self._stop_all_entries()
1234
1235
jamesrenb55378a2010-03-02 22:19:49 +00001236 def _next_group_name(self, group_name=''):
1237 """@returns a directory name to use for the next host group results."""
1238 if group_name:
1239 # Sanitize for use as a pathname.
1240 group_name = group_name.replace(os.path.sep, '_')
1241 if group_name.startswith('.'):
1242 group_name = '_' + group_name[1:]
1243 # Add a separator between the group name and 'group%d'.
1244 group_name += '.'
1245 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1246 query = models.HostQueueEntry.objects.filter(
1247 job=self.id).values('execution_subdir').distinct()
1248 subdirs = (entry['execution_subdir'] for entry in query)
1249 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1250 ids = [int(match.group(1)) for match in group_matches if match]
1251 if ids:
1252 next_id = max(ids) + 1
1253 else:
1254 next_id = 0
1255 return '%sgroup%d' % (group_name, next_id)
1256
1257
1258 def get_group_entries(self, queue_entry_from_group):
1259 """
1260 @param queue_entry_from_group: A HostQueueEntry instance to find other
1261 group entries on this job for.
1262
1263 @returns A list of HostQueueEntry objects all executing this job as
1264 part of the same group as the one supplied (having the same
1265 execution_subdir).
1266 """
1267 execution_subdir = queue_entry_from_group.execution_subdir
1268 return list(HostQueueEntry.fetch(
1269 where='job_id=%s AND execution_subdir=%s',
1270 params=(self.id, execution_subdir)))
1271
1272
1273 def _should_run_cleanup(self, queue_entry):
1274 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1275 return True
1276 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1277 return queue_entry.host.dirty
1278 return False
1279
1280
1281 def _should_run_verify(self, queue_entry):
1282 do_not_verify = (queue_entry.host.protection ==
1283 host_protections.Protection.DO_NOT_VERIFY)
1284 if do_not_verify:
1285 return False
Alex Miller6ee996f2013-02-28 13:53:52 -08001286 # If RebootBefore is set to NEVER, then we won't run reset because
1287 # we can't cleanup, so we need to weaken a Reset into a Verify.
1288 weaker_reset = (self.run_reset and
1289 self.reboot_before == model_attributes.RebootBefore.NEVER)
1290 return self.run_verify or weaker_reset
jamesrenb55378a2010-03-02 22:19:49 +00001291
1292
Dan Shi07e09af2013-04-12 09:31:29 -07001293 def _should_run_reset(self, queue_entry):
1294 can_verify = (queue_entry.host.protection !=
1295 host_protections.Protection.DO_NOT_VERIFY)
1296 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
1297 return (can_reboot and can_verify and (self.run_reset or
1298 (self._should_run_cleanup(queue_entry) and
1299 self._should_run_verify(queue_entry))))
1300
1301
Alex Millerdfff2fd2013-05-28 13:05:06 -07001302 def _should_run_provision(self, queue_entry):
1303 """
1304 Determine if the queue_entry needs to have a provision task run before
1305 it to provision queue_entry.host.
1306
1307 @param queue_entry: The host queue entry in question.
1308 @returns: True if we should schedule a provision task, False otherwise.
1309
1310 """
1311 # If we get to this point, it means that the scheduler has already
1312 # vetted that all the unprovisionable labels match, so we can just
1313 # find all labels on the job that aren't on the host to get the list
1314 # of what we need to provision. (See the scheduling logic in
1315 # host_scheduler.py:is_host_eligable_for_job() where we discard all
Alex Miller627694a2014-05-01 18:04:29 -07001316 # actionable labels when assigning jobs to hosts.)
Alex Millerdfff2fd2013-05-28 13:05:06 -07001317 job_labels = {x.name for x in queue_entry.get_labels()}
Dan Shie44f9c02016-02-18 13:25:05 -08001318 # Skip provision if `skip_provision` is listed in the job labels.
1319 if provision.SKIP_PROVISION in job_labels:
1320 return False
Alex Millerdfff2fd2013-05-28 13:05:06 -07001321 _, host_labels = queue_entry.host.platform_and_labels()
Alex Miller627694a2014-05-01 18:04:29 -07001322 # If there are any labels on the job that are not on the host and they
1323 # are labels that provisioning knows how to change, then that means
1324 # there is provisioning work to do. If there's no provisioning work to
1325 # do, then obviously we have no reason to schedule a provision task!
1326 diff = job_labels - set(host_labels)
1327 if any([provision.Provision.acts_on(x) for x in diff]):
Alex Millerdfff2fd2013-05-28 13:05:06 -07001328 return True
1329 return False
1330
1331
Alex Miller42437f92013-05-28 12:58:54 -07001332 def _queue_special_task(self, queue_entry, task):
jamesrenb55378a2010-03-02 22:19:49 +00001333 """
Alex Miller42437f92013-05-28 12:58:54 -07001334 Create a special task and associate it with a host queue entry.
jamesrenb55378a2010-03-02 22:19:49 +00001335
Alex Miller42437f92013-05-28 12:58:54 -07001336 @param queue_entry: The queue entry this special task should be
1337 associated with.
1338 @param task: One of the members of the enum models.SpecialTask.Task.
1339 @returns: None
1340
jamesrenb55378a2010-03-02 22:19:49 +00001341 """
jamesrenb55378a2010-03-02 22:19:49 +00001342 models.SpecialTask.objects.create(
1343 host=models.Host.objects.get(id=queue_entry.host_id),
1344 queue_entry=queue_entry, task=task)
1345
1346
Alex Miller42437f92013-05-28 12:58:54 -07001347 def schedule_pre_job_tasks(self, queue_entry):
1348 """
1349 Queue all of the special tasks that need to be run before a host
1350 queue entry may run.
1351
1352 If no special taskes need to be scheduled, then |on_pending| will be
1353 called directly.
1354
1355 @returns None
1356
1357 """
1358 task_queued = False
1359 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
1360
Dan Shi12ab5272014-12-16 15:42:07 -08001361 if self._should_run_provision(queue_entry):
1362 self._queue_special_task(hqe_model,
1363 models.SpecialTask.Task.PROVISION)
1364 task_queued = True
1365 elif self._should_run_reset(queue_entry):
Dan Shi07e09af2013-04-12 09:31:29 -07001366 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
Alex Miller42437f92013-05-28 12:58:54 -07001367 task_queued = True
Dan Shi07e09af2013-04-12 09:31:29 -07001368 else:
1369 if self._should_run_cleanup(queue_entry):
1370 self._queue_special_task(hqe_model,
1371 models.SpecialTask.Task.CLEANUP)
1372 task_queued = True
1373 if self._should_run_verify(queue_entry):
1374 self._queue_special_task(hqe_model,
1375 models.SpecialTask.Task.VERIFY)
1376 task_queued = True
Alex Miller42437f92013-05-28 12:58:54 -07001377
1378 if not task_queued:
1379 queue_entry.on_pending()
1380
1381
jamesrenb55378a2010-03-02 22:19:49 +00001382 def _assign_new_group(self, queue_entries, group_name=''):
1383 if len(queue_entries) == 1:
1384 group_subdir_name = queue_entries[0].host.hostname
1385 else:
1386 group_subdir_name = self._next_group_name(group_name)
1387 logging.info('Running synchronous job %d hosts %s as %s',
1388 self.id, [entry.host.hostname for entry in queue_entries],
1389 group_subdir_name)
1390
1391 for queue_entry in queue_entries:
1392 queue_entry.set_execution_subdir(group_subdir_name)
1393
1394
1395 def _choose_group_to_run(self, include_queue_entry):
1396 """
1397 @returns A tuple containing a list of HostQueueEntry instances to be
1398 used to run this Job, a string group name to suggest giving
1399 to this job in the results database.
1400 """
1401 atomic_group = include_queue_entry.atomic_group
1402 chosen_entries = [include_queue_entry]
1403 if atomic_group:
1404 num_entries_wanted = atomic_group.max_number_of_machines
1405 else:
1406 num_entries_wanted = self.synch_count
1407 num_entries_wanted -= len(chosen_entries)
1408
1409 if num_entries_wanted > 0:
1410 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1411 pending_entries = list(HostQueueEntry.fetch(
1412 where=where_clause,
1413 params=(self.id, include_queue_entry.id)))
1414
1415 # Sort the chosen hosts by hostname before slicing.
1416 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1417 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1418 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1419 chosen_entries += pending_entries[:num_entries_wanted]
1420
1421 # Sanity check. We'll only ever be called if this can be met.
1422 if len(chosen_entries) < self.synch_count:
1423 message = ('job %s got less than %s chosen entries: %s' % (
1424 self.id, self.synch_count, chosen_entries))
1425 logging.error(message)
1426 email_manager.manager.enqueue_notify_email(
1427 'Job not started, too few chosen entries', message)
1428 return []
1429
1430 group_name = include_queue_entry.get_group_name()
1431
1432 self._assign_new_group(chosen_entries, group_name=group_name)
1433 return chosen_entries
1434
1435
1436 def run_if_ready(self, queue_entry):
1437 """
1438 Run this job by kicking its HQEs into status='Starting' if enough
1439 hosts are ready for it to run.
1440
1441 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1442 ready to run.
1443 """
1444 if not self.is_ready():
1445 self.stop_if_necessary()
1446 elif queue_entry.atomic_group:
1447 self.run_with_ready_delay(queue_entry)
1448 else:
1449 self.run(queue_entry)
1450
1451
1452 def run_with_ready_delay(self, queue_entry):
1453 """
1454 Start a delay to wait for more hosts to enter Pending state before
1455 launching an atomic group job. Once set, the a delay cannot be reset.
1456
1457 @param queue_entry: The HostQueueEntry object to get atomic group
1458 info from and pass to run_if_ready when the delay is up.
1459
1460 @returns An Agent to run the job as appropriate or None if a delay
1461 has already been set.
1462 """
1463 assert queue_entry.job_id == self.id
1464 assert queue_entry.atomic_group
1465 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1466 over_max_threshold = (self._pending_count() >=
1467 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1468 delay_expired = (self._delay_ready_task and
1469 time.time() >= self._delay_ready_task.end_time)
1470
1471 # Delay is disabled or we already have enough? Do not wait to run.
1472 if not delay or over_max_threshold or delay_expired:
1473 self.run(queue_entry)
1474 else:
1475 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1476
1477
1478 def request_abort(self):
1479 """Request that this Job be aborted on the next scheduler cycle."""
1480 self.model().abort()
1481
1482
1483 def schedule_delayed_callback_task(self, queue_entry):
1484 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1485
1486 if self._delay_ready_task:
1487 return None
1488
1489 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1490
1491 def run_job_after_delay():
1492 logging.info('Job %s done waiting for extra hosts.', self)
1493 # Check to see if the job is still relevant. It could have aborted
1494 # while we were waiting or hosts could have disappearred, etc.
1495 if self._pending_count() < self._min_hosts_needed_to_run():
1496 logging.info('Job %s had too few Pending hosts after waiting '
1497 'for extras. Not running.', self)
1498 self.request_abort()
1499 return
1500 return self.run(queue_entry)
1501
1502 logging.info('Job %s waiting up to %s seconds for more hosts.',
1503 self.id, delay)
1504 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1505 callback=run_job_after_delay)
1506 return self._delay_ready_task
1507
1508
1509 def run(self, queue_entry):
1510 """
1511 @param queue_entry: The HostQueueEntry instance calling this method.
1512 """
1513 if queue_entry.atomic_group and self._atomic_and_has_started():
1514 logging.error('Job.run() called on running atomic Job %d '
1515 'with HQE %s.', self.id, queue_entry)
1516 return
1517 queue_entries = self._choose_group_to_run(queue_entry)
1518 if queue_entries:
1519 self._finish_run(queue_entries)
1520
1521
1522 def _finish_run(self, queue_entries):
1523 for queue_entry in queue_entries:
1524 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1525 self.abort_delay_ready_task()
1526
1527
1528 def abort_delay_ready_task(self):
1529 """Abort the delayed task associated with this job, if any."""
1530 if self._delay_ready_task:
1531 # Cancel any pending callback that would try to run again
1532 # as we are already running.
1533 self._delay_ready_task.abort()
1534
1535
1536 def __str__(self):
1537 return '%s-%s' % (self.id, self.owner)