blob: 3dfe9f0adc324d6b3bd200429db63d87c91491e9 [file] [log] [blame]
beeps5e2bb4a2013-10-28 11:26:45 -07001#pylint: disable-msg=C0111
2
3""" This is the module for everything related to the AgentTask.
4
5The BaseAgentTask imposes an interface through which the scheduler can monitor
6a processes; Examples of such processes include Verify, Cleanup and the Queue
7Tasks that run the tests. The scheduler itself only understands Agents.
8Agents:
9 The Agent is the bridge between the scheduler and the AgentTask. The
10 schedulers tick has a method called handle_agents, which calls the
11 tick of each agent in the Dispatchers queue. This leads to the Agent
12 polling its AgentTask. The scheduler will keep polling a task through
13 the associated Agent till the Agent is removed from the dispatcher.
14
15 At a high level:
16 agents finished = tasks done
17 agent polls till finished
18 task polls till done
19 task sets done
20 agent is removed from dispatcher
21AgentTasks:
22 Basic AgentTasks are created when an hqe changes state. Examples of these
23 are the QueueTask, which is created when a hqe goes into the Starting state
24 and the FinalReparseTask, which is created when the hqe goes into parsing.
25SpecialAgentTasks:
MK Ryu35d661e2014-09-25 17:44:10 -070026 Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted
beeps5e2bb4a2013-10-28 11:26:45 -070027 in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks.
28
29Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps
30an AgentTask to an Agent, which the scheduler understands. From this point
31onward, the scheduler manages the task through the Agents interface,as follows:
32At a high level:
33 task poll
34 start
35 prolog
36 tick till we get an exit code
37 finished(exit==0)
38 done=True
39 epilog
40 cleanup
41 set is_active, is_complete, success (checked in scheduler)
42
43The first special task for an HQE is usually Reset.
44-poll: The first poll will start the task, polls thereafter will call the tasks
45 tick method. A started task will have the started bit set.
46- start: Call prolog, run the process and set the start bit.
47 - prolog: Usually where one puts any model state changes that happen before
48 the actual task. Different per Task. Examples of things that might
49 happen in a prolog:
50 - state of Host, HQE (to something like Resetting)
51 - delete any unwanted queued special tasks
52 - register a pidfile
53 - set the is_active bit on the special task
54 - run:
55 - create a PidfileRunMonitor
56 - pass the autoserv command, working directory etc to drone manager.
57 This will start the actual autoserv process.
58 - set the start bit: so subsequent polls do not 'start' again
59
60- tick: For as long as a started tasks done bit is not set, a poll will lead
61 to a tick. The tick monitors the pid file of the autoserv process
62 running on the drone through the PidfileRunMonitor created in prolog.
63 If the autoserv process has finished we call finished with true/false
64 depending on autoserv exit code.
65
66 - finished: sets the done and success values, then calls epilog. The
67 done bit is important because the Agent polls this bit to
68 measure the success or failure of its task.
69
70 - epilog: Is generally where we set status of the Host/HQE again,
71 requeue any other task that needs to run after this one
72 and perform cleanup. Just like the prolog, this step is
73 different per task.
74
75 - cleanup: Sets the is_active and is_complete and success
76 states on the tasks model. Also uses the
77 drone_manager to:
78 unregister the pidfile
79 copy results of the task
80 (Note this is not to be confused with the
81 special task called cleanup).
82
83 The actions we take in the epilog are based on the
84 success/failure of the autoserv process set in cleanup,
85 eg: if reset failed we will enqueue a repair, but if all
86 is well the epilog will just return. Prejob task epilogs
87 also have an on_pending method that change the status of
88 the HQE to pending/starting, which gets picked up in the
89 scheduler.
90By this point the is_done flag is set, which results in the Agent noticing that
91the task has finished and unregistering it from the dispatcher.Class hierarchy:
92BaseAgentTask
93 |--->SpecialAgentTask (prejob_task.py)
94 |--->RepairTask
95 |--->PreJobTask
96 |--->Verify, Cleanup, Reset, Provision
97
98 |--->AbstractQueueTask (monitor_db.py)
99 |--->QueueTask
100 |--->HostlessQueueTask
101
102 |--->PostJobTask (postjob_task.py)
103 |--->GatherLogsTask
104 |--->SelfThrottledPostJobTask
105 |--->FinalReparseTask
106 |--->ArchiveResultsTask
107
108"""
109
110import logging
111import os
112import urllib
113import time
114
115from autotest_lib.frontend.afe import models
116from autotest_lib.scheduler import drone_manager, pidfile_monitor
117from autotest_lib.client.common_lib import utils
118from autotest_lib.scheduler import email_manager, host_scheduler
beepscc9fc702013-12-02 12:45:38 -0800119from autotest_lib.scheduler import rdb_lib
beeps5e2bb4a2013-10-28 11:26:45 -0700120from autotest_lib.scheduler import scheduler_models
121from autotest_lib.server import autoserv_utils
122
123
beeps5e2bb4a2013-10-28 11:26:45 -0700124AUTOSERV_NICE_LEVEL = 10
125
126
127class BaseAgentTask(object):
128 class _NullMonitor(object):
129 pidfile_id = None
130
131 def has_process(self):
132 return True
133
134
135 def __init__(self, log_file_name=None):
136 """
137 @param log_file_name: (optional) name of file to log command output to
138 """
Jakob Jülich36accc62014-07-23 10:26:55 -0700139 self._drone_manager = drone_manager.instance()
beeps5e2bb4a2013-10-28 11:26:45 -0700140 self.done = False
141 self.started = False
142 self.success = None
143 self.aborted = False
144 self.monitor = None
145 self.queue_entry_ids = []
146 self.host_ids = []
147 self._log_file_name = log_file_name
148
149
150 def _set_ids(self, host=None, queue_entries=None):
151 if queue_entries and queue_entries != [None]:
152 self.host_ids = [entry.host.id for entry in queue_entries]
153 self.queue_entry_ids = [entry.id for entry in queue_entries]
154 else:
155 assert host
156 self.host_ids = [host.id]
157
158
159 def poll(self):
160 if not self.started:
161 self.start()
162 if not self.done:
163 self.tick()
164
165
166 def tick(self):
167 assert self.monitor
168 exit_code = self.monitor.exit_code()
169 if exit_code is None:
170 return
171
172 success = (exit_code == 0)
173 self.finished(success)
174
175
176 def is_done(self):
177 return self.done
178
179
180 def finished(self, success):
181 if self.done:
182 assert self.started
183 return
184 self.started = True
185 self.done = True
186 self.success = success
187 self.epilog()
188
189
190 def prolog(self):
191 """
192 To be overridden.
193 """
194 assert not self.monitor
195 self.register_necessary_pidfiles()
196
197
198 def _log_file(self):
199 if not self._log_file_name:
200 return None
201 return os.path.join(self._working_directory(), self._log_file_name)
202
203
204 def cleanup(self):
205 log_file = self._log_file()
206 if self.monitor and log_file:
207 self.monitor.try_copy_to_results_repository(log_file)
208
209
210 def epilog(self):
211 """
212 To be overridden.
213 """
214 self.cleanup()
215 logging.info("%s finished with success=%s", type(self).__name__,
216 self.success)
217
218
219 def start(self):
220 if not self.started:
221 self.prolog()
222 self.run()
223
224 self.started = True
225
226
227 def abort(self):
228 if self.monitor:
229 self.monitor.kill()
230 self.done = True
231 self.aborted = True
232 self.cleanup()
233
234
235 def _get_consistent_execution_path(self, execution_entries):
236 first_execution_path = execution_entries[0].execution_path()
237 for execution_entry in execution_entries[1:]:
238 assert execution_entry.execution_path() == first_execution_path, (
239 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
240 execution_entry,
241 first_execution_path,
242 execution_entries[0]))
243 return first_execution_path
244
245
246 def _copy_results(self, execution_entries, use_monitor=None):
247 """
248 @param execution_entries: list of objects with execution_path() method
249 """
250 if use_monitor is not None and not use_monitor.has_process():
251 return
252
253 assert len(execution_entries) > 0
254 if use_monitor is None:
255 assert self.monitor
256 use_monitor = self.monitor
257 assert use_monitor.has_process()
258 execution_path = self._get_consistent_execution_path(execution_entries)
259 results_path = execution_path + '/'
260 use_monitor.try_copy_to_results_repository(results_path)
261
262
263 def _parse_results(self, queue_entries):
264 for queue_entry in queue_entries:
265 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
266
267
268 def _archive_results(self, queue_entries):
269 for queue_entry in queue_entries:
270 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
271
272
273 def _command_line(self):
274 """
275 Return the command line to run. Must be overridden.
276 """
277 raise NotImplementedError
278
279
280 @property
281 def num_processes(self):
282 """
283 Return the number of processes forked by this BaseAgentTask's process.
284 It may only be approximate. To be overridden if necessary.
285 """
286 return 1
287
288
289 def _paired_with_monitor(self):
290 """
291 If this BaseAgentTask's process must run on the same machine as some
292 previous process, this method should be overridden to return a
293 PidfileRunMonitor for that process.
294 """
295 return self._NullMonitor()
296
297
298 @property
299 def owner_username(self):
300 """
301 Return login of user responsible for this task. May be None. Must be
302 overridden.
303 """
304 raise NotImplementedError
305
306
307 def _working_directory(self):
308 """
309 Return the directory where this BaseAgentTask's process executes.
310 Must be overridden.
311 """
312 raise NotImplementedError
313
314
315 def _pidfile_name(self):
316 """
317 Return the name of the pidfile this BaseAgentTask's process uses. To be
318 overridden if necessary.
319 """
320 return drone_manager.AUTOSERV_PID_FILE
321
322
323 def _check_paired_results_exist(self):
324 if not self._paired_with_monitor().has_process():
325 email_manager.manager.enqueue_notify_email(
326 'No paired results in task',
327 'No paired results in task %s at %s'
328 % (self, self._paired_with_monitor().pidfile_id))
329 self.finished(False)
330 return False
331 return True
332
333
334 def _create_monitor(self):
335 assert not self.monitor
336 self.monitor = pidfile_monitor.PidfileRunMonitor()
337
338
339 def run(self):
340 if not self._check_paired_results_exist():
341 return
342
343 self._create_monitor()
344 self.monitor.run(
345 self._command_line(), self._working_directory(),
346 num_processes=self.num_processes,
347 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
348 pidfile_name=self._pidfile_name(),
349 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
350 username=self.owner_username,
351 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
352
353
354 def get_drone_hostnames_allowed(self):
355 if not models.DroneSet.drone_sets_enabled():
356 return None
357
358 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
359 if not hqes:
360 # Only special tasks could be missing host queue entries
361 assert isinstance(self, SpecialAgentTask)
362 return self._user_or_global_default_drone_set(
363 self.task, self.task.requested_by)
364
365 job_ids = hqes.values_list('job', flat=True).distinct()
366 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
367 "span multiple jobs")
368
369 job = models.Job.objects.get(id=job_ids[0])
370 drone_set = job.drone_set
371 if not drone_set:
372 return self._user_or_global_default_drone_set(job, job.user())
373
374 return drone_set.get_drone_hostnames()
375
376
377 def _user_or_global_default_drone_set(self, obj_with_owner, user):
378 """
379 Returns the user's default drone set, if present.
380
381 Otherwise, returns the global default drone set.
382 """
383 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
384 if not user:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -0700385 logging.warning('%s had no owner; using default drone set',
beeps5e2bb4a2013-10-28 11:26:45 -0700386 obj_with_owner)
387 return default_hostnames
388 if not user.drone_set:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -0700389 logging.warning('User %s has no default drone set, using global '
beeps5e2bb4a2013-10-28 11:26:45 -0700390 'default', user.login)
391 return default_hostnames
392 return user.drone_set.get_drone_hostnames()
393
394
395 def register_necessary_pidfiles(self):
Jakob Jülich36accc62014-07-23 10:26:55 -0700396 pidfile_id = self._drone_manager.get_pidfile_id_from(
beeps5e2bb4a2013-10-28 11:26:45 -0700397 self._working_directory(), self._pidfile_name())
Jakob Jülich36accc62014-07-23 10:26:55 -0700398 self._drone_manager.register_pidfile(pidfile_id)
beeps5e2bb4a2013-10-28 11:26:45 -0700399
400 paired_pidfile_id = self._paired_with_monitor().pidfile_id
401 if paired_pidfile_id:
Jakob Jülich36accc62014-07-23 10:26:55 -0700402 self._drone_manager.register_pidfile(paired_pidfile_id)
beeps5e2bb4a2013-10-28 11:26:45 -0700403
404
405 def recover(self):
406 if not self._check_paired_results_exist():
407 return
408
409 self._create_monitor()
410 self.monitor.attach_to_existing_process(
411 self._working_directory(), pidfile_name=self._pidfile_name(),
412 num_processes=self.num_processes)
413 if not self.monitor.has_process():
414 # no process to recover; wait to be started normally
415 self.monitor = None
416 return
417
418 self.started = True
419 logging.info('Recovering process %s for %s at %s',
420 self.monitor.get_process(), type(self).__name__,
421 self._working_directory())
422
423
424 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
425 allowed_host_statuses=None):
426 class_name = self.__class__.__name__
427 for entry in queue_entries:
428 if entry.status not in allowed_hqe_statuses:
429 raise host_scheduler.SchedulerError(
430 '%s attempting to start entry with invalid status %s: '
431 '%s' % (class_name, entry.status, entry))
432 invalid_host_status = (
433 allowed_host_statuses is not None
434 and entry.host.status not in allowed_host_statuses)
435 if invalid_host_status:
436 raise host_scheduler.SchedulerError(
437 '%s attempting to start on queue entry with invalid '
438 'host status %s: %s'
439 % (class_name, entry.host.status, entry))
440
441
442SiteAgentTask = utils.import_site_class(
443 __file__, 'autotest_lib.scheduler.site_monitor_db',
444 'SiteAgentTask', BaseAgentTask)
445
446class AgentTask(SiteAgentTask):
447 pass
448
449
450class TaskWithJobKeyvals(object):
451 """AgentTask mixin providing functionality to help with job keyval files."""
452 _KEYVAL_FILE = 'keyval'
453 def _format_keyval(self, key, value):
454 return '%s=%s' % (key, value)
455
456
457 def _keyval_path(self):
458 """Subclasses must override this"""
459 raise NotImplementedError
460
461
462 def _write_keyval_after_job(self, field, value):
463 assert self.monitor
464 if not self.monitor.has_process():
465 return
Jakob Jülich36accc62014-07-23 10:26:55 -0700466 self._drone_manager.write_lines_to_file(
beeps5e2bb4a2013-10-28 11:26:45 -0700467 self._keyval_path(), [self._format_keyval(field, value)],
468 paired_with_process=self.monitor.get_process())
469
470
471 def _job_queued_keyval(self, job):
472 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
473
474
475 def _write_job_finished(self):
476 self._write_keyval_after_job("job_finished", int(time.time()))
477
478
479 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
480 keyval_contents = '\n'.join(self._format_keyval(key, value)
481 for key, value in keyval_dict.iteritems())
482 # always end with a newline to allow additional keyvals to be written
483 keyval_contents += '\n'
Jakob Jülich36accc62014-07-23 10:26:55 -0700484 self._drone_manager.attach_file_to_execution(self._working_directory(),
beeps5e2bb4a2013-10-28 11:26:45 -0700485 keyval_contents,
486 file_path=keyval_path)
487
488
489 def _write_keyvals_before_job(self, keyval_dict):
490 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
491
492
493 def _write_host_keyvals(self, host):
494 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
495 host.hostname)
496 platform, all_labels = host.platform_and_labels()
497 all_labels = [ urllib.quote(label) for label in all_labels ]
498 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
499 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
500
501
502class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
503 """
504 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
505 """
506
507 TASK_TYPE = None
508 host = None
509 queue_entry = None
510
511 def __init__(self, task, extra_command_args):
512 super(SpecialAgentTask, self).__init__()
513
514 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
515
beepscc9fc702013-12-02 12:45:38 -0800516 self.host = rdb_lib.get_hosts([task.host.id])[0]
517 self.host.dbg_str = 'Task: %s' % str(task)
beeps5e2bb4a2013-10-28 11:26:45 -0700518 self.queue_entry = None
519 if task.queue_entry:
520 self.queue_entry = scheduler_models.HostQueueEntry(
521 id=task.queue_entry.id)
beepscc9fc702013-12-02 12:45:38 -0800522 self.host.dbg_str += self.queue_entry.get_dbg_str()
beeps5e2bb4a2013-10-28 11:26:45 -0700523
524 self.task = task
525 self._extra_command_args = extra_command_args
Dan Shi7cf3d842014-08-13 11:20:38 -0700526 self.host.metadata = self.get_metadata()
527
528
529 def get_metadata(self):
530 """Get a dictionary that contains task information.
531
532 The return value is a dictionary that includes task information like id,
533 name and related job information. The value will be stored in metadata
534 database.
535 @return: A dictionary containing the task id, name and related job id.
536 If some attributes are failed to be accessed, an empty
537 dictionary will be returned, and error will be logged.
538 """
539 try:
540 metadata = {'task_id':self.task.id, 'task_name':self.task.task,
541 'hostname':self.task.host.hostname}
542 if self.task.queue_entry:
543 job = self.task.queue_entry.job
544 metadata.update(
545 scheduler_models.get_job_metadata(job))
546 return metadata
547 except AttributeError as e:
548 logging.error('Task has missing attribute: %s', e)
549 return {}
beeps5e2bb4a2013-10-28 11:26:45 -0700550
551
552 def _keyval_path(self):
553 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
554
555
556 def _command_line(self):
557 return autoserv_utils._autoserv_command_line(self.host.hostname,
558 self._extra_command_args,
559 queue_entry=self.queue_entry)
560
561
562 def _working_directory(self):
563 return self.task.execution_path()
564
565
566 @property
567 def owner_username(self):
568 if self.task.requested_by:
569 return self.task.requested_by.login
570 return None
571
572
573 def prolog(self):
574 super(SpecialAgentTask, self).prolog()
575 self.task.activate()
576 self._write_host_keyvals(self.host)
577
578
579 def _fail_queue_entry(self):
580 assert self.queue_entry
581
582 if self.queue_entry.meta_host:
583 return # don't fail metahost entries, they'll be reassigned
584
585 self.queue_entry.update_from_database()
586 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
587 return # entry has been aborted
588
589 self._actually_fail_queue_entry()
590
591
592 # TODO(milleral): http://crbug.com/268607
593 # All this used to be a part of _fail_queue_entry. The
594 # exact semantics of when one should and should not be failing a queue
595 # entry need to be worked out, because provisioning has placed us in a
596 # case where we want to fail a queue entry that could be requeued,
597 # which makes us fail the two above if statements, and thus
598 # _fail_queue_entry() would exit early and have no effect.
599 # What's left here with _actually_fail_queue_entry is a hack to be able to
600 # bypass the checks and unconditionally execute the code.
601 def _actually_fail_queue_entry(self):
602 self.queue_entry.set_execution_subdir()
603 queued_key, queued_time = self._job_queued_keyval(
604 self.queue_entry.job)
605 self._write_keyval_after_job(queued_key, queued_time)
606 self._write_job_finished()
607
608 # copy results logs into the normal place for job results
609 self.monitor.try_copy_results_on_drone(
610 source_path=self._working_directory() + '/',
611 destination_path=self.queue_entry.execution_path() + '/')
612
Jakob Jülich36accc62014-07-23 10:26:55 -0700613 pidfile_id = self._drone_manager.get_pidfile_id_from(
beeps5e2bb4a2013-10-28 11:26:45 -0700614 self.queue_entry.execution_path(),
615 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
Jakob Jülich36accc62014-07-23 10:26:55 -0700616 self._drone_manager.register_pidfile(pidfile_id)
beeps5e2bb4a2013-10-28 11:26:45 -0700617
618 if self.queue_entry.job.parse_failed_repair:
619 self._parse_results([self.queue_entry])
620 else:
621 self._archive_results([self.queue_entry])
622
623 # Also fail all other special tasks that have not yet run for this HQE
624 pending_tasks = models.SpecialTask.objects.filter(
625 queue_entry__id=self.queue_entry.id,
626 is_complete=0)
627 for task in pending_tasks:
628 task.finish(False)
629
630
631 def cleanup(self):
632 super(SpecialAgentTask, self).cleanup()
633
634 # We will consider an aborted task to be "Failed"
635 self.task.finish(bool(self.success))
636
637 if self.monitor:
638 if self.monitor.has_process():
639 self._copy_results([self.task])
640 if self.monitor.pidfile_id is not None:
Jakob Jülich36accc62014-07-23 10:26:55 -0700641 self._drone_manager.unregister_pidfile(self.monitor.pidfile_id)
beeps5e2bb4a2013-10-28 11:26:45 -0700642
643
644 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
645 """Remove a type of special task in all tasks, keep last one if needed.
646
647 @param special_task_to_remove: type of special task to be removed, e.g.,
648 models.SpecialTask.Task.VERIFY.
649 @param keep_last_one: True to keep the last special task if its type is
650 the same as of special_task_to_remove.
651
652 """
653 queued_special_tasks = models.SpecialTask.objects.filter(
654 host__id=self.host.id,
655 task=special_task_to_remove,
656 is_active=False, is_complete=False, queue_entry=None)
657 if keep_last_one:
658 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
659 queued_special_tasks.delete()
Alex Millerec212252014-02-28 16:48:34 -0800660
661
662 def _generate_autoserv_label_args(self, task):
663 """
664 @param task: An instance of afe model's SpecialTask.
665 @returns: The list of arguments to pass to autoserv to tell it what the
666 labels of a job are.
667
668 """
669 labels = {x.name for x in task.queue_entry.job.labels}
670 return ['--job-labels', ','.join(labels)]