blob: c8d38e101e9d7a71bccb3a8af752f26b0ec20e57 [file] [log] [blame]
jamesrenb55378a2010-03-02 22:19:49 +00001"""Database model classes for the scheduler.
2
3Contains model classes abstracting the various DB tables used by the scheduler.
4These overlap the Django models in basic functionality, but were written before
5the Django models existed and have not yet been phased out. Some of them
6(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
7which would probably be ill-suited for inclusion in the general Django model
8classes.
9
10Globals:
11_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
12 one of these statuses, an email will be sent to the job's email_list.
13 comes from global_config.
14_base_url: URL to the local AFE server, used to construct URLs for emails.
15_db: DatabaseConnection for this module.
16_drone_manager: reference to global DroneManager instance.
17"""
18
19import datetime, itertools, logging, os, re, sys, time, weakref
20from django.db import connection
21from autotest_lib.client.common_lib import global_config, host_protections
Eric Li6f27d4f2010-09-29 10:55:17 -070022from autotest_lib.client.common_lib import global_config, utils
jamesrenb55378a2010-03-02 22:19:49 +000023from autotest_lib.frontend.afe import models, model_attributes
24from autotest_lib.database import database_connection
25from autotest_lib.scheduler import drone_manager, email_manager
26from autotest_lib.scheduler import scheduler_config
27
28_notify_email_statuses = []
29_base_url = None
30
31_db = None
32_drone_manager = None
33
34def initialize():
35 global _db
36 _db = database_connection.DatabaseConnection('AUTOTEST_WEB')
37 _db.connect(db_type='django')
38
39 notify_statuses_list = global_config.global_config.get_config_value(
40 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
41 default='')
42 global _notify_email_statuses
43 _notify_email_statuses = [status for status in
44 re.split(r'[\s,;:]', notify_statuses_list.lower())
45 if status]
46
47 # AUTOTEST_WEB.base_url is still a supported config option as some people
48 # may wish to override the entire url.
49 global _base_url
50 config_base_url = global_config.global_config.get_config_value(
51 scheduler_config.CONFIG_SECTION, 'base_url', default='')
52 if config_base_url:
53 _base_url = config_base_url
54 else:
55 # For the common case of everything running on a single server you
56 # can just set the hostname in a single place in the config file.
57 server_name = global_config.global_config.get_config_value(
58 'SERVER', 'hostname')
59 if not server_name:
60 logging.critical('[SERVER] hostname missing from the config file.')
61 sys.exit(1)
62 _base_url = 'http://%s/afe/' % server_name
63
64 initialize_globals()
65
66
67def initialize_globals():
68 global _drone_manager
69 _drone_manager = drone_manager.instance()
70
71
72class DelayedCallTask(object):
73 """
74 A task object like AgentTask for an Agent to run that waits for the
75 specified amount of time to have elapsed before calling the supplied
76 callback once and finishing. If the callback returns anything, it is
77 assumed to be a new Agent instance and will be added to the dispatcher.
78
79 @attribute end_time: The absolute posix time after which this task will
80 call its callback when it is polled and be finished.
81
82 Also has all attributes required by the Agent class.
83 """
84 def __init__(self, delay_seconds, callback, now_func=None):
85 """
86 @param delay_seconds: The delay in seconds from now that this task
87 will call the supplied callback and be done.
88 @param callback: A callable to be called by this task once after at
89 least delay_seconds time has elapsed. It must return None
90 or a new Agent instance.
91 @param now_func: A time.time like function. Default: time.time.
92 Used for testing.
93 """
94 assert delay_seconds > 0
95 assert callable(callback)
96 if not now_func:
97 now_func = time.time
98 self._now_func = now_func
99 self._callback = callback
100
101 self.end_time = self._now_func() + delay_seconds
102
103 # These attributes are required by Agent.
104 self.aborted = False
105 self.host_ids = ()
106 self.success = False
107 self.queue_entry_ids = ()
108 self.num_processes = 0
109
110
111 def poll(self):
112 if not self.is_done() and self._now_func() >= self.end_time:
113 self._callback()
114 self.success = True
115
116
117 def is_done(self):
118 return self.success or self.aborted
119
120
121 def abort(self):
122 self.aborted = True
123
124
125class DBError(Exception):
126 """Raised by the DBObject constructor when its select fails."""
127
128
129class DBObject(object):
130 """A miniature object relational model for the database."""
131
132 # Subclasses MUST override these:
133 _table_name = ''
134 _fields = ()
135
136 # A mapping from (type, id) to the instance of the object for that
137 # particular id. This prevents us from creating new Job() and Host()
138 # instances for every HostQueueEntry object that we instantiate as
139 # multiple HQEs often share the same Job.
140 _instances_by_type_and_id = weakref.WeakValueDictionary()
141 _initialized = False
142
143
144 def __new__(cls, id=None, **kwargs):
145 """
146 Look to see if we already have an instance for this particular type
147 and id. If so, use it instead of creating a duplicate instance.
148 """
149 if id is not None:
150 instance = cls._instances_by_type_and_id.get((cls, id))
151 if instance:
152 return instance
153 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
154
155
156 def __init__(self, id=None, row=None, new_record=False, always_query=True):
157 assert bool(id) or bool(row)
158 if id is not None and row is not None:
159 assert id == row[0]
160 assert self._table_name, '_table_name must be defined in your class'
161 assert self._fields, '_fields must be defined in your class'
162 if not new_record:
163 if self._initialized and not always_query:
164 return # We've already been initialized.
165 if id is None:
166 id = row[0]
167 # Tell future constructors to use us instead of re-querying while
168 # this instance is still around.
169 self._instances_by_type_and_id[(type(self), id)] = self
170
171 self.__table = self._table_name
172
173 self.__new_record = new_record
174
175 if row is None:
176 row = self._fetch_row_from_db(id)
177
178 if self._initialized:
179 differences = self._compare_fields_in_row(row)
180 if differences:
181 logging.warn(
182 'initialized %s %s instance requery is updating: %s',
183 type(self), self.id, differences)
184 self._update_fields_from_row(row)
185 self._initialized = True
186
187
188 @classmethod
189 def _clear_instance_cache(cls):
190 """Used for testing, clear the internal instance cache."""
191 cls._instances_by_type_and_id.clear()
192
193
194 def _fetch_row_from_db(self, row_id):
195 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
196 rows = _db.execute(sql, (row_id,))
197 if not rows:
198 raise DBError("row not found (table=%s, row id=%s)"
199 % (self.__table, row_id))
200 return rows[0]
201
202
203 def _assert_row_length(self, row):
204 assert len(row) == len(self._fields), (
205 "table = %s, row = %s/%d, fields = %s/%d" % (
206 self.__table, row, len(row), self._fields, len(self._fields)))
207
208
209 def _compare_fields_in_row(self, row):
210 """
211 Given a row as returned by a SELECT query, compare it to our existing in
212 memory fields. Fractional seconds are stripped from datetime values
213 before comparison.
214
215 @param row - A sequence of values corresponding to fields named in
216 The class attribute _fields.
217
218 @returns A dictionary listing the differences keyed by field name
219 containing tuples of (current_value, row_value).
220 """
221 self._assert_row_length(row)
222 differences = {}
223 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
224 for field, row_value in itertools.izip(self._fields, row):
225 current_value = getattr(self, field)
226 if (isinstance(current_value, datetime.datetime)
227 and isinstance(row_value, datetime.datetime)):
228 current_value = current_value.strftime(datetime_cmp_fmt)
229 row_value = row_value.strftime(datetime_cmp_fmt)
230 if current_value != row_value:
231 differences[field] = (current_value, row_value)
232 return differences
233
234
235 def _update_fields_from_row(self, row):
236 """
237 Update our field attributes using a single row returned by SELECT.
238
239 @param row - A sequence of values corresponding to fields named in
240 the class fields list.
241 """
242 self._assert_row_length(row)
243
244 self._valid_fields = set()
245 for field, value in itertools.izip(self._fields, row):
246 setattr(self, field, value)
247 self._valid_fields.add(field)
248
249 self._valid_fields.remove('id')
250
251
252 def update_from_database(self):
253 assert self.id is not None
254 row = self._fetch_row_from_db(self.id)
255 self._update_fields_from_row(row)
256
257
258 def count(self, where, table = None):
259 if not table:
260 table = self.__table
261
262 rows = _db.execute("""
263 SELECT count(*) FROM %s
264 WHERE %s
265 """ % (table, where))
266
267 assert len(rows) == 1
268
269 return int(rows[0][0])
270
271
272 def update_field(self, field, value):
273 assert field in self._valid_fields
274
275 if getattr(self, field) == value:
276 return
277
278 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
279 _db.execute(query, (value, self.id))
280
281 setattr(self, field, value)
282
283
284 def save(self):
285 if self.__new_record:
286 keys = self._fields[1:] # avoid id
287 columns = ','.join([str(key) for key in keys])
288 values = []
289 for key in keys:
290 value = getattr(self, key)
291 if value is None:
292 values.append('NULL')
293 else:
294 values.append('"%s"' % value)
295 values_str = ','.join(values)
296 query = ('INSERT INTO %s (%s) VALUES (%s)' %
297 (self.__table, columns, values_str))
298 _db.execute(query)
299 # Update our id to the one the database just assigned to us.
300 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
301
302
303 def delete(self):
304 self._instances_by_type_and_id.pop((type(self), id), None)
305 self._initialized = False
306 self._valid_fields.clear()
307 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
308 _db.execute(query, (self.id,))
309
310
311 @staticmethod
312 def _prefix_with(string, prefix):
313 if string:
314 string = prefix + string
315 return string
316
317
318 @classmethod
319 def fetch(cls, where='', params=(), joins='', order_by=''):
320 """
321 Construct instances of our class based on the given database query.
322
323 @yields One class instance for each row fetched.
324 """
325 order_by = cls._prefix_with(order_by, 'ORDER BY ')
326 where = cls._prefix_with(where, 'WHERE ')
327 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
328 '%(where)s %(order_by)s' % {'table' : cls._table_name,
329 'joins' : joins,
330 'where' : where,
331 'order_by' : order_by})
332 rows = _db.execute(query, params)
333 return [cls(id=row[0], row=row) for row in rows]
334
335
336class IneligibleHostQueue(DBObject):
337 _table_name = 'afe_ineligible_host_queues'
338 _fields = ('id', 'job_id', 'host_id')
339
340
341class AtomicGroup(DBObject):
342 _table_name = 'afe_atomic_groups'
343 _fields = ('id', 'name', 'description', 'max_number_of_machines',
344 'invalid')
345
346
347class Label(DBObject):
348 _table_name = 'afe_labels'
349 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
350 'only_if_needed', 'atomic_group_id')
351
352
353 def __repr__(self):
354 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
355 self.name, self.id, self.atomic_group_id)
356
357
358class Host(DBObject):
359 _table_name = 'afe_hosts'
360 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
361 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
362
363
364 def set_status(self,status):
365 logging.info('%s -> %s', self.hostname, status)
366 self.update_field('status',status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700367 # Noticed some time jumps after the last log message.
368 logging.debug('Host Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000369
370
371 def platform_and_labels(self):
372 """
373 Returns a tuple (platform_name, list_of_all_label_names).
374 """
375 rows = _db.execute("""
376 SELECT afe_labels.name, afe_labels.platform
377 FROM afe_labels
378 INNER JOIN afe_hosts_labels ON
379 afe_labels.id = afe_hosts_labels.label_id
380 WHERE afe_hosts_labels.host_id = %s
381 ORDER BY afe_labels.name
382 """, (self.id,))
383 platform = None
384 all_labels = []
385 for label_name, is_platform in rows:
386 if is_platform:
387 platform = label_name
388 all_labels.append(label_name)
389 return platform, all_labels
390
391
392 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
393
394
395 @classmethod
396 def cmp_for_sort(cls, a, b):
397 """
398 A comparison function for sorting Host objects by hostname.
399
400 This strips any trailing numeric digits, ignores leading 0s and
401 compares hostnames by the leading name and the trailing digits as a
402 number. If both hostnames do not match this pattern, they are simply
403 compared as lower case strings.
404
405 Example of how hostnames will be sorted:
406
407 alice, host1, host2, host09, host010, host10, host11, yolkfolk
408
409 This hopefully satisfy most people's hostname sorting needs regardless
410 of their exact naming schemes. Nobody sane should have both a host10
411 and host010 (but the algorithm works regardless).
412 """
413 lower_a = a.hostname.lower()
414 lower_b = b.hostname.lower()
415 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
416 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
417 if match_a and match_b:
418 name_a, number_a_str = match_a.groups()
419 name_b, number_b_str = match_b.groups()
420 number_a = int(number_a_str.lstrip('0'))
421 number_b = int(number_b_str.lstrip('0'))
422 result = cmp((name_a, number_a), (name_b, number_b))
423 if result == 0 and lower_a != lower_b:
424 # If they compared equal above but the lower case names are
425 # indeed different, don't report equality. abc012 != abc12.
426 return cmp(lower_a, lower_b)
427 return result
428 else:
429 return cmp(lower_a, lower_b)
430
431
432class HostQueueEntry(DBObject):
433 _table_name = 'afe_host_queue_entries'
434 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
435 'active', 'complete', 'deleted', 'execution_subdir',
436 'atomic_group_id', 'aborted', 'started_on')
437
438
439 def __init__(self, id=None, row=None, **kwargs):
440 assert id or row
441 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
442 self.job = Job(self.job_id)
443
444 if self.host_id:
445 self.host = Host(self.host_id)
446 else:
447 self.host = None
448
449 if self.atomic_group_id:
450 self.atomic_group = AtomicGroup(self.atomic_group_id,
451 always_query=False)
452 else:
453 self.atomic_group = None
454
455 self.queue_log_path = os.path.join(self.job.tag(),
456 'queue.log.' + str(self.id))
457
458
459 @classmethod
460 def clone(cls, template):
461 """
462 Creates a new row using the values from a template instance.
463
464 The new instance will not exist in the database or have a valid
465 id attribute until its save() method is called.
466 """
467 assert isinstance(template, cls)
468 new_row = [getattr(template, field) for field in cls._fields]
469 clone = cls(row=new_row, new_record=True)
470 clone.id = None
471 return clone
472
473
474 def _view_job_url(self):
475 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
476
477
478 def get_labels(self):
479 """
480 Get all labels associated with this host queue entry (either via the
481 meta_host or as a job dependency label). The labels yielded are not
482 guaranteed to be unique.
483
484 @yields Label instances associated with this host_queue_entry.
485 """
486 if self.meta_host:
487 yield Label(id=self.meta_host, always_query=False)
488 labels = Label.fetch(
489 joins="JOIN afe_jobs_dependency_labels AS deps "
490 "ON (afe_labels.id = deps.label_id)",
491 where="deps.job_id = %d" % self.job.id)
492 for label in labels:
493 yield label
494
495
496 def set_host(self, host):
497 if host:
498 logging.info('Assigning host %s to entry %s', host.hostname, self)
499 self.queue_log_record('Assigning host ' + host.hostname)
500 self.update_field('host_id', host.id)
501 self.block_host(host.id)
502 else:
503 logging.info('Releasing host from %s', self)
504 self.queue_log_record('Releasing host')
505 self.unblock_host(self.host.id)
506 self.update_field('host_id', None)
507
508 self.host = host
509
510
511 def queue_log_record(self, log_line):
512 now = str(datetime.datetime.now())
513 _drone_manager.write_lines_to_file(self.queue_log_path,
514 [now + ' ' + log_line])
515
516
517 def block_host(self, host_id):
518 logging.info("creating block %s/%s", self.job.id, host_id)
519 row = [0, self.job.id, host_id]
520 block = IneligibleHostQueue(row=row, new_record=True)
521 block.save()
522
523
524 def unblock_host(self, host_id):
525 logging.info("removing block %s/%s", self.job.id, host_id)
526 blocks = IneligibleHostQueue.fetch(
527 'job_id=%d and host_id=%d' % (self.job.id, host_id))
528 for block in blocks:
529 block.delete()
530
531
532 def set_execution_subdir(self, subdir=None):
533 if subdir is None:
534 assert self.host
535 subdir = self.host.hostname
536 self.update_field('execution_subdir', subdir)
537
538
539 def _get_hostname(self):
540 if self.host:
541 return self.host.hostname
542 return 'no host'
543
544
545 def __str__(self):
546 flags = []
547 if self.active:
548 flags.append('active')
549 if self.complete:
550 flags.append('complete')
551 if self.deleted:
552 flags.append('deleted')
553 if self.aborted:
554 flags.append('aborted')
555 flags_str = ','.join(flags)
556 if flags_str:
557 flags_str = ' [%s]' % flags_str
558 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
559 self.status, flags_str)
560
561
562 def set_status(self, status):
563 logging.info("%s -> %s", self, status)
564
565 self.update_field('status', status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700566 # Noticed some time jumps after last logging message.
567 logging.debug('Update Field Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000568
569 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
570 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
571 assert not (active and complete)
572
573 self.update_field('active', active)
574 self.update_field('complete', complete)
575
576 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000577 self._on_complete(status)
Dale Curtis74a314b2011-06-23 14:55:46 -0700578 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000579
580 should_email_status = (status.lower() in _notify_email_statuses or
581 'all' in _notify_email_statuses)
582 if should_email_status:
583 self._email_on_status(status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700584 logging.debug('HQE Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000585
586
jamesrene7c65cb2010-06-08 20:38:10 +0000587 def _on_complete(self, status):
588 if status is not models.HostQueueEntry.Status.ABORTED:
589 self.job.stop_if_necessary()
590
jamesrenb55378a2010-03-02 22:19:49 +0000591 if not self.execution_subdir:
592 return
593 # unregister any possible pidfiles associated with this queue entry
594 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
595 pidfile_id = _drone_manager.get_pidfile_id_from(
596 self.execution_path(), pidfile_name=pidfile_name)
597 _drone_manager.unregister_pidfile(pidfile_id)
598
599
Eric Li6f27d4f2010-09-29 10:55:17 -0700600 def _get_status_email_contents(self, status, summary=None, hostname=None):
601 """
602 Gather info for the status notification e-mails.
603
604 If needed, we could start using the Django templating engine to create
605 the subject and the e-mail body, but that doesn't seem necessary right
606 now.
607
608 @param status: Job status text. Mandatory.
609 @param summary: Job summary text. Optional.
610 @param hostname: A hostname for the job. Optional.
611
612 @return: Tuple (subject, body) for the notification e-mail.
613 """
614 job_stats = Job(id=self.job.id).get_execution_details()
615
616 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
617 (self.job.id, self.job.name, status))
618
619 if hostname is not None:
620 subject += '| Hostname: %s ' % hostname
621
622 if status not in ["1 Failed", "Failed"]:
623 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
624
625 body = "Job ID: %s\n" % self.job.id
626 body += "Job name: %s\n" % self.job.name
627 if hostname is not None:
628 body += "Host: %s\n" % hostname
629 if summary is not None:
630 body += "Summary: %s\n" % summary
631 body += "Status: %s\n" % status
632 body += "Results interface URL: %s\n" % self._view_job_url()
633 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
634 if int(job_stats['total_executed']) > 0:
635 body += "User tests executed: %s\n" % job_stats['total_executed']
636 body += "User tests passed: %s\n" % job_stats['total_passed']
637 body += "User tests failed: %s\n" % job_stats['total_failed']
638 body += ("User tests success rate: %.2f %%\n" %
639 job_stats['success_rate'])
640
641 if job_stats['failed_rows']:
642 body += "Failures:\n"
643 body += job_stats['failed_rows']
644
645 return subject, body
646
647
jamesrenb55378a2010-03-02 22:19:49 +0000648 def _email_on_status(self, status):
649 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700650 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000651 email_manager.manager.send_email(self.job.email_list, subject, body)
652
653
654 def _email_on_job_complete(self):
655 if not self.job.is_finished():
656 return
657
Eric Li6f27d4f2010-09-29 10:55:17 -0700658 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000659 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
660 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700661 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000662 (queue_entry._get_hostname(),
663 queue_entry.status))
664
Eric Li6f27d4f2010-09-29 10:55:17 -0700665 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000666 status_counts = models.Job.objects.get_status_counts(
667 [self.job.id])[self.job.id]
668 status = ', '.join('%d %s' % (count, status) for status, count
669 in status_counts.iteritems())
670
Eric Li6f27d4f2010-09-29 10:55:17 -0700671 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000672 email_manager.manager.send_email(self.job.email_list, subject, body)
673
674
675 def schedule_pre_job_tasks(self):
676 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
677 self.job.name, self.meta_host, self.atomic_group_id,
678 self.job.id, self.id, self.host.hostname, self.status)
679
680 self._do_schedule_pre_job_tasks()
681
682
683 def _do_schedule_pre_job_tasks(self):
684 # Every host goes thru the Verifying stage (which may or may not
685 # actually do anything as determined by get_pre_job_tasks).
686 self.set_status(models.HostQueueEntry.Status.VERIFYING)
687 self.job.schedule_pre_job_tasks(queue_entry=self)
688
689
690 def requeue(self):
691 assert self.host
692 self.set_status(models.HostQueueEntry.Status.QUEUED)
693 self.update_field('started_on', None)
694 # verify/cleanup failure sets the execution subdir, so reset it here
695 self.set_execution_subdir('')
696 if self.meta_host:
697 self.set_host(None)
698
699
700 @property
701 def aborted_by(self):
702 self._load_abort_info()
703 return self._aborted_by
704
705
706 @property
707 def aborted_on(self):
708 self._load_abort_info()
709 return self._aborted_on
710
711
712 def _load_abort_info(self):
713 """ Fetch info about who aborted the job. """
714 if hasattr(self, "_aborted_by"):
715 return
716 rows = _db.execute("""
717 SELECT afe_users.login,
718 afe_aborted_host_queue_entries.aborted_on
719 FROM afe_aborted_host_queue_entries
720 INNER JOIN afe_users
721 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
722 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
723 """, (self.id,))
724 if rows:
725 self._aborted_by, self._aborted_on = rows[0]
726 else:
727 self._aborted_by = self._aborted_on = None
728
729
730 def on_pending(self):
731 """
732 Called when an entry in a synchronous job has passed verify. If the
733 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
734 them in PENDING.
735 """
736 self.set_status(models.HostQueueEntry.Status.PENDING)
737 self.host.set_status(models.Host.Status.PENDING)
738
739 # Some debug code here: sends an email if an asynchronous job does not
740 # immediately enter Starting.
741 # TODO: Remove this once we figure out why asynchronous jobs are getting
742 # stuck in Pending.
743 self.job.run_if_ready(queue_entry=self)
744 if (self.job.synch_count == 1 and
745 self.status == models.HostQueueEntry.Status.PENDING):
746 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
747 message = 'Asynchronous job stuck in Pending'
748 email_manager.manager.enqueue_notify_email(subject, message)
749
750
751 def abort(self, dispatcher):
752 assert self.aborted and not self.complete
753
754 Status = models.HostQueueEntry.Status
755 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
756 # do nothing; post-job tasks will finish and then mark this entry
757 # with status "Aborted" and take care of the host
758 return
759
jamesren3bc70a12010-04-12 18:23:38 +0000760 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
761 Status.WAITING):
jamesrenb55378a2010-03-02 22:19:49 +0000762 assert not dispatcher.get_agents_for_entry(self)
763 self.host.set_status(models.Host.Status.READY)
764 elif self.status == Status.VERIFYING:
765 models.SpecialTask.objects.create(
766 task=models.SpecialTask.Task.CLEANUP,
767 host=models.Host.objects.get(id=self.host.id),
768 requested_by=self.job.owner_model())
769
770 self.set_status(Status.ABORTED)
771 self.job.abort_delay_ready_task()
772
773
774 def get_group_name(self):
775 atomic_group = self.atomic_group
776 if not atomic_group:
777 return ''
778
779 # Look at any meta_host and dependency labels and pick the first
780 # one that also specifies this atomic group. Use that label name
781 # as the group name if possible (it is more specific).
782 for label in self.get_labels():
783 if label.atomic_group_id:
784 assert label.atomic_group_id == atomic_group.id
785 return label.name
786 return atomic_group.name
787
788
789 def execution_tag(self):
Scott Zawalskid712cf32012-07-14 16:24:53 -0400790 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
791 'complete!=1 AND execution_subdir="" AND '
792 'status!="Queued";')
793 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
794 'status="Aborted" WHERE id=%s;')
795 try:
796 assert self.execution_subdir
797 except AssertionError:
798 # TODO(scottz): Remove temporary fix/info gathering pathway for
799 # crosbug.com/31595 once issue is root caused.
800 logging.error('No execution_subdir for host queue id:%s.', self.id)
801 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
802 for row in _db.execute(SQL_SUSPECT_ENTRIES):
803 logging.error(row)
804 logging.error('====DB DEBUG====\n')
805 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
806 logging.error('EXECUTING: %s', fix_query)
807 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
808 raise AssertionError(('self.execution_subdir not found. '
809 'See log for details.'))
810
jamesrenb55378a2010-03-02 22:19:49 +0000811 return "%s/%s" % (self.job.tag(), self.execution_subdir)
812
813
814 def execution_path(self):
815 return self.execution_tag()
816
817
818 def set_started_on_now(self):
819 self.update_field('started_on', datetime.datetime.now())
820
821
822 def is_hostless(self):
823 return (self.host_id is None
824 and self.meta_host is None
825 and self.atomic_group_id is None)
826
827
828class Job(DBObject):
829 _table_name = 'afe_jobs'
830 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
831 'control_type', 'created_on', 'synch_count', 'timeout',
832 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
jamesren4a41e012010-07-16 22:33:48 +0000833 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
834 'parameterized_job_id')
jamesrenb55378a2010-03-02 22:19:49 +0000835
836 # This does not need to be a column in the DB. The delays are likely to
837 # be configured short. If the scheduler is stopped and restarted in
838 # the middle of a job's delay cycle, the delay cycle will either be
839 # repeated or skipped depending on the number of Pending machines found
840 # when the restarted scheduler recovers to track it. Not a problem.
841 #
842 # A reference to the DelayedCallTask that will wake up the job should
843 # no other HQEs change state in time. Its end_time attribute is used
844 # by our run_with_ready_delay() method to determine if the wait is over.
845 _delay_ready_task = None
846
847 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
848 # all status='Pending' atomic group HQEs incase a delay was running when the
849 # scheduler was restarted and no more hosts ever successfully exit Verify.
850
851 def __init__(self, id=None, row=None, **kwargs):
852 assert id or row
853 super(Job, self).__init__(id=id, row=row, **kwargs)
854 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800855 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000856
857
858 def model(self):
859 return models.Job.objects.get(id=self.id)
860
861
862 def owner_model(self):
863 # work around the fact that the Job owner field is a string, not a
864 # foreign key
865 if not self._owner_model:
866 self._owner_model = models.User.objects.get(login=self.owner)
867 return self._owner_model
868
869
870 def is_server_job(self):
871 return self.control_type != 2
872
873
874 def tag(self):
875 return "%s-%s" % (self.id, self.owner)
876
877
878 def get_host_queue_entries(self):
879 rows = _db.execute("""
880 SELECT * FROM afe_host_queue_entries
881 WHERE job_id= %s
882 """, (self.id,))
883 entries = [HostQueueEntry(row=i) for i in rows]
884
885 assert len(entries)>0
886
887 return entries
888
889
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800890 def is_image_update_job(self):
891 """
892 Discover if the current job requires an OS update.
893
894 @return: True/False if OS should be updated before job is run.
895 """
896 # All image update jobs have the parameterized_job_id set.
897 if not self.parameterized_job_id:
898 return False
899
900 # Retrieve the ID of the ParameterizedJob this job is an instance of.
901 rows = _db.execute("""
902 SELECT test_id
903 FROM afe_parameterized_jobs
904 WHERE id = %s
905 """, (self.parameterized_job_id,))
906 if not rows:
907 return False
908 test_id = rows[0][0]
909
910 # Retrieve the ID of the known autoupdate_ParameterizedJob.
911 rows = _db.execute("""
912 SELECT id
913 FROM afe_autotests
914 WHERE name = 'autoupdate_ParameterizedJob'
915 """)
916 if not rows:
917 return False
918 update_id = rows[0][0]
919
920 # If the IDs are the same we've found an image update job.
921 if test_id == update_id:
922 # Finally, get the path to the OS image to install.
923 rows = _db.execute("""
924 SELECT parameter_value
925 FROM afe_parameterized_job_parameters
926 WHERE parameterized_job_id = %s
927 """, (self.parameterized_job_id,))
928 if rows:
929 # Save the path in update_image_path to use later as a command
930 # line parameter to autoserv.
931 self.update_image_path = rows[0][0]
932 return True
933
934 return False
935
936
Eric Li6f27d4f2010-09-29 10:55:17 -0700937 def get_execution_details(self):
938 """
939 Get test execution details for this job.
940
941 @return: Dictionary with test execution details
942 """
943 def _find_test_jobs(rows):
944 """
945 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
946 Those are autotest 'internal job' tests, so they should not be
947 counted when evaluating the test stats.
948
949 @param rows: List of rows (matrix) with database results.
950 """
951 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
952 n_test_jobs = 0
953 for r in rows:
954 test_name = r[0]
955 if job_test_pattern.match(test_name):
956 n_test_jobs += 1
957
958 return n_test_jobs
959
960 stats = {}
961
962 rows = _db.execute("""
963 SELECT t.test, s.word, t.reason
964 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
965 WHERE t.job_idx = j.job_idx
966 AND s.status_idx = t.status
967 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -0800968 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -0700969 """ % self.id)
970
Dale Curtis74a314b2011-06-23 14:55:46 -0700971 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -0700972
973 n_test_jobs = _find_test_jobs(rows)
974 n_test_jobs_failed = _find_test_jobs(failed_rows)
975
976 total_executed = len(rows) - n_test_jobs
977 total_failed = len(failed_rows) - n_test_jobs_failed
978
979 if total_executed > 0:
980 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
981 else:
982 success_rate = 0
983
984 stats['total_executed'] = total_executed
985 stats['total_failed'] = total_failed
986 stats['total_passed'] = total_executed - total_failed
987 stats['success_rate'] = success_rate
988
989 status_header = ("Test Name", "Status", "Reason")
990 if failed_rows:
991 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
992 status_header)
993 else:
994 stats['failed_rows'] = ''
995
996 time_row = _db.execute("""
997 SELECT started_time, finished_time
998 FROM tko_jobs
999 WHERE afe_job_id = %s
1000 """ % self.id)
1001
1002 if time_row:
1003 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -08001004 try:
1005 delta = t_end - t_begin
1006 minutes, seconds = divmod(delta.seconds, 60)
1007 hours, minutes = divmod(minutes, 60)
1008 stats['execution_time'] = ("%02d:%02d:%02d" %
1009 (hours, minutes, seconds))
1010 # One of t_end or t_begin are None
1011 except TypeError:
1012 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -07001013 else:
1014 stats['execution_time'] = '(none)'
1015
1016 return stats
1017
1018
jamesrenb55378a2010-03-02 22:19:49 +00001019 def set_status(self, status, update_queues=False):
1020 self.update_field('status',status)
1021
1022 if update_queues:
1023 for queue_entry in self.get_host_queue_entries():
1024 queue_entry.set_status(status)
1025
1026
1027 def keyval_dict(self):
1028 return self.model().keyval_dict()
1029
1030
1031 def _atomic_and_has_started(self):
1032 """
1033 @returns True if any of the HostQueueEntries associated with this job
1034 have entered the Status.STARTING state or beyond.
1035 """
1036 atomic_entries = models.HostQueueEntry.objects.filter(
1037 job=self.id, atomic_group__isnull=False)
1038 if atomic_entries.count() <= 0:
1039 return False
1040
1041 # These states may *only* be reached if Job.run() has been called.
1042 started_statuses = (models.HostQueueEntry.Status.STARTING,
1043 models.HostQueueEntry.Status.RUNNING,
1044 models.HostQueueEntry.Status.COMPLETED)
1045
1046 started_entries = atomic_entries.filter(status__in=started_statuses)
1047 return started_entries.count() > 0
1048
1049
1050 def _hosts_assigned_count(self):
1051 """The number of HostQueueEntries assigned a Host for this job."""
1052 entries = models.HostQueueEntry.objects.filter(job=self.id,
1053 host__isnull=False)
1054 return entries.count()
1055
1056
1057 def _pending_count(self):
1058 """The number of HostQueueEntries for this job in the Pending state."""
1059 pending_entries = models.HostQueueEntry.objects.filter(
1060 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1061 return pending_entries.count()
1062
1063
1064 def _max_hosts_needed_to_run(self, atomic_group):
1065 """
1066 @param atomic_group: The AtomicGroup associated with this job that we
1067 are using to set an upper bound on the threshold.
1068 @returns The maximum number of HostQueueEntries assigned a Host before
1069 this job can run.
1070 """
1071 return min(self._hosts_assigned_count(),
1072 atomic_group.max_number_of_machines)
1073
1074
1075 def _min_hosts_needed_to_run(self):
1076 """Return the minumum number of hsots needed to run this job."""
1077 return self.synch_count
1078
1079
1080 def is_ready(self):
1081 # NOTE: Atomic group jobs stop reporting ready after they have been
1082 # started to avoid launching multiple copies of one atomic job.
1083 # Only possible if synch_count is less than than half the number of
1084 # machines in the atomic group.
1085 pending_count = self._pending_count()
1086 atomic_and_has_started = self._atomic_and_has_started()
1087 ready = (pending_count >= self.synch_count
1088 and not atomic_and_has_started)
1089
1090 if not ready:
1091 logging.info(
1092 'Job %s not ready: %s pending, %s required '
1093 '(Atomic and started: %s)',
1094 self, pending_count, self.synch_count,
1095 atomic_and_has_started)
1096
1097 return ready
1098
1099
1100 def num_machines(self, clause = None):
1101 sql = "job_id=%s" % self.id
1102 if clause:
1103 sql += " AND (%s)" % clause
1104 return self.count(sql, table='afe_host_queue_entries')
1105
1106
1107 def num_queued(self):
1108 return self.num_machines('not complete')
1109
1110
1111 def num_active(self):
1112 return self.num_machines('active')
1113
1114
1115 def num_complete(self):
1116 return self.num_machines('complete')
1117
1118
1119 def is_finished(self):
1120 return self.num_complete() == self.num_machines()
1121
1122
1123 def _not_yet_run_entries(self, include_verifying=True):
1124 statuses = [models.HostQueueEntry.Status.QUEUED,
1125 models.HostQueueEntry.Status.PENDING]
1126 if include_verifying:
1127 statuses.append(models.HostQueueEntry.Status.VERIFYING)
1128 return models.HostQueueEntry.objects.filter(job=self.id,
1129 status__in=statuses)
1130
1131
1132 def _stop_all_entries(self):
1133 entries_to_stop = self._not_yet_run_entries(
1134 include_verifying=False)
1135 for child_entry in entries_to_stop:
1136 assert not child_entry.complete, (
1137 '%s status=%s, active=%s, complete=%s' %
1138 (child_entry.id, child_entry.status, child_entry.active,
1139 child_entry.complete))
1140 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1141 child_entry.host.status = models.Host.Status.READY
1142 child_entry.host.save()
1143 child_entry.status = models.HostQueueEntry.Status.STOPPED
1144 child_entry.save()
1145
1146
1147 def stop_if_necessary(self):
1148 not_yet_run = self._not_yet_run_entries()
1149 if not_yet_run.count() < self.synch_count:
1150 self._stop_all_entries()
1151
1152
1153 def write_to_machines_file(self, queue_entry):
1154 hostname = queue_entry.host.hostname
1155 file_path = os.path.join(self.tag(), '.machines')
1156 _drone_manager.write_lines_to_file(file_path, [hostname])
1157
1158
1159 def _next_group_name(self, group_name=''):
1160 """@returns a directory name to use for the next host group results."""
1161 if group_name:
1162 # Sanitize for use as a pathname.
1163 group_name = group_name.replace(os.path.sep, '_')
1164 if group_name.startswith('.'):
1165 group_name = '_' + group_name[1:]
1166 # Add a separator between the group name and 'group%d'.
1167 group_name += '.'
1168 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1169 query = models.HostQueueEntry.objects.filter(
1170 job=self.id).values('execution_subdir').distinct()
1171 subdirs = (entry['execution_subdir'] for entry in query)
1172 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1173 ids = [int(match.group(1)) for match in group_matches if match]
1174 if ids:
1175 next_id = max(ids) + 1
1176 else:
1177 next_id = 0
1178 return '%sgroup%d' % (group_name, next_id)
1179
1180
1181 def get_group_entries(self, queue_entry_from_group):
1182 """
1183 @param queue_entry_from_group: A HostQueueEntry instance to find other
1184 group entries on this job for.
1185
1186 @returns A list of HostQueueEntry objects all executing this job as
1187 part of the same group as the one supplied (having the same
1188 execution_subdir).
1189 """
1190 execution_subdir = queue_entry_from_group.execution_subdir
1191 return list(HostQueueEntry.fetch(
1192 where='job_id=%s AND execution_subdir=%s',
1193 params=(self.id, execution_subdir)))
1194
1195
1196 def _should_run_cleanup(self, queue_entry):
1197 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1198 return True
1199 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1200 return queue_entry.host.dirty
1201 return False
1202
1203
1204 def _should_run_verify(self, queue_entry):
1205 do_not_verify = (queue_entry.host.protection ==
1206 host_protections.Protection.DO_NOT_VERIFY)
1207 if do_not_verify:
1208 return False
1209 return self.run_verify
1210
1211
1212 def schedule_pre_job_tasks(self, queue_entry):
1213 """
1214 Get a list of tasks to perform before the host_queue_entry
1215 may be used to run this Job (such as Cleanup & Verify).
1216
1217 @returns A list of tasks to be done to the given queue_entry before
1218 it should be considered be ready to run this job. The last
1219 task in the list calls HostQueueEntry.on_pending(), which
1220 continues the flow of the job.
1221 """
1222 if self._should_run_cleanup(queue_entry):
1223 task = models.SpecialTask.Task.CLEANUP
1224 elif self._should_run_verify(queue_entry):
1225 task = models.SpecialTask.Task.VERIFY
1226 else:
1227 queue_entry.on_pending()
1228 return
1229
1230 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
1231 models.SpecialTask.objects.create(
1232 host=models.Host.objects.get(id=queue_entry.host_id),
1233 queue_entry=queue_entry, task=task)
1234
1235
1236 def _assign_new_group(self, queue_entries, group_name=''):
1237 if len(queue_entries) == 1:
1238 group_subdir_name = queue_entries[0].host.hostname
1239 else:
1240 group_subdir_name = self._next_group_name(group_name)
1241 logging.info('Running synchronous job %d hosts %s as %s',
1242 self.id, [entry.host.hostname for entry in queue_entries],
1243 group_subdir_name)
1244
1245 for queue_entry in queue_entries:
1246 queue_entry.set_execution_subdir(group_subdir_name)
1247
1248
1249 def _choose_group_to_run(self, include_queue_entry):
1250 """
1251 @returns A tuple containing a list of HostQueueEntry instances to be
1252 used to run this Job, a string group name to suggest giving
1253 to this job in the results database.
1254 """
1255 atomic_group = include_queue_entry.atomic_group
1256 chosen_entries = [include_queue_entry]
1257 if atomic_group:
1258 num_entries_wanted = atomic_group.max_number_of_machines
1259 else:
1260 num_entries_wanted = self.synch_count
1261 num_entries_wanted -= len(chosen_entries)
1262
1263 if num_entries_wanted > 0:
1264 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1265 pending_entries = list(HostQueueEntry.fetch(
1266 where=where_clause,
1267 params=(self.id, include_queue_entry.id)))
1268
1269 # Sort the chosen hosts by hostname before slicing.
1270 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1271 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1272 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1273 chosen_entries += pending_entries[:num_entries_wanted]
1274
1275 # Sanity check. We'll only ever be called if this can be met.
1276 if len(chosen_entries) < self.synch_count:
1277 message = ('job %s got less than %s chosen entries: %s' % (
1278 self.id, self.synch_count, chosen_entries))
1279 logging.error(message)
1280 email_manager.manager.enqueue_notify_email(
1281 'Job not started, too few chosen entries', message)
1282 return []
1283
1284 group_name = include_queue_entry.get_group_name()
1285
1286 self._assign_new_group(chosen_entries, group_name=group_name)
1287 return chosen_entries
1288
1289
1290 def run_if_ready(self, queue_entry):
1291 """
1292 Run this job by kicking its HQEs into status='Starting' if enough
1293 hosts are ready for it to run.
1294
1295 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1296 ready to run.
1297 """
1298 if not self.is_ready():
1299 self.stop_if_necessary()
1300 elif queue_entry.atomic_group:
1301 self.run_with_ready_delay(queue_entry)
1302 else:
1303 self.run(queue_entry)
1304
1305
1306 def run_with_ready_delay(self, queue_entry):
1307 """
1308 Start a delay to wait for more hosts to enter Pending state before
1309 launching an atomic group job. Once set, the a delay cannot be reset.
1310
1311 @param queue_entry: The HostQueueEntry object to get atomic group
1312 info from and pass to run_if_ready when the delay is up.
1313
1314 @returns An Agent to run the job as appropriate or None if a delay
1315 has already been set.
1316 """
1317 assert queue_entry.job_id == self.id
1318 assert queue_entry.atomic_group
1319 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1320 over_max_threshold = (self._pending_count() >=
1321 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1322 delay_expired = (self._delay_ready_task and
1323 time.time() >= self._delay_ready_task.end_time)
1324
1325 # Delay is disabled or we already have enough? Do not wait to run.
1326 if not delay or over_max_threshold or delay_expired:
1327 self.run(queue_entry)
1328 else:
1329 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1330
1331
1332 def request_abort(self):
1333 """Request that this Job be aborted on the next scheduler cycle."""
1334 self.model().abort()
1335
1336
1337 def schedule_delayed_callback_task(self, queue_entry):
1338 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1339
1340 if self._delay_ready_task:
1341 return None
1342
1343 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1344
1345 def run_job_after_delay():
1346 logging.info('Job %s done waiting for extra hosts.', self)
1347 # Check to see if the job is still relevant. It could have aborted
1348 # while we were waiting or hosts could have disappearred, etc.
1349 if self._pending_count() < self._min_hosts_needed_to_run():
1350 logging.info('Job %s had too few Pending hosts after waiting '
1351 'for extras. Not running.', self)
1352 self.request_abort()
1353 return
1354 return self.run(queue_entry)
1355
1356 logging.info('Job %s waiting up to %s seconds for more hosts.',
1357 self.id, delay)
1358 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1359 callback=run_job_after_delay)
1360 return self._delay_ready_task
1361
1362
1363 def run(self, queue_entry):
1364 """
1365 @param queue_entry: The HostQueueEntry instance calling this method.
1366 """
1367 if queue_entry.atomic_group and self._atomic_and_has_started():
1368 logging.error('Job.run() called on running atomic Job %d '
1369 'with HQE %s.', self.id, queue_entry)
1370 return
1371 queue_entries = self._choose_group_to_run(queue_entry)
1372 if queue_entries:
1373 self._finish_run(queue_entries)
1374
1375
1376 def _finish_run(self, queue_entries):
1377 for queue_entry in queue_entries:
1378 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1379 self.abort_delay_ready_task()
1380
1381
1382 def abort_delay_ready_task(self):
1383 """Abort the delayed task associated with this job, if any."""
1384 if self._delay_ready_task:
1385 # Cancel any pending callback that would try to run again
1386 # as we are already running.
1387 self._delay_ready_task.abort()
1388
1389
1390 def __str__(self):
1391 return '%s-%s' % (self.id, self.owner)