blob: c452430ea7de27af87ab1bd9633bf5cfd10f3829 [file] [log] [blame]
Aviv Keshet0b9cfc92013-02-05 11:36:02 -08001# pylint: disable-msg=C0111
2
jamesrenb55378a2010-03-02 22:19:49 +00003"""Database model classes for the scheduler.
4
5Contains model classes abstracting the various DB tables used by the scheduler.
6These overlap the Django models in basic functionality, but were written before
7the Django models existed and have not yet been phased out. Some of them
8(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
9which would probably be ill-suited for inclusion in the general Django model
10classes.
11
12Globals:
13_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
14 one of these statuses, an email will be sent to the job's email_list.
15 comes from global_config.
16_base_url: URL to the local AFE server, used to construct URLs for emails.
17_db: DatabaseConnection for this module.
18_drone_manager: reference to global DroneManager instance.
19"""
20
21import datetime, itertools, logging, os, re, sys, time, weakref
jamesrenb55378a2010-03-02 22:19:49 +000022from autotest_lib.client.common_lib import global_config, host_protections
Eric Li6f27d4f2010-09-29 10:55:17 -070023from autotest_lib.client.common_lib import global_config, utils
jamesrenb55378a2010-03-02 22:19:49 +000024from autotest_lib.frontend.afe import models, model_attributes
25from autotest_lib.database import database_connection
26from autotest_lib.scheduler import drone_manager, email_manager
27from autotest_lib.scheduler import scheduler_config
Fang Deng1d6c2a02013-04-17 15:25:45 -070028from autotest_lib.site_utils.graphite import stats
Aviv Keshet888a94d2013-05-14 17:52:33 -070029from autotest_lib.client.common_lib import control_data
jamesrenb55378a2010-03-02 22:19:49 +000030
31_notify_email_statuses = []
32_base_url = None
33
34_db = None
35_drone_manager = None
36
37def initialize():
38 global _db
39 _db = database_connection.DatabaseConnection('AUTOTEST_WEB')
40 _db.connect(db_type='django')
41
42 notify_statuses_list = global_config.global_config.get_config_value(
43 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
44 default='')
45 global _notify_email_statuses
46 _notify_email_statuses = [status for status in
47 re.split(r'[\s,;:]', notify_statuses_list.lower())
48 if status]
49
50 # AUTOTEST_WEB.base_url is still a supported config option as some people
51 # may wish to override the entire url.
52 global _base_url
53 config_base_url = global_config.global_config.get_config_value(
54 scheduler_config.CONFIG_SECTION, 'base_url', default='')
55 if config_base_url:
56 _base_url = config_base_url
57 else:
58 # For the common case of everything running on a single server you
59 # can just set the hostname in a single place in the config file.
60 server_name = global_config.global_config.get_config_value(
61 'SERVER', 'hostname')
62 if not server_name:
63 logging.critical('[SERVER] hostname missing from the config file.')
64 sys.exit(1)
65 _base_url = 'http://%s/afe/' % server_name
66
67 initialize_globals()
68
69
70def initialize_globals():
71 global _drone_manager
72 _drone_manager = drone_manager.instance()
73
74
75class DelayedCallTask(object):
76 """
77 A task object like AgentTask for an Agent to run that waits for the
78 specified amount of time to have elapsed before calling the supplied
79 callback once and finishing. If the callback returns anything, it is
80 assumed to be a new Agent instance and will be added to the dispatcher.
81
82 @attribute end_time: The absolute posix time after which this task will
83 call its callback when it is polled and be finished.
84
85 Also has all attributes required by the Agent class.
86 """
87 def __init__(self, delay_seconds, callback, now_func=None):
88 """
89 @param delay_seconds: The delay in seconds from now that this task
90 will call the supplied callback and be done.
91 @param callback: A callable to be called by this task once after at
92 least delay_seconds time has elapsed. It must return None
93 or a new Agent instance.
94 @param now_func: A time.time like function. Default: time.time.
95 Used for testing.
96 """
97 assert delay_seconds > 0
98 assert callable(callback)
99 if not now_func:
100 now_func = time.time
101 self._now_func = now_func
102 self._callback = callback
103
104 self.end_time = self._now_func() + delay_seconds
105
106 # These attributes are required by Agent.
107 self.aborted = False
108 self.host_ids = ()
109 self.success = False
110 self.queue_entry_ids = ()
111 self.num_processes = 0
112
113
114 def poll(self):
115 if not self.is_done() and self._now_func() >= self.end_time:
116 self._callback()
117 self.success = True
118
119
120 def is_done(self):
121 return self.success or self.aborted
122
123
124 def abort(self):
125 self.aborted = True
126
127
128class DBError(Exception):
129 """Raised by the DBObject constructor when its select fails."""
130
131
132class DBObject(object):
133 """A miniature object relational model for the database."""
134
135 # Subclasses MUST override these:
136 _table_name = ''
137 _fields = ()
138
139 # A mapping from (type, id) to the instance of the object for that
140 # particular id. This prevents us from creating new Job() and Host()
141 # instances for every HostQueueEntry object that we instantiate as
142 # multiple HQEs often share the same Job.
143 _instances_by_type_and_id = weakref.WeakValueDictionary()
144 _initialized = False
145
146
147 def __new__(cls, id=None, **kwargs):
148 """
149 Look to see if we already have an instance for this particular type
150 and id. If so, use it instead of creating a duplicate instance.
151 """
152 if id is not None:
153 instance = cls._instances_by_type_and_id.get((cls, id))
154 if instance:
155 return instance
156 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
157
158
159 def __init__(self, id=None, row=None, new_record=False, always_query=True):
160 assert bool(id) or bool(row)
161 if id is not None and row is not None:
162 assert id == row[0]
163 assert self._table_name, '_table_name must be defined in your class'
164 assert self._fields, '_fields must be defined in your class'
165 if not new_record:
166 if self._initialized and not always_query:
167 return # We've already been initialized.
168 if id is None:
169 id = row[0]
170 # Tell future constructors to use us instead of re-querying while
171 # this instance is still around.
172 self._instances_by_type_and_id[(type(self), id)] = self
173
174 self.__table = self._table_name
175
176 self.__new_record = new_record
177
178 if row is None:
179 row = self._fetch_row_from_db(id)
180
181 if self._initialized:
182 differences = self._compare_fields_in_row(row)
183 if differences:
184 logging.warn(
185 'initialized %s %s instance requery is updating: %s',
186 type(self), self.id, differences)
187 self._update_fields_from_row(row)
188 self._initialized = True
189
190
191 @classmethod
192 def _clear_instance_cache(cls):
193 """Used for testing, clear the internal instance cache."""
194 cls._instances_by_type_and_id.clear()
195
196
197 def _fetch_row_from_db(self, row_id):
198 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
199 rows = _db.execute(sql, (row_id,))
200 if not rows:
201 raise DBError("row not found (table=%s, row id=%s)"
202 % (self.__table, row_id))
203 return rows[0]
204
205
206 def _assert_row_length(self, row):
207 assert len(row) == len(self._fields), (
208 "table = %s, row = %s/%d, fields = %s/%d" % (
209 self.__table, row, len(row), self._fields, len(self._fields)))
210
211
212 def _compare_fields_in_row(self, row):
213 """
214 Given a row as returned by a SELECT query, compare it to our existing in
215 memory fields. Fractional seconds are stripped from datetime values
216 before comparison.
217
218 @param row - A sequence of values corresponding to fields named in
219 The class attribute _fields.
220
221 @returns A dictionary listing the differences keyed by field name
222 containing tuples of (current_value, row_value).
223 """
224 self._assert_row_length(row)
225 differences = {}
226 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
227 for field, row_value in itertools.izip(self._fields, row):
228 current_value = getattr(self, field)
229 if (isinstance(current_value, datetime.datetime)
230 and isinstance(row_value, datetime.datetime)):
231 current_value = current_value.strftime(datetime_cmp_fmt)
232 row_value = row_value.strftime(datetime_cmp_fmt)
233 if current_value != row_value:
234 differences[field] = (current_value, row_value)
235 return differences
236
237
238 def _update_fields_from_row(self, row):
239 """
240 Update our field attributes using a single row returned by SELECT.
241
242 @param row - A sequence of values corresponding to fields named in
243 the class fields list.
244 """
245 self._assert_row_length(row)
246
247 self._valid_fields = set()
248 for field, value in itertools.izip(self._fields, row):
249 setattr(self, field, value)
250 self._valid_fields.add(field)
251
252 self._valid_fields.remove('id')
253
254
255 def update_from_database(self):
256 assert self.id is not None
257 row = self._fetch_row_from_db(self.id)
258 self._update_fields_from_row(row)
259
260
261 def count(self, where, table = None):
262 if not table:
263 table = self.__table
264
265 rows = _db.execute("""
266 SELECT count(*) FROM %s
267 WHERE %s
268 """ % (table, where))
269
270 assert len(rows) == 1
271
272 return int(rows[0][0])
273
274
275 def update_field(self, field, value):
276 assert field in self._valid_fields
277
278 if getattr(self, field) == value:
279 return
280
281 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
282 _db.execute(query, (value, self.id))
283
284 setattr(self, field, value)
285
286
287 def save(self):
288 if self.__new_record:
289 keys = self._fields[1:] # avoid id
290 columns = ','.join([str(key) for key in keys])
291 values = []
292 for key in keys:
293 value = getattr(self, key)
294 if value is None:
295 values.append('NULL')
296 else:
297 values.append('"%s"' % value)
298 values_str = ','.join(values)
299 query = ('INSERT INTO %s (%s) VALUES (%s)' %
300 (self.__table, columns, values_str))
301 _db.execute(query)
302 # Update our id to the one the database just assigned to us.
303 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
304
305
306 def delete(self):
307 self._instances_by_type_and_id.pop((type(self), id), None)
308 self._initialized = False
309 self._valid_fields.clear()
310 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
311 _db.execute(query, (self.id,))
312
313
314 @staticmethod
315 def _prefix_with(string, prefix):
316 if string:
317 string = prefix + string
318 return string
319
320
321 @classmethod
322 def fetch(cls, where='', params=(), joins='', order_by=''):
323 """
324 Construct instances of our class based on the given database query.
325
326 @yields One class instance for each row fetched.
327 """
328 order_by = cls._prefix_with(order_by, 'ORDER BY ')
329 where = cls._prefix_with(where, 'WHERE ')
330 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
331 '%(where)s %(order_by)s' % {'table' : cls._table_name,
332 'joins' : joins,
333 'where' : where,
334 'order_by' : order_by})
335 rows = _db.execute(query, params)
336 return [cls(id=row[0], row=row) for row in rows]
337
338
339class IneligibleHostQueue(DBObject):
340 _table_name = 'afe_ineligible_host_queues'
341 _fields = ('id', 'job_id', 'host_id')
342
343
344class AtomicGroup(DBObject):
345 _table_name = 'afe_atomic_groups'
346 _fields = ('id', 'name', 'description', 'max_number_of_machines',
347 'invalid')
348
349
350class Label(DBObject):
351 _table_name = 'afe_labels'
352 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
353 'only_if_needed', 'atomic_group_id')
354
355
356 def __repr__(self):
357 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
358 self.name, self.id, self.atomic_group_id)
359
360
361class Host(DBObject):
362 _table_name = 'afe_hosts'
363 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
364 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700365 _timer = stats.Timer("scheduler_models.Host")
jamesrenb55378a2010-03-02 22:19:49 +0000366
367
Fang Deng1d6c2a02013-04-17 15:25:45 -0700368 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000369 def set_status(self,status):
370 logging.info('%s -> %s', self.hostname, status)
371 self.update_field('status',status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700372 # Noticed some time jumps after the last log message.
373 logging.debug('Host Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000374
375
376 def platform_and_labels(self):
377 """
378 Returns a tuple (platform_name, list_of_all_label_names).
379 """
380 rows = _db.execute("""
381 SELECT afe_labels.name, afe_labels.platform
382 FROM afe_labels
383 INNER JOIN afe_hosts_labels ON
384 afe_labels.id = afe_hosts_labels.label_id
385 WHERE afe_hosts_labels.host_id = %s
386 ORDER BY afe_labels.name
387 """, (self.id,))
388 platform = None
389 all_labels = []
390 for label_name, is_platform in rows:
391 if is_platform:
392 platform = label_name
393 all_labels.append(label_name)
394 return platform, all_labels
395
396
397 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
398
399
400 @classmethod
401 def cmp_for_sort(cls, a, b):
402 """
403 A comparison function for sorting Host objects by hostname.
404
405 This strips any trailing numeric digits, ignores leading 0s and
406 compares hostnames by the leading name and the trailing digits as a
407 number. If both hostnames do not match this pattern, they are simply
408 compared as lower case strings.
409
410 Example of how hostnames will be sorted:
411
412 alice, host1, host2, host09, host010, host10, host11, yolkfolk
413
414 This hopefully satisfy most people's hostname sorting needs regardless
415 of their exact naming schemes. Nobody sane should have both a host10
416 and host010 (but the algorithm works regardless).
417 """
418 lower_a = a.hostname.lower()
419 lower_b = b.hostname.lower()
420 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
421 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
422 if match_a and match_b:
423 name_a, number_a_str = match_a.groups()
424 name_b, number_b_str = match_b.groups()
425 number_a = int(number_a_str.lstrip('0'))
426 number_b = int(number_b_str.lstrip('0'))
427 result = cmp((name_a, number_a), (name_b, number_b))
428 if result == 0 and lower_a != lower_b:
429 # If they compared equal above but the lower case names are
430 # indeed different, don't report equality. abc012 != abc12.
431 return cmp(lower_a, lower_b)
432 return result
433 else:
434 return cmp(lower_a, lower_b)
435
436
437class HostQueueEntry(DBObject):
438 _table_name = 'afe_host_queue_entries'
439 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
440 'active', 'complete', 'deleted', 'execution_subdir',
441 'atomic_group_id', 'aborted', 'started_on')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700442 _timer = stats.Timer('scheduler_models.HostQueueEntry')
jamesrenb55378a2010-03-02 22:19:49 +0000443
444
445 def __init__(self, id=None, row=None, **kwargs):
446 assert id or row
447 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
448 self.job = Job(self.job_id)
449
450 if self.host_id:
451 self.host = Host(self.host_id)
452 else:
453 self.host = None
454
455 if self.atomic_group_id:
456 self.atomic_group = AtomicGroup(self.atomic_group_id,
457 always_query=False)
458 else:
459 self.atomic_group = None
460
jamesrenb55378a2010-03-02 22:19:49 +0000461
462 @classmethod
463 def clone(cls, template):
464 """
465 Creates a new row using the values from a template instance.
466
467 The new instance will not exist in the database or have a valid
468 id attribute until its save() method is called.
469 """
470 assert isinstance(template, cls)
471 new_row = [getattr(template, field) for field in cls._fields]
472 clone = cls(row=new_row, new_record=True)
473 clone.id = None
474 return clone
475
476
477 def _view_job_url(self):
478 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
479
480
481 def get_labels(self):
482 """
483 Get all labels associated with this host queue entry (either via the
484 meta_host or as a job dependency label). The labels yielded are not
485 guaranteed to be unique.
486
487 @yields Label instances associated with this host_queue_entry.
488 """
489 if self.meta_host:
490 yield Label(id=self.meta_host, always_query=False)
491 labels = Label.fetch(
492 joins="JOIN afe_jobs_dependency_labels AS deps "
493 "ON (afe_labels.id = deps.label_id)",
494 where="deps.job_id = %d" % self.job.id)
495 for label in labels:
496 yield label
497
498
499 def set_host(self, host):
500 if host:
501 logging.info('Assigning host %s to entry %s', host.hostname, self)
jamesrenb55378a2010-03-02 22:19:49 +0000502 self.update_field('host_id', host.id)
503 self.block_host(host.id)
504 else:
505 logging.info('Releasing host from %s', self)
jamesrenb55378a2010-03-02 22:19:49 +0000506 self.unblock_host(self.host.id)
507 self.update_field('host_id', None)
508
509 self.host = host
510
511
jamesrenb55378a2010-03-02 22:19:49 +0000512 def block_host(self, host_id):
513 logging.info("creating block %s/%s", self.job.id, host_id)
514 row = [0, self.job.id, host_id]
515 block = IneligibleHostQueue(row=row, new_record=True)
516 block.save()
517
518
519 def unblock_host(self, host_id):
520 logging.info("removing block %s/%s", self.job.id, host_id)
521 blocks = IneligibleHostQueue.fetch(
522 'job_id=%d and host_id=%d' % (self.job.id, host_id))
523 for block in blocks:
524 block.delete()
525
526
527 def set_execution_subdir(self, subdir=None):
528 if subdir is None:
529 assert self.host
530 subdir = self.host.hostname
531 self.update_field('execution_subdir', subdir)
532
533
534 def _get_hostname(self):
535 if self.host:
536 return self.host.hostname
537 return 'no host'
538
539
540 def __str__(self):
541 flags = []
542 if self.active:
543 flags.append('active')
544 if self.complete:
545 flags.append('complete')
546 if self.deleted:
547 flags.append('deleted')
548 if self.aborted:
549 flags.append('aborted')
550 flags_str = ','.join(flags)
551 if flags_str:
552 flags_str = ' [%s]' % flags_str
553 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
554 self.status, flags_str)
555
556
Fang Deng1d6c2a02013-04-17 15:25:45 -0700557 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +0000558 def set_status(self, status):
559 logging.info("%s -> %s", self, status)
560
561 self.update_field('status', status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700562 # Noticed some time jumps after last logging message.
563 logging.debug('Update Field Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000564
565 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
566 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
567 assert not (active and complete)
568
569 self.update_field('active', active)
570 self.update_field('complete', complete)
571
572 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000573 self._on_complete(status)
Dale Curtis74a314b2011-06-23 14:55:46 -0700574 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000575
576 should_email_status = (status.lower() in _notify_email_statuses or
577 'all' in _notify_email_statuses)
578 if should_email_status:
579 self._email_on_status(status)
Simran Basi3f6717d2012-09-13 15:21:22 -0700580 logging.debug('HQE Set Status Complete')
jamesrenb55378a2010-03-02 22:19:49 +0000581
582
jamesrene7c65cb2010-06-08 20:38:10 +0000583 def _on_complete(self, status):
584 if status is not models.HostQueueEntry.Status.ABORTED:
585 self.job.stop_if_necessary()
586
jamesrenb55378a2010-03-02 22:19:49 +0000587 if not self.execution_subdir:
588 return
589 # unregister any possible pidfiles associated with this queue entry
590 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
591 pidfile_id = _drone_manager.get_pidfile_id_from(
592 self.execution_path(), pidfile_name=pidfile_name)
593 _drone_manager.unregister_pidfile(pidfile_id)
594
595
Eric Li6f27d4f2010-09-29 10:55:17 -0700596 def _get_status_email_contents(self, status, summary=None, hostname=None):
597 """
598 Gather info for the status notification e-mails.
599
600 If needed, we could start using the Django templating engine to create
601 the subject and the e-mail body, but that doesn't seem necessary right
602 now.
603
604 @param status: Job status text. Mandatory.
605 @param summary: Job summary text. Optional.
606 @param hostname: A hostname for the job. Optional.
607
608 @return: Tuple (subject, body) for the notification e-mail.
609 """
610 job_stats = Job(id=self.job.id).get_execution_details()
611
612 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
613 (self.job.id, self.job.name, status))
614
615 if hostname is not None:
616 subject += '| Hostname: %s ' % hostname
617
618 if status not in ["1 Failed", "Failed"]:
619 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
620
621 body = "Job ID: %s\n" % self.job.id
622 body += "Job name: %s\n" % self.job.name
623 if hostname is not None:
624 body += "Host: %s\n" % hostname
625 if summary is not None:
626 body += "Summary: %s\n" % summary
627 body += "Status: %s\n" % status
628 body += "Results interface URL: %s\n" % self._view_job_url()
629 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
630 if int(job_stats['total_executed']) > 0:
631 body += "User tests executed: %s\n" % job_stats['total_executed']
632 body += "User tests passed: %s\n" % job_stats['total_passed']
633 body += "User tests failed: %s\n" % job_stats['total_failed']
634 body += ("User tests success rate: %.2f %%\n" %
635 job_stats['success_rate'])
636
637 if job_stats['failed_rows']:
638 body += "Failures:\n"
639 body += job_stats['failed_rows']
640
641 return subject, body
642
643
jamesrenb55378a2010-03-02 22:19:49 +0000644 def _email_on_status(self, status):
645 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700646 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000647 email_manager.manager.send_email(self.job.email_list, subject, body)
648
649
650 def _email_on_job_complete(self):
651 if not self.job.is_finished():
652 return
653
Eric Li6f27d4f2010-09-29 10:55:17 -0700654 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000655 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
656 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700657 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000658 (queue_entry._get_hostname(),
659 queue_entry.status))
660
Eric Li6f27d4f2010-09-29 10:55:17 -0700661 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000662 status_counts = models.Job.objects.get_status_counts(
663 [self.job.id])[self.job.id]
664 status = ', '.join('%d %s' % (count, status) for status, count
665 in status_counts.iteritems())
666
Eric Li6f27d4f2010-09-29 10:55:17 -0700667 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000668 email_manager.manager.send_email(self.job.email_list, subject, body)
669
670
671 def schedule_pre_job_tasks(self):
672 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
673 self.job.name, self.meta_host, self.atomic_group_id,
674 self.job.id, self.id, self.host.hostname, self.status)
675
676 self._do_schedule_pre_job_tasks()
677
678
679 def _do_schedule_pre_job_tasks(self):
jamesrenb55378a2010-03-02 22:19:49 +0000680 self.job.schedule_pre_job_tasks(queue_entry=self)
681
682
683 def requeue(self):
684 assert self.host
685 self.set_status(models.HostQueueEntry.Status.QUEUED)
686 self.update_field('started_on', None)
687 # verify/cleanup failure sets the execution subdir, so reset it here
688 self.set_execution_subdir('')
689 if self.meta_host:
690 self.set_host(None)
691
692
693 @property
694 def aborted_by(self):
695 self._load_abort_info()
696 return self._aborted_by
697
698
699 @property
700 def aborted_on(self):
701 self._load_abort_info()
702 return self._aborted_on
703
704
705 def _load_abort_info(self):
706 """ Fetch info about who aborted the job. """
707 if hasattr(self, "_aborted_by"):
708 return
709 rows = _db.execute("""
710 SELECT afe_users.login,
711 afe_aborted_host_queue_entries.aborted_on
712 FROM afe_aborted_host_queue_entries
713 INNER JOIN afe_users
714 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
715 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
716 """, (self.id,))
717 if rows:
718 self._aborted_by, self._aborted_on = rows[0]
719 else:
720 self._aborted_by = self._aborted_on = None
721
722
723 def on_pending(self):
724 """
725 Called when an entry in a synchronous job has passed verify. If the
726 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
727 them in PENDING.
728 """
729 self.set_status(models.HostQueueEntry.Status.PENDING)
730 self.host.set_status(models.Host.Status.PENDING)
731
732 # Some debug code here: sends an email if an asynchronous job does not
733 # immediately enter Starting.
734 # TODO: Remove this once we figure out why asynchronous jobs are getting
735 # stuck in Pending.
736 self.job.run_if_ready(queue_entry=self)
737 if (self.job.synch_count == 1 and
738 self.status == models.HostQueueEntry.Status.PENDING):
739 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
740 message = 'Asynchronous job stuck in Pending'
741 email_manager.manager.enqueue_notify_email(subject, message)
742
743
744 def abort(self, dispatcher):
745 assert self.aborted and not self.complete
746
747 Status = models.HostQueueEntry.Status
748 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
749 # do nothing; post-job tasks will finish and then mark this entry
750 # with status "Aborted" and take care of the host
751 return
752
jamesren3bc70a12010-04-12 18:23:38 +0000753 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
754 Status.WAITING):
jamesrenb55378a2010-03-02 22:19:49 +0000755 assert not dispatcher.get_agents_for_entry(self)
756 self.host.set_status(models.Host.Status.READY)
Dan Shi07e09af2013-04-12 09:31:29 -0700757 elif (self.status == Status.VERIFYING or
Alex Millerdfff2fd2013-05-28 13:05:06 -0700758 self.status == Status.RESETTING or
759 self.status == Status.PROVISIONING):
jamesrenb55378a2010-03-02 22:19:49 +0000760 models.SpecialTask.objects.create(
761 task=models.SpecialTask.Task.CLEANUP,
762 host=models.Host.objects.get(id=self.host.id),
763 requested_by=self.job.owner_model())
764
765 self.set_status(Status.ABORTED)
766 self.job.abort_delay_ready_task()
767
768
769 def get_group_name(self):
770 atomic_group = self.atomic_group
771 if not atomic_group:
772 return ''
773
774 # Look at any meta_host and dependency labels and pick the first
775 # one that also specifies this atomic group. Use that label name
776 # as the group name if possible (it is more specific).
777 for label in self.get_labels():
778 if label.atomic_group_id:
779 assert label.atomic_group_id == atomic_group.id
780 return label.name
781 return atomic_group.name
782
783
784 def execution_tag(self):
Scott Zawalskid712cf32012-07-14 16:24:53 -0400785 SQL_SUSPECT_ENTRIES = ('SELECT * FROM afe_host_queue_entries WHERE '
786 'complete!=1 AND execution_subdir="" AND '
787 'status!="Queued";')
788 SQL_FIX_SUSPECT_ENTRY = ('UPDATE afe_host_queue_entries SET '
789 'status="Aborted" WHERE id=%s;')
790 try:
791 assert self.execution_subdir
792 except AssertionError:
793 # TODO(scottz): Remove temporary fix/info gathering pathway for
794 # crosbug.com/31595 once issue is root caused.
795 logging.error('No execution_subdir for host queue id:%s.', self.id)
796 logging.error('====DB DEBUG====\n%s', SQL_SUSPECT_ENTRIES)
797 for row in _db.execute(SQL_SUSPECT_ENTRIES):
798 logging.error(row)
799 logging.error('====DB DEBUG====\n')
800 fix_query = SQL_FIX_SUSPECT_ENTRY % self.id
801 logging.error('EXECUTING: %s', fix_query)
802 _db.execute(SQL_FIX_SUSPECT_ENTRY % self.id)
803 raise AssertionError(('self.execution_subdir not found. '
804 'See log for details.'))
805
jamesrenb55378a2010-03-02 22:19:49 +0000806 return "%s/%s" % (self.job.tag(), self.execution_subdir)
807
808
809 def execution_path(self):
810 return self.execution_tag()
811
812
813 def set_started_on_now(self):
814 self.update_field('started_on', datetime.datetime.now())
815
816
817 def is_hostless(self):
818 return (self.host_id is None
819 and self.meta_host is None
820 and self.atomic_group_id is None)
821
822
823class Job(DBObject):
824 _table_name = 'afe_jobs'
825 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
826 'control_type', 'created_on', 'synch_count', 'timeout',
827 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
Simran Basi9f508a12012-11-09 12:20:44 -0800828 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800829 'parameterized_job_id', 'max_runtime_mins', 'parent_job_id',
Simran Basi94dc0032013-11-12 14:09:46 -0800830 'test_retry', 'run_reset', 'timeout_mins')
Fang Deng1d6c2a02013-04-17 15:25:45 -0700831 _timer = stats.Timer("scheduler_models.Job")
jamesrenb55378a2010-03-02 22:19:49 +0000832
833 # This does not need to be a column in the DB. The delays are likely to
834 # be configured short. If the scheduler is stopped and restarted in
835 # the middle of a job's delay cycle, the delay cycle will either be
836 # repeated or skipped depending on the number of Pending machines found
837 # when the restarted scheduler recovers to track it. Not a problem.
838 #
839 # A reference to the DelayedCallTask that will wake up the job should
840 # no other HQEs change state in time. Its end_time attribute is used
841 # by our run_with_ready_delay() method to determine if the wait is over.
842 _delay_ready_task = None
843
844 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
845 # all status='Pending' atomic group HQEs incase a delay was running when the
846 # scheduler was restarted and no more hosts ever successfully exit Verify.
847
848 def __init__(self, id=None, row=None, **kwargs):
849 assert id or row
850 super(Job, self).__init__(id=id, row=row, **kwargs)
851 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800852 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000853
854
855 def model(self):
856 return models.Job.objects.get(id=self.id)
857
858
859 def owner_model(self):
860 # work around the fact that the Job owner field is a string, not a
861 # foreign key
862 if not self._owner_model:
863 self._owner_model = models.User.objects.get(login=self.owner)
864 return self._owner_model
865
866
867 def is_server_job(self):
Aviv Keshet82352b22013-05-14 18:30:56 -0700868 return self.control_type == control_data.CONTROL_TYPE.SERVER
jamesrenb55378a2010-03-02 22:19:49 +0000869
870
871 def tag(self):
872 return "%s-%s" % (self.id, self.owner)
873
874
875 def get_host_queue_entries(self):
876 rows = _db.execute("""
877 SELECT * FROM afe_host_queue_entries
878 WHERE job_id= %s
879 """, (self.id,))
880 entries = [HostQueueEntry(row=i) for i in rows]
881
882 assert len(entries)>0
883
884 return entries
885
886
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800887 def is_image_update_job(self):
888 """
889 Discover if the current job requires an OS update.
890
891 @return: True/False if OS should be updated before job is run.
892 """
893 # All image update jobs have the parameterized_job_id set.
894 if not self.parameterized_job_id:
895 return False
896
897 # Retrieve the ID of the ParameterizedJob this job is an instance of.
898 rows = _db.execute("""
899 SELECT test_id
900 FROM afe_parameterized_jobs
901 WHERE id = %s
902 """, (self.parameterized_job_id,))
903 if not rows:
904 return False
905 test_id = rows[0][0]
906
907 # Retrieve the ID of the known autoupdate_ParameterizedJob.
908 rows = _db.execute("""
909 SELECT id
910 FROM afe_autotests
911 WHERE name = 'autoupdate_ParameterizedJob'
912 """)
913 if not rows:
914 return False
915 update_id = rows[0][0]
916
917 # If the IDs are the same we've found an image update job.
918 if test_id == update_id:
919 # Finally, get the path to the OS image to install.
920 rows = _db.execute("""
921 SELECT parameter_value
922 FROM afe_parameterized_job_parameters
923 WHERE parameterized_job_id = %s
924 """, (self.parameterized_job_id,))
925 if rows:
926 # Save the path in update_image_path to use later as a command
927 # line parameter to autoserv.
928 self.update_image_path = rows[0][0]
929 return True
930
931 return False
932
933
Eric Li6f27d4f2010-09-29 10:55:17 -0700934 def get_execution_details(self):
935 """
936 Get test execution details for this job.
937
938 @return: Dictionary with test execution details
939 """
940 def _find_test_jobs(rows):
941 """
942 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
943 Those are autotest 'internal job' tests, so they should not be
944 counted when evaluating the test stats.
945
946 @param rows: List of rows (matrix) with database results.
947 """
948 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
949 n_test_jobs = 0
950 for r in rows:
951 test_name = r[0]
952 if job_test_pattern.match(test_name):
953 n_test_jobs += 1
954
955 return n_test_jobs
956
957 stats = {}
958
959 rows = _db.execute("""
960 SELECT t.test, s.word, t.reason
961 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
962 WHERE t.job_idx = j.job_idx
963 AND s.status_idx = t.status
964 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -0800965 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -0700966 """ % self.id)
967
Dale Curtis74a314b2011-06-23 14:55:46 -0700968 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -0700969
970 n_test_jobs = _find_test_jobs(rows)
971 n_test_jobs_failed = _find_test_jobs(failed_rows)
972
973 total_executed = len(rows) - n_test_jobs
974 total_failed = len(failed_rows) - n_test_jobs_failed
975
976 if total_executed > 0:
977 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
978 else:
979 success_rate = 0
980
981 stats['total_executed'] = total_executed
982 stats['total_failed'] = total_failed
983 stats['total_passed'] = total_executed - total_failed
984 stats['success_rate'] = success_rate
985
986 status_header = ("Test Name", "Status", "Reason")
987 if failed_rows:
988 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
989 status_header)
990 else:
991 stats['failed_rows'] = ''
992
993 time_row = _db.execute("""
994 SELECT started_time, finished_time
995 FROM tko_jobs
996 WHERE afe_job_id = %s
997 """ % self.id)
998
999 if time_row:
1000 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -08001001 try:
1002 delta = t_end - t_begin
1003 minutes, seconds = divmod(delta.seconds, 60)
1004 hours, minutes = divmod(minutes, 60)
1005 stats['execution_time'] = ("%02d:%02d:%02d" %
1006 (hours, minutes, seconds))
1007 # One of t_end or t_begin are None
1008 except TypeError:
1009 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -07001010 else:
1011 stats['execution_time'] = '(none)'
1012
1013 return stats
1014
1015
Fang Deng1d6c2a02013-04-17 15:25:45 -07001016 @_timer.decorate
jamesrenb55378a2010-03-02 22:19:49 +00001017 def set_status(self, status, update_queues=False):
1018 self.update_field('status',status)
1019
1020 if update_queues:
1021 for queue_entry in self.get_host_queue_entries():
1022 queue_entry.set_status(status)
1023
1024
1025 def keyval_dict(self):
1026 return self.model().keyval_dict()
1027
1028
1029 def _atomic_and_has_started(self):
1030 """
1031 @returns True if any of the HostQueueEntries associated with this job
1032 have entered the Status.STARTING state or beyond.
1033 """
1034 atomic_entries = models.HostQueueEntry.objects.filter(
1035 job=self.id, atomic_group__isnull=False)
1036 if atomic_entries.count() <= 0:
1037 return False
1038
1039 # These states may *only* be reached if Job.run() has been called.
1040 started_statuses = (models.HostQueueEntry.Status.STARTING,
1041 models.HostQueueEntry.Status.RUNNING,
1042 models.HostQueueEntry.Status.COMPLETED)
1043
1044 started_entries = atomic_entries.filter(status__in=started_statuses)
1045 return started_entries.count() > 0
1046
1047
1048 def _hosts_assigned_count(self):
1049 """The number of HostQueueEntries assigned a Host for this job."""
1050 entries = models.HostQueueEntry.objects.filter(job=self.id,
1051 host__isnull=False)
1052 return entries.count()
1053
1054
1055 def _pending_count(self):
1056 """The number of HostQueueEntries for this job in the Pending state."""
1057 pending_entries = models.HostQueueEntry.objects.filter(
1058 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1059 return pending_entries.count()
1060
1061
1062 def _max_hosts_needed_to_run(self, atomic_group):
1063 """
1064 @param atomic_group: The AtomicGroup associated with this job that we
1065 are using to set an upper bound on the threshold.
1066 @returns The maximum number of HostQueueEntries assigned a Host before
1067 this job can run.
1068 """
1069 return min(self._hosts_assigned_count(),
1070 atomic_group.max_number_of_machines)
1071
1072
1073 def _min_hosts_needed_to_run(self):
1074 """Return the minumum number of hsots needed to run this job."""
1075 return self.synch_count
1076
1077
1078 def is_ready(self):
1079 # NOTE: Atomic group jobs stop reporting ready after they have been
1080 # started to avoid launching multiple copies of one atomic job.
1081 # Only possible if synch_count is less than than half the number of
1082 # machines in the atomic group.
1083 pending_count = self._pending_count()
1084 atomic_and_has_started = self._atomic_and_has_started()
1085 ready = (pending_count >= self.synch_count
1086 and not atomic_and_has_started)
1087
1088 if not ready:
1089 logging.info(
1090 'Job %s not ready: %s pending, %s required '
1091 '(Atomic and started: %s)',
1092 self, pending_count, self.synch_count,
1093 atomic_and_has_started)
1094
1095 return ready
1096
1097
1098 def num_machines(self, clause = None):
1099 sql = "job_id=%s" % self.id
1100 if clause:
1101 sql += " AND (%s)" % clause
1102 return self.count(sql, table='afe_host_queue_entries')
1103
1104
1105 def num_queued(self):
1106 return self.num_machines('not complete')
1107
1108
1109 def num_active(self):
1110 return self.num_machines('active')
1111
1112
1113 def num_complete(self):
1114 return self.num_machines('complete')
1115
1116
1117 def is_finished(self):
1118 return self.num_complete() == self.num_machines()
1119
1120
1121 def _not_yet_run_entries(self, include_verifying=True):
1122 statuses = [models.HostQueueEntry.Status.QUEUED,
1123 models.HostQueueEntry.Status.PENDING]
1124 if include_verifying:
1125 statuses.append(models.HostQueueEntry.Status.VERIFYING)
1126 return models.HostQueueEntry.objects.filter(job=self.id,
1127 status__in=statuses)
1128
1129
1130 def _stop_all_entries(self):
1131 entries_to_stop = self._not_yet_run_entries(
1132 include_verifying=False)
1133 for child_entry in entries_to_stop:
1134 assert not child_entry.complete, (
1135 '%s status=%s, active=%s, complete=%s' %
1136 (child_entry.id, child_entry.status, child_entry.active,
1137 child_entry.complete))
1138 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1139 child_entry.host.status = models.Host.Status.READY
1140 child_entry.host.save()
1141 child_entry.status = models.HostQueueEntry.Status.STOPPED
1142 child_entry.save()
1143
1144
1145 def stop_if_necessary(self):
1146 not_yet_run = self._not_yet_run_entries()
1147 if not_yet_run.count() < self.synch_count:
1148 self._stop_all_entries()
1149
1150
jamesrenb55378a2010-03-02 22:19:49 +00001151 def _next_group_name(self, group_name=''):
1152 """@returns a directory name to use for the next host group results."""
1153 if group_name:
1154 # Sanitize for use as a pathname.
1155 group_name = group_name.replace(os.path.sep, '_')
1156 if group_name.startswith('.'):
1157 group_name = '_' + group_name[1:]
1158 # Add a separator between the group name and 'group%d'.
1159 group_name += '.'
1160 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1161 query = models.HostQueueEntry.objects.filter(
1162 job=self.id).values('execution_subdir').distinct()
1163 subdirs = (entry['execution_subdir'] for entry in query)
1164 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1165 ids = [int(match.group(1)) for match in group_matches if match]
1166 if ids:
1167 next_id = max(ids) + 1
1168 else:
1169 next_id = 0
1170 return '%sgroup%d' % (group_name, next_id)
1171
1172
1173 def get_group_entries(self, queue_entry_from_group):
1174 """
1175 @param queue_entry_from_group: A HostQueueEntry instance to find other
1176 group entries on this job for.
1177
1178 @returns A list of HostQueueEntry objects all executing this job as
1179 part of the same group as the one supplied (having the same
1180 execution_subdir).
1181 """
1182 execution_subdir = queue_entry_from_group.execution_subdir
1183 return list(HostQueueEntry.fetch(
1184 where='job_id=%s AND execution_subdir=%s',
1185 params=(self.id, execution_subdir)))
1186
1187
1188 def _should_run_cleanup(self, queue_entry):
1189 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1190 return True
1191 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1192 return queue_entry.host.dirty
1193 return False
1194
1195
1196 def _should_run_verify(self, queue_entry):
1197 do_not_verify = (queue_entry.host.protection ==
1198 host_protections.Protection.DO_NOT_VERIFY)
1199 if do_not_verify:
1200 return False
Alex Miller6ee996f2013-02-28 13:53:52 -08001201 # If RebootBefore is set to NEVER, then we won't run reset because
1202 # we can't cleanup, so we need to weaken a Reset into a Verify.
1203 weaker_reset = (self.run_reset and
1204 self.reboot_before == model_attributes.RebootBefore.NEVER)
1205 return self.run_verify or weaker_reset
jamesrenb55378a2010-03-02 22:19:49 +00001206
1207
Dan Shi07e09af2013-04-12 09:31:29 -07001208 def _should_run_reset(self, queue_entry):
1209 can_verify = (queue_entry.host.protection !=
1210 host_protections.Protection.DO_NOT_VERIFY)
1211 can_reboot = self.reboot_before != model_attributes.RebootBefore.NEVER
1212 return (can_reboot and can_verify and (self.run_reset or
1213 (self._should_run_cleanup(queue_entry) and
1214 self._should_run_verify(queue_entry))))
1215
1216
Alex Millerdfff2fd2013-05-28 13:05:06 -07001217 def _should_run_provision(self, queue_entry):
1218 """
1219 Determine if the queue_entry needs to have a provision task run before
1220 it to provision queue_entry.host.
1221
1222 @param queue_entry: The host queue entry in question.
1223 @returns: True if we should schedule a provision task, False otherwise.
1224
1225 """
1226 # If we get to this point, it means that the scheduler has already
1227 # vetted that all the unprovisionable labels match, so we can just
1228 # find all labels on the job that aren't on the host to get the list
1229 # of what we need to provision. (See the scheduling logic in
1230 # host_scheduler.py:is_host_eligable_for_job() where we discard all
1231 # provisionable labels when assigning jobs to hosts.)
1232 job_labels = {x.name for x in queue_entry.get_labels()}
1233 _, host_labels = queue_entry.host.platform_and_labels()
1234 # If there are any labels on the job that are not on the host, then
1235 # that means there is provisioning work to do. If there's no
1236 # provisioning work to do, then obviously we have no reason to schedule
1237 # a provision task!
1238 if job_labels - set(host_labels):
1239 return True
1240 return False
1241
1242
Alex Miller42437f92013-05-28 12:58:54 -07001243 def _queue_special_task(self, queue_entry, task):
jamesrenb55378a2010-03-02 22:19:49 +00001244 """
Alex Miller42437f92013-05-28 12:58:54 -07001245 Create a special task and associate it with a host queue entry.
jamesrenb55378a2010-03-02 22:19:49 +00001246
Alex Miller42437f92013-05-28 12:58:54 -07001247 @param queue_entry: The queue entry this special task should be
1248 associated with.
1249 @param task: One of the members of the enum models.SpecialTask.Task.
1250 @returns: None
1251
jamesrenb55378a2010-03-02 22:19:49 +00001252 """
jamesrenb55378a2010-03-02 22:19:49 +00001253 models.SpecialTask.objects.create(
1254 host=models.Host.objects.get(id=queue_entry.host_id),
1255 queue_entry=queue_entry, task=task)
1256
1257
Alex Miller42437f92013-05-28 12:58:54 -07001258 def schedule_pre_job_tasks(self, queue_entry):
1259 """
1260 Queue all of the special tasks that need to be run before a host
1261 queue entry may run.
1262
1263 If no special taskes need to be scheduled, then |on_pending| will be
1264 called directly.
1265
1266 @returns None
1267
1268 """
1269 task_queued = False
1270 hqe_model = models.HostQueueEntry.objects.get(id=queue_entry.id)
1271
Dan Shi07e09af2013-04-12 09:31:29 -07001272 if self._should_run_reset(queue_entry):
1273 self._queue_special_task(hqe_model, models.SpecialTask.Task.RESET)
Alex Miller42437f92013-05-28 12:58:54 -07001274 task_queued = True
Dan Shi07e09af2013-04-12 09:31:29 -07001275 else:
1276 if self._should_run_cleanup(queue_entry):
1277 self._queue_special_task(hqe_model,
1278 models.SpecialTask.Task.CLEANUP)
1279 task_queued = True
1280 if self._should_run_verify(queue_entry):
1281 self._queue_special_task(hqe_model,
1282 models.SpecialTask.Task.VERIFY)
1283 task_queued = True
Alex Miller42437f92013-05-28 12:58:54 -07001284
Alex Millerdfff2fd2013-05-28 13:05:06 -07001285 if self._should_run_provision(queue_entry):
1286 self._queue_special_task(hqe_model,
1287 models.SpecialTask.Task.PROVISION)
1288 task_queued = True
1289
Alex Miller42437f92013-05-28 12:58:54 -07001290 if not task_queued:
1291 queue_entry.on_pending()
1292
1293
jamesrenb55378a2010-03-02 22:19:49 +00001294 def _assign_new_group(self, queue_entries, group_name=''):
1295 if len(queue_entries) == 1:
1296 group_subdir_name = queue_entries[0].host.hostname
1297 else:
1298 group_subdir_name = self._next_group_name(group_name)
1299 logging.info('Running synchronous job %d hosts %s as %s',
1300 self.id, [entry.host.hostname for entry in queue_entries],
1301 group_subdir_name)
1302
1303 for queue_entry in queue_entries:
1304 queue_entry.set_execution_subdir(group_subdir_name)
1305
1306
1307 def _choose_group_to_run(self, include_queue_entry):
1308 """
1309 @returns A tuple containing a list of HostQueueEntry instances to be
1310 used to run this Job, a string group name to suggest giving
1311 to this job in the results database.
1312 """
1313 atomic_group = include_queue_entry.atomic_group
1314 chosen_entries = [include_queue_entry]
1315 if atomic_group:
1316 num_entries_wanted = atomic_group.max_number_of_machines
1317 else:
1318 num_entries_wanted = self.synch_count
1319 num_entries_wanted -= len(chosen_entries)
1320
1321 if num_entries_wanted > 0:
1322 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1323 pending_entries = list(HostQueueEntry.fetch(
1324 where=where_clause,
1325 params=(self.id, include_queue_entry.id)))
1326
1327 # Sort the chosen hosts by hostname before slicing.
1328 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1329 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1330 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1331 chosen_entries += pending_entries[:num_entries_wanted]
1332
1333 # Sanity check. We'll only ever be called if this can be met.
1334 if len(chosen_entries) < self.synch_count:
1335 message = ('job %s got less than %s chosen entries: %s' % (
1336 self.id, self.synch_count, chosen_entries))
1337 logging.error(message)
1338 email_manager.manager.enqueue_notify_email(
1339 'Job not started, too few chosen entries', message)
1340 return []
1341
1342 group_name = include_queue_entry.get_group_name()
1343
1344 self._assign_new_group(chosen_entries, group_name=group_name)
1345 return chosen_entries
1346
1347
1348 def run_if_ready(self, queue_entry):
1349 """
1350 Run this job by kicking its HQEs into status='Starting' if enough
1351 hosts are ready for it to run.
1352
1353 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1354 ready to run.
1355 """
1356 if not self.is_ready():
1357 self.stop_if_necessary()
1358 elif queue_entry.atomic_group:
1359 self.run_with_ready_delay(queue_entry)
1360 else:
1361 self.run(queue_entry)
1362
1363
1364 def run_with_ready_delay(self, queue_entry):
1365 """
1366 Start a delay to wait for more hosts to enter Pending state before
1367 launching an atomic group job. Once set, the a delay cannot be reset.
1368
1369 @param queue_entry: The HostQueueEntry object to get atomic group
1370 info from and pass to run_if_ready when the delay is up.
1371
1372 @returns An Agent to run the job as appropriate or None if a delay
1373 has already been set.
1374 """
1375 assert queue_entry.job_id == self.id
1376 assert queue_entry.atomic_group
1377 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1378 over_max_threshold = (self._pending_count() >=
1379 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1380 delay_expired = (self._delay_ready_task and
1381 time.time() >= self._delay_ready_task.end_time)
1382
1383 # Delay is disabled or we already have enough? Do not wait to run.
1384 if not delay or over_max_threshold or delay_expired:
1385 self.run(queue_entry)
1386 else:
1387 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1388
1389
1390 def request_abort(self):
1391 """Request that this Job be aborted on the next scheduler cycle."""
1392 self.model().abort()
1393
1394
1395 def schedule_delayed_callback_task(self, queue_entry):
1396 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1397
1398 if self._delay_ready_task:
1399 return None
1400
1401 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1402
1403 def run_job_after_delay():
1404 logging.info('Job %s done waiting for extra hosts.', self)
1405 # Check to see if the job is still relevant. It could have aborted
1406 # while we were waiting or hosts could have disappearred, etc.
1407 if self._pending_count() < self._min_hosts_needed_to_run():
1408 logging.info('Job %s had too few Pending hosts after waiting '
1409 'for extras. Not running.', self)
1410 self.request_abort()
1411 return
1412 return self.run(queue_entry)
1413
1414 logging.info('Job %s waiting up to %s seconds for more hosts.',
1415 self.id, delay)
1416 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1417 callback=run_job_after_delay)
1418 return self._delay_ready_task
1419
1420
1421 def run(self, queue_entry):
1422 """
1423 @param queue_entry: The HostQueueEntry instance calling this method.
1424 """
1425 if queue_entry.atomic_group and self._atomic_and_has_started():
1426 logging.error('Job.run() called on running atomic Job %d '
1427 'with HQE %s.', self.id, queue_entry)
1428 return
1429 queue_entries = self._choose_group_to_run(queue_entry)
1430 if queue_entries:
1431 self._finish_run(queue_entries)
1432
1433
1434 def _finish_run(self, queue_entries):
1435 for queue_entry in queue_entries:
1436 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1437 self.abort_delay_ready_task()
1438
1439
1440 def abort_delay_ready_task(self):
1441 """Abort the delayed task associated with this job, if any."""
1442 if self._delay_ready_task:
1443 # Cancel any pending callback that would try to run again
1444 # as we are already running.
1445 self._delay_ready_task.abort()
1446
1447
1448 def __str__(self):
1449 return '%s-%s' % (self.id, self.owner)