blob: 4e63fd3722f9268b64881e969765cd57b26e935a [file] [log] [blame]
Aviv Keshet0b9cfc92013-02-05 11:36:02 -08001# pylint: disable-msg=C0111
2
jamesrenb55378a2010-03-02 22:19:49 +00003"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out. Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
14 one of these statuses, an email will be sent to the job's email_list.
15 comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import datetime, itertools, logging, os, re, sys, time, weakref
jamesrenb55378a2010-03-02 22:19:49 +000022from autotest_lib.client.common_lib import global_config, host_protections
Michael Liangda8c60a2014-06-03 13:24:51 -070023from autotest_lib.client.common_lib import utils
24from autotest_lib.client.common_lib import control_data
Michael Liang500dedc2014-07-15 16:16:44 -070025from autotest_lib.client.common_lib.cros.graphite import es_utils
Michael Liangda8c60a2014-06-03 13:24:51 -070026from autotest_lib.client.common_lib.cros.graphite import stats
jamesrenb55378a2010-03-02 22:19:49 +000027from autotest_lib.frontend.afe import models, model_attributes
jamesrenb55378a2010-03-02 22:19:49 +000028from autotest_lib.scheduler import drone_manager, email_manager
beepscc9fc702013-12-02 12:45:38 -080029from autotest_lib.scheduler import rdb_lib
jamesrenb55378a2010-03-02 22:19:49 +000030from autotest_lib.scheduler import scheduler_config
Prashanth B0e960282014-05-13 19:38:28 -070031from autotest_lib.scheduler import scheduler_lib
Alex Miller627694a2014-05-01 18:04:29 -070032from autotest_lib.server.cros import provision
Michael Liangda8c60a2014-06-03 13:24:51 -070033
jamesrenb55378a2010-03-02 22:19:49 +000034
35_notify_email_statuses = []
36_base_url = None
37
38_db = None
39_drone_manager = None
40
41def initialize():
42 global _db
Prashanth B0e960282014-05-13 19:38:28 -070043 _db = scheduler_lib.ConnectionManager().get_connection()
jamesrenb55378a2010-03-02 22:19:49 +000044
45 notify_statuses_list = global_config.global_config.get_config_value(
46 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
47 default='')
48 global _notify_email_statuses
49 _notify_email_statuses = [status for status in
50 re.split(r'[\s,;:]', notify_statuses_list.lower())
51 if status]
52
53 # AUTOTEST_WEB.base_url is still a supported config option as some people
54 # may wish to override the entire url.
55 global _base_url
56 config_base_url = global_config.global_config.get_config_value(
57 scheduler_config.CONFIG_SECTION, 'base_url', default='')
58 if config_base_url:
59 _base_url = config_base_url
60 else:
61 # For the common case of everything running on a single server you
62 # can just set the hostname in a single place in the config file.
63 server_name = global_config.global_config.get_config_value(
64 'SERVER', 'hostname')
65 if not server_name:
66 logging.critical('[SERVER] hostname missing from the config file.')
67 sys.exit(1)
68 _base_url = 'http://%s/afe/' % server_name
69
70 initialize_globals()
71
72
73def initialize_globals():
74 global _drone_manager
75 _drone_manager = drone_manager.instance()
76
77
78class DelayedCallTask(object):
79 """
80 A task object like AgentTask for an Agent to run that waits for the
81 specified amount of time to have elapsed before calling the supplied
82 callback once and finishing. If the callback returns anything, it is
83 assumed to be a new Agent instance and will be added to the dispatcher.
84
85 @attribute end_time: The absolute posix time after which this task will
86 call its callback when it is polled and be finished.
87
88 Also has all attributes required by the Agent class.
89 """
90 def __init__(self, delay_seconds, callback, now_func=None):
91 """
92 @param delay_seconds: The delay in seconds from now that this task
93 will call the supplied callback and be done.
94 @param callback: A callable to be called by this task once after at
95 least delay_seconds time has elapsed. It must return None
96 or a new Agent instance.
97 @param now_func: A time.time like function. Default: time.time.
98 Used for testing.
99 """
100 assert delay_seconds > 0
101 assert callable(callback)
102 if not now_func:
103 now_func = time.time
104 self._now_func = now_func
105 self._callback = callback
106
107 self.end_time = self._now_func() + delay_seconds
108
109 # These attributes are required by Agent.
110 self.aborted = False
111 self.host_ids = ()
112 self.success = False
113 self.queue_entry_ids = ()
114 self.num_processes = 0
115
116
117 def poll(self):
118 if not self.is_done() and self._now_func() >= self.end_time:
119 self._callback()
120 self.success = True
121
122
123 def is_done(self):
124 return self.success or self.aborted
125
126
127 def abort(self):
128 self.aborted = True
129
130
131class DBError(Exception):
132 """Raised by the DBObject constructor when its select fails."""
133
134
135class DBObject(object):
136 """A miniature object relational model for the database."""
137
138 # Subclasses MUST override these:
139 _table_name = ''
140 _fields = ()
141
142 # A mapping from (type, id) to the instance of the object for that
143 # particular id. This prevents us from creating new Job() and Host()
144 # instances for every HostQueueEntry object that we instantiate as
145 # multiple HQEs often share the same Job.
146 _instances_by_type_and_id = weakref.WeakValueDictionary()
147 _initialized = False
148
149
150 def __new__(cls, id=None, **kwargs):
151 """
152 Look to see if we already have an instance for this particular type
153 and id. If so, use it instead of creating a duplicate instance.
154 """
155 if id is not None:
156 instance = cls._instances_by_type_and_id.get((cls, id))
157 if instance:
158 return instance
159 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
160
161
162 def __init__(self, id=None, row=None, new_record=False, always_query=True):
163 assert bool(id) or bool(row)
164 if id is not None and row is not None:
165 assert id == row[0]
166 assert self._table_name, '_table_name must be defined in your class'
167 assert self._fields, '_fields must be defined in your class'
168 if not new_record:
169 if self._initialized and not always_query:
170 return # We've already been initialized.
171 if id is None:
172 id = row[0]
173 # Tell future constructors to use us instead of re-querying while
174 # this instance is still around.
175 self._instances_by_type_and_id[(type(self), id)] = self
176
177 self.__table = self._table_name
178
179 self.__new_record = new_record
180
181 if row is None:
182 row = self._fetch_row_from_db(id)
183
184 if self._initialized:
185 differences = self._compare_fields_in_row(row)
186 if differences:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -0700187 logging.warning(
jamesrenb55378a2010-03-02 22:19:49 +0000188 'initialized %s %s instance requery is updating: %s',
189 type(self), self.id, differences)
190 self._update_fields_from_row(row)
191 self._initialized = True
192
193
194 @classmethod
195 def _clear_instance_cache(cls):
196 """Used for testing, clear the internal instance cache."""
197 cls._instances_by_type_and_id.clear()
198
199
200 def _fetch_row_from_db(self, row_id):
201 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
202 rows = _db.execute(sql, (row_id,))
203 if not rows:
204 raise DBError("row not found (table=%s, row id=%s)"
205 % (self.__table, row_id))
206 return rows[0]
207
208
209 def _assert_row_length(self, row):
210 assert len(row) == len(self._fields), (
211 "table = %s, row = %s/%d, fields = %s/%d" % (
212 self.__table, row, len(row), self._fields, len(self._fields)))
213
214
215 def _compare_fields_in_row(self, row):
216 """
217 Given a row as returned by a SELECT query, compare it to our existing in
218 memory fields. Fractional seconds are stripped from datetime values
219 before comparison.
220
221 @param row - A sequence of values corresponding to fields named in
222 The class attribute _fields.
223
224 @returns A dictionary listing the differences keyed by field name
225 containing tuples of (current_value, row_value).
226 """
227 self._assert_row_length(row)
228 differences = {}
229 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
230 for field, row_value in itertools.izip(self._fields, row):
231 current_value = getattr(self, field)
232 if (isinstance(current_value, datetime.datetime)
233 and isinstance(row_value, datetime.datetime)):
234 current_value = current_value.strftime(datetime_cmp_fmt)
235 row_value = row_value.strftime(datetime_cmp_fmt)
236 if current_value != row_value:
237 differences[field] = (current_value, row_value)
238 return differences
239
240
241 def _update_fields_from_row(self, row):
242 """
243 Update our field attributes using a single row returned by SELECT.
244
245 @param row - A sequence of values corresponding to fields named in
246 the class fields list.
247 """
248 self._assert_row_length(row)
249
250 self._valid_fields = set()
251 for field, value in itertools.izip(self._fields, row):
252 setattr(self, field, value)
253 self._valid_fields.add(field)
254
255 self._valid_fields.remove('id')
256
257
258 def update_from_database(self):
259 assert self.id is not None
260 row = self._fetch_row_from_db(self.id)
261 self._update_fields_from_row(row)
262
263
264 def count(self, where, table = None):
265 if not table:
266 table = self.__table
267
268 rows = _db.execute("""
269 SELECT count(*) FROM %s
270 WHERE %s
271 """ % (table, where))
272
273 assert len(rows) == 1
274
275 return int(rows[0][0])
276
277
278 def update_field(self, field, value):
279 assert field in self._valid_fields
280
281 if getattr(self, field) == value:
282 return
283
284 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
285 _db.execute(query, (value, self.id))
286
287 setattr(self, field, value)
288
289
290 def save(self):
291 if self.__new_record:
292 keys = self._fields[1:] # avoid id
293 columns = ','.join([str(key) for key in keys])
294 values = []
295 for key in keys:
296 value = getattr(self, key)
297 if value is None:
298 values.append('NULL')
299 else:
300 values.append('"%s"' % value)
301 values_str = ','.join(values)
302 query = ('INSERT INTO %s (%s) VALUES (%s)' %
303 (self.__table, columns, values_str))
304 _db.execute(query)
305 # Update our id to the one the database just assigned to us.
306 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
307
308
309 def delete(self):
310 self._instances_by_type_and_id.pop((type(self), id), None)
311 self._initialized = False
312 self._valid_fields.clear()
313 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
314 _db.execute(query, (self.id,))
315
316
317 @staticmethod
318 def _prefix_with(string, prefix):
319 if string:
320 string = prefix + string
321 return string
322
323
324 @classmethod
325 def fetch(cls, where='', params=(), joins='', order_by=''):
326 """
327 Construct instances of our class based on the given database query.
328
329 @yields One class instance for each row fetched.
330 """
331 order_by = cls._prefix_with(order_by, 'ORDER BY ')
332 where = cls._prefix_with(where, 'WHERE ')
333 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
334 '%(where)s %(order_by)s' % {'table' : cls._table_name,
335 'joins' : joins,
336 'where' : where,
337 'order_by' : order_by})
338 rows = _db.execute(query, params)
339 return [cls(id=row[0], row=row) for row in rows]
340
341
342class IneligibleHostQueue(DBObject):
343 _table_name = 'afe_ineligible_host_queues'
344 _fields = ('id', 'job_id', 'host_id')
345
346
347class AtomicGroup(DBObject):
348 _table_name = 'afe_atomic_groups'
349 _fields = ('id', 'name', 'description', 'max_number_of_machines',
350 'invalid')
351
352
353class Label(DBObject):
354 _table_name = 'afe_labels'
355 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
356 'only_if_needed', 'atomic_group_id')
357
358
359 def __repr__(self):
360 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
361 self.name, self.id, self.atomic_group_id)
362
363
364class Host(DBObject):
365 _table_name = 'afe_hosts'
366 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
beepscc9fc702013-12-02 12:45:38 -0800367 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
368 'leased')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700369 _timer = stats.Timer("scheduler_models.Host")
jamesrenb55378a2010-03-02 22:19:49 +0000370
371
Fang Deng1d6c2a02013-04-17 15:25:45 -0700372 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000373 def set_status(self,status):
374 logging.info('%s -> %s', self.hostname, status)
375 self.update_field('status',status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700376 # Noticed some time jumps after the last log message.
377 logging.debug('Host Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000378
379
380 def platform_and_labels(self):
381 """
382 Returns a tuple (platform_name, list_of_all_label_names).
383 """
384 rows = _db.execute("""
385 SELECT afe_labels.name, afe_labels.platform
386 FROM afe_labels
387 INNER JOIN afe_hosts_labels ON
388 afe_labels.id = afe_hosts_labels.label_id
389 WHERE afe_hosts_labels.host_id = %s
390 ORDER BY afe_labels.name
391 """, (self.id,))
392 platform = None
393 all_labels = []
394 for label_name, is_platform in rows:
395 if is_platform:
396 platform = label_name
397 all_labels.append(label_name)
398 return platform, all_labels
399
400
401 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
402
403
404 @classmethod
405 def cmp_for_sort(cls, a, b):
406 """
407 A comparison function for sorting Host objects by hostname.
408
409 This strips any trailing numeric digits, ignores leading 0s and
410 compares hostnames by the leading name and the trailing digits as a
411 number. If both hostnames do not match this pattern, they are simply
412 compared as lower case strings.
413
414 Example of how hostnames will be sorted:
415
416 alice, host1, host2, host09, host010, host10, host11, yolkfolk
417
418 This hopefully satisfy most people's hostname sorting needs regardless
419 of their exact naming schemes. Nobody sane should have both a host10
420 and host010 (but the algorithm works regardless).
421 """
422 lower_a = a.hostname.lower()
423 lower_b = b.hostname.lower()
424 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
425 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
426 if match_a and match_b:
427 name_a, number_a_str = match_a.groups()
428 name_b, number_b_str = match_b.groups()
429 number_a = int(number_a_str.lstrip('0'))
430 number_b = int(number_b_str.lstrip('0'))
431 result = cmp((name_a, number_a), (name_b, number_b))
432 if result == 0 and lower_a != lower_b:
433 # If they compared equal above but the lower case names are
434 # indeed different, don't report equality. abc012 != abc12.
435 return cmp(lower_a, lower_b)
436 return result
437 else:
438 return cmp(lower_a, lower_b)
439
440
441class HostQueueEntry(DBObject):
442 _table_name = 'afe_host_queue_entries'
443 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
444 'active', 'complete', 'deleted', 'execution_subdir',
Fang Deng51599032014-06-23 17:24:27 -0700445 'atomic_group_id', 'aborted', 'started_on', 'finished_on')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700446 _timer = stats.Timer('scheduler_models.HostQueueEntry')
jamesrenb55378a2010-03-02 22:19:49 +0000447
448
449 def __init__(self, id=None, row=None, **kwargs):
450 assert id or row
451 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
452 self.job = Job(self.job_id)
453
454 if self.host_id:
beepscc9fc702013-12-02 12:45:38 -0800455 self.host = rdb_lib.get_hosts([self.host_id])[0]
456 self.host.dbg_str = self.get_dbg_str()
jamesrenb55378a2010-03-02 22:19:49 +0000457 else:
458 self.host = None
459
460 if self.atomic_group_id:
461 self.atomic_group = AtomicGroup(self.atomic_group_id,
462 always_query=False)
463 else:
464 self.atomic_group = None
465
jamesrenb55378a2010-03-02 22:19:49 +0000466
467 @classmethod
468 def clone(cls, template):
469 """
470 Creates a new row using the values from a template instance.
471
472 The new instance will not exist in the database or have a valid
473 id attribute until its save() method is called.
474 """
475 assert isinstance(template, cls)
476 new_row = [getattr(template, field) for field in cls._fields]
477 clone = cls(row=new_row, new_record=True)
478 clone.id = None
479 return clone
480
481
482 def _view_job_url(self):
483 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
484
485
486 def get_labels(self):
487 """
488 Get all labels associated with this host queue entry (either via the
489 meta_host or as a job dependency label). The labels yielded are not
490 guaranteed to be unique.
491
492 @yields Label instances associated with this host_queue_entry.
493 """
494 if self.meta_host:
495 yield Label(id=self.meta_host, always_query=False)
496 labels = Label.fetch(
497 joins="JOIN afe_jobs_dependency_labels AS deps "
498 "ON (afe_labels.id = deps.label_id)",
499 where="deps.job_id = %d" % self.job.id)
500 for label in labels:
501 yield label
502
503
504 def set_host(self, host):
505 if host:
506 logging.info('Assigning host %s to entry %s', host.hostname, self)
jamesrenb55378a2010-03-02 22:19:49 +0000507 self.update_field('host_id', host.id)
508 self.block_host(host.id)
509 else:
510 logging.info('Releasing host from %s', self)
jamesrenb55378a2010-03-02 22:19:49 +0000511 self.unblock_host(self.host.id)
512 self.update_field('host_id', None)
513
514 self.host = host
515
516
jamesrenb55378a2010-03-02 22:19:49 +0000517 def block_host(self, host_id):
518 logging.info("creating block %s/%s", self.job.id, host_id)
519 row = [0, self.job.id, host_id]
520 block = IneligibleHostQueue(row=row, new_record=True)
521 block.save()
522
523
524 def unblock_host(self, host_id):
525 logging.info("removing block %s/%s", self.job.id, host_id)
526 blocks = IneligibleHostQueue.fetch(
527 'job_id=%d and host_id=%d' % (self.job.id, host_id))
528 for block in blocks:
529 block.delete()
530
531
532 def set_execution_subdir(self, subdir=None):
533 if subdir is None:
534 assert self.host
535 subdir = self.host.hostname
536 self.update_field('execution_subdir', subdir)
537
538
539 def _get_hostname(self):
540 if self.host:
541 return self.host.hostname
542 return 'no host'
543
544
beepscc9fc702013-12-02 12:45:38 -0800545 def get_dbg_str(self):
546 """Get a debug string to identify this host.
547
548 @return: A string containing the hqe and job id.
549 """
550 try:
551 return 'HQE: %s, for job: %s' % (self.id, self.job_id)
552 except AttributeError as e:
553 return 'HQE has not been initialized yet: %s' % e
554
555
jamesrenb55378a2010-03-02 22:19:49 +0000556 def __str__(self):
557 flags = []
558 if self.active:
559 flags.append('active')
560 if self.complete:
561 flags.append('complete')
562 if self.deleted:
563 flags.append('deleted')
564 if self.aborted:
565 flags.append('aborted')
566 flags_str = ','.join(flags)
567 if flags_str:
568 flags_str = ' [%s]' % flags_str
beepscc9fc702013-12-02 12:45:38 -0800569 return ("%s and host: %s has status:%s%s" %
570 (self.get_dbg_str(), self._get_hostname(), self.status,
571 flags_str))
jamesrenb55378a2010-03-02 22:19:49 +0000572
573
Michael Liang0d747462014-07-17 14:19:53 -0700574 def record_state(self, type_str, state, value):
Michael Liang500dedc2014-07-15 16:16:44 -0700575 """Record metadata in elasticsearch.
576
577 If ES configured to use http, then we will time that http request.
578 Otherwise, it uses UDP, so we will not need to time it.
579
Michael Liang0d747462014-07-17 14:19:53 -0700580 @param type_str: sets the _type field in elasticsearch db.
Michael Liang500dedc2014-07-15 16:16:44 -0700581 @param state: string representing what state we are recording,
582 e.g. 'status'
583 @param value: value of the state, e.g. 'verifying'
584 """
585 metadata = {
586 'time_changed': time.time(),
587 state: value,
588 'job_id': self.job_id,
589 }
590 if self.host:
591 metadata['hostname'] = self.host.hostname
Michael Liang0d747462014-07-17 14:19:53 -0700592 es_utils.ESMetadata().post(type_str=type_str, metadata=metadata)
Michael Liang500dedc2014-07-15 16:16:44 -0700593
594
Fang Deng1d6c2a02013-04-17 15:25:45 -0700595 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000596 def set_status(self, status):
597 logging.info("%s -> %s", self, status)
598
599 self.update_field('status', status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700600 # Noticed some time jumps after last logging message.
601 logging.debug('Update Field Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000602
603 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
604 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
605 assert not (active and complete)
606
607 self.update_field('active', active)
608 self.update_field('complete', complete)
609
610 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000611 self._on_complete(status)
Dale Curtis74a314b2011-06-23 14:55:46 -0700612 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000613
614 should_email_status = (status.lower() in _notify_email_statuses or
615 'all' in _notify_email_statuses)
616 if should_email_status:
617 self._email_on_status(status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700618 logging.debug('HQE Set Status Complete')
Michael Liang500dedc2014-07-15 16:16:44 -0700619 self.record_state('hqe_status', 'status', status)
620
jamesrenb55378a2010-03-02 22:19:49 +0000621
622
jamesrene7c65cb2010-06-08 20:38:10 +0000623 def _on_complete(self, status):
624 if status is not models.HostQueueEntry.Status.ABORTED:
625 self.job.stop_if_necessary()
626
Fang Dengd44a1232014-08-18 14:40:28 -0700627 if self.started_on:
628 self.set_finished_on_now()
jamesrenb55378a2010-03-02 22:19:49 +0000629 if not self.execution_subdir:
630 return
631 # unregister any possible pidfiles associated with this queue entry
632 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
633 pidfile_id = _drone_manager.get_pidfile_id_from(
634 self.execution_path(), pidfile_name=pidfile_name)
635 _drone_manager.unregister_pidfile(pidfile_id)
636
637
Eric Li6f27d4f2010-09-29 10:55:17 -0700638 def _get_status_email_contents(self, status, summary=None, hostname=None):
639 """
640 Gather info for the status notification e-mails.
641
642 If needed, we could start using the Django templating engine to create
643 the subject and the e-mail body, but that doesn't seem necessary right
644 now.
645
646 @param status: Job status text. Mandatory.
647 @param summary: Job summary text. Optional.
648 @param hostname: A hostname for the job. Optional.
649
650 @return: Tuple (subject, body) for the notification e-mail.
651 """
652 job_stats = Job(id=self.job.id).get_execution_details()
653
654 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
655 (self.job.id, self.job.name, status))
656
657 if hostname is not None:
658 subject += '| Hostname: %s ' % hostname
659
660 if status not in ["1 Failed", "Failed"]:
661 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
662
663 body = "Job ID: %s\n" % self.job.id
664 body += "Job name: %s\n" % self.job.name
665 if hostname is not None:
666 body += "Host: %s\n" % hostname
667 if summary is not None:
668 body += "Summary: %s\n" % summary
669 body += "Status: %s\n" % status
670 body += "Results interface URL: %s\n" % self._view_job_url()
671 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
672 if int(job_stats['total_executed']) > 0:
673 body += "User tests executed: %s\n" % job_stats['total_executed']
674 body += "User tests passed: %s\n" % job_stats['total_passed']
675 body += "User tests failed: %s\n" % job_stats['total_failed']
676 body += ("User tests success rate: %.2f %%\n" %
677 job_stats['success_rate'])
678
679 if job_stats['failed_rows']:
680 body += "Failures:\n"
681 body += job_stats['failed_rows']
682
683 return subject, body
684
685
jamesrenb55378a2010-03-02 22:19:49 +0000686 def _email_on_status(self, status):
687 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700688 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000689 email_manager.manager.send_email(self.job.email_list, subject, body)
690
691
692 def _email_on_job_complete(self):
693 if not self.job.is_finished():
694 return
695
Eric Li6f27d4f2010-09-29 10:55:17 -0700696 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000697 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
698 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700699 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000700 (queue_entry._get_hostname(),
701 queue_entry.status))
702
Eric Li6f27d4f2010-09-29 10:55:17 -0700703 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000704 status_counts = models.Job.objects.get_status_counts(
705 [self.job.id])[self.job.id]
706 status = ', '.join('%d %s' % (count, status) for status, count
707 in status_counts.iteritems())
708
Eric Li6f27d4f2010-09-29 10:55:17 -0700709 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000710 email_manager.manager.send_email(self.job.email_list, subject, body)
711
712
713 def schedule_pre_job_tasks(self):
714 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
715 self.job.name, self.meta_host, self.atomic_group_id,
716 self.job.id, self.id, self.host.hostname, self.status)
717
718 self._do_schedule_pre_job_tasks()
719
720
721 def _do_schedule_pre_job_tasks(self):
jamesrenb55378a2010-03-02 22:19:49 +0000722 self.job.schedule_pre_job_tasks(queue_entry=self)
723
724
725 def requeue(self):
726 assert self.host
727 self.set_status(models.HostQueueEntry.Status.QUEUED)
728 self.update_field('started_on', None)
Fang Deng51599032014-06-23 17:24:27 -0700729 self.update_field('finished_on', None)
jamesrenb55378a2010-03-02 22:19:49 +0000730 # verify/cleanup failure sets the execution subdir, so reset it here
731 self.set_execution_subdir('')
732 if self.meta_host:
733 self.set_host(None)
734
735
736 @property
737 def aborted_by(self):
738 self._load_abort_info()
739 return self._aborted_by
740
741
742 @property
743 def aborted_on(self):
744 self._load_abort_info()
745 return self._aborted_on
746
747
748 def _load_abort_info(self):
749 """ Fetch info about who aborted the job. """
750 if hasattr(self, "_aborted_by"):
751 return
752 rows = _db.execute("""
753 SELECT afe_users.login,
754 afe_aborted_host_queue_entries.aborted_on
755 FROM afe_aborted_host_queue_entries
756 INNER JOIN afe_users
757 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
758 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
759 """, (self.id,))
760 if rows:
761 self._aborted_by, self._aborted_on = rows[0]
762 else:
763 self._aborted_by = self._aborted_on = None
764
765
766 def on_pending(self):
767 """
768 Called when an entry in a synchronous job has passed verify. If the
769 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
770 them in PENDING.
771 """
772 self.set_status(models.HostQueueEntry.Status.PENDING)
773 self.host.set_status(models.Host.Status.PENDING)
774
775 # Some debug code here: sends an email if an asynchronous job does not
776 # immediately enter Starting.
777 # TODO: Remove this once we figure out why asynchronous jobs are getting
778 # stuck in Pending.
779 self.job.run_if_ready(queue_entry=self)
780 if (self.job.synch_count == 1 and
781 self.status == models.HostQueueEntry.Status.PENDING):
782 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
783 message = 'Asynchronous job stuck in Pending'
784 email_manager.manager.enqueue_notify_email(subject, message)
785
786
787 def abort(self, dispatcher):
788 assert self.aborted and not self.complete
789
790 Status = models.HostQueueEntry.Status
791 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
792 # do nothing; post-job tasks will finish and then mark this entry
793 # with status "Aborted" and take care of the host
794 return
795
jamesren3bc70a12010-04-12 18:23:38 +0000796 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
797 Status.WAITING):
Dan Shi76af8022013-10-19 01:59:49 -0700798 # If hqe is in any of these status, it should not have any
799 # unfinished agent before it can be aborted.
800 agents = dispatcher.get_agents_for_entry(self)
801 # Agent with finished task can be left behind. This is added to
802 # handle the special case of aborting hostless job in STARTING
803 # status, in which the agent has only a HostlessQueueTask
804 # associated. The finished HostlessQueueTask will be cleaned up in
805 # the next tick, so it's safe to leave the agent there. Without
806 # filtering out finished agent, HQE abort won't be able to proceed.
807 assert all([agent.is_done() for agent in agents])
808 # If hqe is still in STARTING status, it may not have assigned a
809 # host yet.
810 if self.host:
811 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -0700812 elif (self.status == Status.VERIFYING or
beepse50d8752013-11-20 18:23:02 -0800813 self.status == Status.RESETTING):
jamesrenb55378a2010-03-02 22:19:49 +0000814 models.SpecialTask.objects.create(
815 task=models.SpecialTask.Task.CLEANUP,
816 host=models.Host.objects.get(id=self.host.id),
817 requested_by=self.job.owner_model())
beepse50d8752013-11-20 18:23:02 -0800818 elif self.status == Status.PROVISIONING:
819 models.SpecialTask.objects.create(
820 task=models.SpecialTask.Task.REPAIR,
821 host=models.Host.objects.get(id=self.host.id),
822 requested_by=self.job.owner_model())
jamesrenb55378a2010-03-02 22:19:49 +0000823
824 self.set_status(Status.ABORTED)
825 self.job.abort_delay_ready_task()
826
827
828 def get_group_name(self):
829 atomic_group = self.atomic_group
830 if not atomic_group:
831 return ''
832
833 # Look at any meta_host and dependency labels and pick the first
834 # one that also specifies this atomic group. Use that label name
835 # as the group name if possible (it is more specific).
836 for label in self.get_labels():
837 if label.atomic_group_id:
838 assert label.atomic_group_id == atomic_group.id
839 return label.name
840 return atomic_group.name
841
842
843 def execution_tag(self):
Scott Zawalskid712cf32012-07-14 16:24:53 -0400844 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
845 'complete!=1 AND execution_subdir="" AND '
846 'status!="Queued";')
847 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
848 'status="Aborted" WHERE id=%s;')
849 try:
850 assert self.execution_subdir
851 except AssertionError:
852 # TODO(scottz): Remove temporary fix/info gathering pathway for
853 # crosbug.com/31595 once issue is root caused.
854 logging.error('No execution_subdir for host queue id:%s.', self.id)
855 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
856 for row in _db.execute(SQL_SUSPECT_ENTRIES):
Dan Shi76af8022013-10-19 01:59:49 -0700857 logging.error(row)
Scott Zawalskid712cf32012-07-14 16:24:53 -0400858 logging.error('====DB DEBUG====\n')
859 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
860 logging.error('EXECUTING: %s', fix_query)
861 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
862 raise AssertionError(('self.execution_subdir not found. '
863 'See log for details.'))
864
jamesrenb55378a2010-03-02 22:19:49 +0000865 return "%s/%s" % (self.job.tag(), self.execution_subdir)
866
867
868 def execution_path(self):
869 return self.execution_tag()
870
871
872 def set_started_on_now(self):
873 self.update_field('started_on', datetime.datetime.now())
874
875
Fang Deng51599032014-06-23 17:24:27 -0700876 def set_finished_on_now(self):
877 self.update_field('finished_on', datetime.datetime.now())
878
879
jamesrenb55378a2010-03-02 22:19:49 +0000880 def is_hostless(self):
881 return (self.host_id is None
882 and self.meta_host is None
883 and self.atomic_group_id is None)
884
885
886class Job(DBObject):
887 _table_name = 'afe_jobs'
888 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
889 'control_type', 'created_on', 'synch_count', 'timeout',
890 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
Simran Basi9f508a12012-11-09 12:20:44 -0800891 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800892 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
Jakob Jülich92c06332014-08-25 19:06:57 +0000893 'test_retry', 'run_reset', 'timeout_mins', 'shard_id')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700894 _timer = stats.Timer("scheduler_models.Job")
jamesrenb55378a2010-03-02 22:19:49 +0000895
896 # This does not need to be a column in the DB. The delays are likely to
897 # be configured short. If the scheduler is stopped and restarted in
898 # the middle of a job's delay cycle, the delay cycle will either be
899 # repeated or skipped depending on the number of Pending machines found
900 # when the restarted scheduler recovers to track it. Not a problem.
901 #
902 # A reference to the DelayedCallTask that will wake up the job should
903 # no other HQEs change state in time. Its end_time attribute is used
904 # by our run_with_ready_delay() method to determine if the wait is over.
905 _delay_ready_task = None
906
907 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
908 # all status='Pending' atomic group HQEs incase a delay was running when the
909 # scheduler was restarted and no more hosts ever successfully exit Verify.
910
911 def __init__(self, id=None, row=None, **kwargs):
912 assert id or row
913 super(Job, self).__init__(id=id, row=row, **kwargs)
914 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800915 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000916
917
918 def model(self):
919 return models.Job.objects.get(id=self.id)
920
921
922 def owner_model(self):
923 # work around the fact that the Job owner field is a string, not a
924 # foreign key
925 if not self._owner_model:
926 self._owner_model = models.User.objects.get(login=self.owner)
927 return self._owner_model
928
929
930 def is_server_job(self):
Aviv Keshet82352b22013-05-14 18:30:56 -0700931 return self.control_type == control_data.CONTROL_TYPE.SERVER
jamesrenb55378a2010-03-02 22:19:49 +0000932
933
934 def tag(self):
935 return "%s-%s" % (self.id, self.owner)
936
937
938 def get_host_queue_entries(self):
939 rows = _db.execute("""
940 SELECT * FROM afe_host_queue_entries
941 WHERE job_id= %s
942 """, (self.id,))
943 entries = [HostQueueEntry(row=i) for i in rows]
944
945 assert len(entries)>0
946
947 return entries
948
949
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800950 def is_image_update_job(self):
951 """
952 Discover if the current job requires an OS update.
953
954 @return: True/False if OS should be updated before job is run.
955 """
956 # All image update jobs have the parameterized_job_id set.
957 if not self.parameterized_job_id:
958 return False
959
960 # Retrieve the ID of the ParameterizedJob this job is an instance of.
961 rows = _db.execute("""
962 SELECT test_id
963 FROM afe_parameterized_jobs
964 WHERE id = %s
965 """, (self.parameterized_job_id,))
966 if not rows:
967 return False
968 test_id = rows[0][0]
969
970 # Retrieve the ID of the known autoupdate_ParameterizedJob.
971 rows = _db.execute("""
972 SELECT id
973 FROM afe_autotests
974 WHERE name = 'autoupdate_ParameterizedJob'
975 """)
976 if not rows:
977 return False
978 update_id = rows[0][0]
979
980 # If the IDs are the same we've found an image update job.
981 if test_id == update_id:
982 # Finally, get the path to the OS image to install.
983 rows = _db.execute("""
984 SELECT parameter_value
985 FROM afe_parameterized_job_parameters
986 WHERE parameterized_job_id = %s
987 """, (self.parameterized_job_id,))
988 if rows:
989 # Save the path in update_image_path to use later as a command
990 # line parameter to autoserv.
991 self.update_image_path = rows[0][0]
992 return True
993
994 return False
995
996
Eric Li6f27d4f2010-09-29 10:55:17 -0700997 def get_execution_details(self):
998 """
999 Get test execution details for this job.
1000
1001 @return: Dictionary with test execution details
1002 """
1003 def _find_test_jobs(rows):
1004 """
1005 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
1006 Those are autotest 'internal job' tests, so they should not be
1007 counted when evaluating the test stats.
1008
1009 @param rows: List of rows (matrix) with database results.
1010 """
1011 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
1012 n_test_jobs = 0
1013 for r in rows:
1014 test_name = r[0]
1015 if job_test_pattern.match(test_name):
1016 n_test_jobs += 1
1017
1018 return n_test_jobs
1019
1020 stats = {}
1021
1022 rows = _db.execute("""
1023 SELECT t.test, s.word, t.reason
1024 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
1025 WHERE t.job_idx = j.job_idx
1026 AND s.status_idx = t.status
1027 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -08001028 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -07001029 """ % self.id)
1030
Dale Curtis74a314b2011-06-23 14:55:46 -07001031 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -07001032
1033 n_test_jobs = _find_test_jobs(rows)
1034 n_test_jobs_failed = _find_test_jobs(failed_rows)
1035
1036 total_executed = len(rows) - n_test_jobs
1037 total_failed = len(failed_rows) - n_test_jobs_failed
1038
1039 if total_executed > 0:
1040 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
1041 else:
1042 success_rate = 0
1043
1044 stats['total_executed'] = total_executed
1045 stats['total_failed'] = total_failed
1046 stats['total_passed'] = total_executed - total_failed
1047 stats['success_rate'] = success_rate
1048
1049 status_header = ("Test Name", "Status", "Reason")
1050 if failed_rows:
1051 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
1052 status_header)
1053 else:
1054 stats['failed_rows'] = ''
1055
1056 time_row = _db.execute("""
1057 SELECT started_time, finished_time
1058 FROM tko_jobs
1059 WHERE afe_job_id = %s
1060 """ % self.id)
1061
1062 if time_row:
1063 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -08001064 try:
1065 delta = t_end - t_begin
1066 minutes, seconds = divmod(delta.seconds, 60)
1067 hours, minutes = divmod(minutes, 60)
1068 stats['execution_time'] = ("%02d:%02d:%02d" %
1069 (hours, minutes, seconds))
1070 # One of t_end or t_begin are None
1071 except TypeError:
1072 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -07001073 else:
1074 stats['execution_time'] = '(none)'
1075
1076 return stats
1077
1078
Fang Deng1d6c2a02013-04-17 15:25:45 -07001079 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +00001080 def set_status(self, status, update_queues=False):
1081 self.update_field('status',status)
1082
1083 if update_queues:
1084 for queue_entry in self.get_host_queue_entries():
1085 queue_entry.set_status(status)
1086
1087
1088 def keyval_dict(self):
1089 return self.model().keyval_dict()
1090
1091
1092 def _atomic_and_has_started(self):
1093 """
1094 @returns True if any of the HostQueueEntries associated with this job
1095 have entered the Status.STARTING state or beyond.
1096 """
1097 atomic_entries = models.HostQueueEntry.objects.filter(
1098 job=self.id, atomic_group__isnull=False)
1099 if atomic_entries.count() <= 0:
1100 return False
1101
1102 # These states may *only* be reached if Job.run() has been called.
1103 started_statuses = (models.HostQueueEntry.Status.STARTING,
1104 models.HostQueueEntry.Status.RUNNING,
1105 models.HostQueueEntry.Status.COMPLETED)
1106
1107 started_entries = atomic_entries.filter(status__in=started_statuses)
1108 return started_entries.count() > 0
1109
1110
1111 def _hosts_assigned_count(self):
1112 """The number of HostQueueEntries assigned a Host for this job."""
1113 entries = models.HostQueueEntry.objects.filter(job=self.id,
1114 host__isnull=False)
1115 return entries.count()
1116
1117
1118 def _pending_count(self):
1119 """The number of HostQueueEntries for this job in the Pending state."""
1120 pending_entries = models.HostQueueEntry.objects.filter(
1121 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1122 return pending_entries.count()
1123
1124
1125 def _max_hosts_needed_to_run(self, atomic_group):
1126 """
1127 @param atomic_group: The AtomicGroup associated with this job that we
1128 are using to set an upper bound on the threshold.
1129 @returns The maximum number of HostQueueEntries assigned a Host before
1130 this job can run.
1131 """
1132 return min(self._hosts_assigned_count(),
1133 atomic_group.max_number_of_machines)
1134
1135
1136 def _min_hosts_needed_to_run(self):
1137 """Return the minumum number of hsots needed to run this job."""
1138 return self.synch_count
1139
1140
1141 def is_ready(self):
1142 # NOTE: Atomic group jobs stop reporting ready after they have been
1143 # started to avoid launching multiple copies of one atomic job.
1144 # Only possible if synch_count is less than than half the number of
1145 # machines in the atomic group.
1146 pending_count = self._pending_count()
1147 atomic_and_has_started = self._atomic_and_has_started()
1148 ready = (pending_count >= self.synch_count
1149 and not atomic_and_has_started)
1150
1151 if not ready:
1152 logging.info(
1153 'Job %s not ready: %s pending, %s required '
1154 '(Atomic and started: %s)',
1155 self, pending_count, self.synch_count,
1156 atomic_and_has_started)
1157
1158 return ready
1159
1160
1161 def num_machines(self, clause = None):
1162 sql = "job_id=%s" % self.id
1163 if clause:
1164 sql += " AND (%s)" % clause
1165 return self.count(sql, table='afe_host_queue_entries')
1166
1167
1168 def num_queued(self):
1169 return self.num_machines('not complete')
1170
1171
1172 def num_active(self):
1173 return self.num_machines('active')
1174
1175
1176 def num_complete(self):
1177 return self.num_machines('complete')
1178
1179
1180 def is_finished(self):
1181 return self.num_complete() == self.num_machines()
1182
1183
1184 def _not_yet_run_entries(self, include_verifying=True):
1185 statuses = [models.HostQueueEntry.Status.QUEUED,
1186 models.HostQueueEntry.Status.PENDING]
1187 if include_verifying:
1188 statuses.append(models.HostQueueEntry.Status.VERIFYING)
1189 return models.HostQueueEntry.objects.filter(job=self.id,
1190 status__in=statuses)
1191
1192
1193 def _stop_all_entries(self):
1194 entries_to_stop = self._not_yet_run_entries(
1195 include_verifying=False)
1196 for child_entry in entries_to_stop:
1197 assert not child_entry.complete, (
1198 '%s status=%s, active=%s, complete=%s' %
1199 (child_entry.id, child_entry.status, child_entry.active,
1200 child_entry.complete))
1201 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1202 child_entry.host.status = models.Host.Status.READY
1203 child_entry.host.save()
1204 child_entry.status = models.HostQueueEntry.Status.STOPPED
1205 child_entry.save()
1206
1207
1208 def stop_if_necessary(self):
1209 not_yet_run = self._not_yet_run_entries()
1210 if not_yet_run.count() < self.synch_count:
1211 self._stop_all_entries()
1212
1213
jamesrenb55378a2010-03-02 22:19:49 +00001214 def _next_group_name(self, group_name=''):
1215 """@returns a directory name to use for the next host group results."""
1216 if group_name:
1217 # Sanitize for use as a pathname.
1218 group_name = group_name.replace(os.path.sep, '_')
1219 if group_name.startswith('.'):
1220 group_name = '_' + group_name[1:]
1221 # Add a separator between the group name and 'group%d'.
1222 group_name += '.'
1223 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1224 query = models.HostQueueEntry.objects.filter(
1225 job=self.id).values('execution_subdir').distinct()
1226 subdirs = (entry['execution_subdir'] for entry in query)
1227 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1228 ids = [int(match.group(1)) for match in group_matches if match]
1229 if ids:
1230 next_id = max(ids) + 1
1231 else:
1232 next_id = 0
1233 return '%sgroup%d' % (group_name, next_id)
1234
1235
1236 def get_group_entries(self, queue_entry_from_group):
1237 """
1238 @param queue_entry_from_group: A HostQueueEntry instance to find other
1239 group entries on this job for.
1240
1241 @returns A list of HostQueueEntry objects all executing this job as
1242 part of the same group as the one supplied (having the same
1243 execution_subdir).
1244 """
1245 execution_subdir = queue_entry_from_group.execution_subdir
1246 return list(HostQueueEntry.fetch(
1247 where='job_id=%s AND execution_subdir=%s',
1248 params=(self.id, execution_subdir)))
1249
1250
1251 def _should_run_cleanup(self, queue_entry):
1252 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1253 return True
1254 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1255 return queue_entry.host.dirty
1256 return False
1257
1258
1259 def _should_run_verify(self, queue_entry):
1260 do_not_verify = (queue_entry.host.protection ==
1261 host_protections.Protection.DO_NOT_VERIFY)
1262 if do_not_verify:
1263 return False
Alex Miller6ee996f2013-02-28 13:53:52 -08001264 # If RebootBefore is set to NEVER, then we won't run reset because
1265 # we can't cleanup, so we need to weaken a Reset into a Verify.
1266 weaker_reset = (self.run_reset and
1267 self.reboot_before == model_attributes.RebootBefore.NEVER)
1268 return self.run_verify or weaker_reset
jamesrenb55378a2010-03-02 22:19:49 +00001269
1270
Dan Shi07e09af2013-04-12 09:31:29 -07001271 def _should_run_reset(self, queue_entry):
1272 can_verify = (queue_entry.host.protection !=
1273 host_protections.Protection.DO_NOT_VERIFY)
1274 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
1275 return (can_reboot and can_verify and (self.run_reset or
1276 (self._should_run_cleanup(queue_entry) and
1277 self._should_run_verify(queue_entry))))
1278
1279
Alex Millerdfff2fd2013-05-28 13:05:06 -07001280 def _should_run_provision(self, queue_entry):
1281 """
1282 Determine if the queue_entry needs to have a provision task run before
1283 it to provision queue_entry.host.
1284
1285 @param queue_entry: The host queue entry in question.
1286 @returns: True if we should schedule a provision task, False otherwise.
1287
1288 """
1289 # If we get to this point, it means that the scheduler has already
1290 # vetted that all the unprovisionable labels match, so we can just
1291 # find all labels on the job that aren't on the host to get the list
1292 # of what we need to provision. (See the scheduling logic in
1293 # host_scheduler.py:is_host_eligable_for_job() where we discard all
Alex Miller627694a2014-05-01 18:04:29 -07001294 # actionable labels when assigning jobs to hosts.)
Alex Millerdfff2fd2013-05-28 13:05:06 -07001295 job_labels = {x.name for x in queue_entry.get_labels()}
1296 _, host_labels = queue_entry.host.platform_and_labels()
Alex Miller627694a2014-05-01 18:04:29 -07001297 # If there are any labels on the job that are not on the host and they
1298 # are labels that provisioning knows how to change, then that means
1299 # there is provisioning work to do. If there's no provisioning work to
1300 # do, then obviously we have no reason to schedule a provision task!
1301 diff = job_labels - set(host_labels)
1302 if any([provision.Provision.acts_on(x) for x in diff]):
Alex Millerdfff2fd2013-05-28 13:05:06 -07001303 return True
1304 return False
1305
1306
Alex Miller42437f92013-05-28 12:58:54 -07001307 def _queue_special_task(self, queue_entry, task):
jamesrenb55378a2010-03-02 22:19:49 +00001308 """
Alex Miller42437f92013-05-28 12:58:54 -07001309 Create a special task and associate it with a host queue entry.
jamesrenb55378a2010-03-02 22:19:49 +00001310
Alex Miller42437f92013-05-28 12:58:54 -07001311 @param queue_entry: The queue entry this special task should be
1312 associated with.
1313 @param task: One of the members of the enum models.SpecialTask.Task.
1314 @returns: None
1315
jamesrenb55378a2010-03-02 22:19:49 +00001316 """
jamesrenb55378a2010-03-02 22:19:49 +00001317 models.SpecialTask.objects.create(
1318 host=models.Host.objects.get(id=queue_entry.host_id),
1319 queue_entry=queue_entry, task=task)
1320
1321
Alex Miller42437f92013-05-28 12:58:54 -07001322 def schedule_pre_job_tasks(self, queue_entry):
1323 """
1324 Queue all of the special tasks that need to be run before a host
1325 queue entry may run.
1326
1327 If no special taskes need to be scheduled, then |on_pending| will be
1328 called directly.
1329
1330 @returns None
1331
1332 """
1333 task_queued = False
1334 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
1335
Dan Shi07e09af2013-04-12 09:31:29 -07001336 if self._should_run_reset(queue_entry):
1337 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
Alex Miller42437f92013-05-28 12:58:54 -07001338 task_queued = True
Dan Shi07e09af2013-04-12 09:31:29 -07001339 else:
1340 if self._should_run_cleanup(queue_entry):
1341 self._queue_special_task(hqe_model,
1342 models.SpecialTask.Task.CLEANUP)
1343 task_queued = True
1344 if self._should_run_verify(queue_entry):
1345 self._queue_special_task(hqe_model,
1346 models.SpecialTask.Task.VERIFY)
1347 task_queued = True
Alex Miller42437f92013-05-28 12:58:54 -07001348
Alex Millerdfff2fd2013-05-28 13:05:06 -07001349 if self._should_run_provision(queue_entry):
1350 self._queue_special_task(hqe_model,
1351 models.SpecialTask.Task.PROVISION)
1352 task_queued = True
1353
Alex Miller42437f92013-05-28 12:58:54 -07001354 if not task_queued:
1355 queue_entry.on_pending()
1356
1357
jamesrenb55378a2010-03-02 22:19:49 +00001358 def _assign_new_group(self, queue_entries, group_name=''):
1359 if len(queue_entries) == 1:
1360 group_subdir_name = queue_entries[0].host.hostname
1361 else:
1362 group_subdir_name = self._next_group_name(group_name)
1363 logging.info('Running synchronous job %d hosts %s as %s',
1364 self.id, [entry.host.hostname for entry in queue_entries],
1365 group_subdir_name)
1366
1367 for queue_entry in queue_entries:
1368 queue_entry.set_execution_subdir(group_subdir_name)
1369
1370
1371 def _choose_group_to_run(self, include_queue_entry):
1372 """
1373 @returns A tuple containing a list of HostQueueEntry instances to be
1374 used to run this Job, a string group name to suggest giving
1375 to this job in the results database.
1376 """
1377 atomic_group = include_queue_entry.atomic_group
1378 chosen_entries = [include_queue_entry]
1379 if atomic_group:
1380 num_entries_wanted = atomic_group.max_number_of_machines
1381 else:
1382 num_entries_wanted = self.synch_count
1383 num_entries_wanted -= len(chosen_entries)
1384
1385 if num_entries_wanted > 0:
1386 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1387 pending_entries = list(HostQueueEntry.fetch(
1388 where=where_clause,
1389 params=(self.id, include_queue_entry.id)))
1390
1391 # Sort the chosen hosts by hostname before slicing.
1392 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1393 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1394 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1395 chosen_entries += pending_entries[:num_entries_wanted]
1396
1397 # Sanity check. We'll only ever be called if this can be met.
1398 if len(chosen_entries) < self.synch_count:
1399 message = ('job %s got less than %s chosen entries: %s' % (
1400 self.id, self.synch_count, chosen_entries))
1401 logging.error(message)
1402 email_manager.manager.enqueue_notify_email(
1403 'Job not started, too few chosen entries', message)
1404 return []
1405
1406 group_name = include_queue_entry.get_group_name()
1407
1408 self._assign_new_group(chosen_entries, group_name=group_name)
1409 return chosen_entries
1410
1411
1412 def run_if_ready(self, queue_entry):
1413 """
1414 Run this job by kicking its HQEs into status='Starting' if enough
1415 hosts are ready for it to run.
1416
1417 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1418 ready to run.
1419 """
1420 if not self.is_ready():
1421 self.stop_if_necessary()
1422 elif queue_entry.atomic_group:
1423 self.run_with_ready_delay(queue_entry)
1424 else:
1425 self.run(queue_entry)
1426
1427
1428 def run_with_ready_delay(self, queue_entry):
1429 """
1430 Start a delay to wait for more hosts to enter Pending state before
1431 launching an atomic group job. Once set, the a delay cannot be reset.
1432
1433 @param queue_entry: The HostQueueEntry object to get atomic group
1434 info from and pass to run_if_ready when the delay is up.
1435
1436 @returns An Agent to run the job as appropriate or None if a delay
1437 has already been set.
1438 """
1439 assert queue_entry.job_id == self.id
1440 assert queue_entry.atomic_group
1441 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1442 over_max_threshold = (self._pending_count() >=
1443 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1444 delay_expired = (self._delay_ready_task and
1445 time.time() >= self._delay_ready_task.end_time)
1446
1447 # Delay is disabled or we already have enough? Do not wait to run.
1448 if not delay or over_max_threshold or delay_expired:
1449 self.run(queue_entry)
1450 else:
1451 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1452
1453
1454 def request_abort(self):
1455 """Request that this Job be aborted on the next scheduler cycle."""
1456 self.model().abort()
1457
1458
1459 def schedule_delayed_callback_task(self, queue_entry):
1460 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1461
1462 if self._delay_ready_task:
1463 return None
1464
1465 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1466
1467 def run_job_after_delay():
1468 logging.info('Job %s done waiting for extra hosts.', self)
1469 # Check to see if the job is still relevant. It could have aborted
1470 # while we were waiting or hosts could have disappearred, etc.
1471 if self._pending_count() < self._min_hosts_needed_to_run():
1472 logging.info('Job %s had too few Pending hosts after waiting '
1473 'for extras. Not running.', self)
1474 self.request_abort()
1475 return
1476 return self.run(queue_entry)
1477
1478 logging.info('Job %s waiting up to %s seconds for more hosts.',
1479 self.id, delay)
1480 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1481 callback=run_job_after_delay)
1482 return self._delay_ready_task
1483
1484
1485 def run(self, queue_entry):
1486 """
1487 @param queue_entry: The HostQueueEntry instance calling this method.
1488 """
1489 if queue_entry.atomic_group and self._atomic_and_has_started():
1490 logging.error('Job.run() called on running atomic Job %d '
1491 'with HQE %s.', self.id, queue_entry)
1492 return
1493 queue_entries = self._choose_group_to_run(queue_entry)
1494 if queue_entries:
1495 self._finish_run(queue_entries)
1496
1497
1498 def _finish_run(self, queue_entries):
1499 for queue_entry in queue_entries:
1500 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1501 self.abort_delay_ready_task()
1502
1503
1504 def abort_delay_ready_task(self):
1505 """Abort the delayed task associated with this job, if any."""
1506 if self._delay_ready_task:
1507 # Cancel any pending callback that would try to run again
1508 # as we are already running.
1509 self._delay_ready_task.abort()
1510
1511
1512 def __str__(self):
1513 return '%s-%s' % (self.id, self.owner)