blob: 303e0e346124326ffd6c845c9d9ee0b1b9f6b961 [file] [log] [blame]
Aviv Keshet0b9cfc92013-02-05 11:36:02 -08001# pylint: disable-msg=C0111
2
jamesrenb55378a2010-03-02 22:19:49 +00003"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out. Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
14 one of these statuses, an email will be sent to the job's email_list.
15 comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import datetime, itertools, logging, os, re, sys, time, weakref
jamesrenb55378a2010-03-02 22:19:49 +000022from autotest_lib.client.common_lib import global_config, host_protections
Michael Liangda8c60a2014-06-03 13:24:51 -070023from autotest_lib.client.common_lib import utils
24from autotest_lib.client.common_lib import control_data
Michael Liang500dedc2014-07-15 16:16:44 -070025from autotest_lib.client.common_lib.cros.graphite import es_utils
Michael Liangda8c60a2014-06-03 13:24:51 -070026from autotest_lib.client.common_lib.cros.graphite import stats
jamesrenb55378a2010-03-02 22:19:49 +000027from autotest_lib.frontend.afe import models, model_attributes
jamesrenb55378a2010-03-02 22:19:49 +000028from autotest_lib.scheduler import drone_manager, email_manager
beepscc9fc702013-12-02 12:45:38 -080029from autotest_lib.scheduler import rdb_lib
jamesrenb55378a2010-03-02 22:19:49 +000030from autotest_lib.scheduler import scheduler_config
Prashanth B0e960282014-05-13 19:38:28 -070031from autotest_lib.scheduler import scheduler_lib
Alex Miller627694a2014-05-01 18:04:29 -070032from autotest_lib.server.cros import provision
Michael Liangda8c60a2014-06-03 13:24:51 -070033
jamesrenb55378a2010-03-02 22:19:49 +000034
35_notify_email_statuses = []
36_base_url = None
37
38_db = None
39_drone_manager = None
40
41def initialize():
42 global _db
Prashanth B0e960282014-05-13 19:38:28 -070043 _db = scheduler_lib.ConnectionManager().get_connection()
jamesrenb55378a2010-03-02 22:19:49 +000044
45 notify_statuses_list = global_config.global_config.get_config_value(
46 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
47 default='')
48 global _notify_email_statuses
49 _notify_email_statuses = [status for status in
50 re.split(r'[\s,;:]', notify_statuses_list.lower())
51 if status]
52
53 # AUTOTEST_WEB.base_url is still a supported config option as some people
54 # may wish to override the entire url.
55 global _base_url
56 config_base_url = global_config.global_config.get_config_value(
57 scheduler_config.CONFIG_SECTION, 'base_url', default='')
58 if config_base_url:
59 _base_url = config_base_url
60 else:
61 # For the common case of everything running on a single server you
62 # can just set the hostname in a single place in the config file.
63 server_name = global_config.global_config.get_config_value(
64 'SERVER', 'hostname')
65 if not server_name:
66 logging.critical('[SERVER] hostname missing from the config file.')
67 sys.exit(1)
68 _base_url = 'http://%s/afe/' % server_name
69
70 initialize_globals()
71
72
73def initialize_globals():
74 global _drone_manager
75 _drone_manager = drone_manager.instance()
76
77
78class DelayedCallTask(object):
79 """
80 A task object like AgentTask for an Agent to run that waits for the
81 specified amount of time to have elapsed before calling the supplied
82 callback once and finishing. If the callback returns anything, it is
83 assumed to be a new Agent instance and will be added to the dispatcher.
84
85 @attribute end_time: The absolute posix time after which this task will
86 call its callback when it is polled and be finished.
87
88 Also has all attributes required by the Agent class.
89 """
90 def __init__(self, delay_seconds, callback, now_func=None):
91 """
92 @param delay_seconds: The delay in seconds from now that this task
93 will call the supplied callback and be done.
94 @param callback: A callable to be called by this task once after at
95 least delay_seconds time has elapsed. It must return None
96 or a new Agent instance.
97 @param now_func: A time.time like function. Default: time.time.
98 Used for testing.
99 """
100 assert delay_seconds > 0
101 assert callable(callback)
102 if not now_func:
103 now_func = time.time
104 self._now_func = now_func
105 self._callback = callback
106
107 self.end_time = self._now_func() + delay_seconds
108
109 # These attributes are required by Agent.
110 self.aborted = False
111 self.host_ids = ()
112 self.success = False
113 self.queue_entry_ids = ()
114 self.num_processes = 0
115
116
117 def poll(self):
118 if not self.is_done() and self._now_func() >= self.end_time:
119 self._callback()
120 self.success = True
121
122
123 def is_done(self):
124 return self.success or self.aborted
125
126
127 def abort(self):
128 self.aborted = True
129
130
131class DBError(Exception):
132 """Raised by the DBObject constructor when its select fails."""
133
134
135class DBObject(object):
136 """A miniature object relational model for the database."""
137
138 # Subclasses MUST override these:
139 _table_name = ''
140 _fields = ()
141
142 # A mapping from (type, id) to the instance of the object for that
143 # particular id. This prevents us from creating new Job() and Host()
144 # instances for every HostQueueEntry object that we instantiate as
145 # multiple HQEs often share the same Job.
146 _instances_by_type_and_id = weakref.WeakValueDictionary()
147 _initialized = False
148
149
150 def __new__(cls, id=None, **kwargs):
151 """
152 Look to see if we already have an instance for this particular type
153 and id. If so, use it instead of creating a duplicate instance.
154 """
155 if id is not None:
156 instance = cls._instances_by_type_and_id.get((cls, id))
157 if instance:
158 return instance
159 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
160
161
162 def __init__(self, id=None, row=None, new_record=False, always_query=True):
163 assert bool(id) or bool(row)
164 if id is not None and row is not None:
165 assert id == row[0]
166 assert self._table_name, '_table_name must be defined in your class'
167 assert self._fields, '_fields must be defined in your class'
168 if not new_record:
169 if self._initialized and not always_query:
170 return # We've already been initialized.
171 if id is None:
172 id = row[0]
173 # Tell future constructors to use us instead of re-querying while
174 # this instance is still around.
175 self._instances_by_type_and_id[(type(self), id)] = self
176
177 self.__table = self._table_name
178
179 self.__new_record = new_record
180
181 if row is None:
182 row = self._fetch_row_from_db(id)
183
184 if self._initialized:
185 differences = self._compare_fields_in_row(row)
186 if differences:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -0700187 logging.warning(
jamesrenb55378a2010-03-02 22:19:49 +0000188 'initialized %s %s instance requery is updating: %s',
189 type(self), self.id, differences)
190 self._update_fields_from_row(row)
191 self._initialized = True
192
193
194 @classmethod
195 def _clear_instance_cache(cls):
196 """Used for testing, clear the internal instance cache."""
197 cls._instances_by_type_and_id.clear()
198
199
200 def _fetch_row_from_db(self, row_id):
201 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
202 rows = _db.execute(sql, (row_id,))
203 if not rows:
204 raise DBError("row not found (table=%s, row id=%s)"
205 % (self.__table, row_id))
206 return rows[0]
207
208
209 def _assert_row_length(self, row):
210 assert len(row) == len(self._fields), (
211 "table = %s, row = %s/%d, fields = %s/%d" % (
212 self.__table, row, len(row), self._fields, len(self._fields)))
213
214
215 def _compare_fields_in_row(self, row):
216 """
217 Given a row as returned by a SELECT query, compare it to our existing in
218 memory fields. Fractional seconds are stripped from datetime values
219 before comparison.
220
221 @param row - A sequence of values corresponding to fields named in
222 The class attribute _fields.
223
224 @returns A dictionary listing the differences keyed by field name
225 containing tuples of (current_value, row_value).
226 """
227 self._assert_row_length(row)
228 differences = {}
229 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
230 for field, row_value in itertools.izip(self._fields, row):
231 current_value = getattr(self, field)
232 if (isinstance(current_value, datetime.datetime)
233 and isinstance(row_value, datetime.datetime)):
234 current_value = current_value.strftime(datetime_cmp_fmt)
235 row_value = row_value.strftime(datetime_cmp_fmt)
236 if current_value != row_value:
237 differences[field] = (current_value, row_value)
238 return differences
239
240
241 def _update_fields_from_row(self, row):
242 """
243 Update our field attributes using a single row returned by SELECT.
244
245 @param row - A sequence of values corresponding to fields named in
246 the class fields list.
247 """
248 self._assert_row_length(row)
249
250 self._valid_fields = set()
251 for field, value in itertools.izip(self._fields, row):
252 setattr(self, field, value)
253 self._valid_fields.add(field)
254
255 self._valid_fields.remove('id')
256
257
258 def update_from_database(self):
259 assert self.id is not None
260 row = self._fetch_row_from_db(self.id)
261 self._update_fields_from_row(row)
262
263
264 def count(self, where, table = None):
265 if not table:
266 table = self.__table
267
268 rows = _db.execute("""
269 SELECT count(*) FROM %s
270 WHERE %s
271 """ % (table, where))
272
273 assert len(rows) == 1
274
275 return int(rows[0][0])
276
277
278 def update_field(self, field, value):
279 assert field in self._valid_fields
280
281 if getattr(self, field) == value:
282 return
283
284 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
285 _db.execute(query, (value, self.id))
286
287 setattr(self, field, value)
288
289
290 def save(self):
291 if self.__new_record:
292 keys = self._fields[1:] # avoid id
293 columns = ','.join([str(key) for key in keys])
294 values = []
295 for key in keys:
296 value = getattr(self, key)
297 if value is None:
298 values.append('NULL')
299 else:
300 values.append('"%s"' % value)
301 values_str = ','.join(values)
302 query = ('INSERT INTO %s (%s) VALUES (%s)' %
303 (self.__table, columns, values_str))
304 _db.execute(query)
305 # Update our id to the one the database just assigned to us.
306 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
307
308
309 def delete(self):
310 self._instances_by_type_and_id.pop((type(self), id), None)
311 self._initialized = False
312 self._valid_fields.clear()
313 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
314 _db.execute(query, (self.id,))
315
316
317 @staticmethod
318 def _prefix_with(string, prefix):
319 if string:
320 string = prefix + string
321 return string
322
323
324 @classmethod
325 def fetch(cls, where='', params=(), joins='', order_by=''):
326 """
327 Construct instances of our class based on the given database query.
328
329 @yields One class instance for each row fetched.
330 """
331 order_by = cls._prefix_with(order_by, 'ORDER BY ')
332 where = cls._prefix_with(where, 'WHERE ')
333 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
334 '%(where)s %(order_by)s' % {'table' : cls._table_name,
335 'joins' : joins,
336 'where' : where,
337 'order_by' : order_by})
338 rows = _db.execute(query, params)
339 return [cls(id=row[0], row=row) for row in rows]
340
341
342class IneligibleHostQueue(DBObject):
343 _table_name = 'afe_ineligible_host_queues'
344 _fields = ('id', 'job_id', 'host_id')
345
346
347class AtomicGroup(DBObject):
348 _table_name = 'afe_atomic_groups'
349 _fields = ('id', 'name', 'description', 'max_number_of_machines',
350 'invalid')
351
352
353class Label(DBObject):
354 _table_name = 'afe_labels'
355 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
356 'only_if_needed', 'atomic_group_id')
357
358
359 def __repr__(self):
360 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
361 self.name, self.id, self.atomic_group_id)
362
363
364class Host(DBObject):
365 _table_name = 'afe_hosts'
366 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
beepscc9fc702013-12-02 12:45:38 -0800367 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
368 'leased')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700369 _timer = stats.Timer("scheduler_models.Host")
jamesrenb55378a2010-03-02 22:19:49 +0000370
371
Fang Deng1d6c2a02013-04-17 15:25:45 -0700372 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000373 def set_status(self,status):
374 logging.info('%s -> %s', self.hostname, status)
375 self.update_field('status',status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700376 # Noticed some time jumps after the last log message.
377 logging.debug('Host Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000378
379
380 def platform_and_labels(self):
381 """
382 Returns a tuple (platform_name, list_of_all_label_names).
383 """
384 rows = _db.execute("""
385 SELECT afe_labels.name, afe_labels.platform
386 FROM afe_labels
387 INNER JOIN afe_hosts_labels ON
388 afe_labels.id = afe_hosts_labels.label_id
389 WHERE afe_hosts_labels.host_id = %s
390 ORDER BY afe_labels.name
391 """, (self.id,))
392 platform = None
393 all_labels = []
394 for label_name, is_platform in rows:
395 if is_platform:
396 platform = label_name
397 all_labels.append(label_name)
398 return platform, all_labels
399
400
401 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
402
403
404 @classmethod
405 def cmp_for_sort(cls, a, b):
406 """
407 A comparison function for sorting Host objects by hostname.
408
409 This strips any trailing numeric digits, ignores leading 0s and
410 compares hostnames by the leading name and the trailing digits as a
411 number. If both hostnames do not match this pattern, they are simply
412 compared as lower case strings.
413
414 Example of how hostnames will be sorted:
415
416 alice, host1, host2, host09, host010, host10, host11, yolkfolk
417
418 This hopefully satisfy most people's hostname sorting needs regardless
419 of their exact naming schemes. Nobody sane should have both a host10
420 and host010 (but the algorithm works regardless).
421 """
422 lower_a = a.hostname.lower()
423 lower_b = b.hostname.lower()
424 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
425 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
426 if match_a and match_b:
427 name_a, number_a_str = match_a.groups()
428 name_b, number_b_str = match_b.groups()
429 number_a = int(number_a_str.lstrip('0'))
430 number_b = int(number_b_str.lstrip('0'))
431 result = cmp((name_a, number_a), (name_b, number_b))
432 if result == 0 and lower_a != lower_b:
433 # If they compared equal above but the lower case names are
434 # indeed different, don't report equality. abc012 != abc12.
435 return cmp(lower_a, lower_b)
436 return result
437 else:
438 return cmp(lower_a, lower_b)
439
440
441class HostQueueEntry(DBObject):
442 _table_name = 'afe_host_queue_entries'
443 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
444 'active', 'complete', 'deleted', 'execution_subdir',
Fang Deng51599032014-06-23 17:24:27 -0700445 'atomic_group_id', 'aborted', 'started_on', 'finished_on')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700446 _timer = stats.Timer('scheduler_models.HostQueueEntry')
jamesrenb55378a2010-03-02 22:19:49 +0000447
448
449 def __init__(self, id=None, row=None, **kwargs):
450 assert id or row
451 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
452 self.job = Job(self.job_id)
453
454 if self.host_id:
beepscc9fc702013-12-02 12:45:38 -0800455 self.host = rdb_lib.get_hosts([self.host_id])[0]
456 self.host.dbg_str = self.get_dbg_str()
jamesrenb55378a2010-03-02 22:19:49 +0000457 else:
458 self.host = None
459
460 if self.atomic_group_id:
461 self.atomic_group = AtomicGroup(self.atomic_group_id,
462 always_query=False)
463 else:
464 self.atomic_group = None
465
jamesrenb55378a2010-03-02 22:19:49 +0000466
467 @classmethod
468 def clone(cls, template):
469 """
470 Creates a new row using the values from a template instance.
471
472 The new instance will not exist in the database or have a valid
473 id attribute until its save() method is called.
474 """
475 assert isinstance(template, cls)
476 new_row = [getattr(template, field) for field in cls._fields]
477 clone = cls(row=new_row, new_record=True)
478 clone.id = None
479 return clone
480
481
482 def _view_job_url(self):
483 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
484
485
486 def get_labels(self):
487 """
488 Get all labels associated with this host queue entry (either via the
489 meta_host or as a job dependency label). The labels yielded are not
490 guaranteed to be unique.
491
492 @yields Label instances associated with this host_queue_entry.
493 """
494 if self.meta_host:
495 yield Label(id=self.meta_host, always_query=False)
496 labels = Label.fetch(
497 joins="JOIN afe_jobs_dependency_labels AS deps "
498 "ON (afe_labels.id = deps.label_id)",
499 where="deps.job_id = %d" % self.job.id)
500 for label in labels:
501 yield label
502
503
504 def set_host(self, host):
505 if host:
506 logging.info('Assigning host %s to entry %s', host.hostname, self)
jamesrenb55378a2010-03-02 22:19:49 +0000507 self.update_field('host_id', host.id)
508 self.block_host(host.id)
509 else:
510 logging.info('Releasing host from %s', self)
jamesrenb55378a2010-03-02 22:19:49 +0000511 self.unblock_host(self.host.id)
512 self.update_field('host_id', None)
513
514 self.host = host
515
516
jamesrenb55378a2010-03-02 22:19:49 +0000517 def block_host(self, host_id):
518 logging.info("creating block %s/%s", self.job.id, host_id)
519 row = [0, self.job.id, host_id]
520 block = IneligibleHostQueue(row=row, new_record=True)
521 block.save()
522
523
524 def unblock_host(self, host_id):
525 logging.info("removing block %s/%s", self.job.id, host_id)
526 blocks = IneligibleHostQueue.fetch(
527 'job_id=%d and host_id=%d' % (self.job.id, host_id))
528 for block in blocks:
529 block.delete()
530
531
532 def set_execution_subdir(self, subdir=None):
533 if subdir is None:
534 assert self.host
535 subdir = self.host.hostname
536 self.update_field('execution_subdir', subdir)
537
538
539 def _get_hostname(self):
540 if self.host:
541 return self.host.hostname
542 return 'no host'
543
544
beepscc9fc702013-12-02 12:45:38 -0800545 def get_dbg_str(self):
546 """Get a debug string to identify this host.
547
548 @return: A string containing the hqe and job id.
549 """
550 try:
551 return 'HQE: %s, for job: %s' % (self.id, self.job_id)
552 except AttributeError as e:
553 return 'HQE has not been initialized yet: %s' % e
554
555
jamesrenb55378a2010-03-02 22:19:49 +0000556 def __str__(self):
557 flags = []
558 if self.active:
559 flags.append('active')
560 if self.complete:
561 flags.append('complete')
562 if self.deleted:
563 flags.append('deleted')
564 if self.aborted:
565 flags.append('aborted')
566 flags_str = ','.join(flags)
567 if flags_str:
568 flags_str = ' [%s]' % flags_str
beepscc9fc702013-12-02 12:45:38 -0800569 return ("%s and host: %s has status:%s%s" %
570 (self.get_dbg_str(), self._get_hostname(), self.status,
571 flags_str))
jamesrenb55378a2010-03-02 22:19:49 +0000572
573
Michael Liang0d747462014-07-17 14:19:53 -0700574 def record_state(self, type_str, state, value):
Michael Liang500dedc2014-07-15 16:16:44 -0700575 """Record metadata in elasticsearch.
576
577 If ES configured to use http, then we will time that http request.
578 Otherwise, it uses UDP, so we will not need to time it.
579
Michael Liang0d747462014-07-17 14:19:53 -0700580 @param type_str: sets the _type field in elasticsearch db.
Michael Liang500dedc2014-07-15 16:16:44 -0700581 @param state: string representing what state we are recording,
582 e.g. 'status'
583 @param value: value of the state, e.g. 'verifying'
584 """
585 metadata = {
586 'time_changed': time.time(),
587 state: value,
588 'job_id': self.job_id,
589 }
590 if self.host:
591 metadata['hostname'] = self.host.hostname
Michael Liang0d747462014-07-17 14:19:53 -0700592 es_utils.ESMetadata().post(type_str=type_str, metadata=metadata)
Michael Liang500dedc2014-07-15 16:16:44 -0700593
594
Fang Deng1d6c2a02013-04-17 15:25:45 -0700595 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000596 def set_status(self, status):
597 logging.info("%s -> %s", self, status)
598
599 self.update_field('status', status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700600 # Noticed some time jumps after last logging message.
601 logging.debug('Update Field Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000602
603 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
604 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
605 assert not (active and complete)
606
607 self.update_field('active', active)
608 self.update_field('complete', complete)
609
610 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000611 self._on_complete(status)
Dale Curtis74a314b2011-06-23 14:55:46 -0700612 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000613
614 should_email_status = (status.lower() in _notify_email_statuses or
615 'all' in _notify_email_statuses)
616 if should_email_status:
617 self._email_on_status(status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700618 logging.debug('HQE Set Status Complete')
Michael Liang500dedc2014-07-15 16:16:44 -0700619 self.record_state('hqe_status', 'status', status)
620
jamesrenb55378a2010-03-02 22:19:49 +0000621
622
jamesrene7c65cb2010-06-08 20:38:10 +0000623 def _on_complete(self, status):
624 if status is not models.HostQueueEntry.Status.ABORTED:
625 self.job.stop_if_necessary()
626
Fang Deng51599032014-06-23 17:24:27 -0700627 self.set_finished_on_now()
jamesrenb55378a2010-03-02 22:19:49 +0000628 if not self.execution_subdir:
629 return
630 # unregister any possible pidfiles associated with this queue entry
631 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
632 pidfile_id = _drone_manager.get_pidfile_id_from(
633 self.execution_path(), pidfile_name=pidfile_name)
634 _drone_manager.unregister_pidfile(pidfile_id)
635
636
Eric Li6f27d4f2010-09-29 10:55:17 -0700637 def _get_status_email_contents(self, status, summary=None, hostname=None):
638 """
639 Gather info for the status notification e-mails.
640
641 If needed, we could start using the Django templating engine to create
642 the subject and the e-mail body, but that doesn't seem necessary right
643 now.
644
645 @param status: Job status text. Mandatory.
646 @param summary: Job summary text. Optional.
647 @param hostname: A hostname for the job. Optional.
648
649 @return: Tuple (subject, body) for the notification e-mail.
650 """
651 job_stats = Job(id=self.job.id).get_execution_details()
652
653 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
654 (self.job.id, self.job.name, status))
655
656 if hostname is not None:
657 subject += '| Hostname: %s ' % hostname
658
659 if status not in ["1 Failed", "Failed"]:
660 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
661
662 body = "Job ID: %s\n" % self.job.id
663 body += "Job name: %s\n" % self.job.name
664 if hostname is not None:
665 body += "Host: %s\n" % hostname
666 if summary is not None:
667 body += "Summary: %s\n" % summary
668 body += "Status: %s\n" % status
669 body += "Results interface URL: %s\n" % self._view_job_url()
670 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
671 if int(job_stats['total_executed']) > 0:
672 body += "User tests executed: %s\n" % job_stats['total_executed']
673 body += "User tests passed: %s\n" % job_stats['total_passed']
674 body += "User tests failed: %s\n" % job_stats['total_failed']
675 body += ("User tests success rate: %.2f %%\n" %
676 job_stats['success_rate'])
677
678 if job_stats['failed_rows']:
679 body += "Failures:\n"
680 body += job_stats['failed_rows']
681
682 return subject, body
683
684
jamesrenb55378a2010-03-02 22:19:49 +0000685 def _email_on_status(self, status):
686 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700687 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000688 email_manager.manager.send_email(self.job.email_list, subject, body)
689
690
691 def _email_on_job_complete(self):
692 if not self.job.is_finished():
693 return
694
Eric Li6f27d4f2010-09-29 10:55:17 -0700695 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000696 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
697 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700698 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000699 (queue_entry._get_hostname(),
700 queue_entry.status))
701
Eric Li6f27d4f2010-09-29 10:55:17 -0700702 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000703 status_counts = models.Job.objects.get_status_counts(
704 [self.job.id])[self.job.id]
705 status = ', '.join('%d %s' % (count, status) for status, count
706 in status_counts.iteritems())
707
Eric Li6f27d4f2010-09-29 10:55:17 -0700708 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000709 email_manager.manager.send_email(self.job.email_list, subject, body)
710
711
712 def schedule_pre_job_tasks(self):
713 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
714 self.job.name, self.meta_host, self.atomic_group_id,
715 self.job.id, self.id, self.host.hostname, self.status)
716
717 self._do_schedule_pre_job_tasks()
718
719
720 def _do_schedule_pre_job_tasks(self):
jamesrenb55378a2010-03-02 22:19:49 +0000721 self.job.schedule_pre_job_tasks(queue_entry=self)
722
723
724 def requeue(self):
725 assert self.host
726 self.set_status(models.HostQueueEntry.Status.QUEUED)
727 self.update_field('started_on', None)
Fang Deng51599032014-06-23 17:24:27 -0700728 self.update_field('finished_on', None)
jamesrenb55378a2010-03-02 22:19:49 +0000729 # verify/cleanup failure sets the execution subdir, so reset it here
730 self.set_execution_subdir('')
731 if self.meta_host:
732 self.set_host(None)
733
734
735 @property
736 def aborted_by(self):
737 self._load_abort_info()
738 return self._aborted_by
739
740
741 @property
742 def aborted_on(self):
743 self._load_abort_info()
744 return self._aborted_on
745
746
747 def _load_abort_info(self):
748 """ Fetch info about who aborted the job. """
749 if hasattr(self, "_aborted_by"):
750 return
751 rows = _db.execute("""
752 SELECT afe_users.login,
753 afe_aborted_host_queue_entries.aborted_on
754 FROM afe_aborted_host_queue_entries
755 INNER JOIN afe_users
756 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
757 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
758 """, (self.id,))
759 if rows:
760 self._aborted_by, self._aborted_on = rows[0]
761 else:
762 self._aborted_by = self._aborted_on = None
763
764
765 def on_pending(self):
766 """
767 Called when an entry in a synchronous job has passed verify. If the
768 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
769 them in PENDING.
770 """
771 self.set_status(models.HostQueueEntry.Status.PENDING)
772 self.host.set_status(models.Host.Status.PENDING)
773
774 # Some debug code here: sends an email if an asynchronous job does not
775 # immediately enter Starting.
776 # TODO: Remove this once we figure out why asynchronous jobs are getting
777 # stuck in Pending.
778 self.job.run_if_ready(queue_entry=self)
779 if (self.job.synch_count == 1 and
780 self.status == models.HostQueueEntry.Status.PENDING):
781 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
782 message = 'Asynchronous job stuck in Pending'
783 email_manager.manager.enqueue_notify_email(subject, message)
784
785
786 def abort(self, dispatcher):
787 assert self.aborted and not self.complete
788
789 Status = models.HostQueueEntry.Status
790 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
791 # do nothing; post-job tasks will finish and then mark this entry
792 # with status "Aborted" and take care of the host
793 return
794
jamesren3bc70a12010-04-12 18:23:38 +0000795 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
796 Status.WAITING):
Dan Shi76af8022013-10-19 01:59:49 -0700797 # If hqe is in any of these status, it should not have any
798 # unfinished agent before it can be aborted.
799 agents = dispatcher.get_agents_for_entry(self)
800 # Agent with finished task can be left behind. This is added to
801 # handle the special case of aborting hostless job in STARTING
802 # status, in which the agent has only a HostlessQueueTask
803 # associated. The finished HostlessQueueTask will be cleaned up in
804 # the next tick, so it's safe to leave the agent there. Without
805 # filtering out finished agent, HQE abort won't be able to proceed.
806 assert all([agent.is_done() for agent in agents])
807 # If hqe is still in STARTING status, it may not have assigned a
808 # host yet.
809 if self.host:
810 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -0700811 elif (self.status == Status.VERIFYING or
beepse50d8752013-11-20 18:23:02 -0800812 self.status == Status.RESETTING):
jamesrenb55378a2010-03-02 22:19:49 +0000813 models.SpecialTask.objects.create(
814 task=models.SpecialTask.Task.CLEANUP,
815 host=models.Host.objects.get(id=self.host.id),
816 requested_by=self.job.owner_model())
beepse50d8752013-11-20 18:23:02 -0800817 elif self.status == Status.PROVISIONING:
818 models.SpecialTask.objects.create(
819 task=models.SpecialTask.Task.REPAIR,
820 host=models.Host.objects.get(id=self.host.id),
821 requested_by=self.job.owner_model())
jamesrenb55378a2010-03-02 22:19:49 +0000822
823 self.set_status(Status.ABORTED)
824 self.job.abort_delay_ready_task()
825
826
827 def get_group_name(self):
828 atomic_group = self.atomic_group
829 if not atomic_group:
830 return ''
831
832 # Look at any meta_host and dependency labels and pick the first
833 # one that also specifies this atomic group. Use that label name
834 # as the group name if possible (it is more specific).
835 for label in self.get_labels():
836 if label.atomic_group_id:
837 assert label.atomic_group_id == atomic_group.id
838 return label.name
839 return atomic_group.name
840
841
842 def execution_tag(self):
Scott Zawalskid712cf32012-07-14 16:24:53 -0400843 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
844 'complete!=1 AND execution_subdir="" AND '
845 'status!="Queued";')
846 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
847 'status="Aborted" WHERE id=%s;')
848 try:
849 assert self.execution_subdir
850 except AssertionError:
851 # TODO(scottz): Remove temporary fix/info gathering pathway for
852 # crosbug.com/31595 once issue is root caused.
853 logging.error('No execution_subdir for host queue id:%s.', self.id)
854 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
855 for row in _db.execute(SQL_SUSPECT_ENTRIES):
Dan Shi76af8022013-10-19 01:59:49 -0700856 logging.error(row)
Scott Zawalskid712cf32012-07-14 16:24:53 -0400857 logging.error('====DB DEBUG====\n')
858 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
859 logging.error('EXECUTING: %s', fix_query)
860 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
861 raise AssertionError(('self.execution_subdir not found. '
862 'See log for details.'))
863
jamesrenb55378a2010-03-02 22:19:49 +0000864 return "%s/%s" % (self.job.tag(), self.execution_subdir)
865
866
867 def execution_path(self):
868 return self.execution_tag()
869
870
871 def set_started_on_now(self):
872 self.update_field('started_on', datetime.datetime.now())
873
874
Fang Deng51599032014-06-23 17:24:27 -0700875 def set_finished_on_now(self):
876 self.update_field('finished_on', datetime.datetime.now())
877
878
jamesrenb55378a2010-03-02 22:19:49 +0000879 def is_hostless(self):
880 return (self.host_id is None
881 and self.meta_host is None
882 and self.atomic_group_id is None)
883
884
885class Job(DBObject):
886 _table_name = 'afe_jobs'
887 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
888 'control_type', 'created_on', 'synch_count', 'timeout',
889 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
Simran Basi9f508a12012-11-09 12:20:44 -0800890 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800891 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
Simran Basi94dc0032013-11-12 14:09:46 -0800892 'test_retry', 'run_reset', 'timeout_mins')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700893 _timer = stats.Timer("scheduler_models.Job")
jamesrenb55378a2010-03-02 22:19:49 +0000894
895 # This does not need to be a column in the DB. The delays are likely to
896 # be configured short. If the scheduler is stopped and restarted in
897 # the middle of a job's delay cycle, the delay cycle will either be
898 # repeated or skipped depending on the number of Pending machines found
899 # when the restarted scheduler recovers to track it. Not a problem.
900 #
901 # A reference to the DelayedCallTask that will wake up the job should
902 # no other HQEs change state in time. Its end_time attribute is used
903 # by our run_with_ready_delay() method to determine if the wait is over.
904 _delay_ready_task = None
905
906 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
907 # all status='Pending' atomic group HQEs incase a delay was running when the
908 # scheduler was restarted and no more hosts ever successfully exit Verify.
909
910 def __init__(self, id=None, row=None, **kwargs):
911 assert id or row
912 super(Job, self).__init__(id=id, row=row, **kwargs)
913 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800914 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000915
916
917 def model(self):
918 return models.Job.objects.get(id=self.id)
919
920
921 def owner_model(self):
922 # work around the fact that the Job owner field is a string, not a
923 # foreign key
924 if not self._owner_model:
925 self._owner_model = models.User.objects.get(login=self.owner)
926 return self._owner_model
927
928
929 def is_server_job(self):
Aviv Keshet82352b22013-05-14 18:30:56 -0700930 return self.control_type == control_data.CONTROL_TYPE.SERVER
jamesrenb55378a2010-03-02 22:19:49 +0000931
932
933 def tag(self):
934 return "%s-%s" % (self.id, self.owner)
935
936
937 def get_host_queue_entries(self):
938 rows = _db.execute("""
939 SELECT * FROM afe_host_queue_entries
940 WHERE job_id= %s
941 """, (self.id,))
942 entries = [HostQueueEntry(row=i) for i in rows]
943
944 assert len(entries)>0
945
946 return entries
947
948
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800949 def is_image_update_job(self):
950 """
951 Discover if the current job requires an OS update.
952
953 @return: True/False if OS should be updated before job is run.
954 """
955 # All image update jobs have the parameterized_job_id set.
956 if not self.parameterized_job_id:
957 return False
958
959 # Retrieve the ID of the ParameterizedJob this job is an instance of.
960 rows = _db.execute("""
961 SELECT test_id
962 FROM afe_parameterized_jobs
963 WHERE id = %s
964 """, (self.parameterized_job_id,))
965 if not rows:
966 return False
967 test_id = rows[0][0]
968
969 # Retrieve the ID of the known autoupdate_ParameterizedJob.
970 rows = _db.execute("""
971 SELECT id
972 FROM afe_autotests
973 WHERE name = 'autoupdate_ParameterizedJob'
974 """)
975 if not rows:
976 return False
977 update_id = rows[0][0]
978
979 # If the IDs are the same we've found an image update job.
980 if test_id == update_id:
981 # Finally, get the path to the OS image to install.
982 rows = _db.execute("""
983 SELECT parameter_value
984 FROM afe_parameterized_job_parameters
985 WHERE parameterized_job_id = %s
986 """, (self.parameterized_job_id,))
987 if rows:
988 # Save the path in update_image_path to use later as a command
989 # line parameter to autoserv.
990 self.update_image_path = rows[0][0]
991 return True
992
993 return False
994
995
Eric Li6f27d4f2010-09-29 10:55:17 -0700996 def get_execution_details(self):
997 """
998 Get test execution details for this job.
999
1000 @return: Dictionary with test execution details
1001 """
1002 def _find_test_jobs(rows):
1003 """
1004 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
1005 Those are autotest 'internal job' tests, so they should not be
1006 counted when evaluating the test stats.
1007
1008 @param rows: List of rows (matrix) with database results.
1009 """
1010 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
1011 n_test_jobs = 0
1012 for r in rows:
1013 test_name = r[0]
1014 if job_test_pattern.match(test_name):
1015 n_test_jobs += 1
1016
1017 return n_test_jobs
1018
1019 stats = {}
1020
1021 rows = _db.execute("""
1022 SELECT t.test, s.word, t.reason
1023 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
1024 WHERE t.job_idx = j.job_idx
1025 AND s.status_idx = t.status
1026 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -08001027 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -07001028 """ % self.id)
1029
Dale Curtis74a314b2011-06-23 14:55:46 -07001030 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -07001031
1032 n_test_jobs = _find_test_jobs(rows)
1033 n_test_jobs_failed = _find_test_jobs(failed_rows)
1034
1035 total_executed = len(rows) - n_test_jobs
1036 total_failed = len(failed_rows) - n_test_jobs_failed
1037
1038 if total_executed > 0:
1039 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
1040 else:
1041 success_rate = 0
1042
1043 stats['total_executed'] = total_executed
1044 stats['total_failed'] = total_failed
1045 stats['total_passed'] = total_executed - total_failed
1046 stats['success_rate'] = success_rate
1047
1048 status_header = ("Test Name", "Status", "Reason")
1049 if failed_rows:
1050 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
1051 status_header)
1052 else:
1053 stats['failed_rows'] = ''
1054
1055 time_row = _db.execute("""
1056 SELECT started_time, finished_time
1057 FROM tko_jobs
1058 WHERE afe_job_id = %s
1059 """ % self.id)
1060
1061 if time_row:
1062 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -08001063 try:
1064 delta = t_end - t_begin
1065 minutes, seconds = divmod(delta.seconds, 60)
1066 hours, minutes = divmod(minutes, 60)
1067 stats['execution_time'] = ("%02d:%02d:%02d" %
1068 (hours, minutes, seconds))
1069 # One of t_end or t_begin are None
1070 except TypeError:
1071 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -07001072 else:
1073 stats['execution_time'] = '(none)'
1074
1075 return stats
1076
1077
Fang Deng1d6c2a02013-04-17 15:25:45 -07001078 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +00001079 def set_status(self, status, update_queues=False):
1080 self.update_field('status',status)
1081
1082 if update_queues:
1083 for queue_entry in self.get_host_queue_entries():
1084 queue_entry.set_status(status)
1085
1086
1087 def keyval_dict(self):
1088 return self.model().keyval_dict()
1089
1090
1091 def _atomic_and_has_started(self):
1092 """
1093 @returns True if any of the HostQueueEntries associated with this job
1094 have entered the Status.STARTING state or beyond.
1095 """
1096 atomic_entries = models.HostQueueEntry.objects.filter(
1097 job=self.id, atomic_group__isnull=False)
1098 if atomic_entries.count() <= 0:
1099 return False
1100
1101 # These states may *only* be reached if Job.run() has been called.
1102 started_statuses = (models.HostQueueEntry.Status.STARTING,
1103 models.HostQueueEntry.Status.RUNNING,
1104 models.HostQueueEntry.Status.COMPLETED)
1105
1106 started_entries = atomic_entries.filter(status__in=started_statuses)
1107 return started_entries.count() > 0
1108
1109
1110 def _hosts_assigned_count(self):
1111 """The number of HostQueueEntries assigned a Host for this job."""
1112 entries = models.HostQueueEntry.objects.filter(job=self.id,
1113 host__isnull=False)
1114 return entries.count()
1115
1116
1117 def _pending_count(self):
1118 """The number of HostQueueEntries for this job in the Pending state."""
1119 pending_entries = models.HostQueueEntry.objects.filter(
1120 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1121 return pending_entries.count()
1122
1123
1124 def _max_hosts_needed_to_run(self, atomic_group):
1125 """
1126 @param atomic_group: The AtomicGroup associated with this job that we
1127 are using to set an upper bound on the threshold.
1128 @returns The maximum number of HostQueueEntries assigned a Host before
1129 this job can run.
1130 """
1131 return min(self._hosts_assigned_count(),
1132 atomic_group.max_number_of_machines)
1133
1134
1135 def _min_hosts_needed_to_run(self):
1136 """Return the minumum number of hsots needed to run this job."""
1137 return self.synch_count
1138
1139
1140 def is_ready(self):
1141 # NOTE: Atomic group jobs stop reporting ready after they have been
1142 # started to avoid launching multiple copies of one atomic job.
1143 # Only possible if synch_count is less than than half the number of
1144 # machines in the atomic group.
1145 pending_count = self._pending_count()
1146 atomic_and_has_started = self._atomic_and_has_started()
1147 ready = (pending_count >= self.synch_count
1148 and not atomic_and_has_started)
1149
1150 if not ready:
1151 logging.info(
1152 'Job %s not ready: %s pending, %s required '
1153 '(Atomic and started: %s)',
1154 self, pending_count, self.synch_count,
1155 atomic_and_has_started)
1156
1157 return ready
1158
1159
1160 def num_machines(self, clause = None):
1161 sql = "job_id=%s" % self.id
1162 if clause:
1163 sql += " AND (%s)" % clause
1164 return self.count(sql, table='afe_host_queue_entries')
1165
1166
1167 def num_queued(self):
1168 return self.num_machines('not complete')
1169
1170
1171 def num_active(self):
1172 return self.num_machines('active')
1173
1174
1175 def num_complete(self):
1176 return self.num_machines('complete')
1177
1178
1179 def is_finished(self):
1180 return self.num_complete() == self.num_machines()
1181
1182
1183 def _not_yet_run_entries(self, include_verifying=True):
1184 statuses = [models.HostQueueEntry.Status.QUEUED,
1185 models.HostQueueEntry.Status.PENDING]
1186 if include_verifying:
1187 statuses.append(models.HostQueueEntry.Status.VERIFYING)
1188 return models.HostQueueEntry.objects.filter(job=self.id,
1189 status__in=statuses)
1190
1191
1192 def _stop_all_entries(self):
1193 entries_to_stop = self._not_yet_run_entries(
1194 include_verifying=False)
1195 for child_entry in entries_to_stop:
1196 assert not child_entry.complete, (
1197 '%s status=%s, active=%s, complete=%s' %
1198 (child_entry.id, child_entry.status, child_entry.active,
1199 child_entry.complete))
1200 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1201 child_entry.host.status = models.Host.Status.READY
1202 child_entry.host.save()
1203 child_entry.status = models.HostQueueEntry.Status.STOPPED
1204 child_entry.save()
1205
1206
1207 def stop_if_necessary(self):
1208 not_yet_run = self._not_yet_run_entries()
1209 if not_yet_run.count() < self.synch_count:
1210 self._stop_all_entries()
1211
1212
jamesrenb55378a2010-03-02 22:19:49 +00001213 def _next_group_name(self, group_name=''):
1214 """@returns a directory name to use for the next host group results."""
1215 if group_name:
1216 # Sanitize for use as a pathname.
1217 group_name = group_name.replace(os.path.sep, '_')
1218 if group_name.startswith('.'):
1219 group_name = '_' + group_name[1:]
1220 # Add a separator between the group name and 'group%d'.
1221 group_name += '.'
1222 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1223 query = models.HostQueueEntry.objects.filter(
1224 job=self.id).values('execution_subdir').distinct()
1225 subdirs = (entry['execution_subdir'] for entry in query)
1226 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1227 ids = [int(match.group(1)) for match in group_matches if match]
1228 if ids:
1229 next_id = max(ids) + 1
1230 else:
1231 next_id = 0
1232 return '%sgroup%d' % (group_name, next_id)
1233
1234
1235 def get_group_entries(self, queue_entry_from_group):
1236 """
1237 @param queue_entry_from_group: A HostQueueEntry instance to find other
1238 group entries on this job for.
1239
1240 @returns A list of HostQueueEntry objects all executing this job as
1241 part of the same group as the one supplied (having the same
1242 execution_subdir).
1243 """
1244 execution_subdir = queue_entry_from_group.execution_subdir
1245 return list(HostQueueEntry.fetch(
1246 where='job_id=%s AND execution_subdir=%s',
1247 params=(self.id, execution_subdir)))
1248
1249
1250 def _should_run_cleanup(self, queue_entry):
1251 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1252 return True
1253 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1254 return queue_entry.host.dirty
1255 return False
1256
1257
1258 def _should_run_verify(self, queue_entry):
1259 do_not_verify = (queue_entry.host.protection ==
1260 host_protections.Protection.DO_NOT_VERIFY)
1261 if do_not_verify:
1262 return False
Alex Miller6ee996f2013-02-28 13:53:52 -08001263 # If RebootBefore is set to NEVER, then we won't run reset because
1264 # we can't cleanup, so we need to weaken a Reset into a Verify.
1265 weaker_reset = (self.run_reset and
1266 self.reboot_before == model_attributes.RebootBefore.NEVER)
1267 return self.run_verify or weaker_reset
jamesrenb55378a2010-03-02 22:19:49 +00001268
1269
Dan Shi07e09af2013-04-12 09:31:29 -07001270 def _should_run_reset(self, queue_entry):
1271 can_verify = (queue_entry.host.protection !=
1272 host_protections.Protection.DO_NOT_VERIFY)
1273 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
1274 return (can_reboot and can_verify and (self.run_reset or
1275 (self._should_run_cleanup(queue_entry) and
1276 self._should_run_verify(queue_entry))))
1277
1278
Alex Millerdfff2fd2013-05-28 13:05:06 -07001279 def _should_run_provision(self, queue_entry):
1280 """
1281 Determine if the queue_entry needs to have a provision task run before
1282 it to provision queue_entry.host.
1283
1284 @param queue_entry: The host queue entry in question.
1285 @returns: True if we should schedule a provision task, False otherwise.
1286
1287 """
1288 # If we get to this point, it means that the scheduler has already
1289 # vetted that all the unprovisionable labels match, so we can just
1290 # find all labels on the job that aren't on the host to get the list
1291 # of what we need to provision. (See the scheduling logic in
1292 # host_scheduler.py:is_host_eligable_for_job() where we discard all
Alex Miller627694a2014-05-01 18:04:29 -07001293 # actionable labels when assigning jobs to hosts.)
Alex Millerdfff2fd2013-05-28 13:05:06 -07001294 job_labels = {x.name for x in queue_entry.get_labels()}
1295 _, host_labels = queue_entry.host.platform_and_labels()
Alex Miller627694a2014-05-01 18:04:29 -07001296 # If there are any labels on the job that are not on the host and they
1297 # are labels that provisioning knows how to change, then that means
1298 # there is provisioning work to do. If there's no provisioning work to
1299 # do, then obviously we have no reason to schedule a provision task!
1300 diff = job_labels - set(host_labels)
1301 if any([provision.Provision.acts_on(x) for x in diff]):
Alex Millerdfff2fd2013-05-28 13:05:06 -07001302 return True
1303 return False
1304
1305
Alex Miller42437f92013-05-28 12:58:54 -07001306 def _queue_special_task(self, queue_entry, task):
jamesrenb55378a2010-03-02 22:19:49 +00001307 """
Alex Miller42437f92013-05-28 12:58:54 -07001308 Create a special task and associate it with a host queue entry.
jamesrenb55378a2010-03-02 22:19:49 +00001309
Alex Miller42437f92013-05-28 12:58:54 -07001310 @param queue_entry: The queue entry this special task should be
1311 associated with.
1312 @param task: One of the members of the enum models.SpecialTask.Task.
1313 @returns: None
1314
jamesrenb55378a2010-03-02 22:19:49 +00001315 """
jamesrenb55378a2010-03-02 22:19:49 +00001316 models.SpecialTask.objects.create(
1317 host=models.Host.objects.get(id=queue_entry.host_id),
1318 queue_entry=queue_entry, task=task)
1319
1320
Alex Miller42437f92013-05-28 12:58:54 -07001321 def schedule_pre_job_tasks(self, queue_entry):
1322 """
1323 Queue all of the special tasks that need to be run before a host
1324 queue entry may run.
1325
1326 If no special taskes need to be scheduled, then |on_pending| will be
1327 called directly.
1328
1329 @returns None
1330
1331 """
1332 task_queued = False
1333 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
1334
Dan Shi07e09af2013-04-12 09:31:29 -07001335 if self._should_run_reset(queue_entry):
1336 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
Alex Miller42437f92013-05-28 12:58:54 -07001337 task_queued = True
Dan Shi07e09af2013-04-12 09:31:29 -07001338 else:
1339 if self._should_run_cleanup(queue_entry):
1340 self._queue_special_task(hqe_model,
1341 models.SpecialTask.Task.CLEANUP)
1342 task_queued = True
1343 if self._should_run_verify(queue_entry):
1344 self._queue_special_task(hqe_model,
1345 models.SpecialTask.Task.VERIFY)
1346 task_queued = True
Alex Miller42437f92013-05-28 12:58:54 -07001347
Alex Millerdfff2fd2013-05-28 13:05:06 -07001348 if self._should_run_provision(queue_entry):
1349 self._queue_special_task(hqe_model,
1350 models.SpecialTask.Task.PROVISION)
1351 task_queued = True
1352
Alex Miller42437f92013-05-28 12:58:54 -07001353 if not task_queued:
1354 queue_entry.on_pending()
1355
1356
jamesrenb55378a2010-03-02 22:19:49 +00001357 def _assign_new_group(self, queue_entries, group_name=''):
1358 if len(queue_entries) == 1:
1359 group_subdir_name = queue_entries[0].host.hostname
1360 else:
1361 group_subdir_name = self._next_group_name(group_name)
1362 logging.info('Running synchronous job %d hosts %s as %s',
1363 self.id, [entry.host.hostname for entry in queue_entries],
1364 group_subdir_name)
1365
1366 for queue_entry in queue_entries:
1367 queue_entry.set_execution_subdir(group_subdir_name)
1368
1369
1370 def _choose_group_to_run(self, include_queue_entry):
1371 """
1372 @returns A tuple containing a list of HostQueueEntry instances to be
1373 used to run this Job, a string group name to suggest giving
1374 to this job in the results database.
1375 """
1376 atomic_group = include_queue_entry.atomic_group
1377 chosen_entries = [include_queue_entry]
1378 if atomic_group:
1379 num_entries_wanted = atomic_group.max_number_of_machines
1380 else:
1381 num_entries_wanted = self.synch_count
1382 num_entries_wanted -= len(chosen_entries)
1383
1384 if num_entries_wanted > 0:
1385 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1386 pending_entries = list(HostQueueEntry.fetch(
1387 where=where_clause,
1388 params=(self.id, include_queue_entry.id)))
1389
1390 # Sort the chosen hosts by hostname before slicing.
1391 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1392 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1393 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1394 chosen_entries += pending_entries[:num_entries_wanted]
1395
1396 # Sanity check. We'll only ever be called if this can be met.
1397 if len(chosen_entries) < self.synch_count:
1398 message = ('job %s got less than %s chosen entries: %s' % (
1399 self.id, self.synch_count, chosen_entries))
1400 logging.error(message)
1401 email_manager.manager.enqueue_notify_email(
1402 'Job not started, too few chosen entries', message)
1403 return []
1404
1405 group_name = include_queue_entry.get_group_name()
1406
1407 self._assign_new_group(chosen_entries, group_name=group_name)
1408 return chosen_entries
1409
1410
1411 def run_if_ready(self, queue_entry):
1412 """
1413 Run this job by kicking its HQEs into status='Starting' if enough
1414 hosts are ready for it to run.
1415
1416 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1417 ready to run.
1418 """
1419 if not self.is_ready():
1420 self.stop_if_necessary()
1421 elif queue_entry.atomic_group:
1422 self.run_with_ready_delay(queue_entry)
1423 else:
1424 self.run(queue_entry)
1425
1426
1427 def run_with_ready_delay(self, queue_entry):
1428 """
1429 Start a delay to wait for more hosts to enter Pending state before
1430 launching an atomic group job. Once set, the a delay cannot be reset.
1431
1432 @param queue_entry: The HostQueueEntry object to get atomic group
1433 info from and pass to run_if_ready when the delay is up.
1434
1435 @returns An Agent to run the job as appropriate or None if a delay
1436 has already been set.
1437 """
1438 assert queue_entry.job_id == self.id
1439 assert queue_entry.atomic_group
1440 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1441 over_max_threshold = (self._pending_count() >=
1442 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1443 delay_expired = (self._delay_ready_task and
1444 time.time() >= self._delay_ready_task.end_time)
1445
1446 # Delay is disabled or we already have enough? Do not wait to run.
1447 if not delay or over_max_threshold or delay_expired:
1448 self.run(queue_entry)
1449 else:
1450 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1451
1452
1453 def request_abort(self):
1454 """Request that this Job be aborted on the next scheduler cycle."""
1455 self.model().abort()
1456
1457
1458 def schedule_delayed_callback_task(self, queue_entry):
1459 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1460
1461 if self._delay_ready_task:
1462 return None
1463
1464 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1465
1466 def run_job_after_delay():
1467 logging.info('Job %s done waiting for extra hosts.', self)
1468 # Check to see if the job is still relevant. It could have aborted
1469 # while we were waiting or hosts could have disappearred, etc.
1470 if self._pending_count() < self._min_hosts_needed_to_run():
1471 logging.info('Job %s had too few Pending hosts after waiting '
1472 'for extras. Not running.', self)
1473 self.request_abort()
1474 return
1475 return self.run(queue_entry)
1476
1477 logging.info('Job %s waiting up to %s seconds for more hosts.',
1478 self.id, delay)
1479 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1480 callback=run_job_after_delay)
1481 return self._delay_ready_task
1482
1483
1484 def run(self, queue_entry):
1485 """
1486 @param queue_entry: The HostQueueEntry instance calling this method.
1487 """
1488 if queue_entry.atomic_group and self._atomic_and_has_started():
1489 logging.error('Job.run() called on running atomic Job %d '
1490 'with HQE %s.', self.id, queue_entry)
1491 return
1492 queue_entries = self._choose_group_to_run(queue_entry)
1493 if queue_entries:
1494 self._finish_run(queue_entries)
1495
1496
1497 def _finish_run(self, queue_entries):
1498 for queue_entry in queue_entries:
1499 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1500 self.abort_delay_ready_task()
1501
1502
1503 def abort_delay_ready_task(self):
1504 """Abort the delayed task associated with this job, if any."""
1505 if self._delay_ready_task:
1506 # Cancel any pending callback that would try to run again
1507 # as we are already running.
1508 self._delay_ready_task.abort()
1509
1510
1511 def __str__(self):
1512 return '%s-%s' % (self.id, self.owner)