blob: 90b5a0ba30d908166ab6ae0e5948a04993ed8f73 [file] [log] [blame]
Aviv Keshet0b9cfc92013-02-05 11:36:02 -08001# pylint: disable-msg=C0111
2
jamesrenb55378a2010-03-02 22:19:49 +00003"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out. Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
14 one of these statuses, an email will be sent to the job's email_list.
15 comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import datetime, itertools, logging, os, re, sys, time, weakref
jamesrenb55378a2010-03-02 22:19:49 +000022from autotest_lib.client.common_lib import global_config, host_protections
Eric Li6f27d4f2010-09-29 10:55:17 -070023from autotest_lib.client.common_lib import global_config, utils
jamesrenb55378a2010-03-02 22:19:49 +000024from autotest_lib.frontend.afe import models, model_attributes
25from autotest_lib.database import database_connection
26from autotest_lib.scheduler import drone_manager, email_manager
27from autotest_lib.scheduler import scheduler_config
28
29_notify_email_statuses = []
30_base_url = None
31
32_db = None
33_drone_manager = None
34
35def initialize():
36 global _db
37 _db = database_connection.DatabaseConnection('AUTOTEST_WEB')
38 _db.connect(db_type='django')
39
40 notify_statuses_list = global_config.global_config.get_config_value(
41 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
42 default='')
43 global _notify_email_statuses
44 _notify_email_statuses = [status for status in
45 re.split(r'[\s,;:]', notify_statuses_list.lower())
46 if status]
47
48 # AUTOTEST_WEB.base_url is still a supported config option as some people
49 # may wish to override the entire url.
50 global _base_url
51 config_base_url = global_config.global_config.get_config_value(
52 scheduler_config.CONFIG_SECTION, 'base_url', default='')
53 if config_base_url:
54 _base_url = config_base_url
55 else:
56 # For the common case of everything running on a single server you
57 # can just set the hostname in a single place in the config file.
58 server_name = global_config.global_config.get_config_value(
59 'SERVER', 'hostname')
60 if not server_name:
61 logging.critical('[SERVER] hostname missing from the config file.')
62 sys.exit(1)
63 _base_url = 'http://%s/afe/' % server_name
64
65 initialize_globals()
66
67
68def initialize_globals():
69 global _drone_manager
70 _drone_manager = drone_manager.instance()
71
72
73class DelayedCallTask(object):
74 """
75 A task object like AgentTask for an Agent to run that waits for the
76 specified amount of time to have elapsed before calling the supplied
77 callback once and finishing. If the callback returns anything, it is
78 assumed to be a new Agent instance and will be added to the dispatcher.
79
80 @attribute end_time: The absolute posix time after which this task will
81 call its callback when it is polled and be finished.
82
83 Also has all attributes required by the Agent class.
84 """
85 def __init__(self, delay_seconds, callback, now_func=None):
86 """
87 @param delay_seconds: The delay in seconds from now that this task
88 will call the supplied callback and be done.
89 @param callback: A callable to be called by this task once after at
90 least delay_seconds time has elapsed. It must return None
91 or a new Agent instance.
92 @param now_func: A time.time like function. Default: time.time.
93 Used for testing.
94 """
95 assert delay_seconds > 0
96 assert callable(callback)
97 if not now_func:
98 now_func = time.time
99 self._now_func = now_func
100 self._callback = callback
101
102 self.end_time = self._now_func() + delay_seconds
103
104 # These attributes are required by Agent.
105 self.aborted = False
106 self.host_ids = ()
107 self.success = False
108 self.queue_entry_ids = ()
109 self.num_processes = 0
110
111
112 def poll(self):
113 if not self.is_done() and self._now_func() >= self.end_time:
114 self._callback()
115 self.success = True
116
117
118 def is_done(self):
119 return self.success or self.aborted
120
121
122 def abort(self):
123 self.aborted = True
124
125
126class DBError(Exception):
127 """Raised by the DBObject constructor when its select fails."""
128
129
130class DBObject(object):
131 """A miniature object relational model for the database."""
132
133 # Subclasses MUST override these:
134 _table_name = ''
135 _fields = ()
136
137 # A mapping from (type, id) to the instance of the object for that
138 # particular id. This prevents us from creating new Job() and Host()
139 # instances for every HostQueueEntry object that we instantiate as
140 # multiple HQEs often share the same Job.
141 _instances_by_type_and_id = weakref.WeakValueDictionary()
142 _initialized = False
143
144
145 def __new__(cls, id=None, **kwargs):
146 """
147 Look to see if we already have an instance for this particular type
148 and id. If so, use it instead of creating a duplicate instance.
149 """
150 if id is not None:
151 instance = cls._instances_by_type_and_id.get((cls, id))
152 if instance:
153 return instance
154 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
155
156
157 def __init__(self, id=None, row=None, new_record=False, always_query=True):
158 assert bool(id) or bool(row)
159 if id is not None and row is not None:
160 assert id == row[0]
161 assert self._table_name, '_table_name must be defined in your class'
162 assert self._fields, '_fields must be defined in your class'
163 if not new_record:
164 if self._initialized and not always_query:
165 return # We've already been initialized.
166 if id is None:
167 id = row[0]
168 # Tell future constructors to use us instead of re-querying while
169 # this instance is still around.
170 self._instances_by_type_and_id[(type(self), id)] = self
171
172 self.__table = self._table_name
173
174 self.__new_record = new_record
175
176 if row is None:
177 row = self._fetch_row_from_db(id)
178
179 if self._initialized:
180 differences = self._compare_fields_in_row(row)
181 if differences:
182 logging.warn(
183 'initialized %s %s instance requery is updating: %s',
184 type(self), self.id, differences)
185 self._update_fields_from_row(row)
186 self._initialized = True
187
188
189 @classmethod
190 def _clear_instance_cache(cls):
191 """Used for testing, clear the internal instance cache."""
192 cls._instances_by_type_and_id.clear()
193
194
195 def _fetch_row_from_db(self, row_id):
196 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
197 rows = _db.execute(sql, (row_id,))
198 if not rows:
199 raise DBError("row not found (table=%s, row id=%s)"
200 % (self.__table, row_id))
201 return rows[0]
202
203
204 def _assert_row_length(self, row):
205 assert len(row) == len(self._fields), (
206 "table = %s, row = %s/%d, fields = %s/%d" % (
207 self.__table, row, len(row), self._fields, len(self._fields)))
208
209
210 def _compare_fields_in_row(self, row):
211 """
212 Given a row as returned by a SELECT query, compare it to our existing in
213 memory fields. Fractional seconds are stripped from datetime values
214 before comparison.
215
216 @param row - A sequence of values corresponding to fields named in
217 The class attribute _fields.
218
219 @returns A dictionary listing the differences keyed by field name
220 containing tuples of (current_value, row_value).
221 """
222 self._assert_row_length(row)
223 differences = {}
224 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
225 for field, row_value in itertools.izip(self._fields, row):
226 current_value = getattr(self, field)
227 if (isinstance(current_value, datetime.datetime)
228 and isinstance(row_value, datetime.datetime)):
229 current_value = current_value.strftime(datetime_cmp_fmt)
230 row_value = row_value.strftime(datetime_cmp_fmt)
231 if current_value != row_value:
232 differences[field] = (current_value, row_value)
233 return differences
234
235
236 def _update_fields_from_row(self, row):
237 """
238 Update our field attributes using a single row returned by SELECT.
239
240 @param row - A sequence of values corresponding to fields named in
241 the class fields list.
242 """
243 self._assert_row_length(row)
244
245 self._valid_fields = set()
246 for field, value in itertools.izip(self._fields, row):
247 setattr(self, field, value)
248 self._valid_fields.add(field)
249
250 self._valid_fields.remove('id')
251
252
253 def update_from_database(self):
254 assert self.id is not None
255 row = self._fetch_row_from_db(self.id)
256 self._update_fields_from_row(row)
257
258
259 def count(self, where, table = None):
260 if not table:
261 table = self.__table
262
263 rows = _db.execute("""
264 SELECT count(*) FROM %s
265 WHERE %s
266 """ % (table, where))
267
268 assert len(rows) == 1
269
270 return int(rows[0][0])
271
272
273 def update_field(self, field, value):
274 assert field in self._valid_fields
275
276 if getattr(self, field) == value:
277 return
278
279 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
280 _db.execute(query, (value, self.id))
281
282 setattr(self, field, value)
283
284
285 def save(self):
286 if self.__new_record:
287 keys = self._fields[1:] # avoid id
288 columns = ','.join([str(key) for key in keys])
289 values = []
290 for key in keys:
291 value = getattr(self, key)
292 if value is None:
293 values.append('NULL')
294 else:
295 values.append('"%s"' % value)
296 values_str = ','.join(values)
297 query = ('INSERT INTO %s (%s) VALUES (%s)' %
298 (self.__table, columns, values_str))
299 _db.execute(query)
300 # Update our id to the one the database just assigned to us.
301 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
302
303
304 def delete(self):
305 self._instances_by_type_and_id.pop((type(self), id), None)
306 self._initialized = False
307 self._valid_fields.clear()
308 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
309 _db.execute(query, (self.id,))
310
311
312 @staticmethod
313 def _prefix_with(string, prefix):
314 if string:
315 string = prefix + string
316 return string
317
318
319 @classmethod
320 def fetch(cls, where='', params=(), joins='', order_by=''):
321 """
322 Construct instances of our class based on the given database query.
323
324 @yields One class instance for each row fetched.
325 """
326 order_by = cls._prefix_with(order_by, 'ORDER BY ')
327 where = cls._prefix_with(where, 'WHERE ')
328 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
329 '%(where)s %(order_by)s' % {'table' : cls._table_name,
330 'joins' : joins,
331 'where' : where,
332 'order_by' : order_by})
333 rows = _db.execute(query, params)
334 return [cls(id=row[0], row=row) for row in rows]
335
336
337class IneligibleHostQueue(DBObject):
338 _table_name = 'afe_ineligible_host_queues'
339 _fields = ('id', 'job_id', 'host_id')
340
341
342class AtomicGroup(DBObject):
343 _table_name = 'afe_atomic_groups'
344 _fields = ('id', 'name', 'description', 'max_number_of_machines',
345 'invalid')
346
347
348class Label(DBObject):
349 _table_name = 'afe_labels'
350 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
351 'only_if_needed', 'atomic_group_id')
352
353
354 def __repr__(self):
355 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
356 self.name, self.id, self.atomic_group_id)
357
358
359class Host(DBObject):
360 _table_name = 'afe_hosts'
361 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
362 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
363
364
365 def set_status(self,status):
366 logging.info('%s -> %s', self.hostname, status)
367 self.update_field('status',status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700368 # Noticed some time jumps after the last log message.
369 logging.debug('Host Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000370
371
372 def platform_and_labels(self):
373 """
374 Returns a tuple (platform_name, list_of_all_label_names).
375 """
376 rows = _db.execute("""
377 SELECT afe_labels.name, afe_labels.platform
378 FROM afe_labels
379 INNER JOIN afe_hosts_labels ON
380 afe_labels.id = afe_hosts_labels.label_id
381 WHERE afe_hosts_labels.host_id = %s
382 ORDER BY afe_labels.name
383 """, (self.id,))
384 platform = None
385 all_labels = []
386 for label_name, is_platform in rows:
387 if is_platform:
388 platform = label_name
389 all_labels.append(label_name)
390 return platform, all_labels
391
392
393 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
394
395
396 @classmethod
397 def cmp_for_sort(cls, a, b):
398 """
399 A comparison function for sorting Host objects by hostname.
400
401 This strips any trailing numeric digits, ignores leading 0s and
402 compares hostnames by the leading name and the trailing digits as a
403 number. If both hostnames do not match this pattern, they are simply
404 compared as lower case strings.
405
406 Example of how hostnames will be sorted:
407
408 alice, host1, host2, host09, host010, host10, host11, yolkfolk
409
410 This hopefully satisfy most people's hostname sorting needs regardless
411 of their exact naming schemes. Nobody sane should have both a host10
412 and host010 (but the algorithm works regardless).
413 """
414 lower_a = a.hostname.lower()
415 lower_b = b.hostname.lower()
416 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
417 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
418 if match_a and match_b:
419 name_a, number_a_str = match_a.groups()
420 name_b, number_b_str = match_b.groups()
421 number_a = int(number_a_str.lstrip('0'))
422 number_b = int(number_b_str.lstrip('0'))
423 result = cmp((name_a, number_a), (name_b, number_b))
424 if result == 0 and lower_a != lower_b:
425 # If they compared equal above but the lower case names are
426 # indeed different, don't report equality. abc012 != abc12.
427 return cmp(lower_a, lower_b)
428 return result
429 else:
430 return cmp(lower_a, lower_b)
431
432
433class HostQueueEntry(DBObject):
434 _table_name = 'afe_host_queue_entries'
435 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
436 'active', 'complete', 'deleted', 'execution_subdir',
437 'atomic_group_id', 'aborted', 'started_on')
438
439
440 def __init__(self, id=None, row=None, **kwargs):
441 assert id or row
442 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
443 self.job = Job(self.job_id)
444
445 if self.host_id:
446 self.host = Host(self.host_id)
447 else:
448 self.host = None
449
450 if self.atomic_group_id:
451 self.atomic_group = AtomicGroup(self.atomic_group_id,
452 always_query=False)
453 else:
454 self.atomic_group = None
455
jamesrenb55378a2010-03-02 22:19:49 +0000456
457 @classmethod
458 def clone(cls, template):
459 """
460 Creates a new row using the values from a template instance.
461
462 The new instance will not exist in the database or have a valid
463 id attribute until its save() method is called.
464 """
465 assert isinstance(template, cls)
466 new_row = [getattr(template, field) for field in cls._fields]
467 clone = cls(row=new_row, new_record=True)
468 clone.id = None
469 return clone
470
471
472 def _view_job_url(self):
473 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
474
475
476 def get_labels(self):
477 """
478 Get all labels associated with this host queue entry (either via the
479 meta_host or as a job dependency label). The labels yielded are not
480 guaranteed to be unique.
481
482 @yields Label instances associated with this host_queue_entry.
483 """
484 if self.meta_host:
485 yield Label(id=self.meta_host, always_query=False)
486 labels = Label.fetch(
487 joins="JOIN afe_jobs_dependency_labels AS deps "
488 "ON (afe_labels.id = deps.label_id)",
489 where="deps.job_id = %d" % self.job.id)
490 for label in labels:
491 yield label
492
493
494 def set_host(self, host):
495 if host:
496 logging.info('Assigning host %s to entry %s', host.hostname, self)
jamesrenb55378a2010-03-02 22:19:49 +0000497 self.update_field('host_id', host.id)
498 self.block_host(host.id)
499 else:
500 logging.info('Releasing host from %s', self)
jamesrenb55378a2010-03-02 22:19:49 +0000501 self.unblock_host(self.host.id)
502 self.update_field('host_id', None)
503
504 self.host = host
505
506
jamesrenb55378a2010-03-02 22:19:49 +0000507 def block_host(self, host_id):
508 logging.info("creating block %s/%s", self.job.id, host_id)
509 row = [0, self.job.id, host_id]
510 block = IneligibleHostQueue(row=row, new_record=True)
511 block.save()
512
513
514 def unblock_host(self, host_id):
515 logging.info("removing block %s/%s", self.job.id, host_id)
516 blocks = IneligibleHostQueue.fetch(
517 'job_id=%d and host_id=%d' % (self.job.id, host_id))
518 for block in blocks:
519 block.delete()
520
521
522 def set_execution_subdir(self, subdir=None):
523 if subdir is None:
524 assert self.host
525 subdir = self.host.hostname
526 self.update_field('execution_subdir', subdir)
527
528
529 def _get_hostname(self):
530 if self.host:
531 return self.host.hostname
532 return 'no host'
533
534
535 def __str__(self):
536 flags = []
537 if self.active:
538 flags.append('active')
539 if self.complete:
540 flags.append('complete')
541 if self.deleted:
542 flags.append('deleted')
543 if self.aborted:
544 flags.append('aborted')
545 flags_str = ','.join(flags)
546 if flags_str:
547 flags_str = ' [%s]' % flags_str
548 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
549 self.status, flags_str)
550
551
552 def set_status(self, status):
553 logging.info("%s -> %s", self, status)
554
555 self.update_field('status', status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700556 # Noticed some time jumps after last logging message.
557 logging.debug('Update Field Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000558
559 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
560 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
561 assert not (active and complete)
562
563 self.update_field('active', active)
564 self.update_field('complete', complete)
565
566 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000567 self._on_complete(status)
Dale Curtis74a314b2011-06-23 14:55:46 -0700568 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000569
570 should_email_status = (status.lower() in _notify_email_statuses or
571 'all' in _notify_email_statuses)
572 if should_email_status:
573 self._email_on_status(status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700574 logging.debug('HQE Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000575
576
jamesrene7c65cb2010-06-08 20:38:10 +0000577 def _on_complete(self, status):
578 if status is not models.HostQueueEntry.Status.ABORTED:
579 self.job.stop_if_necessary()
580
jamesrenb55378a2010-03-02 22:19:49 +0000581 if not self.execution_subdir:
582 return
583 # unregister any possible pidfiles associated with this queue entry
584 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
585 pidfile_id = _drone_manager.get_pidfile_id_from(
586 self.execution_path(), pidfile_name=pidfile_name)
587 _drone_manager.unregister_pidfile(pidfile_id)
588
589
Eric Li6f27d4f2010-09-29 10:55:17 -0700590 def _get_status_email_contents(self, status, summary=None, hostname=None):
591 """
592 Gather info for the status notification e-mails.
593
594 If needed, we could start using the Django templating engine to create
595 the subject and the e-mail body, but that doesn't seem necessary right
596 now.
597
598 @param status: Job status text. Mandatory.
599 @param summary: Job summary text. Optional.
600 @param hostname: A hostname for the job. Optional.
601
602 @return: Tuple (subject, body) for the notification e-mail.
603 """
604 job_stats = Job(id=self.job.id).get_execution_details()
605
606 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
607 (self.job.id, self.job.name, status))
608
609 if hostname is not None:
610 subject += '| Hostname: %s ' % hostname
611
612 if status not in ["1 Failed", "Failed"]:
613 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
614
615 body = "Job ID: %s\n" % self.job.id
616 body += "Job name: %s\n" % self.job.name
617 if hostname is not None:
618 body += "Host: %s\n" % hostname
619 if summary is not None:
620 body += "Summary: %s\n" % summary
621 body += "Status: %s\n" % status
622 body += "Results interface URL: %s\n" % self._view_job_url()
623 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
624 if int(job_stats['total_executed']) > 0:
625 body += "User tests executed: %s\n" % job_stats['total_executed']
626 body += "User tests passed: %s\n" % job_stats['total_passed']
627 body += "User tests failed: %s\n" % job_stats['total_failed']
628 body += ("User tests success rate: %.2f %%\n" %
629 job_stats['success_rate'])
630
631 if job_stats['failed_rows']:
632 body += "Failures:\n"
633 body += job_stats['failed_rows']
634
635 return subject, body
636
637
jamesrenb55378a2010-03-02 22:19:49 +0000638 def _email_on_status(self, status):
639 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700640 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000641 email_manager.manager.send_email(self.job.email_list, subject, body)
642
643
644 def _email_on_job_complete(self):
645 if not self.job.is_finished():
646 return
647
Eric Li6f27d4f2010-09-29 10:55:17 -0700648 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000649 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
650 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700651 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000652 (queue_entry._get_hostname(),
653 queue_entry.status))
654
Eric Li6f27d4f2010-09-29 10:55:17 -0700655 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000656 status_counts = models.Job.objects.get_status_counts(
657 [self.job.id])[self.job.id]
658 status = ', '.join('%d %s' % (count, status) for status, count
659 in status_counts.iteritems())
660
Eric Li6f27d4f2010-09-29 10:55:17 -0700661 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000662 email_manager.manager.send_email(self.job.email_list, subject, body)
663
664
665 def schedule_pre_job_tasks(self):
666 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
667 self.job.name, self.meta_host, self.atomic_group_id,
668 self.job.id, self.id, self.host.hostname, self.status)
669
670 self._do_schedule_pre_job_tasks()
671
672
673 def _do_schedule_pre_job_tasks(self):
674 # Every host goes thru the Verifying stage (which may or may not
675 # actually do anything as determined by get_pre_job_tasks).
676 self.set_status(models.HostQueueEntry.Status.VERIFYING)
677 self.job.schedule_pre_job_tasks(queue_entry=self)
678
679
680 def requeue(self):
681 assert self.host
682 self.set_status(models.HostQueueEntry.Status.QUEUED)
683 self.update_field('started_on', None)
684 # verify/cleanup failure sets the execution subdir, so reset it here
685 self.set_execution_subdir('')
686 if self.meta_host:
687 self.set_host(None)
688
689
690 @property
691 def aborted_by(self):
692 self._load_abort_info()
693 return self._aborted_by
694
695
696 @property
697 def aborted_on(self):
698 self._load_abort_info()
699 return self._aborted_on
700
701
702 def _load_abort_info(self):
703 """ Fetch info about who aborted the job. """
704 if hasattr(self, "_aborted_by"):
705 return
706 rows = _db.execute("""
707 SELECT afe_users.login,
708 afe_aborted_host_queue_entries.aborted_on
709 FROM afe_aborted_host_queue_entries
710 INNER JOIN afe_users
711 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
712 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
713 """, (self.id,))
714 if rows:
715 self._aborted_by, self._aborted_on = rows[0]
716 else:
717 self._aborted_by = self._aborted_on = None
718
719
720 def on_pending(self):
721 """
722 Called when an entry in a synchronous job has passed verify. If the
723 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
724 them in PENDING.
725 """
726 self.set_status(models.HostQueueEntry.Status.PENDING)
727 self.host.set_status(models.Host.Status.PENDING)
728
729 # Some debug code here: sends an email if an asynchronous job does not
730 # immediately enter Starting.
731 # TODO: Remove this once we figure out why asynchronous jobs are getting
732 # stuck in Pending.
733 self.job.run_if_ready(queue_entry=self)
734 if (self.job.synch_count == 1 and
735 self.status == models.HostQueueEntry.Status.PENDING):
736 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
737 message = 'Asynchronous job stuck in Pending'
738 email_manager.manager.enqueue_notify_email(subject, message)
739
740
741 def abort(self, dispatcher):
742 assert self.aborted and not self.complete
743
744 Status = models.HostQueueEntry.Status
745 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
746 # do nothing; post-job tasks will finish and then mark this entry
747 # with status "Aborted" and take care of the host
748 return
749
jamesren3bc70a12010-04-12 18:23:38 +0000750 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
751 Status.WAITING):
jamesrenb55378a2010-03-02 22:19:49 +0000752 assert not dispatcher.get_agents_for_entry(self)
753 self.host.set_status(models.Host.Status.READY)
754 elif self.status == Status.VERIFYING:
755 models.SpecialTask.objects.create(
756 task=models.SpecialTask.Task.CLEANUP,
757 host=models.Host.objects.get(id=self.host.id),
758 requested_by=self.job.owner_model())
759
760 self.set_status(Status.ABORTED)
761 self.job.abort_delay_ready_task()
762
763
764 def get_group_name(self):
765 atomic_group = self.atomic_group
766 if not atomic_group:
767 return ''
768
769 # Look at any meta_host and dependency labels and pick the first
770 # one that also specifies this atomic group. Use that label name
771 # as the group name if possible (it is more specific).
772 for label in self.get_labels():
773 if label.atomic_group_id:
774 assert label.atomic_group_id == atomic_group.id
775 return label.name
776 return atomic_group.name
777
778
779 def execution_tag(self):
Scott Zawalskid712cf32012-07-14 16:24:53 -0400780 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
781 'complete!=1 AND execution_subdir="" AND '
782 'status!="Queued";')
783 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
784 'status="Aborted" WHERE id=%s;')
785 try:
786 assert self.execution_subdir
787 except AssertionError:
788 # TODO(scottz): Remove temporary fix/info gathering pathway for
789 # crosbug.com/31595 once issue is root caused.
790 logging.error('No execution_subdir for host queue id:%s.', self.id)
791 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
792 for row in _db.execute(SQL_SUSPECT_ENTRIES):
793 logging.error(row)
794 logging.error('====DB DEBUG====\n')
795 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
796 logging.error('EXECUTING: %s', fix_query)
797 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
798 raise AssertionError(('self.execution_subdir not found. '
799 'See log for details.'))
800
jamesrenb55378a2010-03-02 22:19:49 +0000801 return "%s/%s" % (self.job.tag(), self.execution_subdir)
802
803
804 def execution_path(self):
805 return self.execution_tag()
806
807
808 def set_started_on_now(self):
809 self.update_field('started_on', datetime.datetime.now())
810
811
812 def is_hostless(self):
813 return (self.host_id is None
814 and self.meta_host is None
815 and self.atomic_group_id is None)
816
817
818class Job(DBObject):
819 _table_name = 'afe_jobs'
820 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
821 'control_type', 'created_on', 'synch_count', 'timeout',
822 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
Simran Basi9f508a12012-11-09 12:20:44 -0800823 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800824 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
825 'test_retry')
jamesrenb55378a2010-03-02 22:19:49 +0000826
827 # This does not need to be a column in the DB. The delays are likely to
828 # be configured short. If the scheduler is stopped and restarted in
829 # the middle of a job's delay cycle, the delay cycle will either be
830 # repeated or skipped depending on the number of Pending machines found
831 # when the restarted scheduler recovers to track it. Not a problem.
832 #
833 # A reference to the DelayedCallTask that will wake up the job should
834 # no other HQEs change state in time. Its end_time attribute is used
835 # by our run_with_ready_delay() method to determine if the wait is over.
836 _delay_ready_task = None
837
838 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
839 # all status='Pending' atomic group HQEs incase a delay was running when the
840 # scheduler was restarted and no more hosts ever successfully exit Verify.
841
842 def __init__(self, id=None, row=None, **kwargs):
843 assert id or row
844 super(Job, self).__init__(id=id, row=row, **kwargs)
845 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800846 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000847
848
849 def model(self):
850 return models.Job.objects.get(id=self.id)
851
852
853 def owner_model(self):
854 # work around the fact that the Job owner field is a string, not a
855 # foreign key
856 if not self._owner_model:
857 self._owner_model = models.User.objects.get(login=self.owner)
858 return self._owner_model
859
860
861 def is_server_job(self):
862 return self.control_type != 2
863
864
865 def tag(self):
866 return "%s-%s" % (self.id, self.owner)
867
868
869 def get_host_queue_entries(self):
870 rows = _db.execute("""
871 SELECT * FROM afe_host_queue_entries
872 WHERE job_id= %s
873 """, (self.id,))
874 entries = [HostQueueEntry(row=i) for i in rows]
875
876 assert len(entries)>0
877
878 return entries
879
880
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800881 def is_image_update_job(self):
882 """
883 Discover if the current job requires an OS update.
884
885 @return: True/False if OS should be updated before job is run.
886 """
887 # All image update jobs have the parameterized_job_id set.
888 if not self.parameterized_job_id:
889 return False
890
891 # Retrieve the ID of the ParameterizedJob this job is an instance of.
892 rows = _db.execute("""
893 SELECT test_id
894 FROM afe_parameterized_jobs
895 WHERE id = %s
896 """, (self.parameterized_job_id,))
897 if not rows:
898 return False
899 test_id = rows[0][0]
900
901 # Retrieve the ID of the known autoupdate_ParameterizedJob.
902 rows = _db.execute("""
903 SELECT id
904 FROM afe_autotests
905 WHERE name = 'autoupdate_ParameterizedJob'
906 """)
907 if not rows:
908 return False
909 update_id = rows[0][0]
910
911 # If the IDs are the same we've found an image update job.
912 if test_id == update_id:
913 # Finally, get the path to the OS image to install.
914 rows = _db.execute("""
915 SELECT parameter_value
916 FROM afe_parameterized_job_parameters
917 WHERE parameterized_job_id = %s
918 """, (self.parameterized_job_id,))
919 if rows:
920 # Save the path in update_image_path to use later as a command
921 # line parameter to autoserv.
922 self.update_image_path = rows[0][0]
923 return True
924
925 return False
926
927
Eric Li6f27d4f2010-09-29 10:55:17 -0700928 def get_execution_details(self):
929 """
930 Get test execution details for this job.
931
932 @return: Dictionary with test execution details
933 """
934 def _find_test_jobs(rows):
935 """
936 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
937 Those are autotest 'internal job' tests, so they should not be
938 counted when evaluating the test stats.
939
940 @param rows: List of rows (matrix) with database results.
941 """
942 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
943 n_test_jobs = 0
944 for r in rows:
945 test_name = r[0]
946 if job_test_pattern.match(test_name):
947 n_test_jobs += 1
948
949 return n_test_jobs
950
951 stats = {}
952
953 rows = _db.execute("""
954 SELECT t.test, s.word, t.reason
955 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
956 WHERE t.job_idx = j.job_idx
957 AND s.status_idx = t.status
958 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -0800959 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -0700960 """ % self.id)
961
Dale Curtis74a314b2011-06-23 14:55:46 -0700962 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -0700963
964 n_test_jobs = _find_test_jobs(rows)
965 n_test_jobs_failed = _find_test_jobs(failed_rows)
966
967 total_executed = len(rows) - n_test_jobs
968 total_failed = len(failed_rows) - n_test_jobs_failed
969
970 if total_executed > 0:
971 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
972 else:
973 success_rate = 0
974
975 stats['total_executed'] = total_executed
976 stats['total_failed'] = total_failed
977 stats['total_passed'] = total_executed - total_failed
978 stats['success_rate'] = success_rate
979
980 status_header = ("Test Name", "Status", "Reason")
981 if failed_rows:
982 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
983 status_header)
984 else:
985 stats['failed_rows'] = ''
986
987 time_row = _db.execute("""
988 SELECT started_time, finished_time
989 FROM tko_jobs
990 WHERE afe_job_id = %s
991 """ % self.id)
992
993 if time_row:
994 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -0800995 try:
996 delta = t_end - t_begin
997 minutes, seconds = divmod(delta.seconds, 60)
998 hours, minutes = divmod(minutes, 60)
999 stats['execution_time'] = ("%02d:%02d:%02d" %
1000 (hours, minutes, seconds))
1001 # One of t_end or t_begin are None
1002 except TypeError:
1003 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -07001004 else:
1005 stats['execution_time'] = '(none)'
1006
1007 return stats
1008
1009
jamesrenb55378a2010-03-02 22:19:49 +00001010 def set_status(self, status, update_queues=False):
1011 self.update_field('status',status)
1012
1013 if update_queues:
1014 for queue_entry in self.get_host_queue_entries():
1015 queue_entry.set_status(status)
1016
1017
1018 def keyval_dict(self):
1019 return self.model().keyval_dict()
1020
1021
1022 def _atomic_and_has_started(self):
1023 """
1024 @returns True if any of the HostQueueEntries associated with this job
1025 have entered the Status.STARTING state or beyond.
1026 """
1027 atomic_entries = models.HostQueueEntry.objects.filter(
1028 job=self.id, atomic_group__isnull=False)
1029 if atomic_entries.count() <= 0:
1030 return False
1031
1032 # These states may *only* be reached if Job.run() has been called.
1033 started_statuses = (models.HostQueueEntry.Status.STARTING,
1034 models.HostQueueEntry.Status.RUNNING,
1035 models.HostQueueEntry.Status.COMPLETED)
1036
1037 started_entries = atomic_entries.filter(status__in=started_statuses)
1038 return started_entries.count() > 0
1039
1040
1041 def _hosts_assigned_count(self):
1042 """The number of HostQueueEntries assigned a Host for this job."""
1043 entries = models.HostQueueEntry.objects.filter(job=self.id,
1044 host__isnull=False)
1045 return entries.count()
1046
1047
1048 def _pending_count(self):
1049 """The number of HostQueueEntries for this job in the Pending state."""
1050 pending_entries = models.HostQueueEntry.objects.filter(
1051 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1052 return pending_entries.count()
1053
1054
1055 def _max_hosts_needed_to_run(self, atomic_group):
1056 """
1057 @param atomic_group: The AtomicGroup associated with this job that we
1058 are using to set an upper bound on the threshold.
1059 @returns The maximum number of HostQueueEntries assigned a Host before
1060 this job can run.
1061 """
1062 return min(self._hosts_assigned_count(),
1063 atomic_group.max_number_of_machines)
1064
1065
1066 def _min_hosts_needed_to_run(self):
1067 """Return the minumum number of hsots needed to run this job."""
1068 return self.synch_count
1069
1070
1071 def is_ready(self):
1072 # NOTE: Atomic group jobs stop reporting ready after they have been
1073 # started to avoid launching multiple copies of one atomic job.
1074 # Only possible if synch_count is less than than half the number of
1075 # machines in the atomic group.
1076 pending_count = self._pending_count()
1077 atomic_and_has_started = self._atomic_and_has_started()
1078 ready = (pending_count >= self.synch_count
1079 and not atomic_and_has_started)
1080
1081 if not ready:
1082 logging.info(
1083 'Job %s not ready: %s pending, %s required '
1084 '(Atomic and started: %s)',
1085 self, pending_count, self.synch_count,
1086 atomic_and_has_started)
1087
1088 return ready
1089
1090
1091 def num_machines(self, clause = None):
1092 sql = "job_id=%s" % self.id
1093 if clause:
1094 sql += " AND (%s)" % clause
1095 return self.count(sql, table='afe_host_queue_entries')
1096
1097
1098 def num_queued(self):
1099 return self.num_machines('not complete')
1100
1101
1102 def num_active(self):
1103 return self.num_machines('active')
1104
1105
1106 def num_complete(self):
1107 return self.num_machines('complete')
1108
1109
1110 def is_finished(self):
1111 return self.num_complete() == self.num_machines()
1112
1113
1114 def _not_yet_run_entries(self, include_verifying=True):
1115 statuses = [models.HostQueueEntry.Status.QUEUED,
1116 models.HostQueueEntry.Status.PENDING]
1117 if include_verifying:
1118 statuses.append(models.HostQueueEntry.Status.VERIFYING)
1119 return models.HostQueueEntry.objects.filter(job=self.id,
1120 status__in=statuses)
1121
1122
1123 def _stop_all_entries(self):
1124 entries_to_stop = self._not_yet_run_entries(
1125 include_verifying=False)
1126 for child_entry in entries_to_stop:
1127 assert not child_entry.complete, (
1128 '%s status=%s, active=%s, complete=%s' %
1129 (child_entry.id, child_entry.status, child_entry.active,
1130 child_entry.complete))
1131 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1132 child_entry.host.status = models.Host.Status.READY
1133 child_entry.host.save()
1134 child_entry.status = models.HostQueueEntry.Status.STOPPED
1135 child_entry.save()
1136
1137
1138 def stop_if_necessary(self):
1139 not_yet_run = self._not_yet_run_entries()
1140 if not_yet_run.count() < self.synch_count:
1141 self._stop_all_entries()
1142
1143
jamesrenb55378a2010-03-02 22:19:49 +00001144 def _next_group_name(self, group_name=''):
1145 """@returns a directory name to use for the next host group results."""
1146 if group_name:
1147 # Sanitize for use as a pathname.
1148 group_name = group_name.replace(os.path.sep, '_')
1149 if group_name.startswith('.'):
1150 group_name = '_' + group_name[1:]
1151 # Add a separator between the group name and 'group%d'.
1152 group_name += '.'
1153 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1154 query = models.HostQueueEntry.objects.filter(
1155 job=self.id).values('execution_subdir').distinct()
1156 subdirs = (entry['execution_subdir'] for entry in query)
1157 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1158 ids = [int(match.group(1)) for match in group_matches if match]
1159 if ids:
1160 next_id = max(ids) + 1
1161 else:
1162 next_id = 0
1163 return '%sgroup%d' % (group_name, next_id)
1164
1165
1166 def get_group_entries(self, queue_entry_from_group):
1167 """
1168 @param queue_entry_from_group: A HostQueueEntry instance to find other
1169 group entries on this job for.
1170
1171 @returns A list of HostQueueEntry objects all executing this job as
1172 part of the same group as the one supplied (having the same
1173 execution_subdir).
1174 """
1175 execution_subdir = queue_entry_from_group.execution_subdir
1176 return list(HostQueueEntry.fetch(
1177 where='job_id=%s AND execution_subdir=%s',
1178 params=(self.id, execution_subdir)))
1179
1180
1181 def _should_run_cleanup(self, queue_entry):
1182 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1183 return True
1184 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1185 return queue_entry.host.dirty
1186 return False
1187
1188
1189 def _should_run_verify(self, queue_entry):
1190 do_not_verify = (queue_entry.host.protection ==
1191 host_protections.Protection.DO_NOT_VERIFY)
1192 if do_not_verify:
1193 return False
1194 return self.run_verify
1195
1196
1197 def schedule_pre_job_tasks(self, queue_entry):
1198 """
1199 Get a list of tasks to perform before the host_queue_entry
1200 may be used to run this Job (such as Cleanup & Verify).
1201
1202 @returns A list of tasks to be done to the given queue_entry before
1203 it should be considered be ready to run this job. The last
1204 task in the list calls HostQueueEntry.on_pending(), which
1205 continues the flow of the job.
1206 """
1207 if self._should_run_cleanup(queue_entry):
1208 task = models.SpecialTask.Task.CLEANUP
1209 elif self._should_run_verify(queue_entry):
1210 task = models.SpecialTask.Task.VERIFY
1211 else:
1212 queue_entry.on_pending()
1213 return
1214
1215 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
1216 models.SpecialTask.objects.create(
1217 host=models.Host.objects.get(id=queue_entry.host_id),
1218 queue_entry=queue_entry, task=task)
1219
1220
1221 def _assign_new_group(self, queue_entries, group_name=''):
1222 if len(queue_entries) == 1:
1223 group_subdir_name = queue_entries[0].host.hostname
1224 else:
1225 group_subdir_name = self._next_group_name(group_name)
1226 logging.info('Running synchronous job %d hosts %s as %s',
1227 self.id, [entry.host.hostname for entry in queue_entries],
1228 group_subdir_name)
1229
1230 for queue_entry in queue_entries:
1231 queue_entry.set_execution_subdir(group_subdir_name)
1232
1233
1234 def _choose_group_to_run(self, include_queue_entry):
1235 """
1236 @returns A tuple containing a list of HostQueueEntry instances to be
1237 used to run this Job, a string group name to suggest giving
1238 to this job in the results database.
1239 """
1240 atomic_group = include_queue_entry.atomic_group
1241 chosen_entries = [include_queue_entry]
1242 if atomic_group:
1243 num_entries_wanted = atomic_group.max_number_of_machines
1244 else:
1245 num_entries_wanted = self.synch_count
1246 num_entries_wanted -= len(chosen_entries)
1247
1248 if num_entries_wanted > 0:
1249 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1250 pending_entries = list(HostQueueEntry.fetch(
1251 where=where_clause,
1252 params=(self.id, include_queue_entry.id)))
1253
1254 # Sort the chosen hosts by hostname before slicing.
1255 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1256 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1257 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1258 chosen_entries += pending_entries[:num_entries_wanted]
1259
1260 # Sanity check. We'll only ever be called if this can be met.
1261 if len(chosen_entries) < self.synch_count:
1262 message = ('job %s got less than %s chosen entries: %s' % (
1263 self.id, self.synch_count, chosen_entries))
1264 logging.error(message)
1265 email_manager.manager.enqueue_notify_email(
1266 'Job not started, too few chosen entries', message)
1267 return []
1268
1269 group_name = include_queue_entry.get_group_name()
1270
1271 self._assign_new_group(chosen_entries, group_name=group_name)
1272 return chosen_entries
1273
1274
1275 def run_if_ready(self, queue_entry):
1276 """
1277 Run this job by kicking its HQEs into status='Starting' if enough
1278 hosts are ready for it to run.
1279
1280 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1281 ready to run.
1282 """
1283 if not self.is_ready():
1284 self.stop_if_necessary()
1285 elif queue_entry.atomic_group:
1286 self.run_with_ready_delay(queue_entry)
1287 else:
1288 self.run(queue_entry)
1289
1290
1291 def run_with_ready_delay(self, queue_entry):
1292 """
1293 Start a delay to wait for more hosts to enter Pending state before
1294 launching an atomic group job. Once set, the a delay cannot be reset.
1295
1296 @param queue_entry: The HostQueueEntry object to get atomic group
1297 info from and pass to run_if_ready when the delay is up.
1298
1299 @returns An Agent to run the job as appropriate or None if a delay
1300 has already been set.
1301 """
1302 assert queue_entry.job_id == self.id
1303 assert queue_entry.atomic_group
1304 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1305 over_max_threshold = (self._pending_count() >=
1306 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1307 delay_expired = (self._delay_ready_task and
1308 time.time() >= self._delay_ready_task.end_time)
1309
1310 # Delay is disabled or we already have enough? Do not wait to run.
1311 if not delay or over_max_threshold or delay_expired:
1312 self.run(queue_entry)
1313 else:
1314 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1315
1316
1317 def request_abort(self):
1318 """Request that this Job be aborted on the next scheduler cycle."""
1319 self.model().abort()
1320
1321
1322 def schedule_delayed_callback_task(self, queue_entry):
1323 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1324
1325 if self._delay_ready_task:
1326 return None
1327
1328 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1329
1330 def run_job_after_delay():
1331 logging.info('Job %s done waiting for extra hosts.', self)
1332 # Check to see if the job is still relevant. It could have aborted
1333 # while we were waiting or hosts could have disappearred, etc.
1334 if self._pending_count() < self._min_hosts_needed_to_run():
1335 logging.info('Job %s had too few Pending hosts after waiting '
1336 'for extras. Not running.', self)
1337 self.request_abort()
1338 return
1339 return self.run(queue_entry)
1340
1341 logging.info('Job %s waiting up to %s seconds for more hosts.',
1342 self.id, delay)
1343 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1344 callback=run_job_after_delay)
1345 return self._delay_ready_task
1346
1347
1348 def run(self, queue_entry):
1349 """
1350 @param queue_entry: The HostQueueEntry instance calling this method.
1351 """
1352 if queue_entry.atomic_group and self._atomic_and_has_started():
1353 logging.error('Job.run() called on running atomic Job %d '
1354 'with HQE %s.', self.id, queue_entry)
1355 return
1356 queue_entries = self._choose_group_to_run(queue_entry)
1357 if queue_entries:
1358 self._finish_run(queue_entries)
1359
1360
1361 def _finish_run(self, queue_entries):
1362 for queue_entry in queue_entries:
1363 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1364 self.abort_delay_ready_task()
1365
1366
1367 def abort_delay_ready_task(self):
1368 """Abort the delayed task associated with this job, if any."""
1369 if self._delay_ready_task:
1370 # Cancel any pending callback that would try to run again
1371 # as we are already running.
1372 self._delay_ready_task.abort()
1373
1374
1375 def __str__(self):
1376 return '%s-%s' % (self.id, self.owner)