blob: 9c1288ae68229f0e95d514900f226cf19baa76c6 [file] [log] [blame]
Aviv Keshet0b9cfc92013-02-05 11:36:02 -08001# pylint: disable-msg=C0111
2
jamesrenb55378a2010-03-02 22:19:49 +00003"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out. Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
14 one of these statuses, an email will be sent to the job's email_list.
15 comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import datetime, itertools, logging, os, re, sys, time, weakref
jamesrenb55378a2010-03-02 22:19:49 +000022from autotest_lib.client.common_lib import global_config, host_protections
Eric Li6f27d4f2010-09-29 10:55:17 -070023from autotest_lib.client.common_lib import global_config, utils
jamesrenb55378a2010-03-02 22:19:49 +000024from autotest_lib.frontend.afe import models, model_attributes
25from autotest_lib.database import database_connection
26from autotest_lib.scheduler import drone_manager, email_manager
27from autotest_lib.scheduler import scheduler_config
Fang Deng1d6c2a02013-04-17 15:25:45 -070028from autotest_lib.site_utils.graphite import stats
Aviv Keshet888a94d2013-05-14 17:52:33 -070029from autotest_lib.client.common_lib import control_data
jamesrenb55378a2010-03-02 22:19:49 +000030
31_notify_email_statuses = []
32_base_url = None
33
34_db = None
35_drone_manager = None
36
37def initialize():
38 global _db
39 _db = database_connection.DatabaseConnection('AUTOTEST_WEB')
40 _db.connect(db_type='django')
41
42 notify_statuses_list = global_config.global_config.get_config_value(
43 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
44 default='')
45 global _notify_email_statuses
46 _notify_email_statuses = [status for status in
47 re.split(r'[\s,;:]', notify_statuses_list.lower())
48 if status]
49
50 # AUTOTEST_WEB.base_url is still a supported config option as some people
51 # may wish to override the entire url.
52 global _base_url
53 config_base_url = global_config.global_config.get_config_value(
54 scheduler_config.CONFIG_SECTION, 'base_url', default='')
55 if config_base_url:
56 _base_url = config_base_url
57 else:
58 # For the common case of everything running on a single server you
59 # can just set the hostname in a single place in the config file.
60 server_name = global_config.global_config.get_config_value(
61 'SERVER', 'hostname')
62 if not server_name:
63 logging.critical('[SERVER] hostname missing from the config file.')
64 sys.exit(1)
65 _base_url = 'http://%s/afe/' % server_name
66
67 initialize_globals()
68
69
70def initialize_globals():
71 global _drone_manager
72 _drone_manager = drone_manager.instance()
73
74
75class DelayedCallTask(object):
76 """
77 A task object like AgentTask for an Agent to run that waits for the
78 specified amount of time to have elapsed before calling the supplied
79 callback once and finishing. If the callback returns anything, it is
80 assumed to be a new Agent instance and will be added to the dispatcher.
81
82 @attribute end_time: The absolute posix time after which this task will
83 call its callback when it is polled and be finished.
84
85 Also has all attributes required by the Agent class.
86 """
87 def __init__(self, delay_seconds, callback, now_func=None):
88 """
89 @param delay_seconds: The delay in seconds from now that this task
90 will call the supplied callback and be done.
91 @param callback: A callable to be called by this task once after at
92 least delay_seconds time has elapsed. It must return None
93 or a new Agent instance.
94 @param now_func: A time.time like function. Default: time.time.
95 Used for testing.
96 """
97 assert delay_seconds > 0
98 assert callable(callback)
99 if not now_func:
100 now_func = time.time
101 self._now_func = now_func
102 self._callback = callback
103
104 self.end_time = self._now_func() + delay_seconds
105
106 # These attributes are required by Agent.
107 self.aborted = False
108 self.host_ids = ()
109 self.success = False
110 self.queue_entry_ids = ()
111 self.num_processes = 0
112
113
114 def poll(self):
115 if not self.is_done() and self._now_func() >= self.end_time:
116 self._callback()
117 self.success = True
118
119
120 def is_done(self):
121 return self.success or self.aborted
122
123
124 def abort(self):
125 self.aborted = True
126
127
128class DBError(Exception):
129 """Raised by the DBObject constructor when its select fails."""
130
131
132class DBObject(object):
133 """A miniature object relational model for the database."""
134
135 # Subclasses MUST override these:
136 _table_name = ''
137 _fields = ()
138
139 # A mapping from (type, id) to the instance of the object for that
140 # particular id. This prevents us from creating new Job() and Host()
141 # instances for every HostQueueEntry object that we instantiate as
142 # multiple HQEs often share the same Job.
143 _instances_by_type_and_id = weakref.WeakValueDictionary()
144 _initialized = False
145
146
147 def __new__(cls, id=None, **kwargs):
148 """
149 Look to see if we already have an instance for this particular type
150 and id. If so, use it instead of creating a duplicate instance.
151 """
152 if id is not None:
153 instance = cls._instances_by_type_and_id.get((cls, id))
154 if instance:
155 return instance
156 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
157
158
159 def __init__(self, id=None, row=None, new_record=False, always_query=True):
160 assert bool(id) or bool(row)
161 if id is not None and row is not None:
162 assert id == row[0]
163 assert self._table_name, '_table_name must be defined in your class'
164 assert self._fields, '_fields must be defined in your class'
165 if not new_record:
166 if self._initialized and not always_query:
167 return # We've already been initialized.
168 if id is None:
169 id = row[0]
170 # Tell future constructors to use us instead of re-querying while
171 # this instance is still around.
172 self._instances_by_type_and_id[(type(self), id)] = self
173
174 self.__table = self._table_name
175
176 self.__new_record = new_record
177
178 if row is None:
179 row = self._fetch_row_from_db(id)
180
181 if self._initialized:
182 differences = self._compare_fields_in_row(row)
183 if differences:
184 logging.warn(
185 'initialized %s %s instance requery is updating: %s',
186 type(self), self.id, differences)
187 self._update_fields_from_row(row)
188 self._initialized = True
189
190
191 @classmethod
192 def _clear_instance_cache(cls):
193 """Used for testing, clear the internal instance cache."""
194 cls._instances_by_type_and_id.clear()
195
196
197 def _fetch_row_from_db(self, row_id):
198 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
199 rows = _db.execute(sql, (row_id,))
200 if not rows:
201 raise DBError("row not found (table=%s, row id=%s)"
202 % (self.__table, row_id))
203 return rows[0]
204
205
206 def _assert_row_length(self, row):
207 assert len(row) == len(self._fields), (
208 "table = %s, row = %s/%d, fields = %s/%d" % (
209 self.__table, row, len(row), self._fields, len(self._fields)))
210
211
212 def _compare_fields_in_row(self, row):
213 """
214 Given a row as returned by a SELECT query, compare it to our existing in
215 memory fields. Fractional seconds are stripped from datetime values
216 before comparison.
217
218 @param row - A sequence of values corresponding to fields named in
219 The class attribute _fields.
220
221 @returns A dictionary listing the differences keyed by field name
222 containing tuples of (current_value, row_value).
223 """
224 self._assert_row_length(row)
225 differences = {}
226 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
227 for field, row_value in itertools.izip(self._fields, row):
228 current_value = getattr(self, field)
229 if (isinstance(current_value, datetime.datetime)
230 and isinstance(row_value, datetime.datetime)):
231 current_value = current_value.strftime(datetime_cmp_fmt)
232 row_value = row_value.strftime(datetime_cmp_fmt)
233 if current_value != row_value:
234 differences[field] = (current_value, row_value)
235 return differences
236
237
238 def _update_fields_from_row(self, row):
239 """
240 Update our field attributes using a single row returned by SELECT.
241
242 @param row - A sequence of values corresponding to fields named in
243 the class fields list.
244 """
245 self._assert_row_length(row)
246
247 self._valid_fields = set()
248 for field, value in itertools.izip(self._fields, row):
249 setattr(self, field, value)
250 self._valid_fields.add(field)
251
252 self._valid_fields.remove('id')
253
254
255 def update_from_database(self):
256 assert self.id is not None
257 row = self._fetch_row_from_db(self.id)
258 self._update_fields_from_row(row)
259
260
261 def count(self, where, table = None):
262 if not table:
263 table = self.__table
264
265 rows = _db.execute("""
266 SELECT count(*) FROM %s
267 WHERE %s
268 """ % (table, where))
269
270 assert len(rows) == 1
271
272 return int(rows[0][0])
273
274
275 def update_field(self, field, value):
276 assert field in self._valid_fields
277
278 if getattr(self, field) == value:
279 return
280
281 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
282 _db.execute(query, (value, self.id))
283
284 setattr(self, field, value)
285
286
287 def save(self):
288 if self.__new_record:
289 keys = self._fields[1:] # avoid id
290 columns = ','.join([str(key) for key in keys])
291 values = []
292 for key in keys:
293 value = getattr(self, key)
294 if value is None:
295 values.append('NULL')
296 else:
297 values.append('"%s"' % value)
298 values_str = ','.join(values)
299 query = ('INSERT INTO %s (%s) VALUES (%s)' %
300 (self.__table, columns, values_str))
301 _db.execute(query)
302 # Update our id to the one the database just assigned to us.
303 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
304
305
306 def delete(self):
307 self._instances_by_type_and_id.pop((type(self), id), None)
308 self._initialized = False
309 self._valid_fields.clear()
310 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
311 _db.execute(query, (self.id,))
312
313
314 @staticmethod
315 def _prefix_with(string, prefix):
316 if string:
317 string = prefix + string
318 return string
319
320
321 @classmethod
322 def fetch(cls, where='', params=(), joins='', order_by=''):
323 """
324 Construct instances of our class based on the given database query.
325
326 @yields One class instance for each row fetched.
327 """
328 order_by = cls._prefix_with(order_by, 'ORDER BY ')
329 where = cls._prefix_with(where, 'WHERE ')
330 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
331 '%(where)s %(order_by)s' % {'table' : cls._table_name,
332 'joins' : joins,
333 'where' : where,
334 'order_by' : order_by})
335 rows = _db.execute(query, params)
336 return [cls(id=row[0], row=row) for row in rows]
337
338
339class IneligibleHostQueue(DBObject):
340 _table_name = 'afe_ineligible_host_queues'
341 _fields = ('id', 'job_id', 'host_id')
342
343
344class AtomicGroup(DBObject):
345 _table_name = 'afe_atomic_groups'
346 _fields = ('id', 'name', 'description', 'max_number_of_machines',
347 'invalid')
348
349
350class Label(DBObject):
351 _table_name = 'afe_labels'
352 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
353 'only_if_needed', 'atomic_group_id')
354
355
356 def __repr__(self):
357 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
358 self.name, self.id, self.atomic_group_id)
359
360
361class Host(DBObject):
362 _table_name = 'afe_hosts'
363 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
364 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700365 _timer = stats.Timer("scheduler_models.Host")
jamesrenb55378a2010-03-02 22:19:49 +0000366
367
Fang Deng1d6c2a02013-04-17 15:25:45 -0700368 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000369 def set_status(self,status):
370 logging.info('%s -> %s', self.hostname, status)
371 self.update_field('status',status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700372 # Noticed some time jumps after the last log message.
373 logging.debug('Host Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000374
375
376 def platform_and_labels(self):
377 """
378 Returns a tuple (platform_name, list_of_all_label_names).
379 """
380 rows = _db.execute("""
381 SELECT afe_labels.name, afe_labels.platform
382 FROM afe_labels
383 INNER JOIN afe_hosts_labels ON
384 afe_labels.id = afe_hosts_labels.label_id
385 WHERE afe_hosts_labels.host_id = %s
386 ORDER BY afe_labels.name
387 """, (self.id,))
388 platform = None
389 all_labels = []
390 for label_name, is_platform in rows:
391 if is_platform:
392 platform = label_name
393 all_labels.append(label_name)
394 return platform, all_labels
395
396
397 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
398
399
400 @classmethod
401 def cmp_for_sort(cls, a, b):
402 """
403 A comparison function for sorting Host objects by hostname.
404
405 This strips any trailing numeric digits, ignores leading 0s and
406 compares hostnames by the leading name and the trailing digits as a
407 number. If both hostnames do not match this pattern, they are simply
408 compared as lower case strings.
409
410 Example of how hostnames will be sorted:
411
412 alice, host1, host2, host09, host010, host10, host11, yolkfolk
413
414 This hopefully satisfy most people's hostname sorting needs regardless
415 of their exact naming schemes. Nobody sane should have both a host10
416 and host010 (but the algorithm works regardless).
417 """
418 lower_a = a.hostname.lower()
419 lower_b = b.hostname.lower()
420 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
421 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
422 if match_a and match_b:
423 name_a, number_a_str = match_a.groups()
424 name_b, number_b_str = match_b.groups()
425 number_a = int(number_a_str.lstrip('0'))
426 number_b = int(number_b_str.lstrip('0'))
427 result = cmp((name_a, number_a), (name_b, number_b))
428 if result == 0 and lower_a != lower_b:
429 # If they compared equal above but the lower case names are
430 # indeed different, don't report equality. abc012 != abc12.
431 return cmp(lower_a, lower_b)
432 return result
433 else:
434 return cmp(lower_a, lower_b)
435
436
437class HostQueueEntry(DBObject):
438 _table_name = 'afe_host_queue_entries'
439 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
440 'active', 'complete', 'deleted', 'execution_subdir',
441 'atomic_group_id', 'aborted', 'started_on')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700442 _timer = stats.Timer('scheduler_models.HostQueueEntry')
jamesrenb55378a2010-03-02 22:19:49 +0000443
444
445 def __init__(self, id=None, row=None, **kwargs):
446 assert id or row
447 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
448 self.job = Job(self.job_id)
449
450 if self.host_id:
451 self.host = Host(self.host_id)
452 else:
453 self.host = None
454
455 if self.atomic_group_id:
456 self.atomic_group = AtomicGroup(self.atomic_group_id,
457 always_query=False)
458 else:
459 self.atomic_group = None
460
jamesrenb55378a2010-03-02 22:19:49 +0000461
462 @classmethod
463 def clone(cls, template):
464 """
465 Creates a new row using the values from a template instance.
466
467 The new instance will not exist in the database or have a valid
468 id attribute until its save() method is called.
469 """
470 assert isinstance(template, cls)
471 new_row = [getattr(template, field) for field in cls._fields]
472 clone = cls(row=new_row, new_record=True)
473 clone.id = None
474 return clone
475
476
477 def _view_job_url(self):
478 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
479
480
481 def get_labels(self):
482 """
483 Get all labels associated with this host queue entry (either via the
484 meta_host or as a job dependency label). The labels yielded are not
485 guaranteed to be unique.
486
487 @yields Label instances associated with this host_queue_entry.
488 """
489 if self.meta_host:
490 yield Label(id=self.meta_host, always_query=False)
491 labels = Label.fetch(
492 joins="JOIN afe_jobs_dependency_labels AS deps "
493 "ON (afe_labels.id = deps.label_id)",
494 where="deps.job_id = %d" % self.job.id)
495 for label in labels:
496 yield label
497
498
499 def set_host(self, host):
500 if host:
501 logging.info('Assigning host %s to entry %s', host.hostname, self)
jamesrenb55378a2010-03-02 22:19:49 +0000502 self.update_field('host_id', host.id)
503 self.block_host(host.id)
504 else:
505 logging.info('Releasing host from %s', self)
jamesrenb55378a2010-03-02 22:19:49 +0000506 self.unblock_host(self.host.id)
507 self.update_field('host_id', None)
508
509 self.host = host
510
511
jamesrenb55378a2010-03-02 22:19:49 +0000512 def block_host(self, host_id):
513 logging.info("creating block %s/%s", self.job.id, host_id)
514 row = [0, self.job.id, host_id]
515 block = IneligibleHostQueue(row=row, new_record=True)
516 block.save()
517
518
519 def unblock_host(self, host_id):
520 logging.info("removing block %s/%s", self.job.id, host_id)
521 blocks = IneligibleHostQueue.fetch(
522 'job_id=%d and host_id=%d' % (self.job.id, host_id))
523 for block in blocks:
524 block.delete()
525
526
527 def set_execution_subdir(self, subdir=None):
528 if subdir is None:
529 assert self.host
530 subdir = self.host.hostname
531 self.update_field('execution_subdir', subdir)
532
533
534 def _get_hostname(self):
535 if self.host:
536 return self.host.hostname
537 return 'no host'
538
539
540 def __str__(self):
541 flags = []
542 if self.active:
543 flags.append('active')
544 if self.complete:
545 flags.append('complete')
546 if self.deleted:
547 flags.append('deleted')
548 if self.aborted:
549 flags.append('aborted')
550 flags_str = ','.join(flags)
551 if flags_str:
552 flags_str = ' [%s]' % flags_str
553 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
554 self.status, flags_str)
555
556
Fang Deng1d6c2a02013-04-17 15:25:45 -0700557 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000558 def set_status(self, status):
559 logging.info("%s -> %s", self, status)
560
561 self.update_field('status', status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700562 # Noticed some time jumps after last logging message.
563 logging.debug('Update Field Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000564
565 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
566 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
567 assert not (active and complete)
568
569 self.update_field('active', active)
570 self.update_field('complete', complete)
571
572 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000573 self._on_complete(status)
Dale Curtis74a314b2011-06-23 14:55:46 -0700574 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000575
576 should_email_status = (status.lower() in _notify_email_statuses or
577 'all' in _notify_email_statuses)
578 if should_email_status:
579 self._email_on_status(status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700580 logging.debug('HQE Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000581
582
jamesrene7c65cb2010-06-08 20:38:10 +0000583 def _on_complete(self, status):
584 if status is not models.HostQueueEntry.Status.ABORTED:
585 self.job.stop_if_necessary()
586
jamesrenb55378a2010-03-02 22:19:49 +0000587 if not self.execution_subdir:
588 return
589 # unregister any possible pidfiles associated with this queue entry
590 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
591 pidfile_id = _drone_manager.get_pidfile_id_from(
592 self.execution_path(), pidfile_name=pidfile_name)
593 _drone_manager.unregister_pidfile(pidfile_id)
594
595
Eric Li6f27d4f2010-09-29 10:55:17 -0700596 def _get_status_email_contents(self, status, summary=None, hostname=None):
597 """
598 Gather info for the status notification e-mails.
599
600 If needed, we could start using the Django templating engine to create
601 the subject and the e-mail body, but that doesn't seem necessary right
602 now.
603
604 @param status: Job status text. Mandatory.
605 @param summary: Job summary text. Optional.
606 @param hostname: A hostname for the job. Optional.
607
608 @return: Tuple (subject, body) for the notification e-mail.
609 """
610 job_stats = Job(id=self.job.id).get_execution_details()
611
612 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
613 (self.job.id, self.job.name, status))
614
615 if hostname is not None:
616 subject += '| Hostname: %s ' % hostname
617
618 if status not in ["1 Failed", "Failed"]:
619 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
620
621 body = "Job ID: %s\n" % self.job.id
622 body += "Job name: %s\n" % self.job.name
623 if hostname is not None:
624 body += "Host: %s\n" % hostname
625 if summary is not None:
626 body += "Summary: %s\n" % summary
627 body += "Status: %s\n" % status
628 body += "Results interface URL: %s\n" % self._view_job_url()
629 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
630 if int(job_stats['total_executed']) > 0:
631 body += "User tests executed: %s\n" % job_stats['total_executed']
632 body += "User tests passed: %s\n" % job_stats['total_passed']
633 body += "User tests failed: %s\n" % job_stats['total_failed']
634 body += ("User tests success rate: %.2f %%\n" %
635 job_stats['success_rate'])
636
637 if job_stats['failed_rows']:
638 body += "Failures:\n"
639 body += job_stats['failed_rows']
640
641 return subject, body
642
643
jamesrenb55378a2010-03-02 22:19:49 +0000644 def _email_on_status(self, status):
645 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700646 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000647 email_manager.manager.send_email(self.job.email_list, subject, body)
648
649
650 def _email_on_job_complete(self):
651 if not self.job.is_finished():
652 return
653
Eric Li6f27d4f2010-09-29 10:55:17 -0700654 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000655 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
656 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700657 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000658 (queue_entry._get_hostname(),
659 queue_entry.status))
660
Eric Li6f27d4f2010-09-29 10:55:17 -0700661 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000662 status_counts = models.Job.objects.get_status_counts(
663 [self.job.id])[self.job.id]
664 status = ', '.join('%d %s' % (count, status) for status, count
665 in status_counts.iteritems())
666
Eric Li6f27d4f2010-09-29 10:55:17 -0700667 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000668 email_manager.manager.send_email(self.job.email_list, subject, body)
669
670
671 def schedule_pre_job_tasks(self):
672 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
673 self.job.name, self.meta_host, self.atomic_group_id,
674 self.job.id, self.id, self.host.hostname, self.status)
675
676 self._do_schedule_pre_job_tasks()
677
678
679 def _do_schedule_pre_job_tasks(self):
jamesrenb55378a2010-03-02 22:19:49 +0000680 self.job.schedule_pre_job_tasks(queue_entry=self)
681
682
683 def requeue(self):
684 assert self.host
685 self.set_status(models.HostQueueEntry.Status.QUEUED)
686 self.update_field('started_on', None)
687 # verify/cleanup failure sets the execution subdir, so reset it here
688 self.set_execution_subdir('')
689 if self.meta_host:
690 self.set_host(None)
691
692
693 @property
694 def aborted_by(self):
695 self._load_abort_info()
696 return self._aborted_by
697
698
699 @property
700 def aborted_on(self):
701 self._load_abort_info()
702 return self._aborted_on
703
704
705 def _load_abort_info(self):
706 """ Fetch info about who aborted the job. """
707 if hasattr(self, "_aborted_by"):
708 return
709 rows = _db.execute("""
710 SELECT afe_users.login,
711 afe_aborted_host_queue_entries.aborted_on
712 FROM afe_aborted_host_queue_entries
713 INNER JOIN afe_users
714 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
715 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
716 """, (self.id,))
717 if rows:
718 self._aborted_by, self._aborted_on = rows[0]
719 else:
720 self._aborted_by = self._aborted_on = None
721
722
723 def on_pending(self):
724 """
725 Called when an entry in a synchronous job has passed verify. If the
726 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
727 them in PENDING.
728 """
729 self.set_status(models.HostQueueEntry.Status.PENDING)
730 self.host.set_status(models.Host.Status.PENDING)
731
732 # Some debug code here: sends an email if an asynchronous job does not
733 # immediately enter Starting.
734 # TODO: Remove this once we figure out why asynchronous jobs are getting
735 # stuck in Pending.
736 self.job.run_if_ready(queue_entry=self)
737 if (self.job.synch_count == 1 and
738 self.status == models.HostQueueEntry.Status.PENDING):
739 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
740 message = 'Asynchronous job stuck in Pending'
741 email_manager.manager.enqueue_notify_email(subject, message)
742
743
744 def abort(self, dispatcher):
745 assert self.aborted and not self.complete
746
747 Status = models.HostQueueEntry.Status
748 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
749 # do nothing; post-job tasks will finish and then mark this entry
750 # with status "Aborted" and take care of the host
751 return
752
jamesren3bc70a12010-04-12 18:23:38 +0000753 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
754 Status.WAITING):
jamesrenb55378a2010-03-02 22:19:49 +0000755 assert not dispatcher.get_agents_for_entry(self)
756 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -0700757 elif (self.status == Status.VERIFYING or
beepse50d8752013-11-20 18:23:02 -0800758 self.status == Status.RESETTING):
jamesrenb55378a2010-03-02 22:19:49 +0000759 models.SpecialTask.objects.create(
760 task=models.SpecialTask.Task.CLEANUP,
761 host=models.Host.objects.get(id=self.host.id),
762 requested_by=self.job.owner_model())
beepse50d8752013-11-20 18:23:02 -0800763 elif self.status == Status.PROVISIONING:
764 models.SpecialTask.objects.create(
765 task=models.SpecialTask.Task.REPAIR,
766 host=models.Host.objects.get(id=self.host.id),
767 requested_by=self.job.owner_model())
jamesrenb55378a2010-03-02 22:19:49 +0000768
769 self.set_status(Status.ABORTED)
770 self.job.abort_delay_ready_task()
771
772
773 def get_group_name(self):
774 atomic_group = self.atomic_group
775 if not atomic_group:
776 return ''
777
778 # Look at any meta_host and dependency labels and pick the first
779 # one that also specifies this atomic group. Use that label name
780 # as the group name if possible (it is more specific).
781 for label in self.get_labels():
782 if label.atomic_group_id:
783 assert label.atomic_group_id == atomic_group.id
784 return label.name
785 return atomic_group.name
786
787
788 def execution_tag(self):
Scott Zawalskid712cf32012-07-14 16:24:53 -0400789 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
790 'complete!=1 AND execution_subdir="" AND '
791 'status!="Queued";')
792 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
793 'status="Aborted" WHERE id=%s;')
794 try:
795 assert self.execution_subdir
796 except AssertionError:
797 # TODO(scottz): Remove temporary fix/info gathering pathway for
798 # crosbug.com/31595 once issue is root caused.
799 logging.error('No execution_subdir for host queue id:%s.', self.id)
800 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
801 for row in _db.execute(SQL_SUSPECT_ENTRIES):
802 logging.error(row)
803 logging.error('====DB DEBUG====\n')
804 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
805 logging.error('EXECUTING: %s', fix_query)
806 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
807 raise AssertionError(('self.execution_subdir not found. '
808 'See log for details.'))
809
jamesrenb55378a2010-03-02 22:19:49 +0000810 return "%s/%s" % (self.job.tag(), self.execution_subdir)
811
812
813 def execution_path(self):
814 return self.execution_tag()
815
816
817 def set_started_on_now(self):
818 self.update_field('started_on', datetime.datetime.now())
819
820
821 def is_hostless(self):
822 return (self.host_id is None
823 and self.meta_host is None
824 and self.atomic_group_id is None)
825
826
827class Job(DBObject):
828 _table_name = 'afe_jobs'
829 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
830 'control_type', 'created_on', 'synch_count', 'timeout',
831 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
Simran Basi9f508a12012-11-09 12:20:44 -0800832 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800833 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
Simran Basi94dc0032013-11-12 14:09:46 -0800834 'test_retry', 'run_reset', 'timeout_mins')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700835 _timer = stats.Timer("scheduler_models.Job")
jamesrenb55378a2010-03-02 22:19:49 +0000836
837 # This does not need to be a column in the DB. The delays are likely to
838 # be configured short. If the scheduler is stopped and restarted in
839 # the middle of a job's delay cycle, the delay cycle will either be
840 # repeated or skipped depending on the number of Pending machines found
841 # when the restarted scheduler recovers to track it. Not a problem.
842 #
843 # A reference to the DelayedCallTask that will wake up the job should
844 # no other HQEs change state in time. Its end_time attribute is used
845 # by our run_with_ready_delay() method to determine if the wait is over.
846 _delay_ready_task = None
847
848 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
849 # all status='Pending' atomic group HQEs incase a delay was running when the
850 # scheduler was restarted and no more hosts ever successfully exit Verify.
851
852 def __init__(self, id=None, row=None, **kwargs):
853 assert id or row
854 super(Job, self).__init__(id=id, row=row, **kwargs)
855 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800856 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000857
858
859 def model(self):
860 return models.Job.objects.get(id=self.id)
861
862
863 def owner_model(self):
864 # work around the fact that the Job owner field is a string, not a
865 # foreign key
866 if not self._owner_model:
867 self._owner_model = models.User.objects.get(login=self.owner)
868 return self._owner_model
869
870
871 def is_server_job(self):
Aviv Keshet82352b22013-05-14 18:30:56 -0700872 return self.control_type == control_data.CONTROL_TYPE.SERVER
jamesrenb55378a2010-03-02 22:19:49 +0000873
874
875 def tag(self):
876 return "%s-%s" % (self.id, self.owner)
877
878
879 def get_host_queue_entries(self):
880 rows = _db.execute("""
881 SELECT * FROM afe_host_queue_entries
882 WHERE job_id= %s
883 """, (self.id,))
884 entries = [HostQueueEntry(row=i) for i in rows]
885
886 assert len(entries)>0
887
888 return entries
889
890
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800891 def is_image_update_job(self):
892 """
893 Discover if the current job requires an OS update.
894
895 @return: True/False if OS should be updated before job is run.
896 """
897 # All image update jobs have the parameterized_job_id set.
898 if not self.parameterized_job_id:
899 return False
900
901 # Retrieve the ID of the ParameterizedJob this job is an instance of.
902 rows = _db.execute("""
903 SELECT test_id
904 FROM afe_parameterized_jobs
905 WHERE id = %s
906 """, (self.parameterized_job_id,))
907 if not rows:
908 return False
909 test_id = rows[0][0]
910
911 # Retrieve the ID of the known autoupdate_ParameterizedJob.
912 rows = _db.execute("""
913 SELECT id
914 FROM afe_autotests
915 WHERE name = 'autoupdate_ParameterizedJob'
916 """)
917 if not rows:
918 return False
919 update_id = rows[0][0]
920
921 # If the IDs are the same we've found an image update job.
922 if test_id == update_id:
923 # Finally, get the path to the OS image to install.
924 rows = _db.execute("""
925 SELECT parameter_value
926 FROM afe_parameterized_job_parameters
927 WHERE parameterized_job_id = %s
928 """, (self.parameterized_job_id,))
929 if rows:
930 # Save the path in update_image_path to use later as a command
931 # line parameter to autoserv.
932 self.update_image_path = rows[0][0]
933 return True
934
935 return False
936
937
Eric Li6f27d4f2010-09-29 10:55:17 -0700938 def get_execution_details(self):
939 """
940 Get test execution details for this job.
941
942 @return: Dictionary with test execution details
943 """
944 def _find_test_jobs(rows):
945 """
946 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
947 Those are autotest 'internal job' tests, so they should not be
948 counted when evaluating the test stats.
949
950 @param rows: List of rows (matrix) with database results.
951 """
952 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
953 n_test_jobs = 0
954 for r in rows:
955 test_name = r[0]
956 if job_test_pattern.match(test_name):
957 n_test_jobs += 1
958
959 return n_test_jobs
960
961 stats = {}
962
963 rows = _db.execute("""
964 SELECT t.test, s.word, t.reason
965 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
966 WHERE t.job_idx = j.job_idx
967 AND s.status_idx = t.status
968 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -0800969 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -0700970 """ % self.id)
971
Dale Curtis74a314b2011-06-23 14:55:46 -0700972 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -0700973
974 n_test_jobs = _find_test_jobs(rows)
975 n_test_jobs_failed = _find_test_jobs(failed_rows)
976
977 total_executed = len(rows) - n_test_jobs
978 total_failed = len(failed_rows) - n_test_jobs_failed
979
980 if total_executed > 0:
981 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
982 else:
983 success_rate = 0
984
985 stats['total_executed'] = total_executed
986 stats['total_failed'] = total_failed
987 stats['total_passed'] = total_executed - total_failed
988 stats['success_rate'] = success_rate
989
990 status_header = ("Test Name", "Status", "Reason")
991 if failed_rows:
992 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
993 status_header)
994 else:
995 stats['failed_rows'] = ''
996
997 time_row = _db.execute("""
998 SELECT started_time, finished_time
999 FROM tko_jobs
1000 WHERE afe_job_id = %s
1001 """ % self.id)
1002
1003 if time_row:
1004 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -08001005 try:
1006 delta = t_end - t_begin
1007 minutes, seconds = divmod(delta.seconds, 60)
1008 hours, minutes = divmod(minutes, 60)
1009 stats['execution_time'] = ("%02d:%02d:%02d" %
1010 (hours, minutes, seconds))
1011 # One of t_end or t_begin are None
1012 except TypeError:
1013 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -07001014 else:
1015 stats['execution_time'] = '(none)'
1016
1017 return stats
1018
1019
Fang Deng1d6c2a02013-04-17 15:25:45 -07001020 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +00001021 def set_status(self, status, update_queues=False):
1022 self.update_field('status',status)
1023
1024 if update_queues:
1025 for queue_entry in self.get_host_queue_entries():
1026 queue_entry.set_status(status)
1027
1028
1029 def keyval_dict(self):
1030 return self.model().keyval_dict()
1031
1032
1033 def _atomic_and_has_started(self):
1034 """
1035 @returns True if any of the HostQueueEntries associated with this job
1036 have entered the Status.STARTING state or beyond.
1037 """
1038 atomic_entries = models.HostQueueEntry.objects.filter(
1039 job=self.id, atomic_group__isnull=False)
1040 if atomic_entries.count() <= 0:
1041 return False
1042
1043 # These states may *only* be reached if Job.run() has been called.
1044 started_statuses = (models.HostQueueEntry.Status.STARTING,
1045 models.HostQueueEntry.Status.RUNNING,
1046 models.HostQueueEntry.Status.COMPLETED)
1047
1048 started_entries = atomic_entries.filter(status__in=started_statuses)
1049 return started_entries.count() > 0
1050
1051
1052 def _hosts_assigned_count(self):
1053 """The number of HostQueueEntries assigned a Host for this job."""
1054 entries = models.HostQueueEntry.objects.filter(job=self.id,
1055 host__isnull=False)
1056 return entries.count()
1057
1058
1059 def _pending_count(self):
1060 """The number of HostQueueEntries for this job in the Pending state."""
1061 pending_entries = models.HostQueueEntry.objects.filter(
1062 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1063 return pending_entries.count()
1064
1065
1066 def _max_hosts_needed_to_run(self, atomic_group):
1067 """
1068 @param atomic_group: The AtomicGroup associated with this job that we
1069 are using to set an upper bound on the threshold.
1070 @returns The maximum number of HostQueueEntries assigned a Host before
1071 this job can run.
1072 """
1073 return min(self._hosts_assigned_count(),
1074 atomic_group.max_number_of_machines)
1075
1076
1077 def _min_hosts_needed_to_run(self):
1078 """Return the minumum number of hsots needed to run this job."""
1079 return self.synch_count
1080
1081
1082 def is_ready(self):
1083 # NOTE: Atomic group jobs stop reporting ready after they have been
1084 # started to avoid launching multiple copies of one atomic job.
1085 # Only possible if synch_count is less than than half the number of
1086 # machines in the atomic group.
1087 pending_count = self._pending_count()
1088 atomic_and_has_started = self._atomic_and_has_started()
1089 ready = (pending_count >= self.synch_count
1090 and not atomic_and_has_started)
1091
1092 if not ready:
1093 logging.info(
1094 'Job %s not ready: %s pending, %s required '
1095 '(Atomic and started: %s)',
1096 self, pending_count, self.synch_count,
1097 atomic_and_has_started)
1098
1099 return ready
1100
1101
1102 def num_machines(self, clause = None):
1103 sql = "job_id=%s" % self.id
1104 if clause:
1105 sql += " AND (%s)" % clause
1106 return self.count(sql, table='afe_host_queue_entries')
1107
1108
1109 def num_queued(self):
1110 return self.num_machines('not complete')
1111
1112
1113 def num_active(self):
1114 return self.num_machines('active')
1115
1116
1117 def num_complete(self):
1118 return self.num_machines('complete')
1119
1120
1121 def is_finished(self):
1122 return self.num_complete() == self.num_machines()
1123
1124
1125 def _not_yet_run_entries(self, include_verifying=True):
1126 statuses = [models.HostQueueEntry.Status.QUEUED,
1127 models.HostQueueEntry.Status.PENDING]
1128 if include_verifying:
1129 statuses.append(models.HostQueueEntry.Status.VERIFYING)
1130 return models.HostQueueEntry.objects.filter(job=self.id,
1131 status__in=statuses)
1132
1133
1134 def _stop_all_entries(self):
1135 entries_to_stop = self._not_yet_run_entries(
1136 include_verifying=False)
1137 for child_entry in entries_to_stop:
1138 assert not child_entry.complete, (
1139 '%s status=%s, active=%s, complete=%s' %
1140 (child_entry.id, child_entry.status, child_entry.active,
1141 child_entry.complete))
1142 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1143 child_entry.host.status = models.Host.Status.READY
1144 child_entry.host.save()
1145 child_entry.status = models.HostQueueEntry.Status.STOPPED
1146 child_entry.save()
1147
1148
1149 def stop_if_necessary(self):
1150 not_yet_run = self._not_yet_run_entries()
1151 if not_yet_run.count() < self.synch_count:
1152 self._stop_all_entries()
1153
1154
jamesrenb55378a2010-03-02 22:19:49 +00001155 def _next_group_name(self, group_name=''):
1156 """@returns a directory name to use for the next host group results."""
1157 if group_name:
1158 # Sanitize for use as a pathname.
1159 group_name = group_name.replace(os.path.sep, '_')
1160 if group_name.startswith('.'):
1161 group_name = '_' + group_name[1:]
1162 # Add a separator between the group name and 'group%d'.
1163 group_name += '.'
1164 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1165 query = models.HostQueueEntry.objects.filter(
1166 job=self.id).values('execution_subdir').distinct()
1167 subdirs = (entry['execution_subdir'] for entry in query)
1168 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1169 ids = [int(match.group(1)) for match in group_matches if match]
1170 if ids:
1171 next_id = max(ids) + 1
1172 else:
1173 next_id = 0
1174 return '%sgroup%d' % (group_name, next_id)
1175
1176
1177 def get_group_entries(self, queue_entry_from_group):
1178 """
1179 @param queue_entry_from_group: A HostQueueEntry instance to find other
1180 group entries on this job for.
1181
1182 @returns A list of HostQueueEntry objects all executing this job as
1183 part of the same group as the one supplied (having the same
1184 execution_subdir).
1185 """
1186 execution_subdir = queue_entry_from_group.execution_subdir
1187 return list(HostQueueEntry.fetch(
1188 where='job_id=%s AND execution_subdir=%s',
1189 params=(self.id, execution_subdir)))
1190
1191
1192 def _should_run_cleanup(self, queue_entry):
1193 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1194 return True
1195 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1196 return queue_entry.host.dirty
1197 return False
1198
1199
1200 def _should_run_verify(self, queue_entry):
1201 do_not_verify = (queue_entry.host.protection ==
1202 host_protections.Protection.DO_NOT_VERIFY)
1203 if do_not_verify:
1204 return False
Alex Miller6ee996f2013-02-28 13:53:52 -08001205 # If RebootBefore is set to NEVER, then we won't run reset because
1206 # we can't cleanup, so we need to weaken a Reset into a Verify.
1207 weaker_reset = (self.run_reset and
1208 self.reboot_before == model_attributes.RebootBefore.NEVER)
1209 return self.run_verify or weaker_reset
jamesrenb55378a2010-03-02 22:19:49 +00001210
1211
Dan Shi07e09af2013-04-12 09:31:29 -07001212 def _should_run_reset(self, queue_entry):
1213 can_verify = (queue_entry.host.protection !=
1214 host_protections.Protection.DO_NOT_VERIFY)
1215 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
1216 return (can_reboot and can_verify and (self.run_reset or
1217 (self._should_run_cleanup(queue_entry) and
1218 self._should_run_verify(queue_entry))))
1219
1220
Alex Millerdfff2fd2013-05-28 13:05:06 -07001221 def _should_run_provision(self, queue_entry):
1222 """
1223 Determine if the queue_entry needs to have a provision task run before
1224 it to provision queue_entry.host.
1225
1226 @param queue_entry: The host queue entry in question.
1227 @returns: True if we should schedule a provision task, False otherwise.
1228
1229 """
1230 # If we get to this point, it means that the scheduler has already
1231 # vetted that all the unprovisionable labels match, so we can just
1232 # find all labels on the job that aren't on the host to get the list
1233 # of what we need to provision. (See the scheduling logic in
1234 # host_scheduler.py:is_host_eligable_for_job() where we discard all
1235 # provisionable labels when assigning jobs to hosts.)
1236 job_labels = {x.name for x in queue_entry.get_labels()}
1237 _, host_labels = queue_entry.host.platform_and_labels()
1238 # If there are any labels on the job that are not on the host, then
1239 # that means there is provisioning work to do. If there's no
1240 # provisioning work to do, then obviously we have no reason to schedule
1241 # a provision task!
1242 if job_labels - set(host_labels):
1243 return True
1244 return False
1245
1246
Alex Miller42437f92013-05-28 12:58:54 -07001247 def _queue_special_task(self, queue_entry, task):
jamesrenb55378a2010-03-02 22:19:49 +00001248 """
Alex Miller42437f92013-05-28 12:58:54 -07001249 Create a special task and associate it with a host queue entry.
jamesrenb55378a2010-03-02 22:19:49 +00001250
Alex Miller42437f92013-05-28 12:58:54 -07001251 @param queue_entry: The queue entry this special task should be
1252 associated with.
1253 @param task: One of the members of the enum models.SpecialTask.Task.
1254 @returns: None
1255
jamesrenb55378a2010-03-02 22:19:49 +00001256 """
jamesrenb55378a2010-03-02 22:19:49 +00001257 models.SpecialTask.objects.create(
1258 host=models.Host.objects.get(id=queue_entry.host_id),
1259 queue_entry=queue_entry, task=task)
1260
1261
Alex Miller42437f92013-05-28 12:58:54 -07001262 def schedule_pre_job_tasks(self, queue_entry):
1263 """
1264 Queue all of the special tasks that need to be run before a host
1265 queue entry may run.
1266
1267 If no special taskes need to be scheduled, then |on_pending| will be
1268 called directly.
1269
1270 @returns None
1271
1272 """
1273 task_queued = False
1274 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
1275
Dan Shi07e09af2013-04-12 09:31:29 -07001276 if self._should_run_reset(queue_entry):
1277 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
Alex Miller42437f92013-05-28 12:58:54 -07001278 task_queued = True
Dan Shi07e09af2013-04-12 09:31:29 -07001279 else:
1280 if self._should_run_cleanup(queue_entry):
1281 self._queue_special_task(hqe_model,
1282 models.SpecialTask.Task.CLEANUP)
1283 task_queued = True
1284 if self._should_run_verify(queue_entry):
1285 self._queue_special_task(hqe_model,
1286 models.SpecialTask.Task.VERIFY)
1287 task_queued = True
Alex Miller42437f92013-05-28 12:58:54 -07001288
Alex Millerdfff2fd2013-05-28 13:05:06 -07001289 if self._should_run_provision(queue_entry):
1290 self._queue_special_task(hqe_model,
1291 models.SpecialTask.Task.PROVISION)
1292 task_queued = True
1293
Alex Miller42437f92013-05-28 12:58:54 -07001294 if not task_queued:
1295 queue_entry.on_pending()
1296
1297
jamesrenb55378a2010-03-02 22:19:49 +00001298 def _assign_new_group(self, queue_entries, group_name=''):
1299 if len(queue_entries) == 1:
1300 group_subdir_name = queue_entries[0].host.hostname
1301 else:
1302 group_subdir_name = self._next_group_name(group_name)
1303 logging.info('Running synchronous job %d hosts %s as %s',
1304 self.id, [entry.host.hostname for entry in queue_entries],
1305 group_subdir_name)
1306
1307 for queue_entry in queue_entries:
1308 queue_entry.set_execution_subdir(group_subdir_name)
1309
1310
1311 def _choose_group_to_run(self, include_queue_entry):
1312 """
1313 @returns A tuple containing a list of HostQueueEntry instances to be
1314 used to run this Job, a string group name to suggest giving
1315 to this job in the results database.
1316 """
1317 atomic_group = include_queue_entry.atomic_group
1318 chosen_entries = [include_queue_entry]
1319 if atomic_group:
1320 num_entries_wanted = atomic_group.max_number_of_machines
1321 else:
1322 num_entries_wanted = self.synch_count
1323 num_entries_wanted -= len(chosen_entries)
1324
1325 if num_entries_wanted > 0:
1326 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1327 pending_entries = list(HostQueueEntry.fetch(
1328 where=where_clause,
1329 params=(self.id, include_queue_entry.id)))
1330
1331 # Sort the chosen hosts by hostname before slicing.
1332 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1333 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1334 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1335 chosen_entries += pending_entries[:num_entries_wanted]
1336
1337 # Sanity check. We'll only ever be called if this can be met.
1338 if len(chosen_entries) < self.synch_count:
1339 message = ('job %s got less than %s chosen entries: %s' % (
1340 self.id, self.synch_count, chosen_entries))
1341 logging.error(message)
1342 email_manager.manager.enqueue_notify_email(
1343 'Job not started, too few chosen entries', message)
1344 return []
1345
1346 group_name = include_queue_entry.get_group_name()
1347
1348 self._assign_new_group(chosen_entries, group_name=group_name)
1349 return chosen_entries
1350
1351
1352 def run_if_ready(self, queue_entry):
1353 """
1354 Run this job by kicking its HQEs into status='Starting' if enough
1355 hosts are ready for it to run.
1356
1357 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1358 ready to run.
1359 """
1360 if not self.is_ready():
1361 self.stop_if_necessary()
1362 elif queue_entry.atomic_group:
1363 self.run_with_ready_delay(queue_entry)
1364 else:
1365 self.run(queue_entry)
1366
1367
1368 def run_with_ready_delay(self, queue_entry):
1369 """
1370 Start a delay to wait for more hosts to enter Pending state before
1371 launching an atomic group job. Once set, the a delay cannot be reset.
1372
1373 @param queue_entry: The HostQueueEntry object to get atomic group
1374 info from and pass to run_if_ready when the delay is up.
1375
1376 @returns An Agent to run the job as appropriate or None if a delay
1377 has already been set.
1378 """
1379 assert queue_entry.job_id == self.id
1380 assert queue_entry.atomic_group
1381 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1382 over_max_threshold = (self._pending_count() >=
1383 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1384 delay_expired = (self._delay_ready_task and
1385 time.time() >= self._delay_ready_task.end_time)
1386
1387 # Delay is disabled or we already have enough? Do not wait to run.
1388 if not delay or over_max_threshold or delay_expired:
1389 self.run(queue_entry)
1390 else:
1391 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1392
1393
1394 def request_abort(self):
1395 """Request that this Job be aborted on the next scheduler cycle."""
1396 self.model().abort()
1397
1398
1399 def schedule_delayed_callback_task(self, queue_entry):
1400 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1401
1402 if self._delay_ready_task:
1403 return None
1404
1405 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1406
1407 def run_job_after_delay():
1408 logging.info('Job %s done waiting for extra hosts.', self)
1409 # Check to see if the job is still relevant. It could have aborted
1410 # while we were waiting or hosts could have disappearred, etc.
1411 if self._pending_count() < self._min_hosts_needed_to_run():
1412 logging.info('Job %s had too few Pending hosts after waiting '
1413 'for extras. Not running.', self)
1414 self.request_abort()
1415 return
1416 return self.run(queue_entry)
1417
1418 logging.info('Job %s waiting up to %s seconds for more hosts.',
1419 self.id, delay)
1420 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1421 callback=run_job_after_delay)
1422 return self._delay_ready_task
1423
1424
1425 def run(self, queue_entry):
1426 """
1427 @param queue_entry: The HostQueueEntry instance calling this method.
1428 """
1429 if queue_entry.atomic_group and self._atomic_and_has_started():
1430 logging.error('Job.run() called on running atomic Job %d '
1431 'with HQE %s.', self.id, queue_entry)
1432 return
1433 queue_entries = self._choose_group_to_run(queue_entry)
1434 if queue_entries:
1435 self._finish_run(queue_entries)
1436
1437
1438 def _finish_run(self, queue_entries):
1439 for queue_entry in queue_entries:
1440 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1441 self.abort_delay_ready_task()
1442
1443
1444 def abort_delay_ready_task(self):
1445 """Abort the delayed task associated with this job, if any."""
1446 if self._delay_ready_task:
1447 # Cancel any pending callback that would try to run again
1448 # as we are already running.
1449 self._delay_ready_task.abort()
1450
1451
1452 def __str__(self):
1453 return '%s-%s' % (self.id, self.owner)