blob: 6bb4b88c4a41e533a6875c3a447eeec6d4217aa0 [file] [log] [blame]
Aviv Keshet0b9cfc92013-02-05 11:36:02 -08001# pylint: disable-msg=C0111
2
jamesrenb55378a2010-03-02 22:19:49 +00003"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out. Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
14 one of these statuses, an email will be sent to the job's email_list.
15 comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import datetime, itertools, logging, os, re, sys, time, weakref
Michael Liangda8c60a2014-06-03 13:24:51 -070022from autotest_lib.client.common_lib import control_data
Dan Shidfea3682014-08-10 23:38:40 -070023from autotest_lib.client.common_lib import global_config, host_protections
24from autotest_lib.client.common_lib import time_utils
25from autotest_lib.client.common_lib import utils
Michael Liang500dedc2014-07-15 16:16:44 -070026from autotest_lib.client.common_lib.cros.graphite import es_utils
Michael Liangda8c60a2014-06-03 13:24:51 -070027from autotest_lib.client.common_lib.cros.graphite import stats
jamesrenb55378a2010-03-02 22:19:49 +000028from autotest_lib.frontend.afe import models, model_attributes
jamesrenb55378a2010-03-02 22:19:49 +000029from autotest_lib.scheduler import drone_manager, email_manager
beepscc9fc702013-12-02 12:45:38 -080030from autotest_lib.scheduler import rdb_lib
jamesrenb55378a2010-03-02 22:19:49 +000031from autotest_lib.scheduler import scheduler_config
Prashanth B0e960282014-05-13 19:38:28 -070032from autotest_lib.scheduler import scheduler_lib
Alex Miller627694a2014-05-01 18:04:29 -070033from autotest_lib.server.cros import provision
Michael Liangda8c60a2014-06-03 13:24:51 -070034
jamesrenb55378a2010-03-02 22:19:49 +000035
36_notify_email_statuses = []
37_base_url = None
38
39_db = None
40_drone_manager = None
41
42def initialize():
43 global _db
Prashanth B0e960282014-05-13 19:38:28 -070044 _db = scheduler_lib.ConnectionManager().get_connection()
jamesrenb55378a2010-03-02 22:19:49 +000045
46 notify_statuses_list = global_config.global_config.get_config_value(
47 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
48 default='')
49 global _notify_email_statuses
50 _notify_email_statuses = [status for status in
51 re.split(r'[\s,;:]', notify_statuses_list.lower())
52 if status]
53
54 # AUTOTEST_WEB.base_url is still a supported config option as some people
55 # may wish to override the entire url.
56 global _base_url
57 config_base_url = global_config.global_config.get_config_value(
58 scheduler_config.CONFIG_SECTION, 'base_url', default='')
59 if config_base_url:
60 _base_url = config_base_url
61 else:
62 # For the common case of everything running on a single server you
63 # can just set the hostname in a single place in the config file.
64 server_name = global_config.global_config.get_config_value(
65 'SERVER', 'hostname')
66 if not server_name:
67 logging.critical('[SERVER] hostname missing from the config file.')
68 sys.exit(1)
69 _base_url = 'http://%s/afe/' % server_name
70
71 initialize_globals()
72
73
74def initialize_globals():
75 global _drone_manager
76 _drone_manager = drone_manager.instance()
77
78
Dan Shi7cf3d842014-08-13 11:20:38 -070079def get_job_metadata(job):
80 """Get a dictionary of the job information.
81
82 The return value is a dictionary that includes job information like id,
83 name and parent job information. The value will be stored in metadata
84 database.
85
86 @param job: A Job object.
87 @return: A dictionary containing the job id, owner and name.
88 """
89 if not job:
90 logging.error('Job is None, no metadata returned.')
91 return {}
92 try:
93 return {'job_id': job.id,
94 'owner': job.owner,
95 'job_name': job.name,
96 'parent_job_id': job.parent_job_id}
97 except AttributeError as e:
98 logging.error('Job has missing attribute: %s', e)
99 return {}
100
101
jamesrenb55378a2010-03-02 22:19:49 +0000102class DelayedCallTask(object):
103 """
104 A task object like AgentTask for an Agent to run that waits for the
105 specified amount of time to have elapsed before calling the supplied
106 callback once and finishing. If the callback returns anything, it is
107 assumed to be a new Agent instance and will be added to the dispatcher.
108
109 @attribute end_time: The absolute posix time after which this task will
110 call its callback when it is polled and be finished.
111
112 Also has all attributes required by the Agent class.
113 """
114 def __init__(self, delay_seconds, callback, now_func=None):
115 """
116 @param delay_seconds: The delay in seconds from now that this task
117 will call the supplied callback and be done.
118 @param callback: A callable to be called by this task once after at
119 least delay_seconds time has elapsed. It must return None
120 or a new Agent instance.
121 @param now_func: A time.time like function. Default: time.time.
122 Used for testing.
123 """
124 assert delay_seconds > 0
125 assert callable(callback)
126 if not now_func:
127 now_func = time.time
128 self._now_func = now_func
129 self._callback = callback
130
131 self.end_time = self._now_func() + delay_seconds
132
133 # These attributes are required by Agent.
134 self.aborted = False
135 self.host_ids = ()
136 self.success = False
137 self.queue_entry_ids = ()
138 self.num_processes = 0
139
140
141 def poll(self):
142 if not self.is_done() and self._now_func() >= self.end_time:
143 self._callback()
144 self.success = True
145
146
147 def is_done(self):
148 return self.success or self.aborted
149
150
151 def abort(self):
152 self.aborted = True
153
154
155class DBError(Exception):
156 """Raised by the DBObject constructor when its select fails."""
157
158
159class DBObject(object):
160 """A miniature object relational model for the database."""
161
162 # Subclasses MUST override these:
163 _table_name = ''
164 _fields = ()
165
166 # A mapping from (type, id) to the instance of the object for that
167 # particular id. This prevents us from creating new Job() and Host()
168 # instances for every HostQueueEntry object that we instantiate as
169 # multiple HQEs often share the same Job.
170 _instances_by_type_and_id = weakref.WeakValueDictionary()
171 _initialized = False
172
173
174 def __new__(cls, id=None, **kwargs):
175 """
176 Look to see if we already have an instance for this particular type
177 and id. If so, use it instead of creating a duplicate instance.
178 """
179 if id is not None:
180 instance = cls._instances_by_type_and_id.get((cls, id))
181 if instance:
182 return instance
183 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
184
185
186 def __init__(self, id=None, row=None, new_record=False, always_query=True):
187 assert bool(id) or bool(row)
188 if id is not None and row is not None:
189 assert id == row[0]
190 assert self._table_name, '_table_name must be defined in your class'
191 assert self._fields, '_fields must be defined in your class'
192 if not new_record:
193 if self._initialized and not always_query:
194 return # We've already been initialized.
195 if id is None:
196 id = row[0]
197 # Tell future constructors to use us instead of re-querying while
198 # this instance is still around.
199 self._instances_by_type_and_id[(type(self), id)] = self
200
201 self.__table = self._table_name
202
203 self.__new_record = new_record
204
205 if row is None:
206 row = self._fetch_row_from_db(id)
207
208 if self._initialized:
209 differences = self._compare_fields_in_row(row)
210 if differences:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -0700211 logging.warning(
jamesrenb55378a2010-03-02 22:19:49 +0000212 'initialized %s %s instance requery is updating: %s',
213 type(self), self.id, differences)
214 self._update_fields_from_row(row)
215 self._initialized = True
216
217
218 @classmethod
219 def _clear_instance_cache(cls):
220 """Used for testing, clear the internal instance cache."""
221 cls._instances_by_type_and_id.clear()
222
223
224 def _fetch_row_from_db(self, row_id):
225 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
226 rows = _db.execute(sql, (row_id,))
227 if not rows:
228 raise DBError("row not found (table=%s, row id=%s)"
229 % (self.__table, row_id))
230 return rows[0]
231
232
233 def _assert_row_length(self, row):
234 assert len(row) == len(self._fields), (
235 "table = %s, row = %s/%d, fields = %s/%d" % (
236 self.__table, row, len(row), self._fields, len(self._fields)))
237
238
239 def _compare_fields_in_row(self, row):
240 """
241 Given a row as returned by a SELECT query, compare it to our existing in
242 memory fields. Fractional seconds are stripped from datetime values
243 before comparison.
244
245 @param row - A sequence of values corresponding to fields named in
246 The class attribute _fields.
247
248 @returns A dictionary listing the differences keyed by field name
249 containing tuples of (current_value, row_value).
250 """
251 self._assert_row_length(row)
252 differences = {}
jamesrenb55378a2010-03-02 22:19:49 +0000253 for field, row_value in itertools.izip(self._fields, row):
254 current_value = getattr(self, field)
255 if (isinstance(current_value, datetime.datetime)
256 and isinstance(row_value, datetime.datetime)):
Dan Shidfea3682014-08-10 23:38:40 -0700257 current_value = current_value.strftime(time_utils.TIME_FMT)
258 row_value = row_value.strftime(time_utils.TIME_FMT)
jamesrenb55378a2010-03-02 22:19:49 +0000259 if current_value != row_value:
260 differences[field] = (current_value, row_value)
261 return differences
262
263
264 def _update_fields_from_row(self, row):
265 """
266 Update our field attributes using a single row returned by SELECT.
267
268 @param row - A sequence of values corresponding to fields named in
269 the class fields list.
270 """
271 self._assert_row_length(row)
272
273 self._valid_fields = set()
274 for field, value in itertools.izip(self._fields, row):
275 setattr(self, field, value)
276 self._valid_fields.add(field)
277
278 self._valid_fields.remove('id')
279
280
281 def update_from_database(self):
282 assert self.id is not None
283 row = self._fetch_row_from_db(self.id)
284 self._update_fields_from_row(row)
285
286
287 def count(self, where, table = None):
288 if not table:
289 table = self.__table
290
291 rows = _db.execute("""
292 SELECT count(*) FROM %s
293 WHERE %s
294 """ % (table, where))
295
296 assert len(rows) == 1
297
298 return int(rows[0][0])
299
300
301 def update_field(self, field, value):
302 assert field in self._valid_fields
303
304 if getattr(self, field) == value:
305 return
306
307 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
308 _db.execute(query, (value, self.id))
309
310 setattr(self, field, value)
311
312
313 def save(self):
314 if self.__new_record:
315 keys = self._fields[1:] # avoid id
316 columns = ','.join([str(key) for key in keys])
317 values = []
318 for key in keys:
319 value = getattr(self, key)
320 if value is None:
321 values.append('NULL')
322 else:
323 values.append('"%s"' % value)
324 values_str = ','.join(values)
325 query = ('INSERT INTO %s (%s) VALUES (%s)' %
326 (self.__table, columns, values_str))
327 _db.execute(query)
328 # Update our id to the one the database just assigned to us.
329 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
330
331
332 def delete(self):
333 self._instances_by_type_and_id.pop((type(self), id), None)
334 self._initialized = False
335 self._valid_fields.clear()
336 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
337 _db.execute(query, (self.id,))
338
339
340 @staticmethod
341 def _prefix_with(string, prefix):
342 if string:
343 string = prefix + string
344 return string
345
346
347 @classmethod
348 def fetch(cls, where='', params=(), joins='', order_by=''):
349 """
350 Construct instances of our class based on the given database query.
351
352 @yields One class instance for each row fetched.
353 """
354 order_by = cls._prefix_with(order_by, 'ORDER BY ')
355 where = cls._prefix_with(where, 'WHERE ')
356 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
357 '%(where)s %(order_by)s' % {'table' : cls._table_name,
358 'joins' : joins,
359 'where' : where,
360 'order_by' : order_by})
361 rows = _db.execute(query, params)
362 return [cls(id=row[0], row=row) for row in rows]
363
364
365class IneligibleHostQueue(DBObject):
366 _table_name = 'afe_ineligible_host_queues'
367 _fields = ('id', 'job_id', 'host_id')
368
369
370class AtomicGroup(DBObject):
371 _table_name = 'afe_atomic_groups'
372 _fields = ('id', 'name', 'description', 'max_number_of_machines',
373 'invalid')
374
375
376class Label(DBObject):
377 _table_name = 'afe_labels'
378 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
379 'only_if_needed', 'atomic_group_id')
380
381
382 def __repr__(self):
383 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
384 self.name, self.id, self.atomic_group_id)
385
386
387class Host(DBObject):
388 _table_name = 'afe_hosts'
389 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
beepscc9fc702013-12-02 12:45:38 -0800390 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
Jakob Juelichde2b9a92014-09-02 15:29:28 -0700391 'leased', 'shard_id')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700392 _timer = stats.Timer("scheduler_models.Host")
jamesrenb55378a2010-03-02 22:19:49 +0000393
394
Fang Deng1d6c2a02013-04-17 15:25:45 -0700395 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000396 def set_status(self,status):
397 logging.info('%s -> %s', self.hostname, status)
398 self.update_field('status',status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700399 # Noticed some time jumps after the last log message.
400 logging.debug('Host Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000401
402
403 def platform_and_labels(self):
404 """
405 Returns a tuple (platform_name, list_of_all_label_names).
406 """
407 rows = _db.execute("""
408 SELECT afe_labels.name, afe_labels.platform
409 FROM afe_labels
410 INNER JOIN afe_hosts_labels ON
411 afe_labels.id = afe_hosts_labels.label_id
412 WHERE afe_hosts_labels.host_id = %s
413 ORDER BY afe_labels.name
414 """, (self.id,))
415 platform = None
416 all_labels = []
417 for label_name, is_platform in rows:
418 if is_platform:
419 platform = label_name
420 all_labels.append(label_name)
421 return platform, all_labels
422
423
424 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
425
426
427 @classmethod
428 def cmp_for_sort(cls, a, b):
429 """
430 A comparison function for sorting Host objects by hostname.
431
432 This strips any trailing numeric digits, ignores leading 0s and
433 compares hostnames by the leading name and the trailing digits as a
434 number. If both hostnames do not match this pattern, they are simply
435 compared as lower case strings.
436
437 Example of how hostnames will be sorted:
438
439 alice, host1, host2, host09, host010, host10, host11, yolkfolk
440
441 This hopefully satisfy most people's hostname sorting needs regardless
442 of their exact naming schemes. Nobody sane should have both a host10
443 and host010 (but the algorithm works regardless).
444 """
445 lower_a = a.hostname.lower()
446 lower_b = b.hostname.lower()
447 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
448 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
449 if match_a and match_b:
450 name_a, number_a_str = match_a.groups()
451 name_b, number_b_str = match_b.groups()
452 number_a = int(number_a_str.lstrip('0'))
453 number_b = int(number_b_str.lstrip('0'))
454 result = cmp((name_a, number_a), (name_b, number_b))
455 if result == 0 and lower_a != lower_b:
456 # If they compared equal above but the lower case names are
457 # indeed different, don't report equality. abc012 != abc12.
458 return cmp(lower_a, lower_b)
459 return result
460 else:
461 return cmp(lower_a, lower_b)
462
463
464class HostQueueEntry(DBObject):
465 _table_name = 'afe_host_queue_entries'
466 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
467 'active', 'complete', 'deleted', 'execution_subdir',
Fang Deng51599032014-06-23 17:24:27 -0700468 'atomic_group_id', 'aborted', 'started_on', 'finished_on')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700469 _timer = stats.Timer('scheduler_models.HostQueueEntry')
jamesrenb55378a2010-03-02 22:19:49 +0000470
471
472 def __init__(self, id=None, row=None, **kwargs):
473 assert id or row
474 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
475 self.job = Job(self.job_id)
476
477 if self.host_id:
beepscc9fc702013-12-02 12:45:38 -0800478 self.host = rdb_lib.get_hosts([self.host_id])[0]
479 self.host.dbg_str = self.get_dbg_str()
Dan Shi7cf3d842014-08-13 11:20:38 -0700480 self.host.metadata = get_job_metadata(self.job)
jamesrenb55378a2010-03-02 22:19:49 +0000481 else:
482 self.host = None
483
484 if self.atomic_group_id:
485 self.atomic_group = AtomicGroup(self.atomic_group_id,
486 always_query=False)
487 else:
488 self.atomic_group = None
489
jamesrenb55378a2010-03-02 22:19:49 +0000490
491 @classmethod
492 def clone(cls, template):
493 """
494 Creates a new row using the values from a template instance.
495
496 The new instance will not exist in the database or have a valid
497 id attribute until its save() method is called.
498 """
499 assert isinstance(template, cls)
500 new_row = [getattr(template, field) for field in cls._fields]
501 clone = cls(row=new_row, new_record=True)
502 clone.id = None
503 return clone
504
505
506 def _view_job_url(self):
507 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
508
509
510 def get_labels(self):
511 """
512 Get all labels associated with this host queue entry (either via the
513 meta_host or as a job dependency label). The labels yielded are not
514 guaranteed to be unique.
515
516 @yields Label instances associated with this host_queue_entry.
517 """
518 if self.meta_host:
519 yield Label(id=self.meta_host, always_query=False)
520 labels = Label.fetch(
521 joins="JOIN afe_jobs_dependency_labels AS deps "
522 "ON (afe_labels.id = deps.label_id)",
523 where="deps.job_id = %d" % self.job.id)
524 for label in labels:
525 yield label
526
527
528 def set_host(self, host):
529 if host:
530 logging.info('Assigning host %s to entry %s', host.hostname, self)
jamesrenb55378a2010-03-02 22:19:49 +0000531 self.update_field('host_id', host.id)
532 self.block_host(host.id)
533 else:
534 logging.info('Releasing host from %s', self)
jamesrenb55378a2010-03-02 22:19:49 +0000535 self.unblock_host(self.host.id)
536 self.update_field('host_id', None)
537
538 self.host = host
539
540
jamesrenb55378a2010-03-02 22:19:49 +0000541 def block_host(self, host_id):
542 logging.info("creating block %s/%s", self.job.id, host_id)
543 row = [0, self.job.id, host_id]
544 block = IneligibleHostQueue(row=row, new_record=True)
545 block.save()
546
547
548 def unblock_host(self, host_id):
549 logging.info("removing block %s/%s", self.job.id, host_id)
550 blocks = IneligibleHostQueue.fetch(
551 'job_id=%d and host_id=%d' % (self.job.id, host_id))
552 for block in blocks:
553 block.delete()
554
555
556 def set_execution_subdir(self, subdir=None):
557 if subdir is None:
558 assert self.host
559 subdir = self.host.hostname
560 self.update_field('execution_subdir', subdir)
561
562
563 def _get_hostname(self):
564 if self.host:
565 return self.host.hostname
566 return 'no host'
567
568
beepscc9fc702013-12-02 12:45:38 -0800569 def get_dbg_str(self):
570 """Get a debug string to identify this host.
571
572 @return: A string containing the hqe and job id.
573 """
574 try:
575 return 'HQE: %s, for job: %s' % (self.id, self.job_id)
576 except AttributeError as e:
577 return 'HQE has not been initialized yet: %s' % e
578
579
jamesrenb55378a2010-03-02 22:19:49 +0000580 def __str__(self):
581 flags = []
582 if self.active:
583 flags.append('active')
584 if self.complete:
585 flags.append('complete')
586 if self.deleted:
587 flags.append('deleted')
588 if self.aborted:
589 flags.append('aborted')
590 flags_str = ','.join(flags)
591 if flags_str:
592 flags_str = ' [%s]' % flags_str
beepscc9fc702013-12-02 12:45:38 -0800593 return ("%s and host: %s has status:%s%s" %
594 (self.get_dbg_str(), self._get_hostname(), self.status,
595 flags_str))
jamesrenb55378a2010-03-02 22:19:49 +0000596
597
Michael Liang0d747462014-07-17 14:19:53 -0700598 def record_state(self, type_str, state, value):
Michael Liang500dedc2014-07-15 16:16:44 -0700599 """Record metadata in elasticsearch.
600
601 If ES configured to use http, then we will time that http request.
602 Otherwise, it uses UDP, so we will not need to time it.
603
Michael Liang0d747462014-07-17 14:19:53 -0700604 @param type_str: sets the _type field in elasticsearch db.
Michael Liang500dedc2014-07-15 16:16:44 -0700605 @param state: string representing what state we are recording,
606 e.g. 'status'
607 @param value: value of the state, e.g. 'verifying'
608 """
609 metadata = {
610 'time_changed': time.time(),
611 state: value,
612 'job_id': self.job_id,
613 }
614 if self.host:
615 metadata['hostname'] = self.host.hostname
Michael Liang0d747462014-07-17 14:19:53 -0700616 es_utils.ESMetadata().post(type_str=type_str, metadata=metadata)
Michael Liang500dedc2014-07-15 16:16:44 -0700617
618
Fang Deng1d6c2a02013-04-17 15:25:45 -0700619 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000620 def set_status(self, status):
621 logging.info("%s -> %s", self, status)
622
623 self.update_field('status', status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700624 # Noticed some time jumps after last logging message.
625 logging.debug('Update Field Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000626
627 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
628 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
629 assert not (active and complete)
630
631 self.update_field('active', active)
632 self.update_field('complete', complete)
633
634 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000635 self._on_complete(status)
Jakob Juelich26ef4262014-09-17 15:24:15 -0700636 if self.job.shard_id is not None:
637 # If shard_id is None, the job will be synced back to the master
638 self.job.update_field('shard_id', None)
Dale Curtis74a314b2011-06-23 14:55:46 -0700639 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000640
641 should_email_status = (status.lower() in _notify_email_statuses or
642 'all' in _notify_email_statuses)
643 if should_email_status:
644 self._email_on_status(status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700645 logging.debug('HQE Set Status Complete')
Michael Liang500dedc2014-07-15 16:16:44 -0700646 self.record_state('hqe_status', 'status', status)
647
jamesrenb55378a2010-03-02 22:19:49 +0000648
649
jamesrene7c65cb2010-06-08 20:38:10 +0000650 def _on_complete(self, status):
651 if status is not models.HostQueueEntry.Status.ABORTED:
652 self.job.stop_if_necessary()
653
Fang Dengd44a1232014-08-18 14:40:28 -0700654 if self.started_on:
655 self.set_finished_on_now()
jamesrenb55378a2010-03-02 22:19:49 +0000656 if not self.execution_subdir:
657 return
658 # unregister any possible pidfiles associated with this queue entry
659 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
660 pidfile_id = _drone_manager.get_pidfile_id_from(
661 self.execution_path(), pidfile_name=pidfile_name)
662 _drone_manager.unregister_pidfile(pidfile_id)
663
664
Eric Li6f27d4f2010-09-29 10:55:17 -0700665 def _get_status_email_contents(self, status, summary=None, hostname=None):
666 """
667 Gather info for the status notification e-mails.
668
669 If needed, we could start using the Django templating engine to create
670 the subject and the e-mail body, but that doesn't seem necessary right
671 now.
672
673 @param status: Job status text. Mandatory.
674 @param summary: Job summary text. Optional.
675 @param hostname: A hostname for the job. Optional.
676
677 @return: Tuple (subject, body) for the notification e-mail.
678 """
679 job_stats = Job(id=self.job.id).get_execution_details()
680
681 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
682 (self.job.id, self.job.name, status))
683
684 if hostname is not None:
685 subject += '| Hostname: %s ' % hostname
686
687 if status not in ["1 Failed", "Failed"]:
688 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
689
690 body = "Job ID: %s\n" % self.job.id
691 body += "Job name: %s\n" % self.job.name
692 if hostname is not None:
693 body += "Host: %s\n" % hostname
694 if summary is not None:
695 body += "Summary: %s\n" % summary
696 body += "Status: %s\n" % status
697 body += "Results interface URL: %s\n" % self._view_job_url()
698 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
699 if int(job_stats['total_executed']) > 0:
700 body += "User tests executed: %s\n" % job_stats['total_executed']
701 body += "User tests passed: %s\n" % job_stats['total_passed']
702 body += "User tests failed: %s\n" % job_stats['total_failed']
703 body += ("User tests success rate: %.2f %%\n" %
704 job_stats['success_rate'])
705
706 if job_stats['failed_rows']:
707 body += "Failures:\n"
708 body += job_stats['failed_rows']
709
710 return subject, body
711
712
jamesrenb55378a2010-03-02 22:19:49 +0000713 def _email_on_status(self, status):
714 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700715 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000716 email_manager.manager.send_email(self.job.email_list, subject, body)
717
718
719 def _email_on_job_complete(self):
720 if not self.job.is_finished():
721 return
722
Eric Li6f27d4f2010-09-29 10:55:17 -0700723 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000724 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
725 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700726 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000727 (queue_entry._get_hostname(),
728 queue_entry.status))
729
Eric Li6f27d4f2010-09-29 10:55:17 -0700730 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000731 status_counts = models.Job.objects.get_status_counts(
732 [self.job.id])[self.job.id]
733 status = ', '.join('%d %s' % (count, status) for status, count
734 in status_counts.iteritems())
735
Eric Li6f27d4f2010-09-29 10:55:17 -0700736 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000737 email_manager.manager.send_email(self.job.email_list, subject, body)
738
739
740 def schedule_pre_job_tasks(self):
741 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
742 self.job.name, self.meta_host, self.atomic_group_id,
743 self.job.id, self.id, self.host.hostname, self.status)
744
745 self._do_schedule_pre_job_tasks()
746
747
748 def _do_schedule_pre_job_tasks(self):
jamesrenb55378a2010-03-02 22:19:49 +0000749 self.job.schedule_pre_job_tasks(queue_entry=self)
750
751
752 def requeue(self):
753 assert self.host
754 self.set_status(models.HostQueueEntry.Status.QUEUED)
755 self.update_field('started_on', None)
Fang Deng51599032014-06-23 17:24:27 -0700756 self.update_field('finished_on', None)
jamesrenb55378a2010-03-02 22:19:49 +0000757 # verify/cleanup failure sets the execution subdir, so reset it here
758 self.set_execution_subdir('')
759 if self.meta_host:
760 self.set_host(None)
761
762
763 @property
764 def aborted_by(self):
765 self._load_abort_info()
766 return self._aborted_by
767
768
769 @property
770 def aborted_on(self):
771 self._load_abort_info()
772 return self._aborted_on
773
774
775 def _load_abort_info(self):
776 """ Fetch info about who aborted the job. """
777 if hasattr(self, "_aborted_by"):
778 return
779 rows = _db.execute("""
780 SELECT afe_users.login,
781 afe_aborted_host_queue_entries.aborted_on
782 FROM afe_aborted_host_queue_entries
783 INNER JOIN afe_users
784 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
785 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
786 """, (self.id,))
787 if rows:
788 self._aborted_by, self._aborted_on = rows[0]
789 else:
790 self._aborted_by = self._aborted_on = None
791
792
793 def on_pending(self):
794 """
795 Called when an entry in a synchronous job has passed verify. If the
796 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
797 them in PENDING.
798 """
799 self.set_status(models.HostQueueEntry.Status.PENDING)
800 self.host.set_status(models.Host.Status.PENDING)
801
802 # Some debug code here: sends an email if an asynchronous job does not
803 # immediately enter Starting.
804 # TODO: Remove this once we figure out why asynchronous jobs are getting
805 # stuck in Pending.
806 self.job.run_if_ready(queue_entry=self)
807 if (self.job.synch_count == 1 and
808 self.status == models.HostQueueEntry.Status.PENDING):
809 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
810 message = 'Asynchronous job stuck in Pending'
811 email_manager.manager.enqueue_notify_email(subject, message)
812
813
814 def abort(self, dispatcher):
815 assert self.aborted and not self.complete
816
817 Status = models.HostQueueEntry.Status
818 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
819 # do nothing; post-job tasks will finish and then mark this entry
820 # with status "Aborted" and take care of the host
821 return
822
jamesren3bc70a12010-04-12 18:23:38 +0000823 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
824 Status.WAITING):
Dan Shi76af8022013-10-19 01:59:49 -0700825 # If hqe is in any of these status, it should not have any
826 # unfinished agent before it can be aborted.
827 agents = dispatcher.get_agents_for_entry(self)
828 # Agent with finished task can be left behind. This is added to
829 # handle the special case of aborting hostless job in STARTING
830 # status, in which the agent has only a HostlessQueueTask
831 # associated. The finished HostlessQueueTask will be cleaned up in
832 # the next tick, so it's safe to leave the agent there. Without
833 # filtering out finished agent, HQE abort won't be able to proceed.
834 assert all([agent.is_done() for agent in agents])
835 # If hqe is still in STARTING status, it may not have assigned a
836 # host yet.
837 if self.host:
838 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -0700839 elif (self.status == Status.VERIFYING or
beepse50d8752013-11-20 18:23:02 -0800840 self.status == Status.RESETTING):
jamesrenb55378a2010-03-02 22:19:49 +0000841 models.SpecialTask.objects.create(
842 task=models.SpecialTask.Task.CLEANUP,
843 host=models.Host.objects.get(id=self.host.id),
844 requested_by=self.job.owner_model())
beepse50d8752013-11-20 18:23:02 -0800845 elif self.status == Status.PROVISIONING:
846 models.SpecialTask.objects.create(
847 task=models.SpecialTask.Task.REPAIR,
848 host=models.Host.objects.get(id=self.host.id),
849 requested_by=self.job.owner_model())
jamesrenb55378a2010-03-02 22:19:49 +0000850
851 self.set_status(Status.ABORTED)
852 self.job.abort_delay_ready_task()
853
854
855 def get_group_name(self):
856 atomic_group = self.atomic_group
857 if not atomic_group:
858 return ''
859
860 # Look at any meta_host and dependency labels and pick the first
861 # one that also specifies this atomic group. Use that label name
862 # as the group name if possible (it is more specific).
863 for label in self.get_labels():
864 if label.atomic_group_id:
865 assert label.atomic_group_id == atomic_group.id
866 return label.name
867 return atomic_group.name
868
869
870 def execution_tag(self):
Scott Zawalskid712cf32012-07-14 16:24:53 -0400871 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
872 'complete!=1 AND execution_subdir="" AND '
873 'status!="Queued";')
874 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
875 'status="Aborted" WHERE id=%s;')
876 try:
877 assert self.execution_subdir
878 except AssertionError:
879 # TODO(scottz): Remove temporary fix/info gathering pathway for
880 # crosbug.com/31595 once issue is root caused.
881 logging.error('No execution_subdir for host queue id:%s.', self.id)
882 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
883 for row in _db.execute(SQL_SUSPECT_ENTRIES):
Dan Shi76af8022013-10-19 01:59:49 -0700884 logging.error(row)
Scott Zawalskid712cf32012-07-14 16:24:53 -0400885 logging.error('====DB DEBUG====\n')
886 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
887 logging.error('EXECUTING: %s', fix_query)
888 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
889 raise AssertionError(('self.execution_subdir not found. '
890 'See log for details.'))
891
jamesrenb55378a2010-03-02 22:19:49 +0000892 return "%s/%s" % (self.job.tag(), self.execution_subdir)
893
894
895 def execution_path(self):
896 return self.execution_tag()
897
898
899 def set_started_on_now(self):
900 self.update_field('started_on', datetime.datetime.now())
901
902
Fang Deng51599032014-06-23 17:24:27 -0700903 def set_finished_on_now(self):
904 self.update_field('finished_on', datetime.datetime.now())
905
906
jamesrenb55378a2010-03-02 22:19:49 +0000907 def is_hostless(self):
908 return (self.host_id is None
909 and self.meta_host is None
910 and self.atomic_group_id is None)
911
912
913class Job(DBObject):
914 _table_name = 'afe_jobs'
915 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
916 'control_type', 'created_on', 'synch_count', 'timeout',
917 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
Simran Basi9f508a12012-11-09 12:20:44 -0800918 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800919 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
Jakob Jülich92c06332014-08-25 19:06:57 +0000920 'test_retry', 'run_reset', 'timeout_mins', 'shard_id')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700921 _timer = stats.Timer("scheduler_models.Job")
jamesrenb55378a2010-03-02 22:19:49 +0000922
923 # This does not need to be a column in the DB. The delays are likely to
924 # be configured short. If the scheduler is stopped and restarted in
925 # the middle of a job's delay cycle, the delay cycle will either be
926 # repeated or skipped depending on the number of Pending machines found
927 # when the restarted scheduler recovers to track it. Not a problem.
928 #
929 # A reference to the DelayedCallTask that will wake up the job should
930 # no other HQEs change state in time. Its end_time attribute is used
931 # by our run_with_ready_delay() method to determine if the wait is over.
932 _delay_ready_task = None
933
934 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
935 # all status='Pending' atomic group HQEs incase a delay was running when the
936 # scheduler was restarted and no more hosts ever successfully exit Verify.
937
938 def __init__(self, id=None, row=None, **kwargs):
939 assert id or row
940 super(Job, self).__init__(id=id, row=row, **kwargs)
941 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800942 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000943
944
945 def model(self):
946 return models.Job.objects.get(id=self.id)
947
948
949 def owner_model(self):
950 # work around the fact that the Job owner field is a string, not a
951 # foreign key
952 if not self._owner_model:
953 self._owner_model = models.User.objects.get(login=self.owner)
954 return self._owner_model
955
956
957 def is_server_job(self):
Aviv Keshet82352b22013-05-14 18:30:56 -0700958 return self.control_type == control_data.CONTROL_TYPE.SERVER
jamesrenb55378a2010-03-02 22:19:49 +0000959
960
961 def tag(self):
962 return "%s-%s" % (self.id, self.owner)
963
964
965 def get_host_queue_entries(self):
966 rows = _db.execute("""
967 SELECT * FROM afe_host_queue_entries
968 WHERE job_id= %s
969 """, (self.id,))
970 entries = [HostQueueEntry(row=i) for i in rows]
971
972 assert len(entries)>0
973
974 return entries
975
976
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800977 def is_image_update_job(self):
978 """
979 Discover if the current job requires an OS update.
980
981 @return: True/False if OS should be updated before job is run.
982 """
983 # All image update jobs have the parameterized_job_id set.
984 if not self.parameterized_job_id:
985 return False
986
987 # Retrieve the ID of the ParameterizedJob this job is an instance of.
988 rows = _db.execute("""
989 SELECT test_id
990 FROM afe_parameterized_jobs
991 WHERE id = %s
992 """, (self.parameterized_job_id,))
993 if not rows:
994 return False
995 test_id = rows[0][0]
996
997 # Retrieve the ID of the known autoupdate_ParameterizedJob.
998 rows = _db.execute("""
999 SELECT id
1000 FROM afe_autotests
1001 WHERE name = 'autoupdate_ParameterizedJob'
1002 """)
1003 if not rows:
1004 return False
1005 update_id = rows[0][0]
1006
1007 # If the IDs are the same we've found an image update job.
1008 if test_id == update_id:
1009 # Finally, get the path to the OS image to install.
1010 rows = _db.execute("""
1011 SELECT parameter_value
1012 FROM afe_parameterized_job_parameters
1013 WHERE parameterized_job_id = %s
1014 """, (self.parameterized_job_id,))
1015 if rows:
1016 # Save the path in update_image_path to use later as a command
1017 # line parameter to autoserv.
1018 self.update_image_path = rows[0][0]
1019 return True
1020
1021 return False
1022
1023
Eric Li6f27d4f2010-09-29 10:55:17 -07001024 def get_execution_details(self):
1025 """
1026 Get test execution details for this job.
1027
1028 @return: Dictionary with test execution details
1029 """
1030 def _find_test_jobs(rows):
1031 """
1032 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
1033 Those are autotest 'internal job' tests, so they should not be
1034 counted when evaluating the test stats.
1035
1036 @param rows: List of rows (matrix) with database results.
1037 """
1038 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
1039 n_test_jobs = 0
1040 for r in rows:
1041 test_name = r[0]
1042 if job_test_pattern.match(test_name):
1043 n_test_jobs += 1
1044
1045 return n_test_jobs
1046
1047 stats = {}
1048
1049 rows = _db.execute("""
1050 SELECT t.test, s.word, t.reason
1051 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
1052 WHERE t.job_idx = j.job_idx
1053 AND s.status_idx = t.status
1054 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -08001055 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -07001056 """ % self.id)
1057
Dale Curtis74a314b2011-06-23 14:55:46 -07001058 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -07001059
1060 n_test_jobs = _find_test_jobs(rows)
1061 n_test_jobs_failed = _find_test_jobs(failed_rows)
1062
1063 total_executed = len(rows) - n_test_jobs
1064 total_failed = len(failed_rows) - n_test_jobs_failed
1065
1066 if total_executed > 0:
1067 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
1068 else:
1069 success_rate = 0
1070
1071 stats['total_executed'] = total_executed
1072 stats['total_failed'] = total_failed
1073 stats['total_passed'] = total_executed - total_failed
1074 stats['success_rate'] = success_rate
1075
1076 status_header = ("Test Name", "Status", "Reason")
1077 if failed_rows:
1078 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
1079 status_header)
1080 else:
1081 stats['failed_rows'] = ''
1082
1083 time_row = _db.execute("""
1084 SELECT started_time, finished_time
1085 FROM tko_jobs
1086 WHERE afe_job_id = %s
1087 """ % self.id)
1088
1089 if time_row:
1090 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -08001091 try:
1092 delta = t_end - t_begin
1093 minutes, seconds = divmod(delta.seconds, 60)
1094 hours, minutes = divmod(minutes, 60)
1095 stats['execution_time'] = ("%02d:%02d:%02d" %
1096 (hours, minutes, seconds))
1097 # One of t_end or t_begin are None
1098 except TypeError:
1099 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -07001100 else:
1101 stats['execution_time'] = '(none)'
1102
1103 return stats
1104
1105
Fang Deng1d6c2a02013-04-17 15:25:45 -07001106 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +00001107 def set_status(self, status, update_queues=False):
1108 self.update_field('status',status)
1109
1110 if update_queues:
1111 for queue_entry in self.get_host_queue_entries():
1112 queue_entry.set_status(status)
1113
1114
1115 def keyval_dict(self):
1116 return self.model().keyval_dict()
1117
1118
1119 def _atomic_and_has_started(self):
1120 """
1121 @returns True if any of the HostQueueEntries associated with this job
1122 have entered the Status.STARTING state or beyond.
1123 """
1124 atomic_entries = models.HostQueueEntry.objects.filter(
1125 job=self.id, atomic_group__isnull=False)
1126 if atomic_entries.count() <= 0:
1127 return False
1128
1129 # These states may *only* be reached if Job.run() has been called.
1130 started_statuses = (models.HostQueueEntry.Status.STARTING,
1131 models.HostQueueEntry.Status.RUNNING,
1132 models.HostQueueEntry.Status.COMPLETED)
1133
1134 started_entries = atomic_entries.filter(status__in=started_statuses)
1135 return started_entries.count() > 0
1136
1137
1138 def _hosts_assigned_count(self):
1139 """The number of HostQueueEntries assigned a Host for this job."""
1140 entries = models.HostQueueEntry.objects.filter(job=self.id,
1141 host__isnull=False)
1142 return entries.count()
1143
1144
1145 def _pending_count(self):
1146 """The number of HostQueueEntries for this job in the Pending state."""
1147 pending_entries = models.HostQueueEntry.objects.filter(
1148 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1149 return pending_entries.count()
1150
1151
1152 def _max_hosts_needed_to_run(self, atomic_group):
1153 """
1154 @param atomic_group: The AtomicGroup associated with this job that we
1155 are using to set an upper bound on the threshold.
1156 @returns The maximum number of HostQueueEntries assigned a Host before
1157 this job can run.
1158 """
1159 return min(self._hosts_assigned_count(),
1160 atomic_group.max_number_of_machines)
1161
1162
1163 def _min_hosts_needed_to_run(self):
1164 """Return the minumum number of hsots needed to run this job."""
1165 return self.synch_count
1166
1167
1168 def is_ready(self):
1169 # NOTE: Atomic group jobs stop reporting ready after they have been
1170 # started to avoid launching multiple copies of one atomic job.
1171 # Only possible if synch_count is less than than half the number of
1172 # machines in the atomic group.
1173 pending_count = self._pending_count()
1174 atomic_and_has_started = self._atomic_and_has_started()
1175 ready = (pending_count >= self.synch_count
1176 and not atomic_and_has_started)
1177
1178 if not ready:
1179 logging.info(
1180 'Job %s not ready: %s pending, %s required '
1181 '(Atomic and started: %s)',
1182 self, pending_count, self.synch_count,
1183 atomic_and_has_started)
1184
1185 return ready
1186
1187
1188 def num_machines(self, clause = None):
1189 sql = "job_id=%s" % self.id
1190 if clause:
1191 sql += " AND (%s)" % clause
1192 return self.count(sql, table='afe_host_queue_entries')
1193
1194
1195 def num_queued(self):
1196 return self.num_machines('not complete')
1197
1198
1199 def num_active(self):
1200 return self.num_machines('active')
1201
1202
1203 def num_complete(self):
1204 return self.num_machines('complete')
1205
1206
1207 def is_finished(self):
1208 return self.num_complete() == self.num_machines()
1209
1210
1211 def _not_yet_run_entries(self, include_verifying=True):
1212 statuses = [models.HostQueueEntry.Status.QUEUED,
1213 models.HostQueueEntry.Status.PENDING]
1214 if include_verifying:
1215 statuses.append(models.HostQueueEntry.Status.VERIFYING)
1216 return models.HostQueueEntry.objects.filter(job=self.id,
1217 status__in=statuses)
1218
1219
1220 def _stop_all_entries(self):
1221 entries_to_stop = self._not_yet_run_entries(
1222 include_verifying=False)
1223 for child_entry in entries_to_stop:
1224 assert not child_entry.complete, (
1225 '%s status=%s, active=%s, complete=%s' %
1226 (child_entry.id, child_entry.status, child_entry.active,
1227 child_entry.complete))
1228 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1229 child_entry.host.status = models.Host.Status.READY
1230 child_entry.host.save()
1231 child_entry.status = models.HostQueueEntry.Status.STOPPED
1232 child_entry.save()
1233
1234
1235 def stop_if_necessary(self):
1236 not_yet_run = self._not_yet_run_entries()
1237 if not_yet_run.count() < self.synch_count:
1238 self._stop_all_entries()
1239
1240
jamesrenb55378a2010-03-02 22:19:49 +00001241 def _next_group_name(self, group_name=''):
1242 """@returns a directory name to use for the next host group results."""
1243 if group_name:
1244 # Sanitize for use as a pathname.
1245 group_name = group_name.replace(os.path.sep, '_')
1246 if group_name.startswith('.'):
1247 group_name = '_' + group_name[1:]
1248 # Add a separator between the group name and 'group%d'.
1249 group_name += '.'
1250 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1251 query = models.HostQueueEntry.objects.filter(
1252 job=self.id).values('execution_subdir').distinct()
1253 subdirs = (entry['execution_subdir'] for entry in query)
1254 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1255 ids = [int(match.group(1)) for match in group_matches if match]
1256 if ids:
1257 next_id = max(ids) + 1
1258 else:
1259 next_id = 0
1260 return '%sgroup%d' % (group_name, next_id)
1261
1262
1263 def get_group_entries(self, queue_entry_from_group):
1264 """
1265 @param queue_entry_from_group: A HostQueueEntry instance to find other
1266 group entries on this job for.
1267
1268 @returns A list of HostQueueEntry objects all executing this job as
1269 part of the same group as the one supplied (having the same
1270 execution_subdir).
1271 """
1272 execution_subdir = queue_entry_from_group.execution_subdir
1273 return list(HostQueueEntry.fetch(
1274 where='job_id=%s AND execution_subdir=%s',
1275 params=(self.id, execution_subdir)))
1276
1277
1278 def _should_run_cleanup(self, queue_entry):
1279 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1280 return True
1281 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1282 return queue_entry.host.dirty
1283 return False
1284
1285
1286 def _should_run_verify(self, queue_entry):
1287 do_not_verify = (queue_entry.host.protection ==
1288 host_protections.Protection.DO_NOT_VERIFY)
1289 if do_not_verify:
1290 return False
Alex Miller6ee996f2013-02-28 13:53:52 -08001291 # If RebootBefore is set to NEVER, then we won't run reset because
1292 # we can't cleanup, so we need to weaken a Reset into a Verify.
1293 weaker_reset = (self.run_reset and
1294 self.reboot_before == model_attributes.RebootBefore.NEVER)
1295 return self.run_verify or weaker_reset
jamesrenb55378a2010-03-02 22:19:49 +00001296
1297
Dan Shi07e09af2013-04-12 09:31:29 -07001298 def _should_run_reset(self, queue_entry):
1299 can_verify = (queue_entry.host.protection !=
1300 host_protections.Protection.DO_NOT_VERIFY)
1301 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
1302 return (can_reboot and can_verify and (self.run_reset or
1303 (self._should_run_cleanup(queue_entry) and
1304 self._should_run_verify(queue_entry))))
1305
1306
Alex Millerdfff2fd2013-05-28 13:05:06 -07001307 def _should_run_provision(self, queue_entry):
1308 """
1309 Determine if the queue_entry needs to have a provision task run before
1310 it to provision queue_entry.host.
1311
1312 @param queue_entry: The host queue entry in question.
1313 @returns: True if we should schedule a provision task, False otherwise.
1314
1315 """
1316 # If we get to this point, it means that the scheduler has already
1317 # vetted that all the unprovisionable labels match, so we can just
1318 # find all labels on the job that aren't on the host to get the list
1319 # of what we need to provision. (See the scheduling logic in
1320 # host_scheduler.py:is_host_eligable_for_job() where we discard all
Alex Miller627694a2014-05-01 18:04:29 -07001321 # actionable labels when assigning jobs to hosts.)
Alex Millerdfff2fd2013-05-28 13:05:06 -07001322 job_labels = {x.name for x in queue_entry.get_labels()}
1323 _, host_labels = queue_entry.host.platform_and_labels()
Alex Miller627694a2014-05-01 18:04:29 -07001324 # If there are any labels on the job that are not on the host and they
1325 # are labels that provisioning knows how to change, then that means
1326 # there is provisioning work to do. If there's no provisioning work to
1327 # do, then obviously we have no reason to schedule a provision task!
1328 diff = job_labels - set(host_labels)
1329 if any([provision.Provision.acts_on(x) for x in diff]):
Alex Millerdfff2fd2013-05-28 13:05:06 -07001330 return True
1331 return False
1332
1333
Alex Miller42437f92013-05-28 12:58:54 -07001334 def _queue_special_task(self, queue_entry, task):
jamesrenb55378a2010-03-02 22:19:49 +00001335 """
Alex Miller42437f92013-05-28 12:58:54 -07001336 Create a special task and associate it with a host queue entry.
jamesrenb55378a2010-03-02 22:19:49 +00001337
Alex Miller42437f92013-05-28 12:58:54 -07001338 @param queue_entry: The queue entry this special task should be
1339 associated with.
1340 @param task: One of the members of the enum models.SpecialTask.Task.
1341 @returns: None
1342
jamesrenb55378a2010-03-02 22:19:49 +00001343 """
jamesrenb55378a2010-03-02 22:19:49 +00001344 models.SpecialTask.objects.create(
1345 host=models.Host.objects.get(id=queue_entry.host_id),
1346 queue_entry=queue_entry, task=task)
1347
1348
Alex Miller42437f92013-05-28 12:58:54 -07001349 def schedule_pre_job_tasks(self, queue_entry):
1350 """
1351 Queue all of the special tasks that need to be run before a host
1352 queue entry may run.
1353
1354 If no special taskes need to be scheduled, then |on_pending| will be
1355 called directly.
1356
1357 @returns None
1358
1359 """
1360 task_queued = False
1361 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
1362
Dan Shi07e09af2013-04-12 09:31:29 -07001363 if self._should_run_reset(queue_entry):
1364 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
Alex Miller42437f92013-05-28 12:58:54 -07001365 task_queued = True
Dan Shi07e09af2013-04-12 09:31:29 -07001366 else:
1367 if self._should_run_cleanup(queue_entry):
1368 self._queue_special_task(hqe_model,
1369 models.SpecialTask.Task.CLEANUP)
1370 task_queued = True
1371 if self._should_run_verify(queue_entry):
1372 self._queue_special_task(hqe_model,
1373 models.SpecialTask.Task.VERIFY)
1374 task_queued = True
Alex Miller42437f92013-05-28 12:58:54 -07001375
Alex Millerdfff2fd2013-05-28 13:05:06 -07001376 if self._should_run_provision(queue_entry):
1377 self._queue_special_task(hqe_model,
1378 models.SpecialTask.Task.PROVISION)
1379 task_queued = True
1380
Alex Miller42437f92013-05-28 12:58:54 -07001381 if not task_queued:
1382 queue_entry.on_pending()
1383
1384
jamesrenb55378a2010-03-02 22:19:49 +00001385 def _assign_new_group(self, queue_entries, group_name=''):
1386 if len(queue_entries) == 1:
1387 group_subdir_name = queue_entries[0].host.hostname
1388 else:
1389 group_subdir_name = self._next_group_name(group_name)
1390 logging.info('Running synchronous job %d hosts %s as %s',
1391 self.id, [entry.host.hostname for entry in queue_entries],
1392 group_subdir_name)
1393
1394 for queue_entry in queue_entries:
1395 queue_entry.set_execution_subdir(group_subdir_name)
1396
1397
1398 def _choose_group_to_run(self, include_queue_entry):
1399 """
1400 @returns A tuple containing a list of HostQueueEntry instances to be
1401 used to run this Job, a string group name to suggest giving
1402 to this job in the results database.
1403 """
1404 atomic_group = include_queue_entry.atomic_group
1405 chosen_entries = [include_queue_entry]
1406 if atomic_group:
1407 num_entries_wanted = atomic_group.max_number_of_machines
1408 else:
1409 num_entries_wanted = self.synch_count
1410 num_entries_wanted -= len(chosen_entries)
1411
1412 if num_entries_wanted > 0:
1413 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1414 pending_entries = list(HostQueueEntry.fetch(
1415 where=where_clause,
1416 params=(self.id, include_queue_entry.id)))
1417
1418 # Sort the chosen hosts by hostname before slicing.
1419 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1420 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1421 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1422 chosen_entries += pending_entries[:num_entries_wanted]
1423
1424 # Sanity check. We'll only ever be called if this can be met.
1425 if len(chosen_entries) < self.synch_count:
1426 message = ('job %s got less than %s chosen entries: %s' % (
1427 self.id, self.synch_count, chosen_entries))
1428 logging.error(message)
1429 email_manager.manager.enqueue_notify_email(
1430 'Job not started, too few chosen entries', message)
1431 return []
1432
1433 group_name = include_queue_entry.get_group_name()
1434
1435 self._assign_new_group(chosen_entries, group_name=group_name)
1436 return chosen_entries
1437
1438
1439 def run_if_ready(self, queue_entry):
1440 """
1441 Run this job by kicking its HQEs into status='Starting' if enough
1442 hosts are ready for it to run.
1443
1444 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1445 ready to run.
1446 """
1447 if not self.is_ready():
1448 self.stop_if_necessary()
1449 elif queue_entry.atomic_group:
1450 self.run_with_ready_delay(queue_entry)
1451 else:
1452 self.run(queue_entry)
1453
1454
1455 def run_with_ready_delay(self, queue_entry):
1456 """
1457 Start a delay to wait for more hosts to enter Pending state before
1458 launching an atomic group job. Once set, the a delay cannot be reset.
1459
1460 @param queue_entry: The HostQueueEntry object to get atomic group
1461 info from and pass to run_if_ready when the delay is up.
1462
1463 @returns An Agent to run the job as appropriate or None if a delay
1464 has already been set.
1465 """
1466 assert queue_entry.job_id == self.id
1467 assert queue_entry.atomic_group
1468 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1469 over_max_threshold = (self._pending_count() >=
1470 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1471 delay_expired = (self._delay_ready_task and
1472 time.time() >= self._delay_ready_task.end_time)
1473
1474 # Delay is disabled or we already have enough? Do not wait to run.
1475 if not delay or over_max_threshold or delay_expired:
1476 self.run(queue_entry)
1477 else:
1478 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1479
1480
1481 def request_abort(self):
1482 """Request that this Job be aborted on the next scheduler cycle."""
1483 self.model().abort()
1484
1485
1486 def schedule_delayed_callback_task(self, queue_entry):
1487 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1488
1489 if self._delay_ready_task:
1490 return None
1491
1492 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1493
1494 def run_job_after_delay():
1495 logging.info('Job %s done waiting for extra hosts.', self)
1496 # Check to see if the job is still relevant. It could have aborted
1497 # while we were waiting or hosts could have disappearred, etc.
1498 if self._pending_count() < self._min_hosts_needed_to_run():
1499 logging.info('Job %s had too few Pending hosts after waiting '
1500 'for extras. Not running.', self)
1501 self.request_abort()
1502 return
1503 return self.run(queue_entry)
1504
1505 logging.info('Job %s waiting up to %s seconds for more hosts.',
1506 self.id, delay)
1507 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1508 callback=run_job_after_delay)
1509 return self._delay_ready_task
1510
1511
1512 def run(self, queue_entry):
1513 """
1514 @param queue_entry: The HostQueueEntry instance calling this method.
1515 """
1516 if queue_entry.atomic_group and self._atomic_and_has_started():
1517 logging.error('Job.run() called on running atomic Job %d '
1518 'with HQE %s.', self.id, queue_entry)
1519 return
1520 queue_entries = self._choose_group_to_run(queue_entry)
1521 if queue_entries:
1522 self._finish_run(queue_entries)
1523
1524
1525 def _finish_run(self, queue_entries):
1526 for queue_entry in queue_entries:
1527 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1528 self.abort_delay_ready_task()
1529
1530
1531 def abort_delay_ready_task(self):
1532 """Abort the delayed task associated with this job, if any."""
1533 if self._delay_ready_task:
1534 # Cancel any pending callback that would try to run again
1535 # as we are already running.
1536 self._delay_ready_task.abort()
1537
1538
1539 def __str__(self):
1540 return '%s-%s' % (self.id, self.owner)