blob: 0757ee37f6d7c31b60f1885123ed3d07749c5c64 [file] [log] [blame]
Aviv Keshet0b9cfc92013-02-05 11:36:02 -08001# pylint: disable-msg=C0111
2
jamesrenb55378a2010-03-02 22:19:49 +00003"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out. Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
14 one of these statuses, an email will be sent to the job's email_list.
15 comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import datetime, itertools, logging, os, re, sys, time, weakref
Michael Liangda8c60a2014-06-03 13:24:51 -070022from autotest_lib.client.common_lib import control_data
Dan Shidfea3682014-08-10 23:38:40 -070023from autotest_lib.client.common_lib import global_config, host_protections
24from autotest_lib.client.common_lib import time_utils
25from autotest_lib.client.common_lib import utils
Gabe Blackb72f4fb2015-01-20 16:47:13 -080026from autotest_lib.client.common_lib.cros.graphite import autotest_es
Michael Liangda8c60a2014-06-03 13:24:51 -070027from autotest_lib.client.common_lib.cros.graphite import stats
jamesrenb55378a2010-03-02 22:19:49 +000028from autotest_lib.frontend.afe import models, model_attributes
jamesrenb55378a2010-03-02 22:19:49 +000029from autotest_lib.scheduler import drone_manager, email_manager
beepscc9fc702013-12-02 12:45:38 -080030from autotest_lib.scheduler import rdb_lib
jamesrenb55378a2010-03-02 22:19:49 +000031from autotest_lib.scheduler import scheduler_config
Prashanth B0e960282014-05-13 19:38:28 -070032from autotest_lib.scheduler import scheduler_lib
Alex Miller627694a2014-05-01 18:04:29 -070033from autotest_lib.server.cros import provision
Michael Liangda8c60a2014-06-03 13:24:51 -070034
jamesrenb55378a2010-03-02 22:19:49 +000035
36_notify_email_statuses = []
37_base_url = None
38
39_db = None
40_drone_manager = None
41
42def initialize():
43 global _db
Prashanth B0e960282014-05-13 19:38:28 -070044 _db = scheduler_lib.ConnectionManager().get_connection()
jamesrenb55378a2010-03-02 22:19:49 +000045
46 notify_statuses_list = global_config.global_config.get_config_value(
47 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
48 default='')
49 global _notify_email_statuses
50 _notify_email_statuses = [status for status in
51 re.split(r'[\s,;:]', notify_statuses_list.lower())
52 if status]
53
54 # AUTOTEST_WEB.base_url is still a supported config option as some people
55 # may wish to override the entire url.
56 global _base_url
57 config_base_url = global_config.global_config.get_config_value(
58 scheduler_config.CONFIG_SECTION, 'base_url', default='')
59 if config_base_url:
60 _base_url = config_base_url
61 else:
62 # For the common case of everything running on a single server you
63 # can just set the hostname in a single place in the config file.
64 server_name = global_config.global_config.get_config_value(
65 'SERVER', 'hostname')
66 if not server_name:
67 logging.critical('[SERVER] hostname missing from the config file.')
68 sys.exit(1)
69 _base_url = 'http://%s/afe/' % server_name
70
71 initialize_globals()
72
73
74def initialize_globals():
75 global _drone_manager
76 _drone_manager = drone_manager.instance()
77
78
Dan Shi7cf3d842014-08-13 11:20:38 -070079def get_job_metadata(job):
80 """Get a dictionary of the job information.
81
82 The return value is a dictionary that includes job information like id,
83 name and parent job information. The value will be stored in metadata
84 database.
85
86 @param job: A Job object.
87 @return: A dictionary containing the job id, owner and name.
88 """
89 if not job:
90 logging.error('Job is None, no metadata returned.')
91 return {}
92 try:
93 return {'job_id': job.id,
94 'owner': job.owner,
95 'job_name': job.name,
96 'parent_job_id': job.parent_job_id}
97 except AttributeError as e:
98 logging.error('Job has missing attribute: %s', e)
99 return {}
100
101
jamesrenb55378a2010-03-02 22:19:49 +0000102class DelayedCallTask(object):
103 """
104 A task object like AgentTask for an Agent to run that waits for the
105 specified amount of time to have elapsed before calling the supplied
106 callback once and finishing. If the callback returns anything, it is
107 assumed to be a new Agent instance and will be added to the dispatcher.
108
109 @attribute end_time: The absolute posix time after which this task will
110 call its callback when it is polled and be finished.
111
112 Also has all attributes required by the Agent class.
113 """
114 def __init__(self, delay_seconds, callback, now_func=None):
115 """
116 @param delay_seconds: The delay in seconds from now that this task
117 will call the supplied callback and be done.
118 @param callback: A callable to be called by this task once after at
119 least delay_seconds time has elapsed. It must return None
120 or a new Agent instance.
121 @param now_func: A time.time like function. Default: time.time.
122 Used for testing.
123 """
124 assert delay_seconds > 0
125 assert callable(callback)
126 if not now_func:
127 now_func = time.time
128 self._now_func = now_func
129 self._callback = callback
130
131 self.end_time = self._now_func() + delay_seconds
132
133 # These attributes are required by Agent.
134 self.aborted = False
135 self.host_ids = ()
136 self.success = False
137 self.queue_entry_ids = ()
138 self.num_processes = 0
139
140
141 def poll(self):
142 if not self.is_done() and self._now_func() >= self.end_time:
143 self._callback()
144 self.success = True
145
146
147 def is_done(self):
148 return self.success or self.aborted
149
150
151 def abort(self):
152 self.aborted = True
153
154
155class DBError(Exception):
156 """Raised by the DBObject constructor when its select fails."""
157
158
159class DBObject(object):
160 """A miniature object relational model for the database."""
161
162 # Subclasses MUST override these:
163 _table_name = ''
164 _fields = ()
165
166 # A mapping from (type, id) to the instance of the object for that
167 # particular id. This prevents us from creating new Job() and Host()
168 # instances for every HostQueueEntry object that we instantiate as
169 # multiple HQEs often share the same Job.
170 _instances_by_type_and_id = weakref.WeakValueDictionary()
171 _initialized = False
172
173
174 def __new__(cls, id=None, **kwargs):
175 """
176 Look to see if we already have an instance for this particular type
177 and id. If so, use it instead of creating a duplicate instance.
178 """
179 if id is not None:
180 instance = cls._instances_by_type_and_id.get((cls, id))
181 if instance:
182 return instance
183 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
184
185
186 def __init__(self, id=None, row=None, new_record=False, always_query=True):
187 assert bool(id) or bool(row)
188 if id is not None and row is not None:
189 assert id == row[0]
190 assert self._table_name, '_table_name must be defined in your class'
191 assert self._fields, '_fields must be defined in your class'
192 if not new_record:
193 if self._initialized and not always_query:
194 return # We've already been initialized.
195 if id is None:
196 id = row[0]
197 # Tell future constructors to use us instead of re-querying while
198 # this instance is still around.
199 self._instances_by_type_and_id[(type(self), id)] = self
200
201 self.__table = self._table_name
202
203 self.__new_record = new_record
204
205 if row is None:
206 row = self._fetch_row_from_db(id)
207
208 if self._initialized:
209 differences = self._compare_fields_in_row(row)
210 if differences:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -0700211 logging.warning(
jamesrenb55378a2010-03-02 22:19:49 +0000212 'initialized %s %s instance requery is updating: %s',
213 type(self), self.id, differences)
214 self._update_fields_from_row(row)
215 self._initialized = True
216
217
218 @classmethod
219 def _clear_instance_cache(cls):
220 """Used for testing, clear the internal instance cache."""
221 cls._instances_by_type_and_id.clear()
222
223
224 def _fetch_row_from_db(self, row_id):
225 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
226 rows = _db.execute(sql, (row_id,))
227 if not rows:
228 raise DBError("row not found (table=%s, row id=%s)"
229 % (self.__table, row_id))
230 return rows[0]
231
232
233 def _assert_row_length(self, row):
234 assert len(row) == len(self._fields), (
235 "table = %s, row = %s/%d, fields = %s/%d" % (
236 self.__table, row, len(row), self._fields, len(self._fields)))
237
238
239 def _compare_fields_in_row(self, row):
240 """
241 Given a row as returned by a SELECT query, compare it to our existing in
242 memory fields. Fractional seconds are stripped from datetime values
243 before comparison.
244
245 @param row - A sequence of values corresponding to fields named in
246 The class attribute _fields.
247
248 @returns A dictionary listing the differences keyed by field name
249 containing tuples of (current_value, row_value).
250 """
251 self._assert_row_length(row)
252 differences = {}
jamesrenb55378a2010-03-02 22:19:49 +0000253 for field, row_value in itertools.izip(self._fields, row):
254 current_value = getattr(self, field)
255 if (isinstance(current_value, datetime.datetime)
256 and isinstance(row_value, datetime.datetime)):
Dan Shidfea3682014-08-10 23:38:40 -0700257 current_value = current_value.strftime(time_utils.TIME_FMT)
258 row_value = row_value.strftime(time_utils.TIME_FMT)
jamesrenb55378a2010-03-02 22:19:49 +0000259 if current_value != row_value:
260 differences[field] = (current_value, row_value)
261 return differences
262
263
264 def _update_fields_from_row(self, row):
265 """
266 Update our field attributes using a single row returned by SELECT.
267
268 @param row - A sequence of values corresponding to fields named in
269 the class fields list.
270 """
271 self._assert_row_length(row)
272
273 self._valid_fields = set()
274 for field, value in itertools.izip(self._fields, row):
275 setattr(self, field, value)
276 self._valid_fields.add(field)
277
278 self._valid_fields.remove('id')
279
280
281 def update_from_database(self):
282 assert self.id is not None
283 row = self._fetch_row_from_db(self.id)
284 self._update_fields_from_row(row)
285
286
287 def count(self, where, table = None):
288 if not table:
289 table = self.__table
290
291 rows = _db.execute("""
292 SELECT count(*) FROM %s
293 WHERE %s
294 """ % (table, where))
295
296 assert len(rows) == 1
297
298 return int(rows[0][0])
299
300
301 def update_field(self, field, value):
302 assert field in self._valid_fields
303
304 if getattr(self, field) == value:
305 return
306
307 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
308 _db.execute(query, (value, self.id))
309
310 setattr(self, field, value)
311
312
313 def save(self):
314 if self.__new_record:
315 keys = self._fields[1:] # avoid id
316 columns = ','.join([str(key) for key in keys])
317 values = []
318 for key in keys:
319 value = getattr(self, key)
320 if value is None:
321 values.append('NULL')
322 else:
323 values.append('"%s"' % value)
324 values_str = ','.join(values)
325 query = ('INSERT INTO %s (%s) VALUES (%s)' %
326 (self.__table, columns, values_str))
327 _db.execute(query)
328 # Update our id to the one the database just assigned to us.
329 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
330
331
332 def delete(self):
333 self._instances_by_type_and_id.pop((type(self), id), None)
334 self._initialized = False
335 self._valid_fields.clear()
336 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
337 _db.execute(query, (self.id,))
338
339
340 @staticmethod
341 def _prefix_with(string, prefix):
342 if string:
343 string = prefix + string
344 return string
345
346
347 @classmethod
348 def fetch(cls, where='', params=(), joins='', order_by=''):
349 """
350 Construct instances of our class based on the given database query.
351
352 @yields One class instance for each row fetched.
353 """
354 order_by = cls._prefix_with(order_by, 'ORDER BY ')
355 where = cls._prefix_with(where, 'WHERE ')
356 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
357 '%(where)s %(order_by)s' % {'table' : cls._table_name,
358 'joins' : joins,
359 'where' : where,
360 'order_by' : order_by})
361 rows = _db.execute(query, params)
362 return [cls(id=row[0], row=row) for row in rows]
363
364
365class IneligibleHostQueue(DBObject):
366 _table_name = 'afe_ineligible_host_queues'
367 _fields = ('id', 'job_id', 'host_id')
368
369
370class AtomicGroup(DBObject):
371 _table_name = 'afe_atomic_groups'
372 _fields = ('id', 'name', 'description', 'max_number_of_machines',
373 'invalid')
374
375
376class Label(DBObject):
377 _table_name = 'afe_labels'
378 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
379 'only_if_needed', 'atomic_group_id')
380
381
382 def __repr__(self):
383 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
384 self.name, self.id, self.atomic_group_id)
385
386
387class Host(DBObject):
388 _table_name = 'afe_hosts'
389 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
beepscc9fc702013-12-02 12:45:38 -0800390 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
Jakob Juelichde2b9a92014-09-02 15:29:28 -0700391 'leased', 'shard_id')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700392 _timer = stats.Timer("scheduler_models.Host")
jamesrenb55378a2010-03-02 22:19:49 +0000393
394
Fang Deng1d6c2a02013-04-17 15:25:45 -0700395 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000396 def set_status(self,status):
397 logging.info('%s -> %s', self.hostname, status)
398 self.update_field('status',status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700399 # Noticed some time jumps after the last log message.
400 logging.debug('Host Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000401
402
403 def platform_and_labels(self):
404 """
405 Returns a tuple (platform_name, list_of_all_label_names).
406 """
407 rows = _db.execute("""
408 SELECT afe_labels.name, afe_labels.platform
409 FROM afe_labels
410 INNER JOIN afe_hosts_labels ON
411 afe_labels.id = afe_hosts_labels.label_id
412 WHERE afe_hosts_labels.host_id = %s
413 ORDER BY afe_labels.name
414 """, (self.id,))
415 platform = None
416 all_labels = []
417 for label_name, is_platform in rows:
418 if is_platform:
419 platform = label_name
420 all_labels.append(label_name)
421 return platform, all_labels
422
423
424 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
425
426
427 @classmethod
428 def cmp_for_sort(cls, a, b):
429 """
430 A comparison function for sorting Host objects by hostname.
431
432 This strips any trailing numeric digits, ignores leading 0s and
433 compares hostnames by the leading name and the trailing digits as a
434 number. If both hostnames do not match this pattern, they are simply
435 compared as lower case strings.
436
437 Example of how hostnames will be sorted:
438
439 alice, host1, host2, host09, host010, host10, host11, yolkfolk
440
441 This hopefully satisfy most people's hostname sorting needs regardless
442 of their exact naming schemes. Nobody sane should have both a host10
443 and host010 (but the algorithm works regardless).
444 """
445 lower_a = a.hostname.lower()
446 lower_b = b.hostname.lower()
447 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
448 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
449 if match_a and match_b:
450 name_a, number_a_str = match_a.groups()
451 name_b, number_b_str = match_b.groups()
452 number_a = int(number_a_str.lstrip('0'))
453 number_b = int(number_b_str.lstrip('0'))
454 result = cmp((name_a, number_a), (name_b, number_b))
455 if result == 0 and lower_a != lower_b:
456 # If they compared equal above but the lower case names are
457 # indeed different, don't report equality. abc012 != abc12.
458 return cmp(lower_a, lower_b)
459 return result
460 else:
461 return cmp(lower_a, lower_b)
462
463
464class HostQueueEntry(DBObject):
465 _table_name = 'afe_host_queue_entries'
466 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
467 'active', 'complete', 'deleted', 'execution_subdir',
Fang Deng51599032014-06-23 17:24:27 -0700468 'atomic_group_id', 'aborted', 'started_on', 'finished_on')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700469 _timer = stats.Timer('scheduler_models.HostQueueEntry')
jamesrenb55378a2010-03-02 22:19:49 +0000470
471
472 def __init__(self, id=None, row=None, **kwargs):
473 assert id or row
474 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
475 self.job = Job(self.job_id)
476
477 if self.host_id:
beepscc9fc702013-12-02 12:45:38 -0800478 self.host = rdb_lib.get_hosts([self.host_id])[0]
479 self.host.dbg_str = self.get_dbg_str()
Dan Shi7cf3d842014-08-13 11:20:38 -0700480 self.host.metadata = get_job_metadata(self.job)
jamesrenb55378a2010-03-02 22:19:49 +0000481 else:
482 self.host = None
483
484 if self.atomic_group_id:
485 self.atomic_group = AtomicGroup(self.atomic_group_id,
486 always_query=False)
487 else:
488 self.atomic_group = None
489
jamesrenb55378a2010-03-02 22:19:49 +0000490
491 @classmethod
492 def clone(cls, template):
493 """
494 Creates a new row using the values from a template instance.
495
496 The new instance will not exist in the database or have a valid
497 id attribute until its save() method is called.
498 """
499 assert isinstance(template, cls)
500 new_row = [getattr(template, field) for field in cls._fields]
501 clone = cls(row=new_row, new_record=True)
502 clone.id = None
503 return clone
504
505
506 def _view_job_url(self):
507 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
508
509
510 def get_labels(self):
511 """
512 Get all labels associated with this host queue entry (either via the
513 meta_host or as a job dependency label). The labels yielded are not
514 guaranteed to be unique.
515
516 @yields Label instances associated with this host_queue_entry.
517 """
518 if self.meta_host:
519 yield Label(id=self.meta_host, always_query=False)
520 labels = Label.fetch(
521 joins="JOIN afe_jobs_dependency_labels AS deps "
522 "ON (afe_labels.id = deps.label_id)",
523 where="deps.job_id = %d" % self.job.id)
524 for label in labels:
525 yield label
526
527
528 def set_host(self, host):
529 if host:
530 logging.info('Assigning host %s to entry %s', host.hostname, self)
jamesrenb55378a2010-03-02 22:19:49 +0000531 self.update_field('host_id', host.id)
532 self.block_host(host.id)
533 else:
534 logging.info('Releasing host from %s', self)
jamesrenb55378a2010-03-02 22:19:49 +0000535 self.unblock_host(self.host.id)
536 self.update_field('host_id', None)
537
538 self.host = host
539
540
jamesrenb55378a2010-03-02 22:19:49 +0000541 def block_host(self, host_id):
542 logging.info("creating block %s/%s", self.job.id, host_id)
543 row = [0, self.job.id, host_id]
544 block = IneligibleHostQueue(row=row, new_record=True)
545 block.save()
546
547
548 def unblock_host(self, host_id):
549 logging.info("removing block %s/%s", self.job.id, host_id)
550 blocks = IneligibleHostQueue.fetch(
551 'job_id=%d and host_id=%d' % (self.job.id, host_id))
552 for block in blocks:
553 block.delete()
554
555
556 def set_execution_subdir(self, subdir=None):
557 if subdir is None:
558 assert self.host
559 subdir = self.host.hostname
560 self.update_field('execution_subdir', subdir)
561
562
563 def _get_hostname(self):
564 if self.host:
565 return self.host.hostname
566 return 'no host'
567
568
beepscc9fc702013-12-02 12:45:38 -0800569 def get_dbg_str(self):
570 """Get a debug string to identify this host.
571
572 @return: A string containing the hqe and job id.
573 """
574 try:
575 return 'HQE: %s, for job: %s' % (self.id, self.job_id)
576 except AttributeError as e:
577 return 'HQE has not been initialized yet: %s' % e
578
579
jamesrenb55378a2010-03-02 22:19:49 +0000580 def __str__(self):
581 flags = []
582 if self.active:
583 flags.append('active')
584 if self.complete:
585 flags.append('complete')
586 if self.deleted:
587 flags.append('deleted')
588 if self.aborted:
589 flags.append('aborted')
590 flags_str = ','.join(flags)
591 if flags_str:
592 flags_str = ' [%s]' % flags_str
beepscc9fc702013-12-02 12:45:38 -0800593 return ("%s and host: %s has status:%s%s" %
594 (self.get_dbg_str(), self._get_hostname(), self.status,
595 flags_str))
jamesrenb55378a2010-03-02 22:19:49 +0000596
597
Michael Liang0d747462014-07-17 14:19:53 -0700598 def record_state(self, type_str, state, value):
Michael Liang500dedc2014-07-15 16:16:44 -0700599 """Record metadata in elasticsearch.
600
601 If ES configured to use http, then we will time that http request.
602 Otherwise, it uses UDP, so we will not need to time it.
603
Michael Liang0d747462014-07-17 14:19:53 -0700604 @param type_str: sets the _type field in elasticsearch db.
Michael Liang500dedc2014-07-15 16:16:44 -0700605 @param state: string representing what state we are recording,
606 e.g. 'status'
607 @param value: value of the state, e.g. 'verifying'
608 """
609 metadata = {
610 'time_changed': time.time(),
611 state: value,
612 'job_id': self.job_id,
613 }
614 if self.host:
615 metadata['hostname'] = self.host.hostname
Gabe Blackb72f4fb2015-01-20 16:47:13 -0800616 autotest_es.post(type_str=type_str, metadata=metadata)
Michael Liang500dedc2014-07-15 16:16:44 -0700617
618
Fang Deng1d6c2a02013-04-17 15:25:45 -0700619 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000620 def set_status(self, status):
621 logging.info("%s -> %s", self, status)
622
623 self.update_field('status', status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700624 # Noticed some time jumps after last logging message.
625 logging.debug('Update Field Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000626
627 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
628 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
629 assert not (active and complete)
630
631 self.update_field('active', active)
jamesrenb55378a2010-03-02 22:19:49 +0000632
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800633 # The ordering of these operations is important. Once we set the
634 # complete bit this job will become indistinguishable from all
635 # the other complete jobs, unless we first set shard_id to NULL
636 # to signal to the shard_client that we need to upload it. However,
637 # we can only set both these after we've updated finished_on etc
638 # within _on_complete or the job will get synced in an intermediate
639 # state. This means that if someone sigkills the scheduler between
640 # setting finished_on and complete, we will have inconsistent jobs.
641 # This should be fine, because nothing critical checks finished_on,
642 # and the scheduler should never be killed mid-tick.
jamesrenb55378a2010-03-02 22:19:49 +0000643 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000644 self._on_complete(status)
Jakob Juelich26ef4262014-09-17 15:24:15 -0700645 if self.job.shard_id is not None:
646 # If shard_id is None, the job will be synced back to the master
647 self.job.update_field('shard_id', None)
Dale Curtis74a314b2011-06-23 14:55:46 -0700648 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000649
Prashanth Balasubramanian22dd2262014-11-28 18:19:18 -0800650 self.update_field('complete', complete)
651
jamesrenb55378a2010-03-02 22:19:49 +0000652 should_email_status = (status.lower() in _notify_email_statuses or
653 'all' in _notify_email_statuses)
654 if should_email_status:
655 self._email_on_status(status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700656 logging.debug('HQE Set Status Complete')
Michael Liang500dedc2014-07-15 16:16:44 -0700657 self.record_state('hqe_status', 'status', status)
658
jamesrenb55378a2010-03-02 22:19:49 +0000659
660
jamesrene7c65cb2010-06-08 20:38:10 +0000661 def _on_complete(self, status):
662 if status is not models.HostQueueEntry.Status.ABORTED:
663 self.job.stop_if_necessary()
664
Fang Dengd44a1232014-08-18 14:40:28 -0700665 if self.started_on:
666 self.set_finished_on_now()
jamesrenb55378a2010-03-02 22:19:49 +0000667 if not self.execution_subdir:
668 return
669 # unregister any possible pidfiles associated with this queue entry
670 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
671 pidfile_id = _drone_manager.get_pidfile_id_from(
672 self.execution_path(), pidfile_name=pidfile_name)
673 _drone_manager.unregister_pidfile(pidfile_id)
674
675
Eric Li6f27d4f2010-09-29 10:55:17 -0700676 def _get_status_email_contents(self, status, summary=None, hostname=None):
677 """
678 Gather info for the status notification e-mails.
679
680 If needed, we could start using the Django templating engine to create
681 the subject and the e-mail body, but that doesn't seem necessary right
682 now.
683
684 @param status: Job status text. Mandatory.
685 @param summary: Job summary text. Optional.
686 @param hostname: A hostname for the job. Optional.
687
688 @return: Tuple (subject, body) for the notification e-mail.
689 """
690 job_stats = Job(id=self.job.id).get_execution_details()
691
692 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
693 (self.job.id, self.job.name, status))
694
695 if hostname is not None:
696 subject += '| Hostname: %s ' % hostname
697
698 if status not in ["1 Failed", "Failed"]:
699 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
700
701 body = "Job ID: %s\n" % self.job.id
702 body += "Job name: %s\n" % self.job.name
703 if hostname is not None:
704 body += "Host: %s\n" % hostname
705 if summary is not None:
706 body += "Summary: %s\n" % summary
707 body += "Status: %s\n" % status
708 body += "Results interface URL: %s\n" % self._view_job_url()
709 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
710 if int(job_stats['total_executed']) > 0:
711 body += "User tests executed: %s\n" % job_stats['total_executed']
712 body += "User tests passed: %s\n" % job_stats['total_passed']
713 body += "User tests failed: %s\n" % job_stats['total_failed']
714 body += ("User tests success rate: %.2f %%\n" %
715 job_stats['success_rate'])
716
717 if job_stats['failed_rows']:
718 body += "Failures:\n"
719 body += job_stats['failed_rows']
720
721 return subject, body
722
723
jamesrenb55378a2010-03-02 22:19:49 +0000724 def _email_on_status(self, status):
725 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700726 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000727 email_manager.manager.send_email(self.job.email_list, subject, body)
728
729
730 def _email_on_job_complete(self):
731 if not self.job.is_finished():
732 return
733
Eric Li6f27d4f2010-09-29 10:55:17 -0700734 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000735 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
736 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700737 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000738 (queue_entry._get_hostname(),
739 queue_entry.status))
740
Eric Li6f27d4f2010-09-29 10:55:17 -0700741 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000742 status_counts = models.Job.objects.get_status_counts(
743 [self.job.id])[self.job.id]
744 status = ', '.join('%d %s' % (count, status) for status, count
745 in status_counts.iteritems())
746
Eric Li6f27d4f2010-09-29 10:55:17 -0700747 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000748 email_manager.manager.send_email(self.job.email_list, subject, body)
749
750
751 def schedule_pre_job_tasks(self):
752 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
753 self.job.name, self.meta_host, self.atomic_group_id,
754 self.job.id, self.id, self.host.hostname, self.status)
755
756 self._do_schedule_pre_job_tasks()
757
758
759 def _do_schedule_pre_job_tasks(self):
jamesrenb55378a2010-03-02 22:19:49 +0000760 self.job.schedule_pre_job_tasks(queue_entry=self)
761
762
763 def requeue(self):
764 assert self.host
765 self.set_status(models.HostQueueEntry.Status.QUEUED)
766 self.update_field('started_on', None)
Fang Deng51599032014-06-23 17:24:27 -0700767 self.update_field('finished_on', None)
jamesrenb55378a2010-03-02 22:19:49 +0000768 # verify/cleanup failure sets the execution subdir, so reset it here
769 self.set_execution_subdir('')
770 if self.meta_host:
771 self.set_host(None)
772
773
774 @property
775 def aborted_by(self):
776 self._load_abort_info()
777 return self._aborted_by
778
779
780 @property
781 def aborted_on(self):
782 self._load_abort_info()
783 return self._aborted_on
784
785
786 def _load_abort_info(self):
787 """ Fetch info about who aborted the job. """
788 if hasattr(self, "_aborted_by"):
789 return
790 rows = _db.execute("""
791 SELECT afe_users.login,
792 afe_aborted_host_queue_entries.aborted_on
793 FROM afe_aborted_host_queue_entries
794 INNER JOIN afe_users
795 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
796 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
797 """, (self.id,))
798 if rows:
799 self._aborted_by, self._aborted_on = rows[0]
800 else:
801 self._aborted_by = self._aborted_on = None
802
803
804 def on_pending(self):
805 """
806 Called when an entry in a synchronous job has passed verify. If the
807 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
808 them in PENDING.
809 """
810 self.set_status(models.HostQueueEntry.Status.PENDING)
811 self.host.set_status(models.Host.Status.PENDING)
812
813 # Some debug code here: sends an email if an asynchronous job does not
814 # immediately enter Starting.
815 # TODO: Remove this once we figure out why asynchronous jobs are getting
816 # stuck in Pending.
817 self.job.run_if_ready(queue_entry=self)
818 if (self.job.synch_count == 1 and
819 self.status == models.HostQueueEntry.Status.PENDING):
820 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
821 message = 'Asynchronous job stuck in Pending'
822 email_manager.manager.enqueue_notify_email(subject, message)
823
824
825 def abort(self, dispatcher):
826 assert self.aborted and not self.complete
827
828 Status = models.HostQueueEntry.Status
829 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
830 # do nothing; post-job tasks will finish and then mark this entry
831 # with status "Aborted" and take care of the host
832 return
833
jamesren3bc70a12010-04-12 18:23:38 +0000834 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
835 Status.WAITING):
Dan Shi76af8022013-10-19 01:59:49 -0700836 # If hqe is in any of these status, it should not have any
837 # unfinished agent before it can be aborted.
838 agents = dispatcher.get_agents_for_entry(self)
839 # Agent with finished task can be left behind. This is added to
840 # handle the special case of aborting hostless job in STARTING
841 # status, in which the agent has only a HostlessQueueTask
842 # associated. The finished HostlessQueueTask will be cleaned up in
843 # the next tick, so it's safe to leave the agent there. Without
844 # filtering out finished agent, HQE abort won't be able to proceed.
845 assert all([agent.is_done() for agent in agents])
846 # If hqe is still in STARTING status, it may not have assigned a
847 # host yet.
848 if self.host:
849 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -0700850 elif (self.status == Status.VERIFYING or
beepse50d8752013-11-20 18:23:02 -0800851 self.status == Status.RESETTING):
jamesrenb55378a2010-03-02 22:19:49 +0000852 models.SpecialTask.objects.create(
853 task=models.SpecialTask.Task.CLEANUP,
854 host=models.Host.objects.get(id=self.host.id),
855 requested_by=self.job.owner_model())
beepse50d8752013-11-20 18:23:02 -0800856 elif self.status == Status.PROVISIONING:
857 models.SpecialTask.objects.create(
858 task=models.SpecialTask.Task.REPAIR,
859 host=models.Host.objects.get(id=self.host.id),
860 requested_by=self.job.owner_model())
jamesrenb55378a2010-03-02 22:19:49 +0000861
862 self.set_status(Status.ABORTED)
863 self.job.abort_delay_ready_task()
864
865
866 def get_group_name(self):
867 atomic_group = self.atomic_group
868 if not atomic_group:
869 return ''
870
871 # Look at any meta_host and dependency labels and pick the first
872 # one that also specifies this atomic group. Use that label name
873 # as the group name if possible (it is more specific).
874 for label in self.get_labels():
875 if label.atomic_group_id:
876 assert label.atomic_group_id == atomic_group.id
877 return label.name
878 return atomic_group.name
879
880
881 def execution_tag(self):
Scott Zawalskid712cf32012-07-14 16:24:53 -0400882 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
883 'complete!=1 AND execution_subdir="" AND '
884 'status!="Queued";')
885 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
886 'status="Aborted" WHERE id=%s;')
887 try:
888 assert self.execution_subdir
889 except AssertionError:
890 # TODO(scottz): Remove temporary fix/info gathering pathway for
891 # crosbug.com/31595 once issue is root caused.
892 logging.error('No execution_subdir for host queue id:%s.', self.id)
893 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
894 for row in _db.execute(SQL_SUSPECT_ENTRIES):
Dan Shi76af8022013-10-19 01:59:49 -0700895 logging.error(row)
Scott Zawalskid712cf32012-07-14 16:24:53 -0400896 logging.error('====DB DEBUG====\n')
897 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
898 logging.error('EXECUTING: %s', fix_query)
899 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
900 raise AssertionError(('self.execution_subdir not found. '
901 'See log for details.'))
902
jamesrenb55378a2010-03-02 22:19:49 +0000903 return "%s/%s" % (self.job.tag(), self.execution_subdir)
904
905
906 def execution_path(self):
907 return self.execution_tag()
908
909
910 def set_started_on_now(self):
911 self.update_field('started_on', datetime.datetime.now())
912
913
Fang Deng51599032014-06-23 17:24:27 -0700914 def set_finished_on_now(self):
915 self.update_field('finished_on', datetime.datetime.now())
916
917
jamesrenb55378a2010-03-02 22:19:49 +0000918 def is_hostless(self):
919 return (self.host_id is None
920 and self.meta_host is None
921 and self.atomic_group_id is None)
922
923
924class Job(DBObject):
925 _table_name = 'afe_jobs'
926 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
927 'control_type', 'created_on', 'synch_count', 'timeout',
928 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
Simran Basi9f508a12012-11-09 12:20:44 -0800929 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800930 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
Jakob Jülich92c06332014-08-25 19:06:57 +0000931 'test_retry', 'run_reset', 'timeout_mins', 'shard_id')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700932 _timer = stats.Timer("scheduler_models.Job")
jamesrenb55378a2010-03-02 22:19:49 +0000933
934 # This does not need to be a column in the DB. The delays are likely to
935 # be configured short. If the scheduler is stopped and restarted in
936 # the middle of a job's delay cycle, the delay cycle will either be
937 # repeated or skipped depending on the number of Pending machines found
938 # when the restarted scheduler recovers to track it. Not a problem.
939 #
940 # A reference to the DelayedCallTask that will wake up the job should
941 # no other HQEs change state in time. Its end_time attribute is used
942 # by our run_with_ready_delay() method to determine if the wait is over.
943 _delay_ready_task = None
944
945 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
946 # all status='Pending' atomic group HQEs incase a delay was running when the
947 # scheduler was restarted and no more hosts ever successfully exit Verify.
948
949 def __init__(self, id=None, row=None, **kwargs):
950 assert id or row
951 super(Job, self).__init__(id=id, row=row, **kwargs)
952 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800953 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000954
955
956 def model(self):
957 return models.Job.objects.get(id=self.id)
958
959
960 def owner_model(self):
961 # work around the fact that the Job owner field is a string, not a
962 # foreign key
963 if not self._owner_model:
964 self._owner_model = models.User.objects.get(login=self.owner)
965 return self._owner_model
966
967
968 def is_server_job(self):
Aviv Keshet82352b22013-05-14 18:30:56 -0700969 return self.control_type == control_data.CONTROL_TYPE.SERVER
jamesrenb55378a2010-03-02 22:19:49 +0000970
971
972 def tag(self):
973 return "%s-%s" % (self.id, self.owner)
974
975
976 def get_host_queue_entries(self):
977 rows = _db.execute("""
978 SELECT * FROM afe_host_queue_entries
979 WHERE job_id= %s
980 """, (self.id,))
981 entries = [HostQueueEntry(row=i) for i in rows]
982
983 assert len(entries)>0
984
985 return entries
986
987
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800988 def is_image_update_job(self):
989 """
990 Discover if the current job requires an OS update.
991
992 @return: True/False if OS should be updated before job is run.
993 """
994 # All image update jobs have the parameterized_job_id set.
995 if not self.parameterized_job_id:
996 return False
997
998 # Retrieve the ID of the ParameterizedJob this job is an instance of.
999 rows = _db.execute("""
1000 SELECT test_id
1001 FROM afe_parameterized_jobs
1002 WHERE id = %s
1003 """, (self.parameterized_job_id,))
1004 if not rows:
1005 return False
1006 test_id = rows[0][0]
1007
1008 # Retrieve the ID of the known autoupdate_ParameterizedJob.
1009 rows = _db.execute("""
1010 SELECT id
1011 FROM afe_autotests
1012 WHERE name = 'autoupdate_ParameterizedJob'
1013 """)
1014 if not rows:
1015 return False
1016 update_id = rows[0][0]
1017
1018 # If the IDs are the same we've found an image update job.
1019 if test_id == update_id:
1020 # Finally, get the path to the OS image to install.
1021 rows = _db.execute("""
1022 SELECT parameter_value
1023 FROM afe_parameterized_job_parameters
1024 WHERE parameterized_job_id = %s
1025 """, (self.parameterized_job_id,))
1026 if rows:
1027 # Save the path in update_image_path to use later as a command
1028 # line parameter to autoserv.
1029 self.update_image_path = rows[0][0]
1030 return True
1031
1032 return False
1033
1034
Eric Li6f27d4f2010-09-29 10:55:17 -07001035 def get_execution_details(self):
1036 """
1037 Get test execution details for this job.
1038
1039 @return: Dictionary with test execution details
1040 """
1041 def _find_test_jobs(rows):
1042 """
1043 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
1044 Those are autotest 'internal job' tests, so they should not be
1045 counted when evaluating the test stats.
1046
1047 @param rows: List of rows (matrix) with database results.
1048 """
1049 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
1050 n_test_jobs = 0
1051 for r in rows:
1052 test_name = r[0]
1053 if job_test_pattern.match(test_name):
1054 n_test_jobs += 1
1055
1056 return n_test_jobs
1057
1058 stats = {}
1059
1060 rows = _db.execute("""
1061 SELECT t.test, s.word, t.reason
1062 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
1063 WHERE t.job_idx = j.job_idx
1064 AND s.status_idx = t.status
1065 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -08001066 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -07001067 """ % self.id)
1068
Dale Curtis74a314b2011-06-23 14:55:46 -07001069 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -07001070
1071 n_test_jobs = _find_test_jobs(rows)
1072 n_test_jobs_failed = _find_test_jobs(failed_rows)
1073
1074 total_executed = len(rows) - n_test_jobs
1075 total_failed = len(failed_rows) - n_test_jobs_failed
1076
1077 if total_executed > 0:
1078 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
1079 else:
1080 success_rate = 0
1081
1082 stats['total_executed'] = total_executed
1083 stats['total_failed'] = total_failed
1084 stats['total_passed'] = total_executed - total_failed
1085 stats['success_rate'] = success_rate
1086
1087 status_header = ("Test Name", "Status", "Reason")
1088 if failed_rows:
1089 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
1090 status_header)
1091 else:
1092 stats['failed_rows'] = ''
1093
1094 time_row = _db.execute("""
1095 SELECT started_time, finished_time
1096 FROM tko_jobs
1097 WHERE afe_job_id = %s
1098 """ % self.id)
1099
1100 if time_row:
1101 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -08001102 try:
1103 delta = t_end - t_begin
1104 minutes, seconds = divmod(delta.seconds, 60)
1105 hours, minutes = divmod(minutes, 60)
1106 stats['execution_time'] = ("%02d:%02d:%02d" %
1107 (hours, minutes, seconds))
1108 # One of t_end or t_begin are None
1109 except TypeError:
1110 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -07001111 else:
1112 stats['execution_time'] = '(none)'
1113
1114 return stats
1115
1116
Fang Deng1d6c2a02013-04-17 15:25:45 -07001117 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +00001118 def set_status(self, status, update_queues=False):
1119 self.update_field('status',status)
1120
1121 if update_queues:
1122 for queue_entry in self.get_host_queue_entries():
1123 queue_entry.set_status(status)
1124
1125
1126 def keyval_dict(self):
1127 return self.model().keyval_dict()
1128
1129
1130 def _atomic_and_has_started(self):
1131 """
1132 @returns True if any of the HostQueueEntries associated with this job
1133 have entered the Status.STARTING state or beyond.
1134 """
1135 atomic_entries = models.HostQueueEntry.objects.filter(
1136 job=self.id, atomic_group__isnull=False)
1137 if atomic_entries.count() <= 0:
1138 return False
1139
1140 # These states may *only* be reached if Job.run() has been called.
1141 started_statuses = (models.HostQueueEntry.Status.STARTING,
1142 models.HostQueueEntry.Status.RUNNING,
1143 models.HostQueueEntry.Status.COMPLETED)
1144
1145 started_entries = atomic_entries.filter(status__in=started_statuses)
1146 return started_entries.count() > 0
1147
1148
1149 def _hosts_assigned_count(self):
1150 """The number of HostQueueEntries assigned a Host for this job."""
1151 entries = models.HostQueueEntry.objects.filter(job=self.id,
1152 host__isnull=False)
1153 return entries.count()
1154
1155
1156 def _pending_count(self):
1157 """The number of HostQueueEntries for this job in the Pending state."""
1158 pending_entries = models.HostQueueEntry.objects.filter(
1159 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1160 return pending_entries.count()
1161
1162
1163 def _max_hosts_needed_to_run(self, atomic_group):
1164 """
1165 @param atomic_group: The AtomicGroup associated with this job that we
1166 are using to set an upper bound on the threshold.
1167 @returns The maximum number of HostQueueEntries assigned a Host before
1168 this job can run.
1169 """
1170 return min(self._hosts_assigned_count(),
1171 atomic_group.max_number_of_machines)
1172
1173
1174 def _min_hosts_needed_to_run(self):
1175 """Return the minumum number of hsots needed to run this job."""
1176 return self.synch_count
1177
1178
1179 def is_ready(self):
1180 # NOTE: Atomic group jobs stop reporting ready after they have been
1181 # started to avoid launching multiple copies of one atomic job.
1182 # Only possible if synch_count is less than than half the number of
1183 # machines in the atomic group.
1184 pending_count = self._pending_count()
1185 atomic_and_has_started = self._atomic_and_has_started()
1186 ready = (pending_count >= self.synch_count
1187 and not atomic_and_has_started)
1188
1189 if not ready:
1190 logging.info(
1191 'Job %s not ready: %s pending, %s required '
1192 '(Atomic and started: %s)',
1193 self, pending_count, self.synch_count,
1194 atomic_and_has_started)
1195
1196 return ready
1197
1198
1199 def num_machines(self, clause = None):
1200 sql = "job_id=%s" % self.id
1201 if clause:
1202 sql += " AND (%s)" % clause
1203 return self.count(sql, table='afe_host_queue_entries')
1204
1205
1206 def num_queued(self):
1207 return self.num_machines('not complete')
1208
1209
1210 def num_active(self):
1211 return self.num_machines('active')
1212
1213
1214 def num_complete(self):
1215 return self.num_machines('complete')
1216
1217
1218 def is_finished(self):
1219 return self.num_complete() == self.num_machines()
1220
1221
1222 def _not_yet_run_entries(self, include_verifying=True):
1223 statuses = [models.HostQueueEntry.Status.QUEUED,
1224 models.HostQueueEntry.Status.PENDING]
1225 if include_verifying:
1226 statuses.append(models.HostQueueEntry.Status.VERIFYING)
1227 return models.HostQueueEntry.objects.filter(job=self.id,
1228 status__in=statuses)
1229
1230
1231 def _stop_all_entries(self):
1232 entries_to_stop = self._not_yet_run_entries(
1233 include_verifying=False)
1234 for child_entry in entries_to_stop:
1235 assert not child_entry.complete, (
1236 '%s status=%s, active=%s, complete=%s' %
1237 (child_entry.id, child_entry.status, child_entry.active,
1238 child_entry.complete))
1239 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1240 child_entry.host.status = models.Host.Status.READY
1241 child_entry.host.save()
1242 child_entry.status = models.HostQueueEntry.Status.STOPPED
1243 child_entry.save()
1244
1245
1246 def stop_if_necessary(self):
1247 not_yet_run = self._not_yet_run_entries()
1248 if not_yet_run.count() < self.synch_count:
1249 self._stop_all_entries()
1250
1251
jamesrenb55378a2010-03-02 22:19:49 +00001252 def _next_group_name(self, group_name=''):
1253 """@returns a directory name to use for the next host group results."""
1254 if group_name:
1255 # Sanitize for use as a pathname.
1256 group_name = group_name.replace(os.path.sep, '_')
1257 if group_name.startswith('.'):
1258 group_name = '_' + group_name[1:]
1259 # Add a separator between the group name and 'group%d'.
1260 group_name += '.'
1261 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1262 query = models.HostQueueEntry.objects.filter(
1263 job=self.id).values('execution_subdir').distinct()
1264 subdirs = (entry['execution_subdir'] for entry in query)
1265 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1266 ids = [int(match.group(1)) for match in group_matches if match]
1267 if ids:
1268 next_id = max(ids) + 1
1269 else:
1270 next_id = 0
1271 return '%sgroup%d' % (group_name, next_id)
1272
1273
1274 def get_group_entries(self, queue_entry_from_group):
1275 """
1276 @param queue_entry_from_group: A HostQueueEntry instance to find other
1277 group entries on this job for.
1278
1279 @returns A list of HostQueueEntry objects all executing this job as
1280 part of the same group as the one supplied (having the same
1281 execution_subdir).
1282 """
1283 execution_subdir = queue_entry_from_group.execution_subdir
1284 return list(HostQueueEntry.fetch(
1285 where='job_id=%s AND execution_subdir=%s',
1286 params=(self.id, execution_subdir)))
1287
1288
1289 def _should_run_cleanup(self, queue_entry):
1290 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1291 return True
1292 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1293 return queue_entry.host.dirty
1294 return False
1295
1296
1297 def _should_run_verify(self, queue_entry):
1298 do_not_verify = (queue_entry.host.protection ==
1299 host_protections.Protection.DO_NOT_VERIFY)
1300 if do_not_verify:
1301 return False
Alex Miller6ee996f2013-02-28 13:53:52 -08001302 # If RebootBefore is set to NEVER, then we won't run reset because
1303 # we can't cleanup, so we need to weaken a Reset into a Verify.
1304 weaker_reset = (self.run_reset and
1305 self.reboot_before == model_attributes.RebootBefore.NEVER)
1306 return self.run_verify or weaker_reset
jamesrenb55378a2010-03-02 22:19:49 +00001307
1308
Dan Shi07e09af2013-04-12 09:31:29 -07001309 def _should_run_reset(self, queue_entry):
1310 can_verify = (queue_entry.host.protection !=
1311 host_protections.Protection.DO_NOT_VERIFY)
1312 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
1313 return (can_reboot and can_verify and (self.run_reset or
1314 (self._should_run_cleanup(queue_entry) and
1315 self._should_run_verify(queue_entry))))
1316
1317
Alex Millerdfff2fd2013-05-28 13:05:06 -07001318 def _should_run_provision(self, queue_entry):
1319 """
1320 Determine if the queue_entry needs to have a provision task run before
1321 it to provision queue_entry.host.
1322
1323 @param queue_entry: The host queue entry in question.
1324 @returns: True if we should schedule a provision task, False otherwise.
1325
1326 """
1327 # If we get to this point, it means that the scheduler has already
1328 # vetted that all the unprovisionable labels match, so we can just
1329 # find all labels on the job that aren't on the host to get the list
1330 # of what we need to provision. (See the scheduling logic in
1331 # host_scheduler.py:is_host_eligable_for_job() where we discard all
Alex Miller627694a2014-05-01 18:04:29 -07001332 # actionable labels when assigning jobs to hosts.)
Alex Millerdfff2fd2013-05-28 13:05:06 -07001333 job_labels = {x.name for x in queue_entry.get_labels()}
1334 _, host_labels = queue_entry.host.platform_and_labels()
Alex Miller627694a2014-05-01 18:04:29 -07001335 # If there are any labels on the job that are not on the host and they
1336 # are labels that provisioning knows how to change, then that means
1337 # there is provisioning work to do. If there's no provisioning work to
1338 # do, then obviously we have no reason to schedule a provision task!
1339 diff = job_labels - set(host_labels)
1340 if any([provision.Provision.acts_on(x) for x in diff]):
Alex Millerdfff2fd2013-05-28 13:05:06 -07001341 return True
1342 return False
1343
1344
Alex Miller42437f92013-05-28 12:58:54 -07001345 def _queue_special_task(self, queue_entry, task):
jamesrenb55378a2010-03-02 22:19:49 +00001346 """
Alex Miller42437f92013-05-28 12:58:54 -07001347 Create a special task and associate it with a host queue entry.
jamesrenb55378a2010-03-02 22:19:49 +00001348
Alex Miller42437f92013-05-28 12:58:54 -07001349 @param queue_entry: The queue entry this special task should be
1350 associated with.
1351 @param task: One of the members of the enum models.SpecialTask.Task.
1352 @returns: None
1353
jamesrenb55378a2010-03-02 22:19:49 +00001354 """
jamesrenb55378a2010-03-02 22:19:49 +00001355 models.SpecialTask.objects.create(
1356 host=models.Host.objects.get(id=queue_entry.host_id),
1357 queue_entry=queue_entry, task=task)
1358
1359
Alex Miller42437f92013-05-28 12:58:54 -07001360 def schedule_pre_job_tasks(self, queue_entry):
1361 """
1362 Queue all of the special tasks that need to be run before a host
1363 queue entry may run.
1364
1365 If no special taskes need to be scheduled, then |on_pending| will be
1366 called directly.
1367
1368 @returns None
1369
1370 """
1371 task_queued = False
1372 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
1373
Dan Shi12ab5272014-12-16 15:42:07 -08001374 if self._should_run_provision(queue_entry):
1375 self._queue_special_task(hqe_model,
1376 models.SpecialTask.Task.PROVISION)
1377 task_queued = True
1378 elif self._should_run_reset(queue_entry):
Dan Shi07e09af2013-04-12 09:31:29 -07001379 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
Alex Miller42437f92013-05-28 12:58:54 -07001380 task_queued = True
Dan Shi07e09af2013-04-12 09:31:29 -07001381 else:
1382 if self._should_run_cleanup(queue_entry):
1383 self._queue_special_task(hqe_model,
1384 models.SpecialTask.Task.CLEANUP)
1385 task_queued = True
1386 if self._should_run_verify(queue_entry):
1387 self._queue_special_task(hqe_model,
1388 models.SpecialTask.Task.VERIFY)
1389 task_queued = True
Alex Miller42437f92013-05-28 12:58:54 -07001390
1391 if not task_queued:
1392 queue_entry.on_pending()
1393
1394
jamesrenb55378a2010-03-02 22:19:49 +00001395 def _assign_new_group(self, queue_entries, group_name=''):
1396 if len(queue_entries) == 1:
1397 group_subdir_name = queue_entries[0].host.hostname
1398 else:
1399 group_subdir_name = self._next_group_name(group_name)
1400 logging.info('Running synchronous job %d hosts %s as %s',
1401 self.id, [entry.host.hostname for entry in queue_entries],
1402 group_subdir_name)
1403
1404 for queue_entry in queue_entries:
1405 queue_entry.set_execution_subdir(group_subdir_name)
1406
1407
1408 def _choose_group_to_run(self, include_queue_entry):
1409 """
1410 @returns A tuple containing a list of HostQueueEntry instances to be
1411 used to run this Job, a string group name to suggest giving
1412 to this job in the results database.
1413 """
1414 atomic_group = include_queue_entry.atomic_group
1415 chosen_entries = [include_queue_entry]
1416 if atomic_group:
1417 num_entries_wanted = atomic_group.max_number_of_machines
1418 else:
1419 num_entries_wanted = self.synch_count
1420 num_entries_wanted -= len(chosen_entries)
1421
1422 if num_entries_wanted > 0:
1423 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1424 pending_entries = list(HostQueueEntry.fetch(
1425 where=where_clause,
1426 params=(self.id, include_queue_entry.id)))
1427
1428 # Sort the chosen hosts by hostname before slicing.
1429 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1430 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1431 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1432 chosen_entries += pending_entries[:num_entries_wanted]
1433
1434 # Sanity check. We'll only ever be called if this can be met.
1435 if len(chosen_entries) < self.synch_count:
1436 message = ('job %s got less than %s chosen entries: %s' % (
1437 self.id, self.synch_count, chosen_entries))
1438 logging.error(message)
1439 email_manager.manager.enqueue_notify_email(
1440 'Job not started, too few chosen entries', message)
1441 return []
1442
1443 group_name = include_queue_entry.get_group_name()
1444
1445 self._assign_new_group(chosen_entries, group_name=group_name)
1446 return chosen_entries
1447
1448
1449 def run_if_ready(self, queue_entry):
1450 """
1451 Run this job by kicking its HQEs into status='Starting' if enough
1452 hosts are ready for it to run.
1453
1454 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1455 ready to run.
1456 """
1457 if not self.is_ready():
1458 self.stop_if_necessary()
1459 elif queue_entry.atomic_group:
1460 self.run_with_ready_delay(queue_entry)
1461 else:
1462 self.run(queue_entry)
1463
1464
1465 def run_with_ready_delay(self, queue_entry):
1466 """
1467 Start a delay to wait for more hosts to enter Pending state before
1468 launching an atomic group job. Once set, the a delay cannot be reset.
1469
1470 @param queue_entry: The HostQueueEntry object to get atomic group
1471 info from and pass to run_if_ready when the delay is up.
1472
1473 @returns An Agent to run the job as appropriate or None if a delay
1474 has already been set.
1475 """
1476 assert queue_entry.job_id == self.id
1477 assert queue_entry.atomic_group
1478 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1479 over_max_threshold = (self._pending_count() >=
1480 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1481 delay_expired = (self._delay_ready_task and
1482 time.time() >= self._delay_ready_task.end_time)
1483
1484 # Delay is disabled or we already have enough? Do not wait to run.
1485 if not delay or over_max_threshold or delay_expired:
1486 self.run(queue_entry)
1487 else:
1488 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1489
1490
1491 def request_abort(self):
1492 """Request that this Job be aborted on the next scheduler cycle."""
1493 self.model().abort()
1494
1495
1496 def schedule_delayed_callback_task(self, queue_entry):
1497 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1498
1499 if self._delay_ready_task:
1500 return None
1501
1502 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1503
1504 def run_job_after_delay():
1505 logging.info('Job %s done waiting for extra hosts.', self)
1506 # Check to see if the job is still relevant. It could have aborted
1507 # while we were waiting or hosts could have disappearred, etc.
1508 if self._pending_count() < self._min_hosts_needed_to_run():
1509 logging.info('Job %s had too few Pending hosts after waiting '
1510 'for extras. Not running.', self)
1511 self.request_abort()
1512 return
1513 return self.run(queue_entry)
1514
1515 logging.info('Job %s waiting up to %s seconds for more hosts.',
1516 self.id, delay)
1517 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1518 callback=run_job_after_delay)
1519 return self._delay_ready_task
1520
1521
1522 def run(self, queue_entry):
1523 """
1524 @param queue_entry: The HostQueueEntry instance calling this method.
1525 """
1526 if queue_entry.atomic_group and self._atomic_and_has_started():
1527 logging.error('Job.run() called on running atomic Job %d '
1528 'with HQE %s.', self.id, queue_entry)
1529 return
1530 queue_entries = self._choose_group_to_run(queue_entry)
1531 if queue_entries:
1532 self._finish_run(queue_entries)
1533
1534
1535 def _finish_run(self, queue_entries):
1536 for queue_entry in queue_entries:
1537 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1538 self.abort_delay_ready_task()
1539
1540
1541 def abort_delay_ready_task(self):
1542 """Abort the delayed task associated with this job, if any."""
1543 if self._delay_ready_task:
1544 # Cancel any pending callback that would try to run again
1545 # as we are already running.
1546 self._delay_ready_task.abort()
1547
1548
1549 def __str__(self):
1550 return '%s-%s' % (self.id, self.owner)