blob: 46109ade8a00239fcd4bf0aaef66a33e5a084be9 [file] [log] [blame]
Aviv Keshet0b9cfc92013-02-05 11:36:02 -08001# pylint: disable-msg=C0111
2
jamesrenb55378a2010-03-02 22:19:49 +00003"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out. Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
14 one of these statuses, an email will be sent to the job's email_list.
15 comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import datetime, itertools, logging, os, re, sys, time, weakref
jamesrenb55378a2010-03-02 22:19:49 +000022from autotest_lib.client.common_lib import global_config, host_protections
Eric Li6f27d4f2010-09-29 10:55:17 -070023from autotest_lib.client.common_lib import global_config, utils
jamesrenb55378a2010-03-02 22:19:49 +000024from autotest_lib.frontend.afe import models, model_attributes
25from autotest_lib.database import database_connection
26from autotest_lib.scheduler import drone_manager, email_manager
27from autotest_lib.scheduler import scheduler_config
Fang Deng1d6c2a02013-04-17 15:25:45 -070028from autotest_lib.site_utils.graphite import stats
Aviv Keshet888a94d2013-05-14 17:52:33 -070029from autotest_lib.client.common_lib import control_data
jamesrenb55378a2010-03-02 22:19:49 +000030
31_notify_email_statuses = []
32_base_url = None
33
34_db = None
35_drone_manager = None
36
37def initialize():
38 global _db
39 _db = database_connection.DatabaseConnection('AUTOTEST_WEB')
40 _db.connect(db_type='django')
41
42 notify_statuses_list = global_config.global_config.get_config_value(
43 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
44 default='')
45 global _notify_email_statuses
46 _notify_email_statuses = [status for status in
47 re.split(r'[\s,;:]', notify_statuses_list.lower())
48 if status]
49
50 # AUTOTEST_WEB.base_url is still a supported config option as some people
51 # may wish to override the entire url.
52 global _base_url
53 config_base_url = global_config.global_config.get_config_value(
54 scheduler_config.CONFIG_SECTION, 'base_url', default='')
55 if config_base_url:
56 _base_url = config_base_url
57 else:
58 # For the common case of everything running on a single server you
59 # can just set the hostname in a single place in the config file.
60 server_name = global_config.global_config.get_config_value(
61 'SERVER', 'hostname')
62 if not server_name:
63 logging.critical('[SERVER] hostname missing from the config file.')
64 sys.exit(1)
65 _base_url = 'http://%s/afe/' % server_name
66
67 initialize_globals()
68
69
70def initialize_globals():
71 global _drone_manager
72 _drone_manager = drone_manager.instance()
73
74
75class DelayedCallTask(object):
76 """
77 A task object like AgentTask for an Agent to run that waits for the
78 specified amount of time to have elapsed before calling the supplied
79 callback once and finishing. If the callback returns anything, it is
80 assumed to be a new Agent instance and will be added to the dispatcher.
81
82 @attribute end_time: The absolute posix time after which this task will
83 call its callback when it is polled and be finished.
84
85 Also has all attributes required by the Agent class.
86 """
87 def __init__(self, delay_seconds, callback, now_func=None):
88 """
89 @param delay_seconds: The delay in seconds from now that this task
90 will call the supplied callback and be done.
91 @param callback: A callable to be called by this task once after at
92 least delay_seconds time has elapsed. It must return None
93 or a new Agent instance.
94 @param now_func: A time.time like function. Default: time.time.
95 Used for testing.
96 """
97 assert delay_seconds > 0
98 assert callable(callback)
99 if not now_func:
100 now_func = time.time
101 self._now_func = now_func
102 self._callback = callback
103
104 self.end_time = self._now_func() + delay_seconds
105
106 # These attributes are required by Agent.
107 self.aborted = False
108 self.host_ids = ()
109 self.success = False
110 self.queue_entry_ids = ()
111 self.num_processes = 0
112
113
114 def poll(self):
115 if not self.is_done() and self._now_func() >= self.end_time:
116 self._callback()
117 self.success = True
118
119
120 def is_done(self):
121 return self.success or self.aborted
122
123
124 def abort(self):
125 self.aborted = True
126
127
128class DBError(Exception):
129 """Raised by the DBObject constructor when its select fails."""
130
131
132class DBObject(object):
133 """A miniature object relational model for the database."""
134
135 # Subclasses MUST override these:
136 _table_name = ''
137 _fields = ()
138
139 # A mapping from (type, id) to the instance of the object for that
140 # particular id. This prevents us from creating new Job() and Host()
141 # instances for every HostQueueEntry object that we instantiate as
142 # multiple HQEs often share the same Job.
143 _instances_by_type_and_id = weakref.WeakValueDictionary()
144 _initialized = False
145
146
147 def __new__(cls, id=None, **kwargs):
148 """
149 Look to see if we already have an instance for this particular type
150 and id. If so, use it instead of creating a duplicate instance.
151 """
152 if id is not None:
153 instance = cls._instances_by_type_and_id.get((cls, id))
154 if instance:
155 return instance
156 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
157
158
159 def __init__(self, id=None, row=None, new_record=False, always_query=True):
160 assert bool(id) or bool(row)
161 if id is not None and row is not None:
162 assert id == row[0]
163 assert self._table_name, '_table_name must be defined in your class'
164 assert self._fields, '_fields must be defined in your class'
165 if not new_record:
166 if self._initialized and not always_query:
167 return # We've already been initialized.
168 if id is None:
169 id = row[0]
170 # Tell future constructors to use us instead of re-querying while
171 # this instance is still around.
172 self._instances_by_type_and_id[(type(self), id)] = self
173
174 self.__table = self._table_name
175
176 self.__new_record = new_record
177
178 if row is None:
179 row = self._fetch_row_from_db(id)
180
181 if self._initialized:
182 differences = self._compare_fields_in_row(row)
183 if differences:
184 logging.warn(
185 'initialized %s %s instance requery is updating: %s',
186 type(self), self.id, differences)
187 self._update_fields_from_row(row)
188 self._initialized = True
189
190
191 @classmethod
192 def _clear_instance_cache(cls):
193 """Used for testing, clear the internal instance cache."""
194 cls._instances_by_type_and_id.clear()
195
196
197 def _fetch_row_from_db(self, row_id):
198 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
199 rows = _db.execute(sql, (row_id,))
200 if not rows:
201 raise DBError("row not found (table=%s, row id=%s)"
202 % (self.__table, row_id))
203 return rows[0]
204
205
206 def _assert_row_length(self, row):
207 assert len(row) == len(self._fields), (
208 "table = %s, row = %s/%d, fields = %s/%d" % (
209 self.__table, row, len(row), self._fields, len(self._fields)))
210
211
212 def _compare_fields_in_row(self, row):
213 """
214 Given a row as returned by a SELECT query, compare it to our existing in
215 memory fields. Fractional seconds are stripped from datetime values
216 before comparison.
217
218 @param row - A sequence of values corresponding to fields named in
219 The class attribute _fields.
220
221 @returns A dictionary listing the differences keyed by field name
222 containing tuples of (current_value, row_value).
223 """
224 self._assert_row_length(row)
225 differences = {}
226 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
227 for field, row_value in itertools.izip(self._fields, row):
228 current_value = getattr(self, field)
229 if (isinstance(current_value, datetime.datetime)
230 and isinstance(row_value, datetime.datetime)):
231 current_value = current_value.strftime(datetime_cmp_fmt)
232 row_value = row_value.strftime(datetime_cmp_fmt)
233 if current_value != row_value:
234 differences[field] = (current_value, row_value)
235 return differences
236
237
238 def _update_fields_from_row(self, row):
239 """
240 Update our field attributes using a single row returned by SELECT.
241
242 @param row - A sequence of values corresponding to fields named in
243 the class fields list.
244 """
245 self._assert_row_length(row)
246
247 self._valid_fields = set()
248 for field, value in itertools.izip(self._fields, row):
249 setattr(self, field, value)
250 self._valid_fields.add(field)
251
252 self._valid_fields.remove('id')
253
254
255 def update_from_database(self):
256 assert self.id is not None
257 row = self._fetch_row_from_db(self.id)
258 self._update_fields_from_row(row)
259
260
261 def count(self, where, table = None):
262 if not table:
263 table = self.__table
264
265 rows = _db.execute("""
266 SELECT count(*) FROM %s
267 WHERE %s
268 """ % (table, where))
269
270 assert len(rows) == 1
271
272 return int(rows[0][0])
273
274
275 def update_field(self, field, value):
276 assert field in self._valid_fields
277
278 if getattr(self, field) == value:
279 return
280
281 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
282 _db.execute(query, (value, self.id))
283
284 setattr(self, field, value)
285
286
287 def save(self):
288 if self.__new_record:
289 keys = self._fields[1:] # avoid id
290 columns = ','.join([str(key) for key in keys])
291 values = []
292 for key in keys:
293 value = getattr(self, key)
294 if value is None:
295 values.append('NULL')
296 else:
297 values.append('"%s"' % value)
298 values_str = ','.join(values)
299 query = ('INSERT INTO %s (%s) VALUES (%s)' %
300 (self.__table, columns, values_str))
301 _db.execute(query)
302 # Update our id to the one the database just assigned to us.
303 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
304
305
306 def delete(self):
307 self._instances_by_type_and_id.pop((type(self), id), None)
308 self._initialized = False
309 self._valid_fields.clear()
310 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
311 _db.execute(query, (self.id,))
312
313
314 @staticmethod
315 def _prefix_with(string, prefix):
316 if string:
317 string = prefix + string
318 return string
319
320
321 @classmethod
322 def fetch(cls, where='', params=(), joins='', order_by=''):
323 """
324 Construct instances of our class based on the given database query.
325
326 @yields One class instance for each row fetched.
327 """
328 order_by = cls._prefix_with(order_by, 'ORDER BY ')
329 where = cls._prefix_with(where, 'WHERE ')
330 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
331 '%(where)s %(order_by)s' % {'table' : cls._table_name,
332 'joins' : joins,
333 'where' : where,
334 'order_by' : order_by})
335 rows = _db.execute(query, params)
336 return [cls(id=row[0], row=row) for row in rows]
337
338
339class IneligibleHostQueue(DBObject):
340 _table_name = 'afe_ineligible_host_queues'
341 _fields = ('id', 'job_id', 'host_id')
342
343
344class AtomicGroup(DBObject):
345 _table_name = 'afe_atomic_groups'
346 _fields = ('id', 'name', 'description', 'max_number_of_machines',
347 'invalid')
348
349
350class Label(DBObject):
351 _table_name = 'afe_labels'
352 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
353 'only_if_needed', 'atomic_group_id')
354
355
356 def __repr__(self):
357 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
358 self.name, self.id, self.atomic_group_id)
359
360
361class Host(DBObject):
362 _table_name = 'afe_hosts'
363 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
364 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700365 _timer = stats.Timer("scheduler_models.Host")
jamesrenb55378a2010-03-02 22:19:49 +0000366
367
Fang Deng1d6c2a02013-04-17 15:25:45 -0700368 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000369 def set_status(self,status):
370 logging.info('%s -> %s', self.hostname, status)
371 self.update_field('status',status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700372 # Noticed some time jumps after the last log message.
373 logging.debug('Host Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000374
375
376 def platform_and_labels(self):
377 """
378 Returns a tuple (platform_name, list_of_all_label_names).
379 """
380 rows = _db.execute("""
381 SELECT afe_labels.name, afe_labels.platform
382 FROM afe_labels
383 INNER JOIN afe_hosts_labels ON
384 afe_labels.id = afe_hosts_labels.label_id
385 WHERE afe_hosts_labels.host_id = %s
386 ORDER BY afe_labels.name
387 """, (self.id,))
388 platform = None
389 all_labels = []
390 for label_name, is_platform in rows:
391 if is_platform:
392 platform = label_name
393 all_labels.append(label_name)
394 return platform, all_labels
395
396
397 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
398
399
400 @classmethod
401 def cmp_for_sort(cls, a, b):
402 """
403 A comparison function for sorting Host objects by hostname.
404
405 This strips any trailing numeric digits, ignores leading 0s and
406 compares hostnames by the leading name and the trailing digits as a
407 number. If both hostnames do not match this pattern, they are simply
408 compared as lower case strings.
409
410 Example of how hostnames will be sorted:
411
412 alice, host1, host2, host09, host010, host10, host11, yolkfolk
413
414 This hopefully satisfy most people's hostname sorting needs regardless
415 of their exact naming schemes. Nobody sane should have both a host10
416 and host010 (but the algorithm works regardless).
417 """
418 lower_a = a.hostname.lower()
419 lower_b = b.hostname.lower()
420 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
421 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
422 if match_a and match_b:
423 name_a, number_a_str = match_a.groups()
424 name_b, number_b_str = match_b.groups()
425 number_a = int(number_a_str.lstrip('0'))
426 number_b = int(number_b_str.lstrip('0'))
427 result = cmp((name_a, number_a), (name_b, number_b))
428 if result == 0 and lower_a != lower_b:
429 # If they compared equal above but the lower case names are
430 # indeed different, don't report equality. abc012 != abc12.
431 return cmp(lower_a, lower_b)
432 return result
433 else:
434 return cmp(lower_a, lower_b)
435
436
437class HostQueueEntry(DBObject):
438 _table_name = 'afe_host_queue_entries'
439 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
440 'active', 'complete', 'deleted', 'execution_subdir',
441 'atomic_group_id', 'aborted', 'started_on')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700442 _timer = stats.Timer('scheduler_models.HostQueueEntry')
jamesrenb55378a2010-03-02 22:19:49 +0000443
444
445 def __init__(self, id=None, row=None, **kwargs):
446 assert id or row
447 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
448 self.job = Job(self.job_id)
449
450 if self.host_id:
451 self.host = Host(self.host_id)
452 else:
453 self.host = None
454
455 if self.atomic_group_id:
456 self.atomic_group = AtomicGroup(self.atomic_group_id,
457 always_query=False)
458 else:
459 self.atomic_group = None
460
jamesrenb55378a2010-03-02 22:19:49 +0000461
462 @classmethod
463 def clone(cls, template):
464 """
465 Creates a new row using the values from a template instance.
466
467 The new instance will not exist in the database or have a valid
468 id attribute until its save() method is called.
469 """
470 assert isinstance(template, cls)
471 new_row = [getattr(template, field) for field in cls._fields]
472 clone = cls(row=new_row, new_record=True)
473 clone.id = None
474 return clone
475
476
477 def _view_job_url(self):
478 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
479
480
481 def get_labels(self):
482 """
483 Get all labels associated with this host queue entry (either via the
484 meta_host or as a job dependency label). The labels yielded are not
485 guaranteed to be unique.
486
487 @yields Label instances associated with this host_queue_entry.
488 """
489 if self.meta_host:
490 yield Label(id=self.meta_host, always_query=False)
491 labels = Label.fetch(
492 joins="JOIN afe_jobs_dependency_labels AS deps "
493 "ON (afe_labels.id = deps.label_id)",
494 where="deps.job_id = %d" % self.job.id)
495 for label in labels:
496 yield label
497
498
499 def set_host(self, host):
500 if host:
501 logging.info('Assigning host %s to entry %s', host.hostname, self)
jamesrenb55378a2010-03-02 22:19:49 +0000502 self.update_field('host_id', host.id)
503 self.block_host(host.id)
504 else:
505 logging.info('Releasing host from %s', self)
jamesrenb55378a2010-03-02 22:19:49 +0000506 self.unblock_host(self.host.id)
507 self.update_field('host_id', None)
508
509 self.host = host
510
511
jamesrenb55378a2010-03-02 22:19:49 +0000512 def block_host(self, host_id):
513 logging.info("creating block %s/%s", self.job.id, host_id)
514 row = [0, self.job.id, host_id]
515 block = IneligibleHostQueue(row=row, new_record=True)
516 block.save()
517
518
519 def unblock_host(self, host_id):
520 logging.info("removing block %s/%s", self.job.id, host_id)
521 blocks = IneligibleHostQueue.fetch(
522 'job_id=%d and host_id=%d' % (self.job.id, host_id))
523 for block in blocks:
524 block.delete()
525
526
527 def set_execution_subdir(self, subdir=None):
528 if subdir is None:
529 assert self.host
530 subdir = self.host.hostname
531 self.update_field('execution_subdir', subdir)
532
533
534 def _get_hostname(self):
535 if self.host:
536 return self.host.hostname
537 return 'no host'
538
539
540 def __str__(self):
541 flags = []
542 if self.active:
543 flags.append('active')
544 if self.complete:
545 flags.append('complete')
546 if self.deleted:
547 flags.append('deleted')
548 if self.aborted:
549 flags.append('aborted')
550 flags_str = ','.join(flags)
551 if flags_str:
552 flags_str = ' [%s]' % flags_str
553 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
554 self.status, flags_str)
555
556
Fang Deng1d6c2a02013-04-17 15:25:45 -0700557 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000558 def set_status(self, status):
559 logging.info("%s -> %s", self, status)
560
561 self.update_field('status', status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700562 # Noticed some time jumps after last logging message.
563 logging.debug('Update Field Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000564
565 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
566 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
567 assert not (active and complete)
568
569 self.update_field('active', active)
570 self.update_field('complete', complete)
571
572 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000573 self._on_complete(status)
Dale Curtis74a314b2011-06-23 14:55:46 -0700574 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000575
576 should_email_status = (status.lower() in _notify_email_statuses or
577 'all' in _notify_email_statuses)
578 if should_email_status:
579 self._email_on_status(status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700580 logging.debug('HQE Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000581
582
jamesrene7c65cb2010-06-08 20:38:10 +0000583 def _on_complete(self, status):
584 if status is not models.HostQueueEntry.Status.ABORTED:
585 self.job.stop_if_necessary()
586
jamesrenb55378a2010-03-02 22:19:49 +0000587 if not self.execution_subdir:
588 return
589 # unregister any possible pidfiles associated with this queue entry
590 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
591 pidfile_id = _drone_manager.get_pidfile_id_from(
592 self.execution_path(), pidfile_name=pidfile_name)
593 _drone_manager.unregister_pidfile(pidfile_id)
594
595
Eric Li6f27d4f2010-09-29 10:55:17 -0700596 def _get_status_email_contents(self, status, summary=None, hostname=None):
597 """
598 Gather info for the status notification e-mails.
599
600 If needed, we could start using the Django templating engine to create
601 the subject and the e-mail body, but that doesn't seem necessary right
602 now.
603
604 @param status: Job status text. Mandatory.
605 @param summary: Job summary text. Optional.
606 @param hostname: A hostname for the job. Optional.
607
608 @return: Tuple (subject, body) for the notification e-mail.
609 """
610 job_stats = Job(id=self.job.id).get_execution_details()
611
612 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
613 (self.job.id, self.job.name, status))
614
615 if hostname is not None:
616 subject += '| Hostname: %s ' % hostname
617
618 if status not in ["1 Failed", "Failed"]:
619 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
620
621 body = "Job ID: %s\n" % self.job.id
622 body += "Job name: %s\n" % self.job.name
623 if hostname is not None:
624 body += "Host: %s\n" % hostname
625 if summary is not None:
626 body += "Summary: %s\n" % summary
627 body += "Status: %s\n" % status
628 body += "Results interface URL: %s\n" % self._view_job_url()
629 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
630 if int(job_stats['total_executed']) > 0:
631 body += "User tests executed: %s\n" % job_stats['total_executed']
632 body += "User tests passed: %s\n" % job_stats['total_passed']
633 body += "User tests failed: %s\n" % job_stats['total_failed']
634 body += ("User tests success rate: %.2f %%\n" %
635 job_stats['success_rate'])
636
637 if job_stats['failed_rows']:
638 body += "Failures:\n"
639 body += job_stats['failed_rows']
640
641 return subject, body
642
643
jamesrenb55378a2010-03-02 22:19:49 +0000644 def _email_on_status(self, status):
645 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700646 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000647 email_manager.manager.send_email(self.job.email_list, subject, body)
648
649
650 def _email_on_job_complete(self):
651 if not self.job.is_finished():
652 return
653
Eric Li6f27d4f2010-09-29 10:55:17 -0700654 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000655 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
656 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700657 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000658 (queue_entry._get_hostname(),
659 queue_entry.status))
660
Eric Li6f27d4f2010-09-29 10:55:17 -0700661 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000662 status_counts = models.Job.objects.get_status_counts(
663 [self.job.id])[self.job.id]
664 status = ', '.join('%d %s' % (count, status) for status, count
665 in status_counts.iteritems())
666
Eric Li6f27d4f2010-09-29 10:55:17 -0700667 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000668 email_manager.manager.send_email(self.job.email_list, subject, body)
669
670
671 def schedule_pre_job_tasks(self):
672 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
673 self.job.name, self.meta_host, self.atomic_group_id,
674 self.job.id, self.id, self.host.hostname, self.status)
675
676 self._do_schedule_pre_job_tasks()
677
678
679 def _do_schedule_pre_job_tasks(self):
680 # Every host goes thru the Verifying stage (which may or may not
681 # actually do anything as determined by get_pre_job_tasks).
682 self.set_status(models.HostQueueEntry.Status.VERIFYING)
683 self.job.schedule_pre_job_tasks(queue_entry=self)
684
685
686 def requeue(self):
687 assert self.host
688 self.set_status(models.HostQueueEntry.Status.QUEUED)
689 self.update_field('started_on', None)
690 # verify/cleanup failure sets the execution subdir, so reset it here
691 self.set_execution_subdir('')
692 if self.meta_host:
693 self.set_host(None)
694
695
696 @property
697 def aborted_by(self):
698 self._load_abort_info()
699 return self._aborted_by
700
701
702 @property
703 def aborted_on(self):
704 self._load_abort_info()
705 return self._aborted_on
706
707
708 def _load_abort_info(self):
709 """ Fetch info about who aborted the job. """
710 if hasattr(self, "_aborted_by"):
711 return
712 rows = _db.execute("""
713 SELECT afe_users.login,
714 afe_aborted_host_queue_entries.aborted_on
715 FROM afe_aborted_host_queue_entries
716 INNER JOIN afe_users
717 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
718 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
719 """, (self.id,))
720 if rows:
721 self._aborted_by, self._aborted_on = rows[0]
722 else:
723 self._aborted_by = self._aborted_on = None
724
725
726 def on_pending(self):
727 """
728 Called when an entry in a synchronous job has passed verify. If the
729 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
730 them in PENDING.
731 """
732 self.set_status(models.HostQueueEntry.Status.PENDING)
733 self.host.set_status(models.Host.Status.PENDING)
734
735 # Some debug code here: sends an email if an asynchronous job does not
736 # immediately enter Starting.
737 # TODO: Remove this once we figure out why asynchronous jobs are getting
738 # stuck in Pending.
739 self.job.run_if_ready(queue_entry=self)
740 if (self.job.synch_count == 1 and
741 self.status == models.HostQueueEntry.Status.PENDING):
742 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
743 message = 'Asynchronous job stuck in Pending'
744 email_manager.manager.enqueue_notify_email(subject, message)
745
746
747 def abort(self, dispatcher):
748 assert self.aborted and not self.complete
749
750 Status = models.HostQueueEntry.Status
751 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
752 # do nothing; post-job tasks will finish and then mark this entry
753 # with status "Aborted" and take care of the host
754 return
755
jamesren3bc70a12010-04-12 18:23:38 +0000756 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
757 Status.WAITING):
jamesrenb55378a2010-03-02 22:19:49 +0000758 assert not dispatcher.get_agents_for_entry(self)
759 self.host.set_status(models.Host.Status.READY)
760 elif self.status == Status.VERIFYING:
761 models.SpecialTask.objects.create(
762 task=models.SpecialTask.Task.CLEANUP,
763 host=models.Host.objects.get(id=self.host.id),
764 requested_by=self.job.owner_model())
765
766 self.set_status(Status.ABORTED)
767 self.job.abort_delay_ready_task()
768
769
770 def get_group_name(self):
771 atomic_group = self.atomic_group
772 if not atomic_group:
773 return ''
774
775 # Look at any meta_host and dependency labels and pick the first
776 # one that also specifies this atomic group. Use that label name
777 # as the group name if possible (it is more specific).
778 for label in self.get_labels():
779 if label.atomic_group_id:
780 assert label.atomic_group_id == atomic_group.id
781 return label.name
782 return atomic_group.name
783
784
785 def execution_tag(self):
Scott Zawalskid712cf32012-07-14 16:24:53 -0400786 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
787 'complete!=1 AND execution_subdir="" AND '
788 'status!="Queued";')
789 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
790 'status="Aborted" WHERE id=%s;')
791 try:
792 assert self.execution_subdir
793 except AssertionError:
794 # TODO(scottz): Remove temporary fix/info gathering pathway for
795 # crosbug.com/31595 once issue is root caused.
796 logging.error('No execution_subdir for host queue id:%s.', self.id)
797 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
798 for row in _db.execute(SQL_SUSPECT_ENTRIES):
799 logging.error(row)
800 logging.error('====DB DEBUG====\n')
801 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
802 logging.error('EXECUTING: %s', fix_query)
803 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
804 raise AssertionError(('self.execution_subdir not found. '
805 'See log for details.'))
806
jamesrenb55378a2010-03-02 22:19:49 +0000807 return "%s/%s" % (self.job.tag(), self.execution_subdir)
808
809
810 def execution_path(self):
811 return self.execution_tag()
812
813
814 def set_started_on_now(self):
815 self.update_field('started_on', datetime.datetime.now())
816
817
818 def is_hostless(self):
819 return (self.host_id is None
820 and self.meta_host is None
821 and self.atomic_group_id is None)
822
823
824class Job(DBObject):
825 _table_name = 'afe_jobs'
826 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
827 'control_type', 'created_on', 'synch_count', 'timeout',
828 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
Simran Basi9f508a12012-11-09 12:20:44 -0800829 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800830 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
831 'test_retry')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700832 _timer = stats.Timer("scheduler_models.Job")
jamesrenb55378a2010-03-02 22:19:49 +0000833
834 # This does not need to be a column in the DB. The delays are likely to
835 # be configured short. If the scheduler is stopped and restarted in
836 # the middle of a job's delay cycle, the delay cycle will either be
837 # repeated or skipped depending on the number of Pending machines found
838 # when the restarted scheduler recovers to track it. Not a problem.
839 #
840 # A reference to the DelayedCallTask that will wake up the job should
841 # no other HQEs change state in time. Its end_time attribute is used
842 # by our run_with_ready_delay() method to determine if the wait is over.
843 _delay_ready_task = None
844
845 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
846 # all status='Pending' atomic group HQEs incase a delay was running when the
847 # scheduler was restarted and no more hosts ever successfully exit Verify.
848
849 def __init__(self, id=None, row=None, **kwargs):
850 assert id or row
851 super(Job, self).__init__(id=id, row=row, **kwargs)
852 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800853 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000854
855
856 def model(self):
857 return models.Job.objects.get(id=self.id)
858
859
860 def owner_model(self):
861 # work around the fact that the Job owner field is a string, not a
862 # foreign key
863 if not self._owner_model:
864 self._owner_model = models.User.objects.get(login=self.owner)
865 return self._owner_model
866
867
868 def is_server_job(self):
Aviv Keshet888a94d2013-05-14 17:52:33 -0700869 return self.control_type != control_data.CONTROL_TYPE.SERVER
jamesrenb55378a2010-03-02 22:19:49 +0000870
871
872 def tag(self):
873 return "%s-%s" % (self.id, self.owner)
874
875
876 def get_host_queue_entries(self):
877 rows = _db.execute("""
878 SELECT * FROM afe_host_queue_entries
879 WHERE job_id= %s
880 """, (self.id,))
881 entries = [HostQueueEntry(row=i) for i in rows]
882
883 assert len(entries)>0
884
885 return entries
886
887
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800888 def is_image_update_job(self):
889 """
890 Discover if the current job requires an OS update.
891
892 @return: True/False if OS should be updated before job is run.
893 """
894 # All image update jobs have the parameterized_job_id set.
895 if not self.parameterized_job_id:
896 return False
897
898 # Retrieve the ID of the ParameterizedJob this job is an instance of.
899 rows = _db.execute("""
900 SELECT test_id
901 FROM afe_parameterized_jobs
902 WHERE id = %s
903 """, (self.parameterized_job_id,))
904 if not rows:
905 return False
906 test_id = rows[0][0]
907
908 # Retrieve the ID of the known autoupdate_ParameterizedJob.
909 rows = _db.execute("""
910 SELECT id
911 FROM afe_autotests
912 WHERE name = 'autoupdate_ParameterizedJob'
913 """)
914 if not rows:
915 return False
916 update_id = rows[0][0]
917
918 # If the IDs are the same we've found an image update job.
919 if test_id == update_id:
920 # Finally, get the path to the OS image to install.
921 rows = _db.execute("""
922 SELECT parameter_value
923 FROM afe_parameterized_job_parameters
924 WHERE parameterized_job_id = %s
925 """, (self.parameterized_job_id,))
926 if rows:
927 # Save the path in update_image_path to use later as a command
928 # line parameter to autoserv.
929 self.update_image_path = rows[0][0]
930 return True
931
932 return False
933
934
Eric Li6f27d4f2010-09-29 10:55:17 -0700935 def get_execution_details(self):
936 """
937 Get test execution details for this job.
938
939 @return: Dictionary with test execution details
940 """
941 def _find_test_jobs(rows):
942 """
943 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
944 Those are autotest 'internal job' tests, so they should not be
945 counted when evaluating the test stats.
946
947 @param rows: List of rows (matrix) with database results.
948 """
949 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
950 n_test_jobs = 0
951 for r in rows:
952 test_name = r[0]
953 if job_test_pattern.match(test_name):
954 n_test_jobs += 1
955
956 return n_test_jobs
957
958 stats = {}
959
960 rows = _db.execute("""
961 SELECT t.test, s.word, t.reason
962 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
963 WHERE t.job_idx = j.job_idx
964 AND s.status_idx = t.status
965 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -0800966 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -0700967 """ % self.id)
968
Dale Curtis74a314b2011-06-23 14:55:46 -0700969 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -0700970
971 n_test_jobs = _find_test_jobs(rows)
972 n_test_jobs_failed = _find_test_jobs(failed_rows)
973
974 total_executed = len(rows) - n_test_jobs
975 total_failed = len(failed_rows) - n_test_jobs_failed
976
977 if total_executed > 0:
978 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
979 else:
980 success_rate = 0
981
982 stats['total_executed'] = total_executed
983 stats['total_failed'] = total_failed
984 stats['total_passed'] = total_executed - total_failed
985 stats['success_rate'] = success_rate
986
987 status_header = ("Test Name", "Status", "Reason")
988 if failed_rows:
989 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
990 status_header)
991 else:
992 stats['failed_rows'] = ''
993
994 time_row = _db.execute("""
995 SELECT started_time, finished_time
996 FROM tko_jobs
997 WHERE afe_job_id = %s
998 """ % self.id)
999
1000 if time_row:
1001 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -08001002 try:
1003 delta = t_end - t_begin
1004 minutes, seconds = divmod(delta.seconds, 60)
1005 hours, minutes = divmod(minutes, 60)
1006 stats['execution_time'] = ("%02d:%02d:%02d" %
1007 (hours, minutes, seconds))
1008 # One of t_end or t_begin are None
1009 except TypeError:
1010 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -07001011 else:
1012 stats['execution_time'] = '(none)'
1013
1014 return stats
1015
1016
Fang Deng1d6c2a02013-04-17 15:25:45 -07001017 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +00001018 def set_status(self, status, update_queues=False):
1019 self.update_field('status',status)
1020
1021 if update_queues:
1022 for queue_entry in self.get_host_queue_entries():
1023 queue_entry.set_status(status)
1024
1025
1026 def keyval_dict(self):
1027 return self.model().keyval_dict()
1028
1029
1030 def _atomic_and_has_started(self):
1031 """
1032 @returns True if any of the HostQueueEntries associated with this job
1033 have entered the Status.STARTING state or beyond.
1034 """
1035 atomic_entries = models.HostQueueEntry.objects.filter(
1036 job=self.id, atomic_group__isnull=False)
1037 if atomic_entries.count() <= 0:
1038 return False
1039
1040 # These states may *only* be reached if Job.run() has been called.
1041 started_statuses = (models.HostQueueEntry.Status.STARTING,
1042 models.HostQueueEntry.Status.RUNNING,
1043 models.HostQueueEntry.Status.COMPLETED)
1044
1045 started_entries = atomic_entries.filter(status__in=started_statuses)
1046 return started_entries.count() > 0
1047
1048
1049 def _hosts_assigned_count(self):
1050 """The number of HostQueueEntries assigned a Host for this job."""
1051 entries = models.HostQueueEntry.objects.filter(job=self.id,
1052 host__isnull=False)
1053 return entries.count()
1054
1055
1056 def _pending_count(self):
1057 """The number of HostQueueEntries for this job in the Pending state."""
1058 pending_entries = models.HostQueueEntry.objects.filter(
1059 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1060 return pending_entries.count()
1061
1062
1063 def _max_hosts_needed_to_run(self, atomic_group):
1064 """
1065 @param atomic_group: The AtomicGroup associated with this job that we
1066 are using to set an upper bound on the threshold.
1067 @returns The maximum number of HostQueueEntries assigned a Host before
1068 this job can run.
1069 """
1070 return min(self._hosts_assigned_count(),
1071 atomic_group.max_number_of_machines)
1072
1073
1074 def _min_hosts_needed_to_run(self):
1075 """Return the minumum number of hsots needed to run this job."""
1076 return self.synch_count
1077
1078
1079 def is_ready(self):
1080 # NOTE: Atomic group jobs stop reporting ready after they have been
1081 # started to avoid launching multiple copies of one atomic job.
1082 # Only possible if synch_count is less than than half the number of
1083 # machines in the atomic group.
1084 pending_count = self._pending_count()
1085 atomic_and_has_started = self._atomic_and_has_started()
1086 ready = (pending_count >= self.synch_count
1087 and not atomic_and_has_started)
1088
1089 if not ready:
1090 logging.info(
1091 'Job %s not ready: %s pending, %s required '
1092 '(Atomic and started: %s)',
1093 self, pending_count, self.synch_count,
1094 atomic_and_has_started)
1095
1096 return ready
1097
1098
1099 def num_machines(self, clause = None):
1100 sql = "job_id=%s" % self.id
1101 if clause:
1102 sql += " AND (%s)" % clause
1103 return self.count(sql, table='afe_host_queue_entries')
1104
1105
1106 def num_queued(self):
1107 return self.num_machines('not complete')
1108
1109
1110 def num_active(self):
1111 return self.num_machines('active')
1112
1113
1114 def num_complete(self):
1115 return self.num_machines('complete')
1116
1117
1118 def is_finished(self):
1119 return self.num_complete() == self.num_machines()
1120
1121
1122 def _not_yet_run_entries(self, include_verifying=True):
1123 statuses = [models.HostQueueEntry.Status.QUEUED,
1124 models.HostQueueEntry.Status.PENDING]
1125 if include_verifying:
1126 statuses.append(models.HostQueueEntry.Status.VERIFYING)
1127 return models.HostQueueEntry.objects.filter(job=self.id,
1128 status__in=statuses)
1129
1130
1131 def _stop_all_entries(self):
1132 entries_to_stop = self._not_yet_run_entries(
1133 include_verifying=False)
1134 for child_entry in entries_to_stop:
1135 assert not child_entry.complete, (
1136 '%s status=%s, active=%s, complete=%s' %
1137 (child_entry.id, child_entry.status, child_entry.active,
1138 child_entry.complete))
1139 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1140 child_entry.host.status = models.Host.Status.READY
1141 child_entry.host.save()
1142 child_entry.status = models.HostQueueEntry.Status.STOPPED
1143 child_entry.save()
1144
1145
1146 def stop_if_necessary(self):
1147 not_yet_run = self._not_yet_run_entries()
1148 if not_yet_run.count() < self.synch_count:
1149 self._stop_all_entries()
1150
1151
jamesrenb55378a2010-03-02 22:19:49 +00001152 def _next_group_name(self, group_name=''):
1153 """@returns a directory name to use for the next host group results."""
1154 if group_name:
1155 # Sanitize for use as a pathname.
1156 group_name = group_name.replace(os.path.sep, '_')
1157 if group_name.startswith('.'):
1158 group_name = '_' + group_name[1:]
1159 # Add a separator between the group name and 'group%d'.
1160 group_name += '.'
1161 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1162 query = models.HostQueueEntry.objects.filter(
1163 job=self.id).values('execution_subdir').distinct()
1164 subdirs = (entry['execution_subdir'] for entry in query)
1165 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1166 ids = [int(match.group(1)) for match in group_matches if match]
1167 if ids:
1168 next_id = max(ids) + 1
1169 else:
1170 next_id = 0
1171 return '%sgroup%d' % (group_name, next_id)
1172
1173
1174 def get_group_entries(self, queue_entry_from_group):
1175 """
1176 @param queue_entry_from_group: A HostQueueEntry instance to find other
1177 group entries on this job for.
1178
1179 @returns A list of HostQueueEntry objects all executing this job as
1180 part of the same group as the one supplied (having the same
1181 execution_subdir).
1182 """
1183 execution_subdir = queue_entry_from_group.execution_subdir
1184 return list(HostQueueEntry.fetch(
1185 where='job_id=%s AND execution_subdir=%s',
1186 params=(self.id, execution_subdir)))
1187
1188
1189 def _should_run_cleanup(self, queue_entry):
1190 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1191 return True
1192 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1193 return queue_entry.host.dirty
1194 return False
1195
1196
1197 def _should_run_verify(self, queue_entry):
1198 do_not_verify = (queue_entry.host.protection ==
1199 host_protections.Protection.DO_NOT_VERIFY)
1200 if do_not_verify:
1201 return False
1202 return self.run_verify
1203
1204
1205 def schedule_pre_job_tasks(self, queue_entry):
1206 """
1207 Get a list of tasks to perform before the host_queue_entry
1208 may be used to run this Job (such as Cleanup & Verify).
1209
1210 @returns A list of tasks to be done to the given queue_entry before
1211 it should be considered be ready to run this job. The last
1212 task in the list calls HostQueueEntry.on_pending(), which
1213 continues the flow of the job.
1214 """
1215 if self._should_run_cleanup(queue_entry):
1216 task = models.SpecialTask.Task.CLEANUP
1217 elif self._should_run_verify(queue_entry):
1218 task = models.SpecialTask.Task.VERIFY
1219 else:
1220 queue_entry.on_pending()
1221 return
1222
1223 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
1224 models.SpecialTask.objects.create(
1225 host=models.Host.objects.get(id=queue_entry.host_id),
1226 queue_entry=queue_entry, task=task)
1227
1228
1229 def _assign_new_group(self, queue_entries, group_name=''):
1230 if len(queue_entries) == 1:
1231 group_subdir_name = queue_entries[0].host.hostname
1232 else:
1233 group_subdir_name = self._next_group_name(group_name)
1234 logging.info('Running synchronous job %d hosts %s as %s',
1235 self.id, [entry.host.hostname for entry in queue_entries],
1236 group_subdir_name)
1237
1238 for queue_entry in queue_entries:
1239 queue_entry.set_execution_subdir(group_subdir_name)
1240
1241
1242 def _choose_group_to_run(self, include_queue_entry):
1243 """
1244 @returns A tuple containing a list of HostQueueEntry instances to be
1245 used to run this Job, a string group name to suggest giving
1246 to this job in the results database.
1247 """
1248 atomic_group = include_queue_entry.atomic_group
1249 chosen_entries = [include_queue_entry]
1250 if atomic_group:
1251 num_entries_wanted = atomic_group.max_number_of_machines
1252 else:
1253 num_entries_wanted = self.synch_count
1254 num_entries_wanted -= len(chosen_entries)
1255
1256 if num_entries_wanted > 0:
1257 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1258 pending_entries = list(HostQueueEntry.fetch(
1259 where=where_clause,
1260 params=(self.id, include_queue_entry.id)))
1261
1262 # Sort the chosen hosts by hostname before slicing.
1263 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1264 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1265 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1266 chosen_entries += pending_entries[:num_entries_wanted]
1267
1268 # Sanity check. We'll only ever be called if this can be met.
1269 if len(chosen_entries) < self.synch_count:
1270 message = ('job %s got less than %s chosen entries: %s' % (
1271 self.id, self.synch_count, chosen_entries))
1272 logging.error(message)
1273 email_manager.manager.enqueue_notify_email(
1274 'Job not started, too few chosen entries', message)
1275 return []
1276
1277 group_name = include_queue_entry.get_group_name()
1278
1279 self._assign_new_group(chosen_entries, group_name=group_name)
1280 return chosen_entries
1281
1282
1283 def run_if_ready(self, queue_entry):
1284 """
1285 Run this job by kicking its HQEs into status='Starting' if enough
1286 hosts are ready for it to run.
1287
1288 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1289 ready to run.
1290 """
1291 if not self.is_ready():
1292 self.stop_if_necessary()
1293 elif queue_entry.atomic_group:
1294 self.run_with_ready_delay(queue_entry)
1295 else:
1296 self.run(queue_entry)
1297
1298
1299 def run_with_ready_delay(self, queue_entry):
1300 """
1301 Start a delay to wait for more hosts to enter Pending state before
1302 launching an atomic group job. Once set, the a delay cannot be reset.
1303
1304 @param queue_entry: The HostQueueEntry object to get atomic group
1305 info from and pass to run_if_ready when the delay is up.
1306
1307 @returns An Agent to run the job as appropriate or None if a delay
1308 has already been set.
1309 """
1310 assert queue_entry.job_id == self.id
1311 assert queue_entry.atomic_group
1312 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1313 over_max_threshold = (self._pending_count() >=
1314 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1315 delay_expired = (self._delay_ready_task and
1316 time.time() >= self._delay_ready_task.end_time)
1317
1318 # Delay is disabled or we already have enough? Do not wait to run.
1319 if not delay or over_max_threshold or delay_expired:
1320 self.run(queue_entry)
1321 else:
1322 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1323
1324
1325 def request_abort(self):
1326 """Request that this Job be aborted on the next scheduler cycle."""
1327 self.model().abort()
1328
1329
1330 def schedule_delayed_callback_task(self, queue_entry):
1331 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1332
1333 if self._delay_ready_task:
1334 return None
1335
1336 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1337
1338 def run_job_after_delay():
1339 logging.info('Job %s done waiting for extra hosts.', self)
1340 # Check to see if the job is still relevant. It could have aborted
1341 # while we were waiting or hosts could have disappearred, etc.
1342 if self._pending_count() < self._min_hosts_needed_to_run():
1343 logging.info('Job %s had too few Pending hosts after waiting '
1344 'for extras. Not running.', self)
1345 self.request_abort()
1346 return
1347 return self.run(queue_entry)
1348
1349 logging.info('Job %s waiting up to %s seconds for more hosts.',
1350 self.id, delay)
1351 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1352 callback=run_job_after_delay)
1353 return self._delay_ready_task
1354
1355
1356 def run(self, queue_entry):
1357 """
1358 @param queue_entry: The HostQueueEntry instance calling this method.
1359 """
1360 if queue_entry.atomic_group and self._atomic_and_has_started():
1361 logging.error('Job.run() called on running atomic Job %d '
1362 'with HQE %s.', self.id, queue_entry)
1363 return
1364 queue_entries = self._choose_group_to_run(queue_entry)
1365 if queue_entries:
1366 self._finish_run(queue_entries)
1367
1368
1369 def _finish_run(self, queue_entries):
1370 for queue_entry in queue_entries:
1371 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1372 self.abort_delay_ready_task()
1373
1374
1375 def abort_delay_ready_task(self):
1376 """Abort the delayed task associated with this job, if any."""
1377 if self._delay_ready_task:
1378 # Cancel any pending callback that would try to run again
1379 # as we are already running.
1380 self._delay_ready_task.abort()
1381
1382
1383 def __str__(self):
1384 return '%s-%s' % (self.id, self.owner)