blob: d06107eb682b513a0ca7ede18db46625b987a20d [file] [log] [blame]
Aviv Keshet0b9cfc92013-02-05 11:36:02 -08001# pylint: disable-msg=C0111
2
jamesrenb55378a2010-03-02 22:19:49 +00003"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out. Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
14 one of these statuses, an email will be sent to the job's email_list.
15 comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import datetime, itertools, logging, os, re, sys, time, weakref
jamesrenb55378a2010-03-02 22:19:49 +000022from autotest_lib.client.common_lib import global_config, host_protections
Eric Li6f27d4f2010-09-29 10:55:17 -070023from autotest_lib.client.common_lib import global_config, utils
jamesrenb55378a2010-03-02 22:19:49 +000024from autotest_lib.frontend.afe import models, model_attributes
25from autotest_lib.database import database_connection
26from autotest_lib.scheduler import drone_manager, email_manager
27from autotest_lib.scheduler import scheduler_config
Fang Deng1d6c2a02013-04-17 15:25:45 -070028from autotest_lib.site_utils.graphite import stats
jamesrenb55378a2010-03-02 22:19:49 +000029
30_notify_email_statuses = []
31_base_url = None
32
33_db = None
34_drone_manager = None
35
36def initialize():
37 global _db
38 _db = database_connection.DatabaseConnection('AUTOTEST_WEB')
39 _db.connect(db_type='django')
40
41 notify_statuses_list = global_config.global_config.get_config_value(
42 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
43 default='')
44 global _notify_email_statuses
45 _notify_email_statuses = [status for status in
46 re.split(r'[\s,;:]', notify_statuses_list.lower())
47 if status]
48
49 # AUTOTEST_WEB.base_url is still a supported config option as some people
50 # may wish to override the entire url.
51 global _base_url
52 config_base_url = global_config.global_config.get_config_value(
53 scheduler_config.CONFIG_SECTION, 'base_url', default='')
54 if config_base_url:
55 _base_url = config_base_url
56 else:
57 # For the common case of everything running on a single server you
58 # can just set the hostname in a single place in the config file.
59 server_name = global_config.global_config.get_config_value(
60 'SERVER', 'hostname')
61 if not server_name:
62 logging.critical('[SERVER] hostname missing from the config file.')
63 sys.exit(1)
64 _base_url = 'http://%s/afe/' % server_name
65
66 initialize_globals()
67
68
69def initialize_globals():
70 global _drone_manager
71 _drone_manager = drone_manager.instance()
72
73
74class DelayedCallTask(object):
75 """
76 A task object like AgentTask for an Agent to run that waits for the
77 specified amount of time to have elapsed before calling the supplied
78 callback once and finishing. If the callback returns anything, it is
79 assumed to be a new Agent instance and will be added to the dispatcher.
80
81 @attribute end_time: The absolute posix time after which this task will
82 call its callback when it is polled and be finished.
83
84 Also has all attributes required by the Agent class.
85 """
86 def __init__(self, delay_seconds, callback, now_func=None):
87 """
88 @param delay_seconds: The delay in seconds from now that this task
89 will call the supplied callback and be done.
90 @param callback: A callable to be called by this task once after at
91 least delay_seconds time has elapsed. It must return None
92 or a new Agent instance.
93 @param now_func: A time.time like function. Default: time.time.
94 Used for testing.
95 """
96 assert delay_seconds > 0
97 assert callable(callback)
98 if not now_func:
99 now_func = time.time
100 self._now_func = now_func
101 self._callback = callback
102
103 self.end_time = self._now_func() + delay_seconds
104
105 # These attributes are required by Agent.
106 self.aborted = False
107 self.host_ids = ()
108 self.success = False
109 self.queue_entry_ids = ()
110 self.num_processes = 0
111
112
113 def poll(self):
114 if not self.is_done() and self._now_func() >= self.end_time:
115 self._callback()
116 self.success = True
117
118
119 def is_done(self):
120 return self.success or self.aborted
121
122
123 def abort(self):
124 self.aborted = True
125
126
127class DBError(Exception):
128 """Raised by the DBObject constructor when its select fails."""
129
130
131class DBObject(object):
132 """A miniature object relational model for the database."""
133
134 # Subclasses MUST override these:
135 _table_name = ''
136 _fields = ()
137
138 # A mapping from (type, id) to the instance of the object for that
139 # particular id. This prevents us from creating new Job() and Host()
140 # instances for every HostQueueEntry object that we instantiate as
141 # multiple HQEs often share the same Job.
142 _instances_by_type_and_id = weakref.WeakValueDictionary()
143 _initialized = False
144
145
146 def __new__(cls, id=None, **kwargs):
147 """
148 Look to see if we already have an instance for this particular type
149 and id. If so, use it instead of creating a duplicate instance.
150 """
151 if id is not None:
152 instance = cls._instances_by_type_and_id.get((cls, id))
153 if instance:
154 return instance
155 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
156
157
158 def __init__(self, id=None, row=None, new_record=False, always_query=True):
159 assert bool(id) or bool(row)
160 if id is not None and row is not None:
161 assert id == row[0]
162 assert self._table_name, '_table_name must be defined in your class'
163 assert self._fields, '_fields must be defined in your class'
164 if not new_record:
165 if self._initialized and not always_query:
166 return # We've already been initialized.
167 if id is None:
168 id = row[0]
169 # Tell future constructors to use us instead of re-querying while
170 # this instance is still around.
171 self._instances_by_type_and_id[(type(self), id)] = self
172
173 self.__table = self._table_name
174
175 self.__new_record = new_record
176
177 if row is None:
178 row = self._fetch_row_from_db(id)
179
180 if self._initialized:
181 differences = self._compare_fields_in_row(row)
182 if differences:
183 logging.warn(
184 'initialized %s %s instance requery is updating: %s',
185 type(self), self.id, differences)
186 self._update_fields_from_row(row)
187 self._initialized = True
188
189
190 @classmethod
191 def _clear_instance_cache(cls):
192 """Used for testing, clear the internal instance cache."""
193 cls._instances_by_type_and_id.clear()
194
195
196 def _fetch_row_from_db(self, row_id):
197 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
198 rows = _db.execute(sql, (row_id,))
199 if not rows:
200 raise DBError("row not found (table=%s, row id=%s)"
201 % (self.__table, row_id))
202 return rows[0]
203
204
205 def _assert_row_length(self, row):
206 assert len(row) == len(self._fields), (
207 "table = %s, row = %s/%d, fields = %s/%d" % (
208 self.__table, row, len(row), self._fields, len(self._fields)))
209
210
211 def _compare_fields_in_row(self, row):
212 """
213 Given a row as returned by a SELECT query, compare it to our existing in
214 memory fields. Fractional seconds are stripped from datetime values
215 before comparison.
216
217 @param row - A sequence of values corresponding to fields named in
218 The class attribute _fields.
219
220 @returns A dictionary listing the differences keyed by field name
221 containing tuples of (current_value, row_value).
222 """
223 self._assert_row_length(row)
224 differences = {}
225 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
226 for field, row_value in itertools.izip(self._fields, row):
227 current_value = getattr(self, field)
228 if (isinstance(current_value, datetime.datetime)
229 and isinstance(row_value, datetime.datetime)):
230 current_value = current_value.strftime(datetime_cmp_fmt)
231 row_value = row_value.strftime(datetime_cmp_fmt)
232 if current_value != row_value:
233 differences[field] = (current_value, row_value)
234 return differences
235
236
237 def _update_fields_from_row(self, row):
238 """
239 Update our field attributes using a single row returned by SELECT.
240
241 @param row - A sequence of values corresponding to fields named in
242 the class fields list.
243 """
244 self._assert_row_length(row)
245
246 self._valid_fields = set()
247 for field, value in itertools.izip(self._fields, row):
248 setattr(self, field, value)
249 self._valid_fields.add(field)
250
251 self._valid_fields.remove('id')
252
253
254 def update_from_database(self):
255 assert self.id is not None
256 row = self._fetch_row_from_db(self.id)
257 self._update_fields_from_row(row)
258
259
260 def count(self, where, table = None):
261 if not table:
262 table = self.__table
263
264 rows = _db.execute("""
265 SELECT count(*) FROM %s
266 WHERE %s
267 """ % (table, where))
268
269 assert len(rows) == 1
270
271 return int(rows[0][0])
272
273
274 def update_field(self, field, value):
275 assert field in self._valid_fields
276
277 if getattr(self, field) == value:
278 return
279
280 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
281 _db.execute(query, (value, self.id))
282
283 setattr(self, field, value)
284
285
286 def save(self):
287 if self.__new_record:
288 keys = self._fields[1:] # avoid id
289 columns = ','.join([str(key) for key in keys])
290 values = []
291 for key in keys:
292 value = getattr(self, key)
293 if value is None:
294 values.append('NULL')
295 else:
296 values.append('"%s"' % value)
297 values_str = ','.join(values)
298 query = ('INSERT INTO %s (%s) VALUES (%s)' %
299 (self.__table, columns, values_str))
300 _db.execute(query)
301 # Update our id to the one the database just assigned to us.
302 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
303
304
305 def delete(self):
306 self._instances_by_type_and_id.pop((type(self), id), None)
307 self._initialized = False
308 self._valid_fields.clear()
309 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
310 _db.execute(query, (self.id,))
311
312
313 @staticmethod
314 def _prefix_with(string, prefix):
315 if string:
316 string = prefix + string
317 return string
318
319
320 @classmethod
321 def fetch(cls, where='', params=(), joins='', order_by=''):
322 """
323 Construct instances of our class based on the given database query.
324
325 @yields One class instance for each row fetched.
326 """
327 order_by = cls._prefix_with(order_by, 'ORDER BY ')
328 where = cls._prefix_with(where, 'WHERE ')
329 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
330 '%(where)s %(order_by)s' % {'table' : cls._table_name,
331 'joins' : joins,
332 'where' : where,
333 'order_by' : order_by})
334 rows = _db.execute(query, params)
335 return [cls(id=row[0], row=row) for row in rows]
336
337
338class IneligibleHostQueue(DBObject):
339 _table_name = 'afe_ineligible_host_queues'
340 _fields = ('id', 'job_id', 'host_id')
341
342
343class AtomicGroup(DBObject):
344 _table_name = 'afe_atomic_groups'
345 _fields = ('id', 'name', 'description', 'max_number_of_machines',
346 'invalid')
347
348
349class Label(DBObject):
350 _table_name = 'afe_labels'
351 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
352 'only_if_needed', 'atomic_group_id')
353
354
355 def __repr__(self):
356 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
357 self.name, self.id, self.atomic_group_id)
358
359
360class Host(DBObject):
361 _table_name = 'afe_hosts'
362 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
363 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700364 _timer = stats.Timer("scheduler_models.Host")
jamesrenb55378a2010-03-02 22:19:49 +0000365
366
Fang Deng1d6c2a02013-04-17 15:25:45 -0700367 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000368 def set_status(self,status):
369 logging.info('%s -> %s', self.hostname, status)
370 self.update_field('status',status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700371 # Noticed some time jumps after the last log message.
372 logging.debug('Host Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000373
374
375 def platform_and_labels(self):
376 """
377 Returns a tuple (platform_name, list_of_all_label_names).
378 """
379 rows = _db.execute("""
380 SELECT afe_labels.name, afe_labels.platform
381 FROM afe_labels
382 INNER JOIN afe_hosts_labels ON
383 afe_labels.id = afe_hosts_labels.label_id
384 WHERE afe_hosts_labels.host_id = %s
385 ORDER BY afe_labels.name
386 """, (self.id,))
387 platform = None
388 all_labels = []
389 for label_name, is_platform in rows:
390 if is_platform:
391 platform = label_name
392 all_labels.append(label_name)
393 return platform, all_labels
394
395
396 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
397
398
399 @classmethod
400 def cmp_for_sort(cls, a, b):
401 """
402 A comparison function for sorting Host objects by hostname.
403
404 This strips any trailing numeric digits, ignores leading 0s and
405 compares hostnames by the leading name and the trailing digits as a
406 number. If both hostnames do not match this pattern, they are simply
407 compared as lower case strings.
408
409 Example of how hostnames will be sorted:
410
411 alice, host1, host2, host09, host010, host10, host11, yolkfolk
412
413 This hopefully satisfy most people's hostname sorting needs regardless
414 of their exact naming schemes. Nobody sane should have both a host10
415 and host010 (but the algorithm works regardless).
416 """
417 lower_a = a.hostname.lower()
418 lower_b = b.hostname.lower()
419 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
420 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
421 if match_a and match_b:
422 name_a, number_a_str = match_a.groups()
423 name_b, number_b_str = match_b.groups()
424 number_a = int(number_a_str.lstrip('0'))
425 number_b = int(number_b_str.lstrip('0'))
426 result = cmp((name_a, number_a), (name_b, number_b))
427 if result == 0 and lower_a != lower_b:
428 # If they compared equal above but the lower case names are
429 # indeed different, don't report equality. abc012 != abc12.
430 return cmp(lower_a, lower_b)
431 return result
432 else:
433 return cmp(lower_a, lower_b)
434
435
436class HostQueueEntry(DBObject):
437 _table_name = 'afe_host_queue_entries'
438 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
439 'active', 'complete', 'deleted', 'execution_subdir',
440 'atomic_group_id', 'aborted', 'started_on')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700441 _timer = stats.Timer('scheduler_models.HostQueueEntry')
jamesrenb55378a2010-03-02 22:19:49 +0000442
443
444 def __init__(self, id=None, row=None, **kwargs):
445 assert id or row
446 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
447 self.job = Job(self.job_id)
448
449 if self.host_id:
450 self.host = Host(self.host_id)
451 else:
452 self.host = None
453
454 if self.atomic_group_id:
455 self.atomic_group = AtomicGroup(self.atomic_group_id,
456 always_query=False)
457 else:
458 self.atomic_group = None
459
jamesrenb55378a2010-03-02 22:19:49 +0000460
461 @classmethod
462 def clone(cls, template):
463 """
464 Creates a new row using the values from a template instance.
465
466 The new instance will not exist in the database or have a valid
467 id attribute until its save() method is called.
468 """
469 assert isinstance(template, cls)
470 new_row = [getattr(template, field) for field in cls._fields]
471 clone = cls(row=new_row, new_record=True)
472 clone.id = None
473 return clone
474
475
476 def _view_job_url(self):
477 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
478
479
480 def get_labels(self):
481 """
482 Get all labels associated with this host queue entry (either via the
483 meta_host or as a job dependency label). The labels yielded are not
484 guaranteed to be unique.
485
486 @yields Label instances associated with this host_queue_entry.
487 """
488 if self.meta_host:
489 yield Label(id=self.meta_host, always_query=False)
490 labels = Label.fetch(
491 joins="JOIN afe_jobs_dependency_labels AS deps "
492 "ON (afe_labels.id = deps.label_id)",
493 where="deps.job_id = %d" % self.job.id)
494 for label in labels:
495 yield label
496
497
498 def set_host(self, host):
499 if host:
500 logging.info('Assigning host %s to entry %s', host.hostname, self)
jamesrenb55378a2010-03-02 22:19:49 +0000501 self.update_field('host_id', host.id)
502 self.block_host(host.id)
503 else:
504 logging.info('Releasing host from %s', self)
jamesrenb55378a2010-03-02 22:19:49 +0000505 self.unblock_host(self.host.id)
506 self.update_field('host_id', None)
507
508 self.host = host
509
510
jamesrenb55378a2010-03-02 22:19:49 +0000511 def block_host(self, host_id):
512 logging.info("creating block %s/%s", self.job.id, host_id)
513 row = [0, self.job.id, host_id]
514 block = IneligibleHostQueue(row=row, new_record=True)
515 block.save()
516
517
518 def unblock_host(self, host_id):
519 logging.info("removing block %s/%s", self.job.id, host_id)
520 blocks = IneligibleHostQueue.fetch(
521 'job_id=%d and host_id=%d' % (self.job.id, host_id))
522 for block in blocks:
523 block.delete()
524
525
526 def set_execution_subdir(self, subdir=None):
527 if subdir is None:
528 assert self.host
529 subdir = self.host.hostname
530 self.update_field('execution_subdir', subdir)
531
532
533 def _get_hostname(self):
534 if self.host:
535 return self.host.hostname
536 return 'no host'
537
538
539 def __str__(self):
540 flags = []
541 if self.active:
542 flags.append('active')
543 if self.complete:
544 flags.append('complete')
545 if self.deleted:
546 flags.append('deleted')
547 if self.aborted:
548 flags.append('aborted')
549 flags_str = ','.join(flags)
550 if flags_str:
551 flags_str = ' [%s]' % flags_str
552 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
553 self.status, flags_str)
554
555
Fang Deng1d6c2a02013-04-17 15:25:45 -0700556 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000557 def set_status(self, status):
558 logging.info("%s -> %s", self, status)
559
560 self.update_field('status', status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700561 # Noticed some time jumps after last logging message.
562 logging.debug('Update Field Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000563
564 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
565 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
566 assert not (active and complete)
567
568 self.update_field('active', active)
569 self.update_field('complete', complete)
570
571 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000572 self._on_complete(status)
Dale Curtis74a314b2011-06-23 14:55:46 -0700573 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000574
575 should_email_status = (status.lower() in _notify_email_statuses or
576 'all' in _notify_email_statuses)
577 if should_email_status:
578 self._email_on_status(status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700579 logging.debug('HQE Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000580
581
jamesrene7c65cb2010-06-08 20:38:10 +0000582 def _on_complete(self, status):
583 if status is not models.HostQueueEntry.Status.ABORTED:
584 self.job.stop_if_necessary()
585
jamesrenb55378a2010-03-02 22:19:49 +0000586 if not self.execution_subdir:
587 return
588 # unregister any possible pidfiles associated with this queue entry
589 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
590 pidfile_id = _drone_manager.get_pidfile_id_from(
591 self.execution_path(), pidfile_name=pidfile_name)
592 _drone_manager.unregister_pidfile(pidfile_id)
593
594
Eric Li6f27d4f2010-09-29 10:55:17 -0700595 def _get_status_email_contents(self, status, summary=None, hostname=None):
596 """
597 Gather info for the status notification e-mails.
598
599 If needed, we could start using the Django templating engine to create
600 the subject and the e-mail body, but that doesn't seem necessary right
601 now.
602
603 @param status: Job status text. Mandatory.
604 @param summary: Job summary text. Optional.
605 @param hostname: A hostname for the job. Optional.
606
607 @return: Tuple (subject, body) for the notification e-mail.
608 """
609 job_stats = Job(id=self.job.id).get_execution_details()
610
611 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
612 (self.job.id, self.job.name, status))
613
614 if hostname is not None:
615 subject += '| Hostname: %s ' % hostname
616
617 if status not in ["1 Failed", "Failed"]:
618 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
619
620 body = "Job ID: %s\n" % self.job.id
621 body += "Job name: %s\n" % self.job.name
622 if hostname is not None:
623 body += "Host: %s\n" % hostname
624 if summary is not None:
625 body += "Summary: %s\n" % summary
626 body += "Status: %s\n" % status
627 body += "Results interface URL: %s\n" % self._view_job_url()
628 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
629 if int(job_stats['total_executed']) > 0:
630 body += "User tests executed: %s\n" % job_stats['total_executed']
631 body += "User tests passed: %s\n" % job_stats['total_passed']
632 body += "User tests failed: %s\n" % job_stats['total_failed']
633 body += ("User tests success rate: %.2f %%\n" %
634 job_stats['success_rate'])
635
636 if job_stats['failed_rows']:
637 body += "Failures:\n"
638 body += job_stats['failed_rows']
639
640 return subject, body
641
642
jamesrenb55378a2010-03-02 22:19:49 +0000643 def _email_on_status(self, status):
644 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700645 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000646 email_manager.manager.send_email(self.job.email_list, subject, body)
647
648
649 def _email_on_job_complete(self):
650 if not self.job.is_finished():
651 return
652
Eric Li6f27d4f2010-09-29 10:55:17 -0700653 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000654 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
655 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700656 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000657 (queue_entry._get_hostname(),
658 queue_entry.status))
659
Eric Li6f27d4f2010-09-29 10:55:17 -0700660 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000661 status_counts = models.Job.objects.get_status_counts(
662 [self.job.id])[self.job.id]
663 status = ', '.join('%d %s' % (count, status) for status, count
664 in status_counts.iteritems())
665
Eric Li6f27d4f2010-09-29 10:55:17 -0700666 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000667 email_manager.manager.send_email(self.job.email_list, subject, body)
668
669
670 def schedule_pre_job_tasks(self):
671 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
672 self.job.name, self.meta_host, self.atomic_group_id,
673 self.job.id, self.id, self.host.hostname, self.status)
674
675 self._do_schedule_pre_job_tasks()
676
677
678 def _do_schedule_pre_job_tasks(self):
679 # Every host goes thru the Verifying stage (which may or may not
680 # actually do anything as determined by get_pre_job_tasks).
681 self.set_status(models.HostQueueEntry.Status.VERIFYING)
682 self.job.schedule_pre_job_tasks(queue_entry=self)
683
684
685 def requeue(self):
686 assert self.host
687 self.set_status(models.HostQueueEntry.Status.QUEUED)
688 self.update_field('started_on', None)
689 # verify/cleanup failure sets the execution subdir, so reset it here
690 self.set_execution_subdir('')
691 if self.meta_host:
692 self.set_host(None)
693
694
695 @property
696 def aborted_by(self):
697 self._load_abort_info()
698 return self._aborted_by
699
700
701 @property
702 def aborted_on(self):
703 self._load_abort_info()
704 return self._aborted_on
705
706
707 def _load_abort_info(self):
708 """ Fetch info about who aborted the job. """
709 if hasattr(self, "_aborted_by"):
710 return
711 rows = _db.execute("""
712 SELECT afe_users.login,
713 afe_aborted_host_queue_entries.aborted_on
714 FROM afe_aborted_host_queue_entries
715 INNER JOIN afe_users
716 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
717 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
718 """, (self.id,))
719 if rows:
720 self._aborted_by, self._aborted_on = rows[0]
721 else:
722 self._aborted_by = self._aborted_on = None
723
724
725 def on_pending(self):
726 """
727 Called when an entry in a synchronous job has passed verify. If the
728 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
729 them in PENDING.
730 """
731 self.set_status(models.HostQueueEntry.Status.PENDING)
732 self.host.set_status(models.Host.Status.PENDING)
733
734 # Some debug code here: sends an email if an asynchronous job does not
735 # immediately enter Starting.
736 # TODO: Remove this once we figure out why asynchronous jobs are getting
737 # stuck in Pending.
738 self.job.run_if_ready(queue_entry=self)
739 if (self.job.synch_count == 1 and
740 self.status == models.HostQueueEntry.Status.PENDING):
741 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
742 message = 'Asynchronous job stuck in Pending'
743 email_manager.manager.enqueue_notify_email(subject, message)
744
745
746 def abort(self, dispatcher):
747 assert self.aborted and not self.complete
748
749 Status = models.HostQueueEntry.Status
750 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
751 # do nothing; post-job tasks will finish and then mark this entry
752 # with status "Aborted" and take care of the host
753 return
754
jamesren3bc70a12010-04-12 18:23:38 +0000755 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
756 Status.WAITING):
jamesrenb55378a2010-03-02 22:19:49 +0000757 assert not dispatcher.get_agents_for_entry(self)
758 self.host.set_status(models.Host.Status.READY)
759 elif self.status == Status.VERIFYING:
760 models.SpecialTask.objects.create(
761 task=models.SpecialTask.Task.CLEANUP,
762 host=models.Host.objects.get(id=self.host.id),
763 requested_by=self.job.owner_model())
764
765 self.set_status(Status.ABORTED)
766 self.job.abort_delay_ready_task()
767
768
769 def get_group_name(self):
770 atomic_group = self.atomic_group
771 if not atomic_group:
772 return ''
773
774 # Look at any meta_host and dependency labels and pick the first
775 # one that also specifies this atomic group. Use that label name
776 # as the group name if possible (it is more specific).
777 for label in self.get_labels():
778 if label.atomic_group_id:
779 assert label.atomic_group_id == atomic_group.id
780 return label.name
781 return atomic_group.name
782
783
784 def execution_tag(self):
Scott Zawalskid712cf32012-07-14 16:24:53 -0400785 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
786 'complete!=1 AND execution_subdir="" AND '
787 'status!="Queued";')
788 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
789 'status="Aborted" WHERE id=%s;')
790 try:
791 assert self.execution_subdir
792 except AssertionError:
793 # TODO(scottz): Remove temporary fix/info gathering pathway for
794 # crosbug.com/31595 once issue is root caused.
795 logging.error('No execution_subdir for host queue id:%s.', self.id)
796 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
797 for row in _db.execute(SQL_SUSPECT_ENTRIES):
798 logging.error(row)
799 logging.error('====DB DEBUG====\n')
800 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
801 logging.error('EXECUTING: %s', fix_query)
802 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
803 raise AssertionError(('self.execution_subdir not found. '
804 'See log for details.'))
805
jamesrenb55378a2010-03-02 22:19:49 +0000806 return "%s/%s" % (self.job.tag(), self.execution_subdir)
807
808
809 def execution_path(self):
810 return self.execution_tag()
811
812
813 def set_started_on_now(self):
814 self.update_field('started_on', datetime.datetime.now())
815
816
817 def is_hostless(self):
818 return (self.host_id is None
819 and self.meta_host is None
820 and self.atomic_group_id is None)
821
822
823class Job(DBObject):
824 _table_name = 'afe_jobs'
825 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
826 'control_type', 'created_on', 'synch_count', 'timeout',
827 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
Simran Basi9f508a12012-11-09 12:20:44 -0800828 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800829 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
830 'test_retry')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700831 _timer = stats.Timer("scheduler_models.Job")
jamesrenb55378a2010-03-02 22:19:49 +0000832
833 # This does not need to be a column in the DB. The delays are likely to
834 # be configured short. If the scheduler is stopped and restarted in
835 # the middle of a job's delay cycle, the delay cycle will either be
836 # repeated or skipped depending on the number of Pending machines found
837 # when the restarted scheduler recovers to track it. Not a problem.
838 #
839 # A reference to the DelayedCallTask that will wake up the job should
840 # no other HQEs change state in time. Its end_time attribute is used
841 # by our run_with_ready_delay() method to determine if the wait is over.
842 _delay_ready_task = None
843
844 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
845 # all status='Pending' atomic group HQEs incase a delay was running when the
846 # scheduler was restarted and no more hosts ever successfully exit Verify.
847
848 def __init__(self, id=None, row=None, **kwargs):
849 assert id or row
850 super(Job, self).__init__(id=id, row=row, **kwargs)
851 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800852 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000853
854
855 def model(self):
856 return models.Job.objects.get(id=self.id)
857
858
859 def owner_model(self):
860 # work around the fact that the Job owner field is a string, not a
861 # foreign key
862 if not self._owner_model:
863 self._owner_model = models.User.objects.get(login=self.owner)
864 return self._owner_model
865
866
867 def is_server_job(self):
868 return self.control_type != 2
869
870
871 def tag(self):
872 return "%s-%s" % (self.id, self.owner)
873
874
875 def get_host_queue_entries(self):
876 rows = _db.execute("""
877 SELECT * FROM afe_host_queue_entries
878 WHERE job_id= %s
879 """, (self.id,))
880 entries = [HostQueueEntry(row=i) for i in rows]
881
882 assert len(entries)>0
883
884 return entries
885
886
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800887 def is_image_update_job(self):
888 """
889 Discover if the current job requires an OS update.
890
891 @return: True/False if OS should be updated before job is run.
892 """
893 # All image update jobs have the parameterized_job_id set.
894 if not self.parameterized_job_id:
895 return False
896
897 # Retrieve the ID of the ParameterizedJob this job is an instance of.
898 rows = _db.execute("""
899 SELECT test_id
900 FROM afe_parameterized_jobs
901 WHERE id = %s
902 """, (self.parameterized_job_id,))
903 if not rows:
904 return False
905 test_id = rows[0][0]
906
907 # Retrieve the ID of the known autoupdate_ParameterizedJob.
908 rows = _db.execute("""
909 SELECT id
910 FROM afe_autotests
911 WHERE name = 'autoupdate_ParameterizedJob'
912 """)
913 if not rows:
914 return False
915 update_id = rows[0][0]
916
917 # If the IDs are the same we've found an image update job.
918 if test_id == update_id:
919 # Finally, get the path to the OS image to install.
920 rows = _db.execute("""
921 SELECT parameter_value
922 FROM afe_parameterized_job_parameters
923 WHERE parameterized_job_id = %s
924 """, (self.parameterized_job_id,))
925 if rows:
926 # Save the path in update_image_path to use later as a command
927 # line parameter to autoserv.
928 self.update_image_path = rows[0][0]
929 return True
930
931 return False
932
933
Eric Li6f27d4f2010-09-29 10:55:17 -0700934 def get_execution_details(self):
935 """
936 Get test execution details for this job.
937
938 @return: Dictionary with test execution details
939 """
940 def _find_test_jobs(rows):
941 """
942 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
943 Those are autotest 'internal job' tests, so they should not be
944 counted when evaluating the test stats.
945
946 @param rows: List of rows (matrix) with database results.
947 """
948 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
949 n_test_jobs = 0
950 for r in rows:
951 test_name = r[0]
952 if job_test_pattern.match(test_name):
953 n_test_jobs += 1
954
955 return n_test_jobs
956
957 stats = {}
958
959 rows = _db.execute("""
960 SELECT t.test, s.word, t.reason
961 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
962 WHERE t.job_idx = j.job_idx
963 AND s.status_idx = t.status
964 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -0800965 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -0700966 """ % self.id)
967
Dale Curtis74a314b2011-06-23 14:55:46 -0700968 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -0700969
970 n_test_jobs = _find_test_jobs(rows)
971 n_test_jobs_failed = _find_test_jobs(failed_rows)
972
973 total_executed = len(rows) - n_test_jobs
974 total_failed = len(failed_rows) - n_test_jobs_failed
975
976 if total_executed > 0:
977 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
978 else:
979 success_rate = 0
980
981 stats['total_executed'] = total_executed
982 stats['total_failed'] = total_failed
983 stats['total_passed'] = total_executed - total_failed
984 stats['success_rate'] = success_rate
985
986 status_header = ("Test Name", "Status", "Reason")
987 if failed_rows:
988 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
989 status_header)
990 else:
991 stats['failed_rows'] = ''
992
993 time_row = _db.execute("""
994 SELECT started_time, finished_time
995 FROM tko_jobs
996 WHERE afe_job_id = %s
997 """ % self.id)
998
999 if time_row:
1000 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -08001001 try:
1002 delta = t_end - t_begin
1003 minutes, seconds = divmod(delta.seconds, 60)
1004 hours, minutes = divmod(minutes, 60)
1005 stats['execution_time'] = ("%02d:%02d:%02d" %
1006 (hours, minutes, seconds))
1007 # One of t_end or t_begin are None
1008 except TypeError:
1009 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -07001010 else:
1011 stats['execution_time'] = '(none)'
1012
1013 return stats
1014
1015
Fang Deng1d6c2a02013-04-17 15:25:45 -07001016 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +00001017 def set_status(self, status, update_queues=False):
1018 self.update_field('status',status)
1019
1020 if update_queues:
1021 for queue_entry in self.get_host_queue_entries():
1022 queue_entry.set_status(status)
1023
1024
1025 def keyval_dict(self):
1026 return self.model().keyval_dict()
1027
1028
1029 def _atomic_and_has_started(self):
1030 """
1031 @returns True if any of the HostQueueEntries associated with this job
1032 have entered the Status.STARTING state or beyond.
1033 """
1034 atomic_entries = models.HostQueueEntry.objects.filter(
1035 job=self.id, atomic_group__isnull=False)
1036 if atomic_entries.count() <= 0:
1037 return False
1038
1039 # These states may *only* be reached if Job.run() has been called.
1040 started_statuses = (models.HostQueueEntry.Status.STARTING,
1041 models.HostQueueEntry.Status.RUNNING,
1042 models.HostQueueEntry.Status.COMPLETED)
1043
1044 started_entries = atomic_entries.filter(status__in=started_statuses)
1045 return started_entries.count() > 0
1046
1047
1048 def _hosts_assigned_count(self):
1049 """The number of HostQueueEntries assigned a Host for this job."""
1050 entries = models.HostQueueEntry.objects.filter(job=self.id,
1051 host__isnull=False)
1052 return entries.count()
1053
1054
1055 def _pending_count(self):
1056 """The number of HostQueueEntries for this job in the Pending state."""
1057 pending_entries = models.HostQueueEntry.objects.filter(
1058 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1059 return pending_entries.count()
1060
1061
1062 def _max_hosts_needed_to_run(self, atomic_group):
1063 """
1064 @param atomic_group: The AtomicGroup associated with this job that we
1065 are using to set an upper bound on the threshold.
1066 @returns The maximum number of HostQueueEntries assigned a Host before
1067 this job can run.
1068 """
1069 return min(self._hosts_assigned_count(),
1070 atomic_group.max_number_of_machines)
1071
1072
1073 def _min_hosts_needed_to_run(self):
1074 """Return the minumum number of hsots needed to run this job."""
1075 return self.synch_count
1076
1077
1078 def is_ready(self):
1079 # NOTE: Atomic group jobs stop reporting ready after they have been
1080 # started to avoid launching multiple copies of one atomic job.
1081 # Only possible if synch_count is less than than half the number of
1082 # machines in the atomic group.
1083 pending_count = self._pending_count()
1084 atomic_and_has_started = self._atomic_and_has_started()
1085 ready = (pending_count >= self.synch_count
1086 and not atomic_and_has_started)
1087
1088 if not ready:
1089 logging.info(
1090 'Job %s not ready: %s pending, %s required '
1091 '(Atomic and started: %s)',
1092 self, pending_count, self.synch_count,
1093 atomic_and_has_started)
1094
1095 return ready
1096
1097
1098 def num_machines(self, clause = None):
1099 sql = "job_id=%s" % self.id
1100 if clause:
1101 sql += " AND (%s)" % clause
1102 return self.count(sql, table='afe_host_queue_entries')
1103
1104
1105 def num_queued(self):
1106 return self.num_machines('not complete')
1107
1108
1109 def num_active(self):
1110 return self.num_machines('active')
1111
1112
1113 def num_complete(self):
1114 return self.num_machines('complete')
1115
1116
1117 def is_finished(self):
1118 return self.num_complete() == self.num_machines()
1119
1120
1121 def _not_yet_run_entries(self, include_verifying=True):
1122 statuses = [models.HostQueueEntry.Status.QUEUED,
1123 models.HostQueueEntry.Status.PENDING]
1124 if include_verifying:
1125 statuses.append(models.HostQueueEntry.Status.VERIFYING)
1126 return models.HostQueueEntry.objects.filter(job=self.id,
1127 status__in=statuses)
1128
1129
1130 def _stop_all_entries(self):
1131 entries_to_stop = self._not_yet_run_entries(
1132 include_verifying=False)
1133 for child_entry in entries_to_stop:
1134 assert not child_entry.complete, (
1135 '%s status=%s, active=%s, complete=%s' %
1136 (child_entry.id, child_entry.status, child_entry.active,
1137 child_entry.complete))
1138 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1139 child_entry.host.status = models.Host.Status.READY
1140 child_entry.host.save()
1141 child_entry.status = models.HostQueueEntry.Status.STOPPED
1142 child_entry.save()
1143
1144
1145 def stop_if_necessary(self):
1146 not_yet_run = self._not_yet_run_entries()
1147 if not_yet_run.count() < self.synch_count:
1148 self._stop_all_entries()
1149
1150
jamesrenb55378a2010-03-02 22:19:49 +00001151 def _next_group_name(self, group_name=''):
1152 """@returns a directory name to use for the next host group results."""
1153 if group_name:
1154 # Sanitize for use as a pathname.
1155 group_name = group_name.replace(os.path.sep, '_')
1156 if group_name.startswith('.'):
1157 group_name = '_' + group_name[1:]
1158 # Add a separator between the group name and 'group%d'.
1159 group_name += '.'
1160 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1161 query = models.HostQueueEntry.objects.filter(
1162 job=self.id).values('execution_subdir').distinct()
1163 subdirs = (entry['execution_subdir'] for entry in query)
1164 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1165 ids = [int(match.group(1)) for match in group_matches if match]
1166 if ids:
1167 next_id = max(ids) + 1
1168 else:
1169 next_id = 0
1170 return '%sgroup%d' % (group_name, next_id)
1171
1172
1173 def get_group_entries(self, queue_entry_from_group):
1174 """
1175 @param queue_entry_from_group: A HostQueueEntry instance to find other
1176 group entries on this job for.
1177
1178 @returns A list of HostQueueEntry objects all executing this job as
1179 part of the same group as the one supplied (having the same
1180 execution_subdir).
1181 """
1182 execution_subdir = queue_entry_from_group.execution_subdir
1183 return list(HostQueueEntry.fetch(
1184 where='job_id=%s AND execution_subdir=%s',
1185 params=(self.id, execution_subdir)))
1186
1187
1188 def _should_run_cleanup(self, queue_entry):
1189 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1190 return True
1191 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1192 return queue_entry.host.dirty
1193 return False
1194
1195
1196 def _should_run_verify(self, queue_entry):
1197 do_not_verify = (queue_entry.host.protection ==
1198 host_protections.Protection.DO_NOT_VERIFY)
1199 if do_not_verify:
1200 return False
1201 return self.run_verify
1202
1203
1204 def schedule_pre_job_tasks(self, queue_entry):
1205 """
1206 Get a list of tasks to perform before the host_queue_entry
1207 may be used to run this Job (such as Cleanup & Verify).
1208
1209 @returns A list of tasks to be done to the given queue_entry before
1210 it should be considered be ready to run this job. The last
1211 task in the list calls HostQueueEntry.on_pending(), which
1212 continues the flow of the job.
1213 """
1214 if self._should_run_cleanup(queue_entry):
1215 task = models.SpecialTask.Task.CLEANUP
1216 elif self._should_run_verify(queue_entry):
1217 task = models.SpecialTask.Task.VERIFY
1218 else:
1219 queue_entry.on_pending()
1220 return
1221
1222 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
1223 models.SpecialTask.objects.create(
1224 host=models.Host.objects.get(id=queue_entry.host_id),
1225 queue_entry=queue_entry, task=task)
1226
1227
1228 def _assign_new_group(self, queue_entries, group_name=''):
1229 if len(queue_entries) == 1:
1230 group_subdir_name = queue_entries[0].host.hostname
1231 else:
1232 group_subdir_name = self._next_group_name(group_name)
1233 logging.info('Running synchronous job %d hosts %s as %s',
1234 self.id, [entry.host.hostname for entry in queue_entries],
1235 group_subdir_name)
1236
1237 for queue_entry in queue_entries:
1238 queue_entry.set_execution_subdir(group_subdir_name)
1239
1240
1241 def _choose_group_to_run(self, include_queue_entry):
1242 """
1243 @returns A tuple containing a list of HostQueueEntry instances to be
1244 used to run this Job, a string group name to suggest giving
1245 to this job in the results database.
1246 """
1247 atomic_group = include_queue_entry.atomic_group
1248 chosen_entries = [include_queue_entry]
1249 if atomic_group:
1250 num_entries_wanted = atomic_group.max_number_of_machines
1251 else:
1252 num_entries_wanted = self.synch_count
1253 num_entries_wanted -= len(chosen_entries)
1254
1255 if num_entries_wanted > 0:
1256 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1257 pending_entries = list(HostQueueEntry.fetch(
1258 where=where_clause,
1259 params=(self.id, include_queue_entry.id)))
1260
1261 # Sort the chosen hosts by hostname before slicing.
1262 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1263 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1264 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1265 chosen_entries += pending_entries[:num_entries_wanted]
1266
1267 # Sanity check. We'll only ever be called if this can be met.
1268 if len(chosen_entries) < self.synch_count:
1269 message = ('job %s got less than %s chosen entries: %s' % (
1270 self.id, self.synch_count, chosen_entries))
1271 logging.error(message)
1272 email_manager.manager.enqueue_notify_email(
1273 'Job not started, too few chosen entries', message)
1274 return []
1275
1276 group_name = include_queue_entry.get_group_name()
1277
1278 self._assign_new_group(chosen_entries, group_name=group_name)
1279 return chosen_entries
1280
1281
1282 def run_if_ready(self, queue_entry):
1283 """
1284 Run this job by kicking its HQEs into status='Starting' if enough
1285 hosts are ready for it to run.
1286
1287 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1288 ready to run.
1289 """
1290 if not self.is_ready():
1291 self.stop_if_necessary()
1292 elif queue_entry.atomic_group:
1293 self.run_with_ready_delay(queue_entry)
1294 else:
1295 self.run(queue_entry)
1296
1297
1298 def run_with_ready_delay(self, queue_entry):
1299 """
1300 Start a delay to wait for more hosts to enter Pending state before
1301 launching an atomic group job. Once set, the a delay cannot be reset.
1302
1303 @param queue_entry: The HostQueueEntry object to get atomic group
1304 info from and pass to run_if_ready when the delay is up.
1305
1306 @returns An Agent to run the job as appropriate or None if a delay
1307 has already been set.
1308 """
1309 assert queue_entry.job_id == self.id
1310 assert queue_entry.atomic_group
1311 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1312 over_max_threshold = (self._pending_count() >=
1313 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1314 delay_expired = (self._delay_ready_task and
1315 time.time() >= self._delay_ready_task.end_time)
1316
1317 # Delay is disabled or we already have enough? Do not wait to run.
1318 if not delay or over_max_threshold or delay_expired:
1319 self.run(queue_entry)
1320 else:
1321 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1322
1323
1324 def request_abort(self):
1325 """Request that this Job be aborted on the next scheduler cycle."""
1326 self.model().abort()
1327
1328
1329 def schedule_delayed_callback_task(self, queue_entry):
1330 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1331
1332 if self._delay_ready_task:
1333 return None
1334
1335 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1336
1337 def run_job_after_delay():
1338 logging.info('Job %s done waiting for extra hosts.', self)
1339 # Check to see if the job is still relevant. It could have aborted
1340 # while we were waiting or hosts could have disappearred, etc.
1341 if self._pending_count() < self._min_hosts_needed_to_run():
1342 logging.info('Job %s had too few Pending hosts after waiting '
1343 'for extras. Not running.', self)
1344 self.request_abort()
1345 return
1346 return self.run(queue_entry)
1347
1348 logging.info('Job %s waiting up to %s seconds for more hosts.',
1349 self.id, delay)
1350 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1351 callback=run_job_after_delay)
1352 return self._delay_ready_task
1353
1354
1355 def run(self, queue_entry):
1356 """
1357 @param queue_entry: The HostQueueEntry instance calling this method.
1358 """
1359 if queue_entry.atomic_group and self._atomic_and_has_started():
1360 logging.error('Job.run() called on running atomic Job %d '
1361 'with HQE %s.', self.id, queue_entry)
1362 return
1363 queue_entries = self._choose_group_to_run(queue_entry)
1364 if queue_entries:
1365 self._finish_run(queue_entries)
1366
1367
1368 def _finish_run(self, queue_entries):
1369 for queue_entry in queue_entries:
1370 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1371 self.abort_delay_ready_task()
1372
1373
1374 def abort_delay_ready_task(self):
1375 """Abort the delayed task associated with this job, if any."""
1376 if self._delay_ready_task:
1377 # Cancel any pending callback that would try to run again
1378 # as we are already running.
1379 self._delay_ready_task.abort()
1380
1381
1382 def __str__(self):
1383 return '%s-%s' % (self.id, self.owner)