blob: 25210a400bfc3e6d43000703622ce7b32cb4aa82 [file] [log] [blame]
jamesrenb55378a2010-03-02 22:19:49 +00001"""Database model classes for the scheduler.
2
3Contains model classes abstracting the various DB tables used by the scheduler.
4These overlap the Django models in basic functionality, but were written before
5the Django models existed and have not yet been phased out. Some of them
6(particularly HostQueueEntry and Job) have considerable scheduler-specific logic
7which would probably be ill-suited for inclusion in the general Django model
8classes.
9
10Globals:
11_notify_email_statuses: list of HQE statuses. each time a single HQE reaches
12 one of these statuses, an email will be sent to the job's email_list.
13 comes from global_config.
14_base_url: URL to the local AFE server, used to construct URLs for emails.
15_db: DatabaseConnection for this module.
16_drone_manager: reference to global DroneManager instance.
17"""
18
19import datetime, itertools, logging, os, re, sys, time, weakref
20from django.db import connection
21from autotest_lib.client.common_lib import global_config, host_protections
Eric Li6f27d4f2010-09-29 10:55:17 -070022from autotest_lib.client.common_lib import global_config, utils
jamesrenb55378a2010-03-02 22:19:49 +000023from autotest_lib.frontend.afe import models, model_attributes
24from autotest_lib.database import database_connection
25from autotest_lib.scheduler import drone_manager, email_manager
26from autotest_lib.scheduler import scheduler_config
27
28_notify_email_statuses = []
29_base_url = None
30
31_db = None
32_drone_manager = None
33
34def initialize():
35 global _db
36 _db = database_connection.DatabaseConnection('AUTOTEST_WEB')
37 _db.connect(db_type='django')
38
39 notify_statuses_list = global_config.global_config.get_config_value(
40 scheduler_config.CONFIG_SECTION, "notify_email_statuses",
41 default='')
42 global _notify_email_statuses
43 _notify_email_statuses = [status for status in
44 re.split(r'[\s,;:]', notify_statuses_list.lower())
45 if status]
46
47 # AUTOTEST_WEB.base_url is still a supported config option as some people
48 # may wish to override the entire url.
49 global _base_url
50 config_base_url = global_config.global_config.get_config_value(
51 scheduler_config.CONFIG_SECTION, 'base_url', default='')
52 if config_base_url:
53 _base_url = config_base_url
54 else:
55 # For the common case of everything running on a single server you
56 # can just set the hostname in a single place in the config file.
57 server_name = global_config.global_config.get_config_value(
58 'SERVER', 'hostname')
59 if not server_name:
60 logging.critical('[SERVER] hostname missing from the config file.')
61 sys.exit(1)
62 _base_url = 'http://%s/afe/' % server_name
63
64 initialize_globals()
65
66
67def initialize_globals():
68 global _drone_manager
69 _drone_manager = drone_manager.instance()
70
71
72class DelayedCallTask(object):
73 """
74 A task object like AgentTask for an Agent to run that waits for the
75 specified amount of time to have elapsed before calling the supplied
76 callback once and finishing. If the callback returns anything, it is
77 assumed to be a new Agent instance and will be added to the dispatcher.
78
79 @attribute end_time: The absolute posix time after which this task will
80 call its callback when it is polled and be finished.
81
82 Also has all attributes required by the Agent class.
83 """
84 def __init__(self, delay_seconds, callback, now_func=None):
85 """
86 @param delay_seconds: The delay in seconds from now that this task
87 will call the supplied callback and be done.
88 @param callback: A callable to be called by this task once after at
89 least delay_seconds time has elapsed. It must return None
90 or a new Agent instance.
91 @param now_func: A time.time like function. Default: time.time.
92 Used for testing.
93 """
94 assert delay_seconds > 0
95 assert callable(callback)
96 if not now_func:
97 now_func = time.time
98 self._now_func = now_func
99 self._callback = callback
100
101 self.end_time = self._now_func() + delay_seconds
102
103 # These attributes are required by Agent.
104 self.aborted = False
105 self.host_ids = ()
106 self.success = False
107 self.queue_entry_ids = ()
108 self.num_processes = 0
109
110
111 def poll(self):
112 if not self.is_done() and self._now_func() >= self.end_time:
113 self._callback()
114 self.success = True
115
116
117 def is_done(self):
118 return self.success or self.aborted
119
120
121 def abort(self):
122 self.aborted = True
123
124
125class DBError(Exception):
126 """Raised by the DBObject constructor when its select fails."""
127
128
129class DBObject(object):
130 """A miniature object relational model for the database."""
131
132 # Subclasses MUST override these:
133 _table_name = ''
134 _fields = ()
135
136 # A mapping from (type, id) to the instance of the object for that
137 # particular id. This prevents us from creating new Job() and Host()
138 # instances for every HostQueueEntry object that we instantiate as
139 # multiple HQEs often share the same Job.
140 _instances_by_type_and_id = weakref.WeakValueDictionary()
141 _initialized = False
142
143
144 def __new__(cls, id=None, **kwargs):
145 """
146 Look to see if we already have an instance for this particular type
147 and id. If so, use it instead of creating a duplicate instance.
148 """
149 if id is not None:
150 instance = cls._instances_by_type_and_id.get((cls, id))
151 if instance:
152 return instance
153 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
154
155
156 def __init__(self, id=None, row=None, new_record=False, always_query=True):
157 assert bool(id) or bool(row)
158 if id is not None and row is not None:
159 assert id == row[0]
160 assert self._table_name, '_table_name must be defined in your class'
161 assert self._fields, '_fields must be defined in your class'
162 if not new_record:
163 if self._initialized and not always_query:
164 return # We've already been initialized.
165 if id is None:
166 id = row[0]
167 # Tell future constructors to use us instead of re-querying while
168 # this instance is still around.
169 self._instances_by_type_and_id[(type(self), id)] = self
170
171 self.__table = self._table_name
172
173 self.__new_record = new_record
174
175 if row is None:
176 row = self._fetch_row_from_db(id)
177
178 if self._initialized:
179 differences = self._compare_fields_in_row(row)
180 if differences:
181 logging.warn(
182 'initialized %s %s instance requery is updating: %s',
183 type(self), self.id, differences)
184 self._update_fields_from_row(row)
185 self._initialized = True
186
187
188 @classmethod
189 def _clear_instance_cache(cls):
190 """Used for testing, clear the internal instance cache."""
191 cls._instances_by_type_and_id.clear()
192
193
194 def _fetch_row_from_db(self, row_id):
195 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
196 rows = _db.execute(sql, (row_id,))
197 if not rows:
198 raise DBError("row not found (table=%s, row id=%s)"
199 % (self.__table, row_id))
200 return rows[0]
201
202
203 def _assert_row_length(self, row):
204 assert len(row) == len(self._fields), (
205 "table = %s, row = %s/%d, fields = %s/%d" % (
206 self.__table, row, len(row), self._fields, len(self._fields)))
207
208
209 def _compare_fields_in_row(self, row):
210 """
211 Given a row as returned by a SELECT query, compare it to our existing in
212 memory fields. Fractional seconds are stripped from datetime values
213 before comparison.
214
215 @param row - A sequence of values corresponding to fields named in
216 The class attribute _fields.
217
218 @returns A dictionary listing the differences keyed by field name
219 containing tuples of (current_value, row_value).
220 """
221 self._assert_row_length(row)
222 differences = {}
223 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
224 for field, row_value in itertools.izip(self._fields, row):
225 current_value = getattr(self, field)
226 if (isinstance(current_value, datetime.datetime)
227 and isinstance(row_value, datetime.datetime)):
228 current_value = current_value.strftime(datetime_cmp_fmt)
229 row_value = row_value.strftime(datetime_cmp_fmt)
230 if current_value != row_value:
231 differences[field] = (current_value, row_value)
232 return differences
233
234
235 def _update_fields_from_row(self, row):
236 """
237 Update our field attributes using a single row returned by SELECT.
238
239 @param row - A sequence of values corresponding to fields named in
240 the class fields list.
241 """
242 self._assert_row_length(row)
243
244 self._valid_fields = set()
245 for field, value in itertools.izip(self._fields, row):
246 setattr(self, field, value)
247 self._valid_fields.add(field)
248
249 self._valid_fields.remove('id')
250
251
252 def update_from_database(self):
253 assert self.id is not None
254 row = self._fetch_row_from_db(self.id)
255 self._update_fields_from_row(row)
256
257
258 def count(self, where, table = None):
259 if not table:
260 table = self.__table
261
262 rows = _db.execute("""
263 SELECT count(*) FROM %s
264 WHERE %s
265 """ % (table, where))
266
267 assert len(rows) == 1
268
269 return int(rows[0][0])
270
271
272 def update_field(self, field, value):
273 assert field in self._valid_fields
274
275 if getattr(self, field) == value:
276 return
277
278 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
279 _db.execute(query, (value, self.id))
280
281 setattr(self, field, value)
282
283
284 def save(self):
285 if self.__new_record:
286 keys = self._fields[1:] # avoid id
287 columns = ','.join([str(key) for key in keys])
288 values = []
289 for key in keys:
290 value = getattr(self, key)
291 if value is None:
292 values.append('NULL')
293 else:
294 values.append('"%s"' % value)
295 values_str = ','.join(values)
296 query = ('INSERT INTO %s (%s) VALUES (%s)' %
297 (self.__table, columns, values_str))
298 _db.execute(query)
299 # Update our id to the one the database just assigned to us.
300 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
301
302
303 def delete(self):
304 self._instances_by_type_and_id.pop((type(self), id), None)
305 self._initialized = False
306 self._valid_fields.clear()
307 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
308 _db.execute(query, (self.id,))
309
310
311 @staticmethod
312 def _prefix_with(string, prefix):
313 if string:
314 string = prefix + string
315 return string
316
317
318 @classmethod
319 def fetch(cls, where='', params=(), joins='', order_by=''):
320 """
321 Construct instances of our class based on the given database query.
322
323 @yields One class instance for each row fetched.
324 """
325 order_by = cls._prefix_with(order_by, 'ORDER BY ')
326 where = cls._prefix_with(where, 'WHERE ')
327 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
328 '%(where)s %(order_by)s' % {'table' : cls._table_name,
329 'joins' : joins,
330 'where' : where,
331 'order_by' : order_by})
332 rows = _db.execute(query, params)
333 return [cls(id=row[0], row=row) for row in rows]
334
335
336class IneligibleHostQueue(DBObject):
337 _table_name = 'afe_ineligible_host_queues'
338 _fields = ('id', 'job_id', 'host_id')
339
340
341class AtomicGroup(DBObject):
342 _table_name = 'afe_atomic_groups'
343 _fields = ('id', 'name', 'description', 'max_number_of_machines',
344 'invalid')
345
346
347class Label(DBObject):
348 _table_name = 'afe_labels'
349 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
350 'only_if_needed', 'atomic_group_id')
351
352
353 def __repr__(self):
354 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
355 self.name, self.id, self.atomic_group_id)
356
357
358class Host(DBObject):
359 _table_name = 'afe_hosts'
360 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
361 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
362
363
364 def set_status(self,status):
365 logging.info('%s -> %s', self.hostname, status)
366 self.update_field('status',status)
367
368
369 def platform_and_labels(self):
370 """
371 Returns a tuple (platform_name, list_of_all_label_names).
372 """
373 rows = _db.execute("""
374 SELECT afe_labels.name, afe_labels.platform
375 FROM afe_labels
376 INNER JOIN afe_hosts_labels ON
377 afe_labels.id = afe_hosts_labels.label_id
378 WHERE afe_hosts_labels.host_id = %s
379 ORDER BY afe_labels.name
380 """, (self.id,))
381 platform = None
382 all_labels = []
383 for label_name, is_platform in rows:
384 if is_platform:
385 platform = label_name
386 all_labels.append(label_name)
387 return platform, all_labels
388
389
390 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
391
392
393 @classmethod
394 def cmp_for_sort(cls, a, b):
395 """
396 A comparison function for sorting Host objects by hostname.
397
398 This strips any trailing numeric digits, ignores leading 0s and
399 compares hostnames by the leading name and the trailing digits as a
400 number. If both hostnames do not match this pattern, they are simply
401 compared as lower case strings.
402
403 Example of how hostnames will be sorted:
404
405 alice, host1, host2, host09, host010, host10, host11, yolkfolk
406
407 This hopefully satisfy most people's hostname sorting needs regardless
408 of their exact naming schemes. Nobody sane should have both a host10
409 and host010 (but the algorithm works regardless).
410 """
411 lower_a = a.hostname.lower()
412 lower_b = b.hostname.lower()
413 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
414 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
415 if match_a and match_b:
416 name_a, number_a_str = match_a.groups()
417 name_b, number_b_str = match_b.groups()
418 number_a = int(number_a_str.lstrip('0'))
419 number_b = int(number_b_str.lstrip('0'))
420 result = cmp((name_a, number_a), (name_b, number_b))
421 if result == 0 and lower_a != lower_b:
422 # If they compared equal above but the lower case names are
423 # indeed different, don't report equality. abc012 != abc12.
424 return cmp(lower_a, lower_b)
425 return result
426 else:
427 return cmp(lower_a, lower_b)
428
429
430class HostQueueEntry(DBObject):
431 _table_name = 'afe_host_queue_entries'
432 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
433 'active', 'complete', 'deleted', 'execution_subdir',
434 'atomic_group_id', 'aborted', 'started_on')
435
436
437 def __init__(self, id=None, row=None, **kwargs):
438 assert id or row
439 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
440 self.job = Job(self.job_id)
441
442 if self.host_id:
443 self.host = Host(self.host_id)
444 else:
445 self.host = None
446
447 if self.atomic_group_id:
448 self.atomic_group = AtomicGroup(self.atomic_group_id,
449 always_query=False)
450 else:
451 self.atomic_group = None
452
453 self.queue_log_path = os.path.join(self.job.tag(),
454 'queue.log.' + str(self.id))
455
456
457 @classmethod
458 def clone(cls, template):
459 """
460 Creates a new row using the values from a template instance.
461
462 The new instance will not exist in the database or have a valid
463 id attribute until its save() method is called.
464 """
465 assert isinstance(template, cls)
466 new_row = [getattr(template, field) for field in cls._fields]
467 clone = cls(row=new_row, new_record=True)
468 clone.id = None
469 return clone
470
471
472 def _view_job_url(self):
473 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
474
475
476 def get_labels(self):
477 """
478 Get all labels associated with this host queue entry (either via the
479 meta_host or as a job dependency label). The labels yielded are not
480 guaranteed to be unique.
481
482 @yields Label instances associated with this host_queue_entry.
483 """
484 if self.meta_host:
485 yield Label(id=self.meta_host, always_query=False)
486 labels = Label.fetch(
487 joins="JOIN afe_jobs_dependency_labels AS deps "
488 "ON (afe_labels.id = deps.label_id)",
489 where="deps.job_id = %d" % self.job.id)
490 for label in labels:
491 yield label
492
493
494 def set_host(self, host):
495 if host:
496 logging.info('Assigning host %s to entry %s', host.hostname, self)
497 self.queue_log_record('Assigning host ' + host.hostname)
498 self.update_field('host_id', host.id)
499 self.block_host(host.id)
500 else:
501 logging.info('Releasing host from %s', self)
502 self.queue_log_record('Releasing host')
503 self.unblock_host(self.host.id)
504 self.update_field('host_id', None)
505
506 self.host = host
507
508
509 def queue_log_record(self, log_line):
510 now = str(datetime.datetime.now())
511 _drone_manager.write_lines_to_file(self.queue_log_path,
512 [now + ' ' + log_line])
513
514
515 def block_host(self, host_id):
516 logging.info("creating block %s/%s", self.job.id, host_id)
517 row = [0, self.job.id, host_id]
518 block = IneligibleHostQueue(row=row, new_record=True)
519 block.save()
520
521
522 def unblock_host(self, host_id):
523 logging.info("removing block %s/%s", self.job.id, host_id)
524 blocks = IneligibleHostQueue.fetch(
525 'job_id=%d and host_id=%d' % (self.job.id, host_id))
526 for block in blocks:
527 block.delete()
528
529
530 def set_execution_subdir(self, subdir=None):
531 if subdir is None:
532 assert self.host
533 subdir = self.host.hostname
534 self.update_field('execution_subdir', subdir)
535
536
537 def _get_hostname(self):
538 if self.host:
539 return self.host.hostname
540 return 'no host'
541
542
543 def __str__(self):
544 flags = []
545 if self.active:
546 flags.append('active')
547 if self.complete:
548 flags.append('complete')
549 if self.deleted:
550 flags.append('deleted')
551 if self.aborted:
552 flags.append('aborted')
553 flags_str = ','.join(flags)
554 if flags_str:
555 flags_str = ' [%s]' % flags_str
556 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
557 self.status, flags_str)
558
559
560 def set_status(self, status):
561 logging.info("%s -> %s", self, status)
562
563 self.update_field('status', status)
564
565 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
566 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
567 assert not (active and complete)
568
569 self.update_field('active', active)
570 self.update_field('complete', complete)
571
572 if complete:
jamesrene7c65cb2010-06-08 20:38:10 +0000573 self._on_complete(status)
Dale Curtis74a314b2011-06-23 14:55:46 -0700574 self._email_on_job_complete()
jamesrenb55378a2010-03-02 22:19:49 +0000575
576 should_email_status = (status.lower() in _notify_email_statuses or
577 'all' in _notify_email_statuses)
578 if should_email_status:
579 self._email_on_status(status)
580
581
jamesrene7c65cb2010-06-08 20:38:10 +0000582 def _on_complete(self, status):
583 if status is not models.HostQueueEntry.Status.ABORTED:
584 self.job.stop_if_necessary()
585
jamesrenb55378a2010-03-02 22:19:49 +0000586 if not self.execution_subdir:
587 return
588 # unregister any possible pidfiles associated with this queue entry
589 for pidfile_name in drone_manager.ALL_PIDFILE_NAMES:
590 pidfile_id = _drone_manager.get_pidfile_id_from(
591 self.execution_path(), pidfile_name=pidfile_name)
592 _drone_manager.unregister_pidfile(pidfile_id)
593
594
Eric Li6f27d4f2010-09-29 10:55:17 -0700595 def _get_status_email_contents(self, status, summary=None, hostname=None):
596 """
597 Gather info for the status notification e-mails.
598
599 If needed, we could start using the Django templating engine to create
600 the subject and the e-mail body, but that doesn't seem necessary right
601 now.
602
603 @param status: Job status text. Mandatory.
604 @param summary: Job summary text. Optional.
605 @param hostname: A hostname for the job. Optional.
606
607 @return: Tuple (subject, body) for the notification e-mail.
608 """
609 job_stats = Job(id=self.job.id).get_execution_details()
610
611 subject = ('Autotest | Job ID: %s "%s" | Status: %s ' %
612 (self.job.id, self.job.name, status))
613
614 if hostname is not None:
615 subject += '| Hostname: %s ' % hostname
616
617 if status not in ["1 Failed", "Failed"]:
618 subject += '| Success Rate: %.2f %%' % job_stats['success_rate']
619
620 body = "Job ID: %s\n" % self.job.id
621 body += "Job name: %s\n" % self.job.name
622 if hostname is not None:
623 body += "Host: %s\n" % hostname
624 if summary is not None:
625 body += "Summary: %s\n" % summary
626 body += "Status: %s\n" % status
627 body += "Results interface URL: %s\n" % self._view_job_url()
628 body += "Execution time (HH:MM:SS): %s\n" % job_stats['execution_time']
629 if int(job_stats['total_executed']) > 0:
630 body += "User tests executed: %s\n" % job_stats['total_executed']
631 body += "User tests passed: %s\n" % job_stats['total_passed']
632 body += "User tests failed: %s\n" % job_stats['total_failed']
633 body += ("User tests success rate: %.2f %%\n" %
634 job_stats['success_rate'])
635
636 if job_stats['failed_rows']:
637 body += "Failures:\n"
638 body += job_stats['failed_rows']
639
640 return subject, body
641
642
jamesrenb55378a2010-03-02 22:19:49 +0000643 def _email_on_status(self, status):
644 hostname = self._get_hostname()
Eric Li6f27d4f2010-09-29 10:55:17 -0700645 subject, body = self._get_status_email_contents(status, None, hostname)
jamesrenb55378a2010-03-02 22:19:49 +0000646 email_manager.manager.send_email(self.job.email_list, subject, body)
647
648
649 def _email_on_job_complete(self):
650 if not self.job.is_finished():
651 return
652
Eric Li6f27d4f2010-09-29 10:55:17 -0700653 summary = []
jamesrenb55378a2010-03-02 22:19:49 +0000654 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
655 for queue_entry in hosts_queue:
Eric Li6f27d4f2010-09-29 10:55:17 -0700656 summary.append("Host: %s Status: %s" %
jamesrenb55378a2010-03-02 22:19:49 +0000657 (queue_entry._get_hostname(),
658 queue_entry.status))
659
Eric Li6f27d4f2010-09-29 10:55:17 -0700660 summary = "\n".join(summary)
jamesrenb55378a2010-03-02 22:19:49 +0000661 status_counts = models.Job.objects.get_status_counts(
662 [self.job.id])[self.job.id]
663 status = ', '.join('%d %s' % (count, status) for status, count
664 in status_counts.iteritems())
665
Eric Li6f27d4f2010-09-29 10:55:17 -0700666 subject, body = self._get_status_email_contents(status, summary, None)
jamesrenb55378a2010-03-02 22:19:49 +0000667 email_manager.manager.send_email(self.job.email_list, subject, body)
668
669
670 def schedule_pre_job_tasks(self):
671 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
672 self.job.name, self.meta_host, self.atomic_group_id,
673 self.job.id, self.id, self.host.hostname, self.status)
674
675 self._do_schedule_pre_job_tasks()
676
677
678 def _do_schedule_pre_job_tasks(self):
679 # Every host goes thru the Verifying stage (which may or may not
680 # actually do anything as determined by get_pre_job_tasks).
681 self.set_status(models.HostQueueEntry.Status.VERIFYING)
682 self.job.schedule_pre_job_tasks(queue_entry=self)
683
684
685 def requeue(self):
686 assert self.host
687 self.set_status(models.HostQueueEntry.Status.QUEUED)
688 self.update_field('started_on', None)
689 # verify/cleanup failure sets the execution subdir, so reset it here
690 self.set_execution_subdir('')
691 if self.meta_host:
692 self.set_host(None)
693
694
695 @property
696 def aborted_by(self):
697 self._load_abort_info()
698 return self._aborted_by
699
700
701 @property
702 def aborted_on(self):
703 self._load_abort_info()
704 return self._aborted_on
705
706
707 def _load_abort_info(self):
708 """ Fetch info about who aborted the job. """
709 if hasattr(self, "_aborted_by"):
710 return
711 rows = _db.execute("""
712 SELECT afe_users.login,
713 afe_aborted_host_queue_entries.aborted_on
714 FROM afe_aborted_host_queue_entries
715 INNER JOIN afe_users
716 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
717 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
718 """, (self.id,))
719 if rows:
720 self._aborted_by, self._aborted_on = rows[0]
721 else:
722 self._aborted_by = self._aborted_on = None
723
724
725 def on_pending(self):
726 """
727 Called when an entry in a synchronous job has passed verify. If the
728 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
729 them in PENDING.
730 """
731 self.set_status(models.HostQueueEntry.Status.PENDING)
732 self.host.set_status(models.Host.Status.PENDING)
733
734 # Some debug code here: sends an email if an asynchronous job does not
735 # immediately enter Starting.
736 # TODO: Remove this once we figure out why asynchronous jobs are getting
737 # stuck in Pending.
738 self.job.run_if_ready(queue_entry=self)
739 if (self.job.synch_count == 1 and
740 self.status == models.HostQueueEntry.Status.PENDING):
741 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
742 message = 'Asynchronous job stuck in Pending'
743 email_manager.manager.enqueue_notify_email(subject, message)
744
745
746 def abort(self, dispatcher):
747 assert self.aborted and not self.complete
748
749 Status = models.HostQueueEntry.Status
750 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
751 # do nothing; post-job tasks will finish and then mark this entry
752 # with status "Aborted" and take care of the host
753 return
754
jamesren3bc70a12010-04-12 18:23:38 +0000755 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING,
756 Status.WAITING):
jamesrenb55378a2010-03-02 22:19:49 +0000757 assert not dispatcher.get_agents_for_entry(self)
758 self.host.set_status(models.Host.Status.READY)
759 elif self.status == Status.VERIFYING:
760 models.SpecialTask.objects.create(
761 task=models.SpecialTask.Task.CLEANUP,
762 host=models.Host.objects.get(id=self.host.id),
763 requested_by=self.job.owner_model())
764
765 self.set_status(Status.ABORTED)
766 self.job.abort_delay_ready_task()
767
768
769 def get_group_name(self):
770 atomic_group = self.atomic_group
771 if not atomic_group:
772 return ''
773
774 # Look at any meta_host and dependency labels and pick the first
775 # one that also specifies this atomic group. Use that label name
776 # as the group name if possible (it is more specific).
777 for label in self.get_labels():
778 if label.atomic_group_id:
779 assert label.atomic_group_id == atomic_group.id
780 return label.name
781 return atomic_group.name
782
783
784 def execution_tag(self):
785 assert self.execution_subdir
786 return "%s/%s" % (self.job.tag(), self.execution_subdir)
787
788
789 def execution_path(self):
790 return self.execution_tag()
791
792
793 def set_started_on_now(self):
794 self.update_field('started_on', datetime.datetime.now())
795
796
797 def is_hostless(self):
798 return (self.host_id is None
799 and self.meta_host is None
800 and self.atomic_group_id is None)
801
802
803class Job(DBObject):
804 _table_name = 'afe_jobs'
805 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
806 'control_type', 'created_on', 'synch_count', 'timeout',
807 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
jamesren4a41e012010-07-16 22:33:48 +0000808 'parse_failed_repair', 'max_runtime_hrs', 'drone_set_id',
809 'parameterized_job_id')
jamesrenb55378a2010-03-02 22:19:49 +0000810
811 # This does not need to be a column in the DB. The delays are likely to
812 # be configured short. If the scheduler is stopped and restarted in
813 # the middle of a job's delay cycle, the delay cycle will either be
814 # repeated or skipped depending on the number of Pending machines found
815 # when the restarted scheduler recovers to track it. Not a problem.
816 #
817 # A reference to the DelayedCallTask that will wake up the job should
818 # no other HQEs change state in time. Its end_time attribute is used
819 # by our run_with_ready_delay() method to determine if the wait is over.
820 _delay_ready_task = None
821
822 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
823 # all status='Pending' atomic group HQEs incase a delay was running when the
824 # scheduler was restarted and no more hosts ever successfully exit Verify.
825
826 def __init__(self, id=None, row=None, **kwargs):
827 assert id or row
828 super(Job, self).__init__(id=id, row=row, **kwargs)
829 self._owner_model = None # caches model instance of owner
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800830 self.update_image_path = None # path of OS image to install
jamesrenb55378a2010-03-02 22:19:49 +0000831
832
833 def model(self):
834 return models.Job.objects.get(id=self.id)
835
836
837 def owner_model(self):
838 # work around the fact that the Job owner field is a string, not a
839 # foreign key
840 if not self._owner_model:
841 self._owner_model = models.User.objects.get(login=self.owner)
842 return self._owner_model
843
844
845 def is_server_job(self):
846 return self.control_type != 2
847
848
849 def tag(self):
850 return "%s-%s" % (self.id, self.owner)
851
852
853 def get_host_queue_entries(self):
854 rows = _db.execute("""
855 SELECT * FROM afe_host_queue_entries
856 WHERE job_id= %s
857 """, (self.id,))
858 entries = [HostQueueEntry(row=i) for i in rows]
859
860 assert len(entries)>0
861
862 return entries
863
864
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800865 def is_image_update_job(self):
866 """
867 Discover if the current job requires an OS update.
868
869 @return: True/False if OS should be updated before job is run.
870 """
871 # All image update jobs have the parameterized_job_id set.
872 if not self.parameterized_job_id:
873 return False
874
875 # Retrieve the ID of the ParameterizedJob this job is an instance of.
876 rows = _db.execute("""
877 SELECT test_id
878 FROM afe_parameterized_jobs
879 WHERE id = %s
880 """, (self.parameterized_job_id,))
881 if not rows:
882 return False
883 test_id = rows[0][0]
884
885 # Retrieve the ID of the known autoupdate_ParameterizedJob.
886 rows = _db.execute("""
887 SELECT id
888 FROM afe_autotests
889 WHERE name = 'autoupdate_ParameterizedJob'
890 """)
891 if not rows:
892 return False
893 update_id = rows[0][0]
894
895 # If the IDs are the same we've found an image update job.
896 if test_id == update_id:
897 # Finally, get the path to the OS image to install.
898 rows = _db.execute("""
899 SELECT parameter_value
900 FROM afe_parameterized_job_parameters
901 WHERE parameterized_job_id = %s
902 """, (self.parameterized_job_id,))
903 if rows:
904 # Save the path in update_image_path to use later as a command
905 # line parameter to autoserv.
906 self.update_image_path = rows[0][0]
907 return True
908
909 return False
910
911
Eric Li6f27d4f2010-09-29 10:55:17 -0700912 def get_execution_details(self):
913 """
914 Get test execution details for this job.
915
916 @return: Dictionary with test execution details
917 """
918 def _find_test_jobs(rows):
919 """
920 Here we are looking for tests such as SERVER_JOB and CLIENT_JOB.*
921 Those are autotest 'internal job' tests, so they should not be
922 counted when evaluating the test stats.
923
924 @param rows: List of rows (matrix) with database results.
925 """
926 job_test_pattern = re.compile('SERVER|CLIENT\\_JOB\.[\d]')
927 n_test_jobs = 0
928 for r in rows:
929 test_name = r[0]
930 if job_test_pattern.match(test_name):
931 n_test_jobs += 1
932
933 return n_test_jobs
934
935 stats = {}
936
937 rows = _db.execute("""
938 SELECT t.test, s.word, t.reason
939 FROM tko_tests AS t, tko_jobs AS j, tko_status AS s
940 WHERE t.job_idx = j.job_idx
941 AND s.status_idx = t.status
942 AND j.afe_job_id = %s
Mike Truty6941dea2010-11-09 15:26:32 -0800943 ORDER BY t.reason
Eric Li6f27d4f2010-09-29 10:55:17 -0700944 """ % self.id)
945
Dale Curtis74a314b2011-06-23 14:55:46 -0700946 failed_rows = [r for r in rows if not r[1] == 'GOOD']
Eric Li6f27d4f2010-09-29 10:55:17 -0700947
948 n_test_jobs = _find_test_jobs(rows)
949 n_test_jobs_failed = _find_test_jobs(failed_rows)
950
951 total_executed = len(rows) - n_test_jobs
952 total_failed = len(failed_rows) - n_test_jobs_failed
953
954 if total_executed > 0:
955 success_rate = 100 - ((total_failed / float(total_executed)) * 100)
956 else:
957 success_rate = 0
958
959 stats['total_executed'] = total_executed
960 stats['total_failed'] = total_failed
961 stats['total_passed'] = total_executed - total_failed
962 stats['success_rate'] = success_rate
963
964 status_header = ("Test Name", "Status", "Reason")
965 if failed_rows:
966 stats['failed_rows'] = utils.matrix_to_string(failed_rows,
967 status_header)
968 else:
969 stats['failed_rows'] = ''
970
971 time_row = _db.execute("""
972 SELECT started_time, finished_time
973 FROM tko_jobs
974 WHERE afe_job_id = %s
975 """ % self.id)
976
977 if time_row:
978 t_begin, t_end = time_row[0]
Mike Truty6941dea2010-11-09 15:26:32 -0800979 try:
980 delta = t_end - t_begin
981 minutes, seconds = divmod(delta.seconds, 60)
982 hours, minutes = divmod(minutes, 60)
983 stats['execution_time'] = ("%02d:%02d:%02d" %
984 (hours, minutes, seconds))
985 # One of t_end or t_begin are None
986 except TypeError:
987 stats['execution_time'] = '(could not determine)'
Eric Li6f27d4f2010-09-29 10:55:17 -0700988 else:
989 stats['execution_time'] = '(none)'
990
991 return stats
992
993
jamesrenb55378a2010-03-02 22:19:49 +0000994 def set_status(self, status, update_queues=False):
995 self.update_field('status',status)
996
997 if update_queues:
998 for queue_entry in self.get_host_queue_entries():
999 queue_entry.set_status(status)
1000
1001
1002 def keyval_dict(self):
1003 return self.model().keyval_dict()
1004
1005
1006 def _atomic_and_has_started(self):
1007 """
1008 @returns True if any of the HostQueueEntries associated with this job
1009 have entered the Status.STARTING state or beyond.
1010 """
1011 atomic_entries = models.HostQueueEntry.objects.filter(
1012 job=self.id, atomic_group__isnull=False)
1013 if atomic_entries.count() <= 0:
1014 return False
1015
1016 # These states may *only* be reached if Job.run() has been called.
1017 started_statuses = (models.HostQueueEntry.Status.STARTING,
1018 models.HostQueueEntry.Status.RUNNING,
1019 models.HostQueueEntry.Status.COMPLETED)
1020
1021 started_entries = atomic_entries.filter(status__in=started_statuses)
1022 return started_entries.count() > 0
1023
1024
1025 def _hosts_assigned_count(self):
1026 """The number of HostQueueEntries assigned a Host for this job."""
1027 entries = models.HostQueueEntry.objects.filter(job=self.id,
1028 host__isnull=False)
1029 return entries.count()
1030
1031
1032 def _pending_count(self):
1033 """The number of HostQueueEntries for this job in the Pending state."""
1034 pending_entries = models.HostQueueEntry.objects.filter(
1035 job=self.id, status=models.HostQueueEntry.Status.PENDING)
1036 return pending_entries.count()
1037
1038
1039 def _max_hosts_needed_to_run(self, atomic_group):
1040 """
1041 @param atomic_group: The AtomicGroup associated with this job that we
1042 are using to set an upper bound on the threshold.
1043 @returns The maximum number of HostQueueEntries assigned a Host before
1044 this job can run.
1045 """
1046 return min(self._hosts_assigned_count(),
1047 atomic_group.max_number_of_machines)
1048
1049
1050 def _min_hosts_needed_to_run(self):
1051 """Return the minumum number of hsots needed to run this job."""
1052 return self.synch_count
1053
1054
1055 def is_ready(self):
1056 # NOTE: Atomic group jobs stop reporting ready after they have been
1057 # started to avoid launching multiple copies of one atomic job.
1058 # Only possible if synch_count is less than than half the number of
1059 # machines in the atomic group.
1060 pending_count = self._pending_count()
1061 atomic_and_has_started = self._atomic_and_has_started()
1062 ready = (pending_count >= self.synch_count
1063 and not atomic_and_has_started)
1064
1065 if not ready:
1066 logging.info(
1067 'Job %s not ready: %s pending, %s required '
1068 '(Atomic and started: %s)',
1069 self, pending_count, self.synch_count,
1070 atomic_and_has_started)
1071
1072 return ready
1073
1074
1075 def num_machines(self, clause = None):
1076 sql = "job_id=%s" % self.id
1077 if clause:
1078 sql += " AND (%s)" % clause
1079 return self.count(sql, table='afe_host_queue_entries')
1080
1081
1082 def num_queued(self):
1083 return self.num_machines('not complete')
1084
1085
1086 def num_active(self):
1087 return self.num_machines('active')
1088
1089
1090 def num_complete(self):
1091 return self.num_machines('complete')
1092
1093
1094 def is_finished(self):
1095 return self.num_complete() == self.num_machines()
1096
1097
1098 def _not_yet_run_entries(self, include_verifying=True):
1099 statuses = [models.HostQueueEntry.Status.QUEUED,
1100 models.HostQueueEntry.Status.PENDING]
1101 if include_verifying:
1102 statuses.append(models.HostQueueEntry.Status.VERIFYING)
1103 return models.HostQueueEntry.objects.filter(job=self.id,
1104 status__in=statuses)
1105
1106
1107 def _stop_all_entries(self):
1108 entries_to_stop = self._not_yet_run_entries(
1109 include_verifying=False)
1110 for child_entry in entries_to_stop:
1111 assert not child_entry.complete, (
1112 '%s status=%s, active=%s, complete=%s' %
1113 (child_entry.id, child_entry.status, child_entry.active,
1114 child_entry.complete))
1115 if child_entry.status == models.HostQueueEntry.Status.PENDING:
1116 child_entry.host.status = models.Host.Status.READY
1117 child_entry.host.save()
1118 child_entry.status = models.HostQueueEntry.Status.STOPPED
1119 child_entry.save()
1120
1121
1122 def stop_if_necessary(self):
1123 not_yet_run = self._not_yet_run_entries()
1124 if not_yet_run.count() < self.synch_count:
1125 self._stop_all_entries()
1126
1127
1128 def write_to_machines_file(self, queue_entry):
1129 hostname = queue_entry.host.hostname
1130 file_path = os.path.join(self.tag(), '.machines')
1131 _drone_manager.write_lines_to_file(file_path, [hostname])
1132
1133
1134 def _next_group_name(self, group_name=''):
1135 """@returns a directory name to use for the next host group results."""
1136 if group_name:
1137 # Sanitize for use as a pathname.
1138 group_name = group_name.replace(os.path.sep, '_')
1139 if group_name.startswith('.'):
1140 group_name = '_' + group_name[1:]
1141 # Add a separator between the group name and 'group%d'.
1142 group_name += '.'
1143 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
1144 query = models.HostQueueEntry.objects.filter(
1145 job=self.id).values('execution_subdir').distinct()
1146 subdirs = (entry['execution_subdir'] for entry in query)
1147 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
1148 ids = [int(match.group(1)) for match in group_matches if match]
1149 if ids:
1150 next_id = max(ids) + 1
1151 else:
1152 next_id = 0
1153 return '%sgroup%d' % (group_name, next_id)
1154
1155
1156 def get_group_entries(self, queue_entry_from_group):
1157 """
1158 @param queue_entry_from_group: A HostQueueEntry instance to find other
1159 group entries on this job for.
1160
1161 @returns A list of HostQueueEntry objects all executing this job as
1162 part of the same group as the one supplied (having the same
1163 execution_subdir).
1164 """
1165 execution_subdir = queue_entry_from_group.execution_subdir
1166 return list(HostQueueEntry.fetch(
1167 where='job_id=%s AND execution_subdir=%s',
1168 params=(self.id, execution_subdir)))
1169
1170
1171 def _should_run_cleanup(self, queue_entry):
1172 if self.reboot_before == model_attributes.RebootBefore.ALWAYS:
1173 return True
1174 elif self.reboot_before == model_attributes.RebootBefore.IF_DIRTY:
1175 return queue_entry.host.dirty
1176 return False
1177
1178
1179 def _should_run_verify(self, queue_entry):
1180 do_not_verify = (queue_entry.host.protection ==
1181 host_protections.Protection.DO_NOT_VERIFY)
1182 if do_not_verify:
1183 return False
1184 return self.run_verify
1185
1186
1187 def schedule_pre_job_tasks(self, queue_entry):
1188 """
1189 Get a list of tasks to perform before the host_queue_entry
1190 may be used to run this Job (such as Cleanup & Verify).
1191
1192 @returns A list of tasks to be done to the given queue_entry before
1193 it should be considered be ready to run this job. The last
1194 task in the list calls HostQueueEntry.on_pending(), which
1195 continues the flow of the job.
1196 """
1197 if self._should_run_cleanup(queue_entry):
1198 task = models.SpecialTask.Task.CLEANUP
1199 elif self._should_run_verify(queue_entry):
1200 task = models.SpecialTask.Task.VERIFY
1201 else:
1202 queue_entry.on_pending()
1203 return
1204
1205 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
1206 models.SpecialTask.objects.create(
1207 host=models.Host.objects.get(id=queue_entry.host_id),
1208 queue_entry=queue_entry, task=task)
1209
1210
1211 def _assign_new_group(self, queue_entries, group_name=''):
1212 if len(queue_entries) == 1:
1213 group_subdir_name = queue_entries[0].host.hostname
1214 else:
1215 group_subdir_name = self._next_group_name(group_name)
1216 logging.info('Running synchronous job %d hosts %s as %s',
1217 self.id, [entry.host.hostname for entry in queue_entries],
1218 group_subdir_name)
1219
1220 for queue_entry in queue_entries:
1221 queue_entry.set_execution_subdir(group_subdir_name)
1222
1223
1224 def _choose_group_to_run(self, include_queue_entry):
1225 """
1226 @returns A tuple containing a list of HostQueueEntry instances to be
1227 used to run this Job, a string group name to suggest giving
1228 to this job in the results database.
1229 """
1230 atomic_group = include_queue_entry.atomic_group
1231 chosen_entries = [include_queue_entry]
1232 if atomic_group:
1233 num_entries_wanted = atomic_group.max_number_of_machines
1234 else:
1235 num_entries_wanted = self.synch_count
1236 num_entries_wanted -= len(chosen_entries)
1237
1238 if num_entries_wanted > 0:
1239 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
1240 pending_entries = list(HostQueueEntry.fetch(
1241 where=where_clause,
1242 params=(self.id, include_queue_entry.id)))
1243
1244 # Sort the chosen hosts by hostname before slicing.
1245 def cmp_queue_entries_by_hostname(entry_a, entry_b):
1246 return Host.cmp_for_sort(entry_a.host, entry_b.host)
1247 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
1248 chosen_entries += pending_entries[:num_entries_wanted]
1249
1250 # Sanity check. We'll only ever be called if this can be met.
1251 if len(chosen_entries) < self.synch_count:
1252 message = ('job %s got less than %s chosen entries: %s' % (
1253 self.id, self.synch_count, chosen_entries))
1254 logging.error(message)
1255 email_manager.manager.enqueue_notify_email(
1256 'Job not started, too few chosen entries', message)
1257 return []
1258
1259 group_name = include_queue_entry.get_group_name()
1260
1261 self._assign_new_group(chosen_entries, group_name=group_name)
1262 return chosen_entries
1263
1264
1265 def run_if_ready(self, queue_entry):
1266 """
1267 Run this job by kicking its HQEs into status='Starting' if enough
1268 hosts are ready for it to run.
1269
1270 Cleans up by kicking HQEs into status='Stopped' if this Job is not
1271 ready to run.
1272 """
1273 if not self.is_ready():
1274 self.stop_if_necessary()
1275 elif queue_entry.atomic_group:
1276 self.run_with_ready_delay(queue_entry)
1277 else:
1278 self.run(queue_entry)
1279
1280
1281 def run_with_ready_delay(self, queue_entry):
1282 """
1283 Start a delay to wait for more hosts to enter Pending state before
1284 launching an atomic group job. Once set, the a delay cannot be reset.
1285
1286 @param queue_entry: The HostQueueEntry object to get atomic group
1287 info from and pass to run_if_ready when the delay is up.
1288
1289 @returns An Agent to run the job as appropriate or None if a delay
1290 has already been set.
1291 """
1292 assert queue_entry.job_id == self.id
1293 assert queue_entry.atomic_group
1294 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1295 over_max_threshold = (self._pending_count() >=
1296 self._max_hosts_needed_to_run(queue_entry.atomic_group))
1297 delay_expired = (self._delay_ready_task and
1298 time.time() >= self._delay_ready_task.end_time)
1299
1300 # Delay is disabled or we already have enough? Do not wait to run.
1301 if not delay or over_max_threshold or delay_expired:
1302 self.run(queue_entry)
1303 else:
1304 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
1305
1306
1307 def request_abort(self):
1308 """Request that this Job be aborted on the next scheduler cycle."""
1309 self.model().abort()
1310
1311
1312 def schedule_delayed_callback_task(self, queue_entry):
1313 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
1314
1315 if self._delay_ready_task:
1316 return None
1317
1318 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
1319
1320 def run_job_after_delay():
1321 logging.info('Job %s done waiting for extra hosts.', self)
1322 # Check to see if the job is still relevant. It could have aborted
1323 # while we were waiting or hosts could have disappearred, etc.
1324 if self._pending_count() < self._min_hosts_needed_to_run():
1325 logging.info('Job %s had too few Pending hosts after waiting '
1326 'for extras. Not running.', self)
1327 self.request_abort()
1328 return
1329 return self.run(queue_entry)
1330
1331 logging.info('Job %s waiting up to %s seconds for more hosts.',
1332 self.id, delay)
1333 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
1334 callback=run_job_after_delay)
1335 return self._delay_ready_task
1336
1337
1338 def run(self, queue_entry):
1339 """
1340 @param queue_entry: The HostQueueEntry instance calling this method.
1341 """
1342 if queue_entry.atomic_group and self._atomic_and_has_started():
1343 logging.error('Job.run() called on running atomic Job %d '
1344 'with HQE %s.', self.id, queue_entry)
1345 return
1346 queue_entries = self._choose_group_to_run(queue_entry)
1347 if queue_entries:
1348 self._finish_run(queue_entries)
1349
1350
1351 def _finish_run(self, queue_entries):
1352 for queue_entry in queue_entries:
1353 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
1354 self.abort_delay_ready_task()
1355
1356
1357 def abort_delay_ready_task(self):
1358 """Abort the delayed task associated with this job, if any."""
1359 if self._delay_ready_task:
1360 # Cancel any pending callback that would try to run again
1361 # as we are already running.
1362 self._delay_ready_task.abort()
1363
1364
1365 def __str__(self):
1366 return '%s-%s' % (self.id, self.owner)