blob: 2252e37602eea5c7108db181784854580ca4440f [file] [log] [blame]
Aviv Keshet0b9cfc92013-02-05 11:36:02 -08001# pylint: disable-msg=C0111
2
jamesrenb55378a2010-03-02 22:19:49 +00003"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out. Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
14 one of these statuses, an email will be sent to the job's email_list.
15 comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import datetime, itertools, logging, os, re, sys, time, weakref
jamesrenb55378a2010-03-02 22:19:49 +000022from autotest_lib.client.common_lib import global_config, host_protections
Michael Liangda8c60a2014-06-03 13:24:51 -070023from autotest_lib.client.common_lib import utils
24from autotest_lib.client.common_lib import control_data
Michael Liang500dedc2014-07-15 16:16:44 -070025from autotest_lib.client.common_lib.cros.graphite import es_utils
Michael Liangda8c60a2014-06-03 13:24:51 -070026from autotest_lib.client.common_lib.cros.graphite import stats
jamesrenb55378a2010-03-02 22:19:49 +000027from autotest_lib.frontend.afe import models, model_attributes
jamesrenb55378a2010-03-02 22:19:49 +000028from autotest_lib.scheduler import drone_manager, email_manager
beepscc9fc702013-12-02 12:45:38 -080029from autotest_lib.scheduler import rdb_lib
jamesrenb55378a2010-03-02 22:19:49 +000030from autotest_lib.scheduler import scheduler_config
Prashanth B0e960282014-05-13 19:38:28 -070031from autotest_lib.scheduler import scheduler_lib
Alex Miller627694a2014-05-01 18:04:29 -070032from autotest_lib.server.cros import provision
Michael Liangda8c60a2014-06-03 13:24:51 -070033
jamesrenb55378a2010-03-02 22:19:49 +000034
35_notify_email_statuses = []
36_base_url = None
37
38_db = None
39_drone_manager = None
40
41def initialize():
42 global _db
Prashanth B0e960282014-05-13 19:38:28 -070043 _db = scheduler_lib.ConnectionManager().get_connection()
jamesrenb55378a2010-03-02 22:19:49 +000044
45 notify_statuses_list = global_config.global_config.get_config_value(
46 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
47 default='')
48 global _notify_email_statuses
49 _notify_email_statuses = [status for status in
50 re.split(r'[\s,;:]', notify_statuses_list.lower())
51 if status]
52
53 # AUTOTEST_WEB.base_url is still a supported config option as some people
54 # may wish to override the entire url.
55 global _base_url
56 config_base_url = global_config.global_config.get_config_value(
57 scheduler_config.CONFIG_SECTION, 'base_url', default='')
58 if config_base_url:
59 _base_url = config_base_url
60 else:
61 # For the common case of everything running on a single server you
62 # can just set the hostname in a single place in the config file.
63 server_name = global_config.global_config.get_config_value(
64 'SERVER', 'hostname')
65 if not server_name:
66 logging.critical('[SERVER] hostname missing from the config file.')
67 sys.exit(1)
68 _base_url = 'http://%s/afe/' % server_name
69
70 initialize_globals()
71
72
73def initialize_globals():
74 global _drone_manager
75 _drone_manager = drone_manager.instance()
76
77
Dan Shi7cf3d842014-08-13 11:20:38 -070078def get_job_metadata(job):
79 """Get a dictionary of the job information.
80
81 The return value is a dictionary that includes job information like id,
82 name and parent job information. The value will be stored in metadata
83 database.
84
85 @param job: A Job object.
86 @return: A dictionary containing the job id, owner and name.
87 """
88 if not job:
89 logging.error('Job is None, no metadata returned.')
90 return {}
91 try:
92 return {'job_id': job.id,
93 'owner': job.owner,
94 'job_name': job.name,
95 'parent_job_id': job.parent_job_id}
96 except AttributeError as e:
97 logging.error('Job has missing attribute: %s', e)
98 return {}
99
100
jamesrenb55378a2010-03-02 22:19:49 +0000101class DelayedCallTask(object):
102 """
103 A task object like AgentTask for an Agent to run that waits for the
104 specified amount of time to have elapsed before calling the supplied
105 callback once and finishing. If the callback returns anything, it is
106 assumed to be a new Agent instance and will be added to the dispatcher.
107
108 @attribute end_time: The absolute posix time after which this task will
109 call its callback when it is polled and be finished.
110
111 Also has all attributes required by the Agent class.
112 """
113 def __init__(self, delay_seconds, callback, now_func=None):
114 """
115 @param delay_seconds: The delay in seconds from now that this task
116 will call the supplied callback and be done.
117 @param callback: A callable to be called by this task once after at
118 least delay_seconds time has elapsed. It must return None
119 or a new Agent instance.
120 @param now_func: A time.time like function. Default: time.time.
121 Used for testing.
122 """
123 assert delay_seconds > 0
124 assert callable(callback)
125 if not now_func:
126 now_func = time.time
127 self._now_func = now_func
128 self._callback = callback
129
130 self.end_time = self._now_func() + delay_seconds
131
132 # These attributes are required by Agent.
133 self.aborted = False
134 self.host_ids = ()
135 self.success = False
136 self.queue_entry_ids = ()
137 self.num_processes = 0
138
139
140 def poll(self):
141 if not self.is_done() and self._now_func() >= self.end_time:
142 self._callback()
143 self.success = True
144
145
146 def is_done(self):
147 return self.success or self.aborted
148
149
150 def abort(self):
151 self.aborted = True
152
153
154class DBError(Exception):
155 """Raised by the DBObject constructor when its select fails."""
156
157
158class DBObject(object):
159 """A miniature object relational model for the database."""
160
161 # Subclasses MUST override these:
162 _table_name = ''
163 _fields = ()
164
165 # A mapping from (type, id) to the instance of the object for that
166 # particular id. This prevents us from creating new Job() and Host()
167 # instances for every HostQueueEntry object that we instantiate as
168 # multiple HQEs often share the same Job.
169 _instances_by_type_and_id = weakref.WeakValueDictionary()
170 _initialized = False
171
172
173 def __new__(cls, id=None, **kwargs):
174 """
175 Look to see if we already have an instance for this particular type
176 and id. If so, use it instead of creating a duplicate instance.
177 """
178 if id is not None:
179 instance = cls._instances_by_type_and_id.get((cls, id))
180 if instance:
181 return instance
182 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
183
184
185 def __init__(self, id=None, row=None, new_record=False, always_query=True):
186 assert bool(id) or bool(row)
187 if id is not None and row is not None:
188 assert id == row[0]
189 assert self._table_name, '_table_name must be defined in your class'
190 assert self._fields, '_fields must be defined in your class'
191 if not new_record:
192 if self._initialized and not always_query:
193 return # We've already been initialized.
194 if id is None:
195 id = row[0]
196 # Tell future constructors to use us instead of re-querying while
197 # this instance is still around.
198 self._instances_by_type_and_id[(type(self), id)] = self
199
200 self.__table = self._table_name
201
202 self.__new_record = new_record
203
204 if row is None:
205 row = self._fetch_row_from_db(id)
206
207 if self._initialized:
208 differences = self._compare_fields_in_row(row)
209 if differences:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -0700210 logging.warning(
jamesrenb55378a2010-03-02 22:19:49 +0000211 'initialized %s %s instance requery is updating: %s',
212 type(self), self.id, differences)
213 self._update_fields_from_row(row)
214 self._initialized = True
215
216
217 @classmethod
218 def _clear_instance_cache(cls):
219 """Used for testing, clear the internal instance cache."""
220 cls._instances_by_type_and_id.clear()
221
222
223 def _fetch_row_from_db(self, row_id):
224 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
225 rows = _db.execute(sql, (row_id,))
226 if not rows:
227 raise DBError("row not found (table=%s, row id=%s)"
228 % (self.__table, row_id))
229 return rows[0]
230
231
232 def _assert_row_length(self, row):
233 assert len(row) == len(self._fields), (
234 "table = %s, row = %s/%d, fields = %s/%d" % (
235 self.__table, row, len(row), self._fields, len(self._fields)))
236
237
238 def _compare_fields_in_row(self, row):
239 """
240 Given a row as returned by a SELECT query, compare it to our existing in
241 memory fields. Fractional seconds are stripped from datetime values
242 before comparison.
243
244 @param row - A sequence of values corresponding to fields named in
245 The class attribute _fields.
246
247 @returns A dictionary listing the differences keyed by field name
248 containing tuples of (current_value, row_value).
249 """
250 self._assert_row_length(row)
251 differences = {}
252 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
253 for field, row_value in itertools.izip(self._fields, row):
254 current_value = getattr(self, field)
255 if (isinstance(current_value, datetime.datetime)
256 and isinstance(row_value, datetime.datetime)):
257 current_value = current_value.strftime(datetime_cmp_fmt)
258 row_value = row_value.strftime(datetime_cmp_fmt)
259 if current_value != row_value:
260 differences[field] = (current_value, row_value)
261 return differences
262
263
264 def _update_fields_from_row(self, row):
265 """
266 Update our field attributes using a single row returned by SELECT.
267
268 @param row - A sequence of values corresponding to fields named in
269 the class fields list.
270 """
271 self._assert_row_length(row)
272
273 self._valid_fields = set()
274 for field, value in itertools.izip(self._fields, row):
275 setattr(self, field, value)
276 self._valid_fields.add(field)
277
278 self._valid_fields.remove('id')
279
280
281 def update_from_database(self):
282 assert self.id is not None
283 row = self._fetch_row_from_db(self.id)
284 self._update_fields_from_row(row)
285
286
287 def count(self, where, table = None):
288 if not table:
289 table = self.__table
290
291 rows = _db.execute("""
292 SELECT count(*) FROM %s
293 WHERE %s
294 """ % (table, where))
295
296 assert len(rows) == 1
297
298 return int(rows[0][0])
299
300
301 def update_field(self, field, value):
302 assert field in self._valid_fields
303
304 if getattr(self, field) == value:
305 return
306
307 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
308 _db.execute(query, (value, self.id))
309
310 setattr(self, field, value)
311
312
313 def save(self):
314 if self.__new_record:
315 keys = self._fields[1:] # avoid id
316 columns = ','.join([str(key) for key in keys])
317 values = []
318 for key in keys:
319 value = getattr(self, key)
320 if value is None:
321 values.append('NULL')
322 else:
323 values.append('"%s"' % value)
324 values_str = ','.join(values)
325 query = ('INSERT INTO %s (%s) VALUES (%s)' %
326 (self.__table, columns, values_str))
327 _db.execute(query)
328 # Update our id to the one the database just assigned to us.
329 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
330
331
332 def delete(self):
333 self._instances_by_type_and_id.pop((type(self), id), None)
334 self._initialized = False
335 self._valid_fields.clear()
336 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
337 _db.execute(query, (self.id,))
338
339
340 @staticmethod
341 def _prefix_with(string, prefix):
342 if string:
343 string = prefix + string
344 return string
345
346
347 @classmethod
348 def fetch(cls, where='', params=(), joins='', order_by=''):
349 """
350 Construct instances of our class based on the given database query.
351
352 @yields One class instance for each row fetched.
353 """
354 order_by = cls._prefix_with(order_by, 'ORDER BY ')
355 where = cls._prefix_with(where, 'WHERE ')
356 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
357 '%(where)s %(order_by)s' % {'table' : cls._table_name,
358 'joins' : joins,
359 'where' : where,
360 'order_by' : order_by})
361 rows = _db.execute(query, params)
362 return [cls(id=row[0], row=row) for row in rows]
363
364
365class IneligibleHostQueue(DBObject):
366 _table_name = 'afe_ineligible_host_queues'
367 _fields = ('id', 'job_id', 'host_id')
368
369
370class AtomicGroup(DBObject):
371 _table_name = 'afe_atomic_groups'
372 _fields = ('id', 'name', 'description', 'max_number_of_machines',
373 'invalid')
374
375
376class Label(DBObject):
377 _table_name = 'afe_labels'
378 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
379 'only_if_needed', 'atomic_group_id')
380
381
382 def __repr__(self):
383 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
384 self.name, self.id, self.atomic_group_id)
385
386
387class Host(DBObject):
388 _table_name = 'afe_hosts'
389 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
beepscc9fc702013-12-02 12:45:38 -0800390 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
Jakob Juelichde2b9a92014-09-02 15:29:28 -0700391 'leased', 'shard_id')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700392 _timer = stats.Timer("scheduler_models.Host")
jamesrenb55378a2010-03-02 22:19:49 +0000393
394
Fang Deng1d6c2a02013-04-17 15:25:45 -0700395 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000396 def set_status(self,status):
397 logging.info('%s -> %s', self.hostname, status)
398 self.update_field('status',status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700399 # Noticed some time jumps after the last log message.
400 logging.debug('Host Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000401
402
403 def platform_and_labels(self):
404 """
405 Returns a tuple (platform_name, list_of_all_label_names).
406 """
407 rows = _db.execute("""
408 SELECT afe_labels.name, afe_labels.platform
409 FROM afe_labels
410 INNER JOIN afe_hosts_labels ON
411 afe_labels.id = afe_hosts_labels.label_id
412 WHERE afe_hosts_labels.host_id = %s
413 ORDER BY afe_labels.name
414 """, (self.id,))
415 platform = None
416 all_labels = []
417 for label_name, is_platform in rows:
418 if is_platform:
419 platform = label_name
420 all_labels.append(label_name)
421 return platform, all_labels
422
423
424 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
425
426
427 @classmethod
428 def cmp_for_sort(cls, a, b):
429 """
430 A comparison function for sorting Host objects by hostname.
431
432 This strips any trailing numeric digits, ignores leading 0s and
433 compares hostnames by the leading name and the trailing digits as a
434 number. If both hostnames do not match this pattern, they are simply
435 compared as lower case strings.
436
437 Example of how hostnames will be sorted:
438
439 alice, host1, host2, host09, host010, host10, host11, yolkfolk
440
441 This hopefully satisfy most people's hostname sorting needs regardless
442 of their exact naming schemes. Nobody sane should have both a host10
443 and host010 (but the algorithm works regardless).
444 """
445 lower_a = a.hostname.lower()
446 lower_b = b.hostname.lower()
447 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
448 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
449 if match_a and match_b:
450 name_a, number_a_str = match_a.groups()
451 name_b, number_b_str = match_b.groups()
452 number_a = int(number_a_str.lstrip('0'))
453 number_b = int(number_b_str.lstrip('0'))
454 result = cmp((name_a, number_a), (name_b, number_b))
455 if result == 0 and lower_a != lower_b:
456 # If they compared equal above but the lower case names are
457 # indeed different, don't report equality. abc012 != abc12.
458 return cmp(lower_a, lower_b)
459 return result
460 else:
461 return cmp(lower_a, lower_b)
462
463
464class HostQueueEntry(DBObject):
465 _table_name = 'afe_host_queue_entries'
466 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
467 'active', 'complete', 'deleted', 'execution_subdir',
Fang Deng51599032014-06-23 17:24:27 -0700468 'atomic_group_id', 'aborted', 'started_on', 'finished_on')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700469 _timer = stats.Timer('scheduler_models.HostQueueEntry')
jamesrenb55378a2010-03-02 22:19:49 +0000470
471
472 def __init__(self, id=None, row=None, **kwargs):
473 assert id or row
474 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
475 self.job = Job(self.job_id)
476
477 if self.host_id:
beepscc9fc702013-12-02 12:45:38 -0800478 self.host = rdb_lib.get_hosts([self.host_id])[0]
479 self.host.dbg_str = self.get_dbg_str()
Dan Shi7cf3d842014-08-13 11:20:38 -0700480 self.host.metadata = get_job_metadata(self.job)
jamesrenb55378a2010-03-02 22:19:49 +0000481 else:
482 self.host = None
483
484 if self.atomic_group_id:
485 self.atomic_group = AtomicGroup(self.atomic_group_id,
486 always_query=False)
487 else:
488 self.atomic_group = None
489
jamesrenb55378a2010-03-02 22:19:49 +0000490
491 @classmethod
492 def clone(cls, template):
493 """
494 Creates a new row using the values from a template instance.
495
496 The new instance will not exist in the database or have a valid
497 id attribute until its save() method is called.
498 """
499 assert isinstance(template, cls)
500 new_row = [getattr(template, field) for field in cls._fields]
501 clone = cls(row=new_row, new_record=True)
502 clone.id = None
503 return clone
504
505
506 def _view_job_url(self):
507 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
508
509
510 def get_labels(self):
511 """
512 Get all labels associated with this host queue entry (either via the
513 meta_host or as a job dependency label). The labels yielded are not
514 guaranteed to be unique.
515
516 @yields Label instances associated with this host_queue_entry.
517 """
518 if self.meta_host:
519 yield Label(id=self.meta_host, always_query=False)
520 labels = Label.fetch(
521 joins="JOIN afe_jobs_dependency_labels AS deps "
522 "ON (afe_labels.id = deps.label_id)",
523 where="deps.job_id = %d" % self.job.id)
524 for label in labels:
525 yield label
526
527
528 def set_host(self, host):
529 if host:
530 logging.info('Assigning host %s to entry %s', host.hostname, self)
jamesrenb55378a2010-03-02 22:19:49 +0000531 self.update_field('host_id', host.id)
532 self.block_host(host.id)
533 else:
534 logging.info('Releasing host from %s', self)
jamesrenb55378a2010-03-02 22:19:49 +0000535 self.unblock_host(self.host.id)
536 self.update_field('host_id', None)
537
538 self.host = host
539
540
jamesrenb55378a2010-03-02 22:19:49 +0000541 def block_host(self, host_id):
542 logging.info("creating block %s/%s", self.job.id, host_id)
543 row = [0, self.job.id, host_id]
544 block = IneligibleHostQueue(row=row, new_record=True)
545 block.save()
546
547
548 def unblock_host(self, host_id):
549 logging.info("removing block %s/%s", self.job.id, host_id)
550 blocks = IneligibleHostQueue.fetch(
551 'job_id=%d and host_id=%d' % (self.job.id, host_id))
552 for block in blocks:
553 block.delete()
554
555
556 def set_execution_subdir(self, subdir=None):
557 if subdir is None:
558 assert self.host
559 subdir = self.host.hostname
560 self.update_field('execution_subdir', subdir)
561
562
563 def _get_hostname(self):
564 if self.host:
565 return self.host.hostname
566 return 'no host'
567
568
beepscc9fc702013-12-02 12:45:38 -0800569 def get_dbg_str(self):
570 """Get a debug string to identify this host.
571
572 @return: A string containing the hqe and job id.
573 """
574 try:
575 return 'HQE: %s, for job: %s' % (self.id, self.job_id)
576 except AttributeError as e:
577 return 'HQE has not been initialized yet: %s' % e
578
579
jamesrenb55378a2010-03-02 22:19:49 +0000580 def __str__(self):
581 flags = []
582 if self.active:
583 flags.append('active')
584 if self.complete:
585 flags.append('complete')
586 if self.deleted:
587 flags.append('deleted')
588 if self.aborted:
589 flags.append('aborted')
590 flags_str = ','.join(flags)
591 if flags_str:
592 flags_str = ' [%s]' % flags_str
beepscc9fc702013-12-02 12:45:38 -0800593 return ("%s and host: %s has status:%s%s" %
594 (self.get_dbg_str(), self._get_hostname(), self.status,
595 flags_str))
jamesrenb55378a2010-03-02 22:19:49 +0000596
597
Michael Liang0d747462014-07-17 14:19:53 -0700598 def record_state(self, type_str, state, value):
Michael Liang500dedc2014-07-15 16:16:44 -0700599 """Record metadata in elasticsearch.
600
601 If ES configured to use http, then we will time that http request.
602 Otherwise, it uses UDP, so we will not need to time it.
603
Michael Liang0d747462014-07-17 14:19:53 -0700604 @param type_str: sets the _type field in elasticsearch db.
Michael Liang500dedc2014-07-15 16:16:44 -0700605 @param state: string representing what state we are recording,
606 e.g. 'status'
607 @param value: value of the state, e.g. 'verifying'
608 """
609 metadata = {
610 'time_changed': time.time(),
611 state: value,
612 'job_id': self.job_id,
613 }
614 if self.host:
615 metadata['hostname'] = self.host.hostname
Michael Liang0d747462014-07-17 14:19:53 -0700616 es_utils.ESMetadata().post(type_str=type_str, metadata=metadata)
Michael Liang500dedc2014-07-15 16:16:44 -0700617
618
Fang Deng1d6c2a02013-04-17 15:25:45 -0700619 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000620 def set_status(self, status):
621 logging.info("%s -> %s", self, status)
622
623 self.update_field('status', status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700624 # Noticed some time jumps after last logging message.
625 logging.debug('Update Field Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000626
627 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
628 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
629 assert not (active and complete)
630
631 self.update_field('active', active)
632 self.update_field('complete', complete)
633
634 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000635 self._on_complete(status)
Dale Curtis74a314b2011-06-23 14:55:46 -0700636 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000637
638 should_email_status = (status.lower() in _notify_email_statuses or
639 'all' in _notify_email_statuses)
640 if should_email_status:
641 self._email_on_status(status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700642 logging.debug('HQE Set Status Complete')
Michael Liang500dedc2014-07-15 16:16:44 -0700643 self.record_state('hqe_status', 'status', status)
644
jamesrenb55378a2010-03-02 22:19:49 +0000645
646
jamesrene7c65cb2010-06-08 20:38:10 +0000647 def _on_complete(self, status):
648 if status is not models.HostQueueEntry.Status.ABORTED:
649 self.job.stop_if_necessary()
650
Fang Dengd44a1232014-08-18 14:40:28 -0700651 if self.started_on:
652 self.set_finished_on_now()
jamesrenb55378a2010-03-02 22:19:49 +0000653 if not self.execution_subdir:
654 return
655 # unregister any possible pidfiles associated with this queue entry
656 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
657 pidfile_id = _drone_manager.get_pidfile_id_from(
658 self.execution_path(), pidfile_name=pidfile_name)
659 _drone_manager.unregister_pidfile(pidfile_id)
660
661
Eric Li6f27d4f2010-09-29 10:55:17 -0700662 def _get_status_email_contents(self, status, summary=None, hostname=None):
663 """
664 Gather info for the status notification e-mails.
665
666 If needed, we could start using the Django templating engine to create
667 the subject and the e-mail body, but that doesn't seem necessary right
668 now.
669
670 @param status: Job status text. Mandatory.
671 @param summary: Job summary text. Optional.
672 @param hostname: A hostname for the job. Optional.
673
674 @return: Tuple (subject, body) for the notification e-mail.
675 """
676 job_stats = Job(id=self.job.id).get_execution_details()
677
678 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
679 (self.job.id, self.job.name, status))
680
681 if hostname is not None:
682 subject += '| Hostname: %s ' % hostname
683
684 if status not in ["1 Failed", "Failed"]:
685 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
686
687 body = "Job ID: %s\n" % self.job.id
688 body += "Job name: %s\n" % self.job.name
689 if hostname is not None:
690 body += "Host: %s\n" % hostname
691 if summary is not None:
692 body += "Summary: %s\n" % summary
693 body += "Status: %s\n" % status
694 body += "Results interface URL: %s\n" % self._view_job_url()
695 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
696 if int(job_stats['total_executed']) > 0:
697 body += "User tests executed: %s\n" % job_stats['total_executed']
698 body += "User tests passed: %s\n" % job_stats['total_passed']
699 body += "User tests failed: %s\n" % job_stats['total_failed']
700 body += ("User tests success rate: %.2f %%\n" %
701 job_stats['success_rate'])
702
703 if job_stats['failed_rows']:
704 body += "Failures:\n"
705 body += job_stats['failed_rows']
706
707 return subject, body
708
709
jamesrenb55378a2010-03-02 22:19:49 +0000710 def _email_on_status(self, status):
711 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700712 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000713 email_manager.manager.send_email(self.job.email_list, subject, body)
714
715
716 def _email_on_job_complete(self):
717 if not self.job.is_finished():
718 return
719
Eric Li6f27d4f2010-09-29 10:55:17 -0700720 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000721 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
722 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700723 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000724 (queue_entry._get_hostname(),
725 queue_entry.status))
726
Eric Li6f27d4f2010-09-29 10:55:17 -0700727 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000728 status_counts = models.Job.objects.get_status_counts(
729 [self.job.id])[self.job.id]
730 status = ', '.join('%d %s' % (count, status) for status, count
731 in status_counts.iteritems())
732
Eric Li6f27d4f2010-09-29 10:55:17 -0700733 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000734 email_manager.manager.send_email(self.job.email_list, subject, body)
735
736
737 def schedule_pre_job_tasks(self):
738 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
739 self.job.name, self.meta_host, self.atomic_group_id,
740 self.job.id, self.id, self.host.hostname, self.status)
741
742 self._do_schedule_pre_job_tasks()
743
744
745 def _do_schedule_pre_job_tasks(self):
jamesrenb55378a2010-03-02 22:19:49 +0000746 self.job.schedule_pre_job_tasks(queue_entry=self)
747
748
749 def requeue(self):
750 assert self.host
751 self.set_status(models.HostQueueEntry.Status.QUEUED)
752 self.update_field('started_on', None)
Fang Deng51599032014-06-23 17:24:27 -0700753 self.update_field('finished_on', None)
jamesrenb55378a2010-03-02 22:19:49 +0000754 # verify/cleanup failure sets the execution subdir, so reset it here
755 self.set_execution_subdir('')
756 if self.meta_host:
757 self.set_host(None)
758
759
760 @property
761 def aborted_by(self):
762 self._load_abort_info()
763 return self._aborted_by
764
765
766 @property
767 def aborted_on(self):
768 self._load_abort_info()
769 return self._aborted_on
770
771
772 def _load_abort_info(self):
773 """ Fetch info about who aborted the job. """
774 if hasattr(self, "_aborted_by"):
775 return
776 rows = _db.execute("""
777 SELECT afe_users.login,
778 afe_aborted_host_queue_entries.aborted_on
779 FROM afe_aborted_host_queue_entries
780 INNER JOIN afe_users
781 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
782 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
783 """, (self.id,))
784 if rows:
785 self._aborted_by, self._aborted_on = rows[0]
786 else:
787 self._aborted_by = self._aborted_on = None
788
789
790 def on_pending(self):
791 """
792 Called when an entry in a synchronous job has passed verify. If the
793 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
794 them in PENDING.
795 """
796 self.set_status(models.HostQueueEntry.Status.PENDING)
797 self.host.set_status(models.Host.Status.PENDING)
798
799 # Some debug code here: sends an email if an asynchronous job does not
800 # immediately enter Starting.
801 # TODO: Remove this once we figure out why asynchronous jobs are getting
802 # stuck in Pending.
803 self.job.run_if_ready(queue_entry=self)
804 if (self.job.synch_count == 1 and
805 self.status == models.HostQueueEntry.Status.PENDING):
806 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
807 message = 'Asynchronous job stuck in Pending'
808 email_manager.manager.enqueue_notify_email(subject, message)
809
810
811 def abort(self, dispatcher):
812 assert self.aborted and not self.complete
813
814 Status = models.HostQueueEntry.Status
815 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
816 # do nothing; post-job tasks will finish and then mark this entry
817 # with status "Aborted" and take care of the host
818 return
819
jamesren3bc70a12010-04-12 18:23:38 +0000820 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
821 Status.WAITING):
Dan Shi76af8022013-10-19 01:59:49 -0700822 # If hqe is in any of these status, it should not have any
823 # unfinished agent before it can be aborted.
824 agents = dispatcher.get_agents_for_entry(self)
825 # Agent with finished task can be left behind. This is added to
826 # handle the special case of aborting hostless job in STARTING
827 # status, in which the agent has only a HostlessQueueTask
828 # associated. The finished HostlessQueueTask will be cleaned up in
829 # the next tick, so it's safe to leave the agent there. Without
830 # filtering out finished agent, HQE abort won't be able to proceed.
831 assert all([agent.is_done() for agent in agents])
832 # If hqe is still in STARTING status, it may not have assigned a
833 # host yet.
834 if self.host:
835 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -0700836 elif (self.status == Status.VERIFYING or
beepse50d8752013-11-20 18:23:02 -0800837 self.status == Status.RESETTING):
jamesrenb55378a2010-03-02 22:19:49 +0000838 models.SpecialTask.objects.create(
839 task=models.SpecialTask.Task.CLEANUP,
840 host=models.Host.objects.get(id=self.host.id),
841 requested_by=self.job.owner_model())
beepse50d8752013-11-20 18:23:02 -0800842 elif self.status == Status.PROVISIONING:
843 models.SpecialTask.objects.create(
844 task=models.SpecialTask.Task.REPAIR,
845 host=models.Host.objects.get(id=self.host.id),
846 requested_by=self.job.owner_model())
jamesrenb55378a2010-03-02 22:19:49 +0000847
848 self.set_status(Status.ABORTED)
849 self.job.abort_delay_ready_task()
850
851
852 def get_group_name(self):
853 atomic_group = self.atomic_group
854 if not atomic_group:
855 return ''
856
857 # Look at any meta_host and dependency labels and pick the first
858 # one that also specifies this atomic group. Use that label name
859 # as the group name if possible (it is more specific).
860 for label in self.get_labels():
861 if label.atomic_group_id:
862 assert label.atomic_group_id == atomic_group.id
863 return label.name
864 return atomic_group.name
865
866
867 def execution_tag(self):
Scott Zawalskid712cf32012-07-14 16:24:53 -0400868 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
869 'complete!=1 AND execution_subdir="" AND '
870 'status!="Queued";')
871 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
872 'status="Aborted" WHERE id=%s;')
873 try:
874 assert self.execution_subdir
875 except AssertionError:
876 # TODO(scottz): Remove temporary fix/info gathering pathway for
877 # crosbug.com/31595 once issue is root caused.
878 logging.error('No execution_subdir for host queue id:%s.', self.id)
879 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
880 for row in _db.execute(SQL_SUSPECT_ENTRIES):
Dan Shi76af8022013-10-19 01:59:49 -0700881 logging.error(row)
Scott Zawalskid712cf32012-07-14 16:24:53 -0400882 logging.error('====DB DEBUG====\n')
883 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
884 logging.error('EXECUTING: %s', fix_query)
885 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
886 raise AssertionError(('self.execution_subdir not found. '
887 'See log for details.'))
888
jamesrenb55378a2010-03-02 22:19:49 +0000889 return "%s/%s" % (self.job.tag(), self.execution_subdir)
890
891
892 def execution_path(self):
893 return self.execution_tag()
894
895
896 def set_started_on_now(self):
897 self.update_field('started_on', datetime.datetime.now())
898
899
Fang Deng51599032014-06-23 17:24:27 -0700900 def set_finished_on_now(self):
901 self.update_field('finished_on', datetime.datetime.now())
902
903
jamesrenb55378a2010-03-02 22:19:49 +0000904 def is_hostless(self):
905 return (self.host_id is None
906 and self.meta_host is None
907 and self.atomic_group_id is None)
908
909
910class Job(DBObject):
911 _table_name = 'afe_jobs'
912 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
913 'control_type', 'created_on', 'synch_count', 'timeout',
914 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
Simran Basi9f508a12012-11-09 12:20:44 -0800915 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800916 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
Jakob Jülich92c06332014-08-25 19:06:57 +0000917 'test_retry', 'run_reset', 'timeout_mins', 'shard_id')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700918 _timer = stats.Timer("scheduler_models.Job")
jamesrenb55378a2010-03-02 22:19:49 +0000919
920 # This does not need to be a column in the DB. The delays are likely to
921 # be configured short. If the scheduler is stopped and restarted in
922 # the middle of a job's delay cycle, the delay cycle will either be
923 # repeated or skipped depending on the number of Pending machines found
924 # when the restarted scheduler recovers to track it. Not a problem.
925 #
926 # A reference to the DelayedCallTask that will wake up the job should
927 # no other HQEs change state in time. Its end_time attribute is used
928 # by our run_with_ready_delay() method to determine if the wait is over.
929 _delay_ready_task = None
930
931 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
932 # all status='Pending' atomic group HQEs incase a delay was running when the
933 # scheduler was restarted and no more hosts ever successfully exit Verify.
934
935 def __init__(self, id=None, row=None, **kwargs):
936 assert id or row
937 super(Job, self).__init__(id=id, row=row, **kwargs)
938 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800939 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000940
941
942 def model(self):
943 return models.Job.objects.get(id=self.id)
944
945
946 def owner_model(self):
947 # work around the fact that the Job owner field is a string, not a
948 # foreign key
949 if not self._owner_model:
950 self._owner_model = models.User.objects.get(login=self.owner)
951 return self._owner_model
952
953
954 def is_server_job(self):
Aviv Keshet82352b22013-05-14 18:30:56 -0700955 return self.control_type == control_data.CONTROL_TYPE.SERVER
jamesrenb55378a2010-03-02 22:19:49 +0000956
957
958 def tag(self):
959 return "%s-%s" % (self.id, self.owner)
960
961
962 def get_host_queue_entries(self):
963 rows = _db.execute("""
964 SELECT * FROM afe_host_queue_entries
965 WHERE job_id= %s
966 """, (self.id,))
967 entries = [HostQueueEntry(row=i) for i in rows]
968
969 assert len(entries)>0
970
971 return entries
972
973
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800974 def is_image_update_job(self):
975 """
976 Discover if the current job requires an OS update.
977
978 @return: True/False if OS should be updated before job is run.
979 """
980 # All image update jobs have the parameterized_job_id set.
981 if not self.parameterized_job_id:
982 return False
983
984 # Retrieve the ID of the ParameterizedJob this job is an instance of.
985 rows = _db.execute("""
986 SELECT test_id
987 FROM afe_parameterized_jobs
988 WHERE id = %s
989 """, (self.parameterized_job_id,))
990 if not rows:
991 return False
992 test_id = rows[0][0]
993
994 # Retrieve the ID of the known autoupdate_ParameterizedJob.
995 rows = _db.execute("""
996 SELECT id
997 FROM afe_autotests
998 WHERE name = 'autoupdate_ParameterizedJob'
999 """)
1000 if not rows:
1001 return False
1002 update_id = rows[0][0]
1003
1004 # If the IDs are the same we've found an image update job.
1005 if test_id == update_id:
1006 # Finally, get the path to the OS image to install.
1007 rows = _db.execute("""
1008 SELECT parameter_value
1009 FROM afe_parameterized_job_parameters
1010 WHERE parameterized_job_id = %s
1011 """, (self.parameterized_job_id,))
1012 if rows:
1013 # Save the path in update_image_path to use later as a command
1014 # line parameter to autoserv.
1015 self.update_image_path = rows[0][0]
1016 return True
1017
1018 return False
1019
1020
Eric Li6f27d4f2010-09-29 10:55:17 -07001021 def get_execution_details(self):
1022 """
1023 Get test execution details for this job.
1024
1025 @return: Dictionary with test execution details
1026 """
1027 def _find_test_jobs(rows):
1028 """
1029 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
1030 Those are autotest 'internal job' tests, so they should not be
1031 counted when evaluating the test stats.
1032
1033 @param rows: List of rows (matrix) with database results.
1034 """
1035 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
1036 n_test_jobs = 0
1037 for r in rows:
1038 test_name = r[0]
1039 if job_test_pattern.match(test_name):
1040 n_test_jobs += 1
1041
1042 return n_test_jobs
1043
1044 stats = {}
1045
1046 rows = _db.execute("""
1047 SELECT t.test, s.word, t.reason
1048 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
1049 WHERE t.job_idx = j.job_idx
1050 AND s.status_idx = t.status
1051 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -08001052 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -07001053 """ % self.id)
1054
Dale Curtis74a314b2011-06-23 14:55:46 -07001055 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -07001056
1057 n_test_jobs = _find_test_jobs(rows)
1058 n_test_jobs_failed = _find_test_jobs(failed_rows)
1059
1060 total_executed = len(rows) - n_test_jobs
1061 total_failed = len(failed_rows) - n_test_jobs_failed
1062
1063 if total_executed > 0:
1064 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
1065 else:
1066 success_rate = 0
1067
1068 stats['total_executed'] = total_executed
1069 stats['total_failed'] = total_failed
1070 stats['total_passed'] = total_executed - total_failed
1071 stats['success_rate'] = success_rate
1072
1073 status_header = ("Test Name", "Status", "Reason")
1074 if failed_rows:
1075 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
1076 status_header)
1077 else:
1078 stats['failed_rows'] = ''
1079
1080 time_row = _db.execute("""
1081 SELECT started_time, finished_time
1082 FROM tko_jobs
1083 WHERE afe_job_id = %s
1084 """ % self.id)
1085
1086 if time_row:
1087 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -08001088 try:
1089 delta = t_end - t_begin
1090 minutes, seconds = divmod(delta.seconds, 60)
1091 hours, minutes = divmod(minutes, 60)
1092 stats['execution_time'] = ("%02d:%02d:%02d" %
1093 (hours, minutes, seconds))
1094 # One of t_end or t_begin are None
1095 except TypeError:
1096 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -07001097 else:
1098 stats['execution_time'] = '(none)'
1099
1100 return stats
1101
1102
Fang Deng1d6c2a02013-04-17 15:25:45 -07001103 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +00001104 def set_status(self, status, update_queues=False):
1105 self.update_field('status',status)
1106
1107 if update_queues:
1108 for queue_entry in self.get_host_queue_entries():
1109 queue_entry.set_status(status)
1110
1111
1112 def keyval_dict(self):
1113 return self.model().keyval_dict()
1114
1115
1116 def _atomic_and_has_started(self):
1117 """
1118 @returns True if any of the HostQueueEntries associated with this job
1119 have entered the Status.STARTING state or beyond.
1120 """
1121 atomic_entries = models.HostQueueEntry.objects.filter(
1122 job=self.id, atomic_group__isnull=False)
1123 if atomic_entries.count() <= 0:
1124 return False
1125
1126 # These states may *only* be reached if Job.run() has been called.
1127 started_statuses = (models.HostQueueEntry.Status.STARTING,
1128 models.HostQueueEntry.Status.RUNNING,
1129 models.HostQueueEntry.Status.COMPLETED)
1130
1131 started_entries = atomic_entries.filter(status__in=started_statuses)
1132 return started_entries.count() > 0
1133
1134
1135 def _hosts_assigned_count(self):
1136 """The number of HostQueueEntries assigned a Host for this job."""
1137 entries = models.HostQueueEntry.objects.filter(job=self.id,
1138 host__isnull=False)
1139 return entries.count()
1140
1141
1142 def _pending_count(self):
1143 """The number of HostQueueEntries for this job in the Pending state."""
1144 pending_entries = models.HostQueueEntry.objects.filter(
1145 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1146 return pending_entries.count()
1147
1148
1149 def _max_hosts_needed_to_run(self, atomic_group):
1150 """
1151 @param atomic_group: The AtomicGroup associated with this job that we
1152 are using to set an upper bound on the threshold.
1153 @returns The maximum number of HostQueueEntries assigned a Host before
1154 this job can run.
1155 """
1156 return min(self._hosts_assigned_count(),
1157 atomic_group.max_number_of_machines)
1158
1159
1160 def _min_hosts_needed_to_run(self):
1161 """Return the minumum number of hsots needed to run this job."""
1162 return self.synch_count
1163
1164
1165 def is_ready(self):
1166 # NOTE: Atomic group jobs stop reporting ready after they have been
1167 # started to avoid launching multiple copies of one atomic job.
1168 # Only possible if synch_count is less than than half the number of
1169 # machines in the atomic group.
1170 pending_count = self._pending_count()
1171 atomic_and_has_started = self._atomic_and_has_started()
1172 ready = (pending_count >= self.synch_count
1173 and not atomic_and_has_started)
1174
1175 if not ready:
1176 logging.info(
1177 'Job %s not ready: %s pending, %s required '
1178 '(Atomic and started: %s)',
1179 self, pending_count, self.synch_count,
1180 atomic_and_has_started)
1181
1182 return ready
1183
1184
1185 def num_machines(self, clause = None):
1186 sql = "job_id=%s" % self.id
1187 if clause:
1188 sql += " AND (%s)" % clause
1189 return self.count(sql, table='afe_host_queue_entries')
1190
1191
1192 def num_queued(self):
1193 return self.num_machines('not complete')
1194
1195
1196 def num_active(self):
1197 return self.num_machines('active')
1198
1199
1200 def num_complete(self):
1201 return self.num_machines('complete')
1202
1203
1204 def is_finished(self):
1205 return self.num_complete() == self.num_machines()
1206
1207
1208 def _not_yet_run_entries(self, include_verifying=True):
1209 statuses = [models.HostQueueEntry.Status.QUEUED,
1210 models.HostQueueEntry.Status.PENDING]
1211 if include_verifying:
1212 statuses.append(models.HostQueueEntry.Status.VERIFYING)
1213 return models.HostQueueEntry.objects.filter(job=self.id,
1214 status__in=statuses)
1215
1216
1217 def _stop_all_entries(self):
1218 entries_to_stop = self._not_yet_run_entries(
1219 include_verifying=False)
1220 for child_entry in entries_to_stop:
1221 assert not child_entry.complete, (
1222 '%s status=%s, active=%s, complete=%s' %
1223 (child_entry.id, child_entry.status, child_entry.active,
1224 child_entry.complete))
1225 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1226 child_entry.host.status = models.Host.Status.READY
1227 child_entry.host.save()
1228 child_entry.status = models.HostQueueEntry.Status.STOPPED
1229 child_entry.save()
1230
1231
1232 def stop_if_necessary(self):
1233 not_yet_run = self._not_yet_run_entries()
1234 if not_yet_run.count() < self.synch_count:
1235 self._stop_all_entries()
1236
1237
jamesrenb55378a2010-03-02 22:19:49 +00001238 def _next_group_name(self, group_name=''):
1239 """@returns a directory name to use for the next host group results."""
1240 if group_name:
1241 # Sanitize for use as a pathname.
1242 group_name = group_name.replace(os.path.sep, '_')
1243 if group_name.startswith('.'):
1244 group_name = '_' + group_name[1:]
1245 # Add a separator between the group name and 'group%d'.
1246 group_name += '.'
1247 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1248 query = models.HostQueueEntry.objects.filter(
1249 job=self.id).values('execution_subdir').distinct()
1250 subdirs = (entry['execution_subdir'] for entry in query)
1251 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1252 ids = [int(match.group(1)) for match in group_matches if match]
1253 if ids:
1254 next_id = max(ids) + 1
1255 else:
1256 next_id = 0
1257 return '%sgroup%d' % (group_name, next_id)
1258
1259
1260 def get_group_entries(self, queue_entry_from_group):
1261 """
1262 @param queue_entry_from_group: A HostQueueEntry instance to find other
1263 group entries on this job for.
1264
1265 @returns A list of HostQueueEntry objects all executing this job as
1266 part of the same group as the one supplied (having the same
1267 execution_subdir).
1268 """
1269 execution_subdir = queue_entry_from_group.execution_subdir
1270 return list(HostQueueEntry.fetch(
1271 where='job_id=%s AND execution_subdir=%s',
1272 params=(self.id, execution_subdir)))
1273
1274
1275 def _should_run_cleanup(self, queue_entry):
1276 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1277 return True
1278 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1279 return queue_entry.host.dirty
1280 return False
1281
1282
1283 def _should_run_verify(self, queue_entry):
1284 do_not_verify = (queue_entry.host.protection ==
1285 host_protections.Protection.DO_NOT_VERIFY)
1286 if do_not_verify:
1287 return False
Alex Miller6ee996f2013-02-28 13:53:52 -08001288 # If RebootBefore is set to NEVER, then we won't run reset because
1289 # we can't cleanup, so we need to weaken a Reset into a Verify.
1290 weaker_reset = (self.run_reset and
1291 self.reboot_before == model_attributes.RebootBefore.NEVER)
1292 return self.run_verify or weaker_reset
jamesrenb55378a2010-03-02 22:19:49 +00001293
1294
Dan Shi07e09af2013-04-12 09:31:29 -07001295 def _should_run_reset(self, queue_entry):
1296 can_verify = (queue_entry.host.protection !=
1297 host_protections.Protection.DO_NOT_VERIFY)
1298 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
1299 return (can_reboot and can_verify and (self.run_reset or
1300 (self._should_run_cleanup(queue_entry) and
1301 self._should_run_verify(queue_entry))))
1302
1303
Alex Millerdfff2fd2013-05-28 13:05:06 -07001304 def _should_run_provision(self, queue_entry):
1305 """
1306 Determine if the queue_entry needs to have a provision task run before
1307 it to provision queue_entry.host.
1308
1309 @param queue_entry: The host queue entry in question.
1310 @returns: True if we should schedule a provision task, False otherwise.
1311
1312 """
1313 # If we get to this point, it means that the scheduler has already
1314 # vetted that all the unprovisionable labels match, so we can just
1315 # find all labels on the job that aren't on the host to get the list
1316 # of what we need to provision. (See the scheduling logic in
1317 # host_scheduler.py:is_host_eligable_for_job() where we discard all
Alex Miller627694a2014-05-01 18:04:29 -07001318 # actionable labels when assigning jobs to hosts.)
Alex Millerdfff2fd2013-05-28 13:05:06 -07001319 job_labels = {x.name for x in queue_entry.get_labels()}
1320 _, host_labels = queue_entry.host.platform_and_labels()
Alex Miller627694a2014-05-01 18:04:29 -07001321 # If there are any labels on the job that are not on the host and they
1322 # are labels that provisioning knows how to change, then that means
1323 # there is provisioning work to do. If there's no provisioning work to
1324 # do, then obviously we have no reason to schedule a provision task!
1325 diff = job_labels - set(host_labels)
1326 if any([provision.Provision.acts_on(x) for x in diff]):
Alex Millerdfff2fd2013-05-28 13:05:06 -07001327 return True
1328 return False
1329
1330
Alex Miller42437f92013-05-28 12:58:54 -07001331 def _queue_special_task(self, queue_entry, task):
jamesrenb55378a2010-03-02 22:19:49 +00001332 """
Alex Miller42437f92013-05-28 12:58:54 -07001333 Create a special task and associate it with a host queue entry.
jamesrenb55378a2010-03-02 22:19:49 +00001334
Alex Miller42437f92013-05-28 12:58:54 -07001335 @param queue_entry: The queue entry this special task should be
1336 associated with.
1337 @param task: One of the members of the enum models.SpecialTask.Task.
1338 @returns: None
1339
jamesrenb55378a2010-03-02 22:19:49 +00001340 """
jamesrenb55378a2010-03-02 22:19:49 +00001341 models.SpecialTask.objects.create(
1342 host=models.Host.objects.get(id=queue_entry.host_id),
1343 queue_entry=queue_entry, task=task)
1344
1345
Alex Miller42437f92013-05-28 12:58:54 -07001346 def schedule_pre_job_tasks(self, queue_entry):
1347 """
1348 Queue all of the special tasks that need to be run before a host
1349 queue entry may run.
1350
1351 If no special taskes need to be scheduled, then |on_pending| will be
1352 called directly.
1353
1354 @returns None
1355
1356 """
1357 task_queued = False
1358 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
1359
Dan Shi07e09af2013-04-12 09:31:29 -07001360 if self._should_run_reset(queue_entry):
1361 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
Alex Miller42437f92013-05-28 12:58:54 -07001362 task_queued = True
Dan Shi07e09af2013-04-12 09:31:29 -07001363 else:
1364 if self._should_run_cleanup(queue_entry):
1365 self._queue_special_task(hqe_model,
1366 models.SpecialTask.Task.CLEANUP)
1367 task_queued = True
1368 if self._should_run_verify(queue_entry):
1369 self._queue_special_task(hqe_model,
1370 models.SpecialTask.Task.VERIFY)
1371 task_queued = True
Alex Miller42437f92013-05-28 12:58:54 -07001372
Alex Millerdfff2fd2013-05-28 13:05:06 -07001373 if self._should_run_provision(queue_entry):
1374 self._queue_special_task(hqe_model,
1375 models.SpecialTask.Task.PROVISION)
1376 task_queued = True
1377
Alex Miller42437f92013-05-28 12:58:54 -07001378 if not task_queued:
1379 queue_entry.on_pending()
1380
1381
jamesrenb55378a2010-03-02 22:19:49 +00001382 def _assign_new_group(self, queue_entries, group_name=''):
1383 if len(queue_entries) == 1:
1384 group_subdir_name = queue_entries[0].host.hostname
1385 else:
1386 group_subdir_name = self._next_group_name(group_name)
1387 logging.info('Running synchronous job %d hosts %s as %s',
1388 self.id, [entry.host.hostname for entry in queue_entries],
1389 group_subdir_name)
1390
1391 for queue_entry in queue_entries:
1392 queue_entry.set_execution_subdir(group_subdir_name)
1393
1394
1395 def _choose_group_to_run(self, include_queue_entry):
1396 """
1397 @returns A tuple containing a list of HostQueueEntry instances to be
1398 used to run this Job, a string group name to suggest giving
1399 to this job in the results database.
1400 """
1401 atomic_group = include_queue_entry.atomic_group
1402 chosen_entries = [include_queue_entry]
1403 if atomic_group:
1404 num_entries_wanted = atomic_group.max_number_of_machines
1405 else:
1406 num_entries_wanted = self.synch_count
1407 num_entries_wanted -= len(chosen_entries)
1408
1409 if num_entries_wanted > 0:
1410 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1411 pending_entries = list(HostQueueEntry.fetch(
1412 where=where_clause,
1413 params=(self.id, include_queue_entry.id)))
1414
1415 # Sort the chosen hosts by hostname before slicing.
1416 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1417 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1418 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1419 chosen_entries += pending_entries[:num_entries_wanted]
1420
1421 # Sanity check. We'll only ever be called if this can be met.
1422 if len(chosen_entries) < self.synch_count:
1423 message = ('job %s got less than %s chosen entries: %s' % (
1424 self.id, self.synch_count, chosen_entries))
1425 logging.error(message)
1426 email_manager.manager.enqueue_notify_email(
1427 'Job not started, too few chosen entries', message)
1428 return []
1429
1430 group_name = include_queue_entry.get_group_name()
1431
1432 self._assign_new_group(chosen_entries, group_name=group_name)
1433 return chosen_entries
1434
1435
1436 def run_if_ready(self, queue_entry):
1437 """
1438 Run this job by kicking its HQEs into status='Starting' if enough
1439 hosts are ready for it to run.
1440
1441 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1442 ready to run.
1443 """
1444 if not self.is_ready():
1445 self.stop_if_necessary()
1446 elif queue_entry.atomic_group:
1447 self.run_with_ready_delay(queue_entry)
1448 else:
1449 self.run(queue_entry)
1450
1451
1452 def run_with_ready_delay(self, queue_entry):
1453 """
1454 Start a delay to wait for more hosts to enter Pending state before
1455 launching an atomic group job. Once set, the a delay cannot be reset.
1456
1457 @param queue_entry: The HostQueueEntry object to get atomic group
1458 info from and pass to run_if_ready when the delay is up.
1459
1460 @returns An Agent to run the job as appropriate or None if a delay
1461 has already been set.
1462 """
1463 assert queue_entry.job_id == self.id
1464 assert queue_entry.atomic_group
1465 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1466 over_max_threshold = (self._pending_count() >=
1467 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1468 delay_expired = (self._delay_ready_task and
1469 time.time() >= self._delay_ready_task.end_time)
1470
1471 # Delay is disabled or we already have enough? Do not wait to run.
1472 if not delay or over_max_threshold or delay_expired:
1473 self.run(queue_entry)
1474 else:
1475 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1476
1477
1478 def request_abort(self):
1479 """Request that this Job be aborted on the next scheduler cycle."""
1480 self.model().abort()
1481
1482
1483 def schedule_delayed_callback_task(self, queue_entry):
1484 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1485
1486 if self._delay_ready_task:
1487 return None
1488
1489 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1490
1491 def run_job_after_delay():
1492 logging.info('Job %s done waiting for extra hosts.', self)
1493 # Check to see if the job is still relevant. It could have aborted
1494 # while we were waiting or hosts could have disappearred, etc.
1495 if self._pending_count() < self._min_hosts_needed_to_run():
1496 logging.info('Job %s had too few Pending hosts after waiting '
1497 'for extras. Not running.', self)
1498 self.request_abort()
1499 return
1500 return self.run(queue_entry)
1501
1502 logging.info('Job %s waiting up to %s seconds for more hosts.',
1503 self.id, delay)
1504 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1505 callback=run_job_after_delay)
1506 return self._delay_ready_task
1507
1508
1509 def run(self, queue_entry):
1510 """
1511 @param queue_entry: The HostQueueEntry instance calling this method.
1512 """
1513 if queue_entry.atomic_group and self._atomic_and_has_started():
1514 logging.error('Job.run() called on running atomic Job %d '
1515 'with HQE %s.', self.id, queue_entry)
1516 return
1517 queue_entries = self._choose_group_to_run(queue_entry)
1518 if queue_entries:
1519 self._finish_run(queue_entries)
1520
1521
1522 def _finish_run(self, queue_entries):
1523 for queue_entry in queue_entries:
1524 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1525 self.abort_delay_ready_task()
1526
1527
1528 def abort_delay_ready_task(self):
1529 """Abort the delayed task associated with this job, if any."""
1530 if self._delay_ready_task:
1531 # Cancel any pending callback that would try to run again
1532 # as we are already running.
1533 self._delay_ready_task.abort()
1534
1535
1536 def __str__(self):
1537 return '%s-%s' % (self.id, self.owner)