blob: 8f6006b740061457b36999ea4a1683987dbdd48e [file] [log] [blame]
beeps5e2bb4a2013-10-28 11:26:45 -07001#pylint: disable-msg=C0111
2
3""" This is the module for everything related to the AgentTask.
4
5The BaseAgentTask imposes an interface through which the scheduler can monitor
6a processes; Examples of such processes include Verify, Cleanup and the Queue
7Tasks that run the tests. The scheduler itself only understands Agents.
8Agents:
9 The Agent is the bridge between the scheduler and the AgentTask. The
10 schedulers tick has a method called handle_agents, which calls the
11 tick of each agent in the Dispatchers queue. This leads to the Agent
12 polling its AgentTask. The scheduler will keep polling a task through
13 the associated Agent till the Agent is removed from the dispatcher.
14
15 At a high level:
16 agents finished = tasks done
17 agent polls till finished
18 task polls till done
19 task sets done
20 agent is removed from dispatcher
21AgentTasks:
22 Basic AgentTasks are created when an hqe changes state. Examples of these
23 are the QueueTask, which is created when a hqe goes into the Starting state
24 and the FinalReparseTask, which is created when the hqe goes into parsing.
25SpecialAgentTasks:
MK Ryu35d661e2014-09-25 17:44:10 -070026 Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted
beeps5e2bb4a2013-10-28 11:26:45 -070027 in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks.
28
29Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps
30an AgentTask to an Agent, which the scheduler understands. From this point
31onward, the scheduler manages the task through the Agents interface,as follows:
32At a high level:
33 task poll
34 start
35 prolog
36 tick till we get an exit code
37 finished(exit==0)
38 done=True
39 epilog
40 cleanup
41 set is_active, is_complete, success (checked in scheduler)
42
43The first special task for an HQE is usually Reset.
44-poll: The first poll will start the task, polls thereafter will call the tasks
45 tick method. A started task will have the started bit set.
46- start: Call prolog, run the process and set the start bit.
47 - prolog: Usually where one puts any model state changes that happen before
48 the actual task. Different per Task. Examples of things that might
49 happen in a prolog:
50 - state of Host, HQE (to something like Resetting)
51 - delete any unwanted queued special tasks
52 - register a pidfile
53 - set the is_active bit on the special task
54 - run:
55 - create a PidfileRunMonitor
56 - pass the autoserv command, working directory etc to drone manager.
57 This will start the actual autoserv process.
58 - set the start bit: so subsequent polls do not 'start' again
59
60- tick: For as long as a started tasks done bit is not set, a poll will lead
61 to a tick. The tick monitors the pid file of the autoserv process
62 running on the drone through the PidfileRunMonitor created in prolog.
63 If the autoserv process has finished we call finished with true/false
64 depending on autoserv exit code.
65
66 - finished: sets the done and success values, then calls epilog. The
67 done bit is important because the Agent polls this bit to
68 measure the success or failure of its task.
69
70 - epilog: Is generally where we set status of the Host/HQE again,
71 requeue any other task that needs to run after this one
72 and perform cleanup. Just like the prolog, this step is
73 different per task.
74
75 - cleanup: Sets the is_active and is_complete and success
76 states on the tasks model. Also uses the
77 drone_manager to:
78 unregister the pidfile
79 copy results of the task
80 (Note this is not to be confused with the
81 special task called cleanup).
82
83 The actions we take in the epilog are based on the
84 success/failure of the autoserv process set in cleanup,
85 eg: if reset failed we will enqueue a repair, but if all
86 is well the epilog will just return. Prejob task epilogs
87 also have an on_pending method that change the status of
88 the HQE to pending/starting, which gets picked up in the
89 scheduler.
90By this point the is_done flag is set, which results in the Agent noticing that
91the task has finished and unregistering it from the dispatcher.Class hierarchy:
92BaseAgentTask
93 |--->SpecialAgentTask (prejob_task.py)
94 |--->RepairTask
95 |--->PreJobTask
96 |--->Verify, Cleanup, Reset, Provision
97
98 |--->AbstractQueueTask (monitor_db.py)
99 |--->QueueTask
100 |--->HostlessQueueTask
101
102 |--->PostJobTask (postjob_task.py)
103 |--->GatherLogsTask
104 |--->SelfThrottledPostJobTask
105 |--->FinalReparseTask
106 |--->ArchiveResultsTask
107
108"""
109
110import logging
111import os
112import urllib
113import time
114
Dan Shi80f7c532015-08-25 10:23:14 -0700115from autotest_lib.client.common_lib import utils
116from autotest_lib.client.common_lib.cros.graphite import autotest_stats
beeps5e2bb4a2013-10-28 11:26:45 -0700117from autotest_lib.frontend.afe import models
118from autotest_lib.scheduler import drone_manager, pidfile_monitor
Dan Shi8d7f3562016-01-11 10:55:46 -0800119from autotest_lib.scheduler import scheduler_lib
beepscc9fc702013-12-02 12:45:38 -0800120from autotest_lib.scheduler import rdb_lib
beeps5e2bb4a2013-10-28 11:26:45 -0700121from autotest_lib.scheduler import scheduler_models
122from autotest_lib.server import autoserv_utils
123
124
beeps5e2bb4a2013-10-28 11:26:45 -0700125AUTOSERV_NICE_LEVEL = 10
126
127
128class BaseAgentTask(object):
129 class _NullMonitor(object):
130 pidfile_id = None
131
132 def has_process(self):
133 return True
134
135
136 def __init__(self, log_file_name=None):
137 """
138 @param log_file_name: (optional) name of file to log command output to
139 """
Jakob Jülich36accc62014-07-23 10:26:55 -0700140 self._drone_manager = drone_manager.instance()
beeps5e2bb4a2013-10-28 11:26:45 -0700141 self.done = False
142 self.started = False
143 self.success = None
144 self.aborted = False
145 self.monitor = None
146 self.queue_entry_ids = []
147 self.host_ids = []
148 self._log_file_name = log_file_name
149
150
151 def _set_ids(self, host=None, queue_entries=None):
152 if queue_entries and queue_entries != [None]:
153 self.host_ids = [entry.host.id for entry in queue_entries]
154 self.queue_entry_ids = [entry.id for entry in queue_entries]
155 else:
156 assert host
157 self.host_ids = [host.id]
158
159
160 def poll(self):
161 if not self.started:
162 self.start()
163 if not self.done:
164 self.tick()
165
166
167 def tick(self):
168 assert self.monitor
169 exit_code = self.monitor.exit_code()
170 if exit_code is None:
171 return
172
173 success = (exit_code == 0)
174 self.finished(success)
175
176
177 def is_done(self):
178 return self.done
179
180
181 def finished(self, success):
182 if self.done:
183 assert self.started
184 return
185 self.started = True
186 self.done = True
187 self.success = success
188 self.epilog()
189
190
191 def prolog(self):
192 """
193 To be overridden.
194 """
195 assert not self.monitor
196 self.register_necessary_pidfiles()
197
198
199 def _log_file(self):
200 if not self._log_file_name:
201 return None
202 return os.path.join(self._working_directory(), self._log_file_name)
203
204
205 def cleanup(self):
206 log_file = self._log_file()
207 if self.monitor and log_file:
208 self.monitor.try_copy_to_results_repository(log_file)
209
210
211 def epilog(self):
212 """
213 To be overridden.
214 """
215 self.cleanup()
216 logging.info("%s finished with success=%s", type(self).__name__,
217 self.success)
218
219
220 def start(self):
221 if not self.started:
222 self.prolog()
223 self.run()
224
225 self.started = True
226
227
228 def abort(self):
229 if self.monitor:
230 self.monitor.kill()
231 self.done = True
232 self.aborted = True
233 self.cleanup()
234
235
236 def _get_consistent_execution_path(self, execution_entries):
237 first_execution_path = execution_entries[0].execution_path()
238 for execution_entry in execution_entries[1:]:
239 assert execution_entry.execution_path() == first_execution_path, (
240 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
241 execution_entry,
242 first_execution_path,
243 execution_entries[0]))
244 return first_execution_path
245
246
247 def _copy_results(self, execution_entries, use_monitor=None):
248 """
249 @param execution_entries: list of objects with execution_path() method
250 """
251 if use_monitor is not None and not use_monitor.has_process():
252 return
253
254 assert len(execution_entries) > 0
255 if use_monitor is None:
256 assert self.monitor
257 use_monitor = self.monitor
258 assert use_monitor.has_process()
259 execution_path = self._get_consistent_execution_path(execution_entries)
260 results_path = execution_path + '/'
261 use_monitor.try_copy_to_results_repository(results_path)
262
263
264 def _parse_results(self, queue_entries):
265 for queue_entry in queue_entries:
266 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
267
268
269 def _archive_results(self, queue_entries):
270 for queue_entry in queue_entries:
271 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
272
273
274 def _command_line(self):
275 """
276 Return the command line to run. Must be overridden.
277 """
278 raise NotImplementedError
279
280
281 @property
282 def num_processes(self):
283 """
284 Return the number of processes forked by this BaseAgentTask's process.
285 It may only be approximate. To be overridden if necessary.
286 """
287 return 1
288
289
290 def _paired_with_monitor(self):
291 """
292 If this BaseAgentTask's process must run on the same machine as some
293 previous process, this method should be overridden to return a
294 PidfileRunMonitor for that process.
295 """
296 return self._NullMonitor()
297
298
299 @property
300 def owner_username(self):
301 """
302 Return login of user responsible for this task. May be None. Must be
303 overridden.
304 """
305 raise NotImplementedError
306
307
308 def _working_directory(self):
309 """
310 Return the directory where this BaseAgentTask's process executes.
311 Must be overridden.
312 """
313 raise NotImplementedError
314
315
316 def _pidfile_name(self):
317 """
318 Return the name of the pidfile this BaseAgentTask's process uses. To be
319 overridden if necessary.
320 """
321 return drone_manager.AUTOSERV_PID_FILE
322
323
324 def _check_paired_results_exist(self):
325 if not self._paired_with_monitor().has_process():
Dan Shi80f7c532015-08-25 10:23:14 -0700326 metadata = {
327 '_type': 'scheduler_error',
328 'error': 'No paired results in task',
329 'task': str(self),
330 'pidfile_id': str(self._paired_with_monitor().pidfile_id)}
331 autotest_stats.Counter('no_paired_results_in_task',
332 metadata=metadata).increment()
beeps5e2bb4a2013-10-28 11:26:45 -0700333 self.finished(False)
334 return False
335 return True
336
337
338 def _create_monitor(self):
339 assert not self.monitor
340 self.monitor = pidfile_monitor.PidfileRunMonitor()
341
342
343 def run(self):
344 if not self._check_paired_results_exist():
345 return
346
347 self._create_monitor()
348 self.monitor.run(
349 self._command_line(), self._working_directory(),
350 num_processes=self.num_processes,
351 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
352 pidfile_name=self._pidfile_name(),
353 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
354 username=self.owner_username,
355 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
356
357
358 def get_drone_hostnames_allowed(self):
359 if not models.DroneSet.drone_sets_enabled():
360 return None
361
362 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
363 if not hqes:
364 # Only special tasks could be missing host queue entries
365 assert isinstance(self, SpecialAgentTask)
366 return self._user_or_global_default_drone_set(
367 self.task, self.task.requested_by)
368
369 job_ids = hqes.values_list('job', flat=True).distinct()
370 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
371 "span multiple jobs")
372
373 job = models.Job.objects.get(id=job_ids[0])
374 drone_set = job.drone_set
375 if not drone_set:
376 return self._user_or_global_default_drone_set(job, job.user())
377
378 return drone_set.get_drone_hostnames()
379
380
381 def _user_or_global_default_drone_set(self, obj_with_owner, user):
382 """
383 Returns the user's default drone set, if present.
384
385 Otherwise, returns the global default drone set.
386 """
387 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
388 if not user:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -0700389 logging.warning('%s had no owner; using default drone set',
beeps5e2bb4a2013-10-28 11:26:45 -0700390 obj_with_owner)
391 return default_hostnames
392 if not user.drone_set:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -0700393 logging.warning('User %s has no default drone set, using global '
beeps5e2bb4a2013-10-28 11:26:45 -0700394 'default', user.login)
395 return default_hostnames
396 return user.drone_set.get_drone_hostnames()
397
398
399 def register_necessary_pidfiles(self):
Jakob Jülich36accc62014-07-23 10:26:55 -0700400 pidfile_id = self._drone_manager.get_pidfile_id_from(
beeps5e2bb4a2013-10-28 11:26:45 -0700401 self._working_directory(), self._pidfile_name())
Jakob Jülich36accc62014-07-23 10:26:55 -0700402 self._drone_manager.register_pidfile(pidfile_id)
beeps5e2bb4a2013-10-28 11:26:45 -0700403
404 paired_pidfile_id = self._paired_with_monitor().pidfile_id
405 if paired_pidfile_id:
Jakob Jülich36accc62014-07-23 10:26:55 -0700406 self._drone_manager.register_pidfile(paired_pidfile_id)
beeps5e2bb4a2013-10-28 11:26:45 -0700407
408
409 def recover(self):
410 if not self._check_paired_results_exist():
411 return
412
413 self._create_monitor()
414 self.monitor.attach_to_existing_process(
415 self._working_directory(), pidfile_name=self._pidfile_name(),
416 num_processes=self.num_processes)
417 if not self.monitor.has_process():
418 # no process to recover; wait to be started normally
419 self.monitor = None
420 return
421
422 self.started = True
423 logging.info('Recovering process %s for %s at %s',
424 self.monitor.get_process(), type(self).__name__,
425 self._working_directory())
426
427
428 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
429 allowed_host_statuses=None):
430 class_name = self.__class__.__name__
431 for entry in queue_entries:
432 if entry.status not in allowed_hqe_statuses:
Dan Shi8d7f3562016-01-11 10:55:46 -0800433 raise scheduler_lib.SchedulerError(
beeps5e2bb4a2013-10-28 11:26:45 -0700434 '%s attempting to start entry with invalid status %s: '
435 '%s' % (class_name, entry.status, entry))
436 invalid_host_status = (
437 allowed_host_statuses is not None
438 and entry.host.status not in allowed_host_statuses)
439 if invalid_host_status:
Dan Shi8d7f3562016-01-11 10:55:46 -0800440 raise scheduler_lib.SchedulerError(
beeps5e2bb4a2013-10-28 11:26:45 -0700441 '%s attempting to start on queue entry with invalid '
442 'host status %s: %s'
443 % (class_name, entry.host.status, entry))
444
445
446SiteAgentTask = utils.import_site_class(
447 __file__, 'autotest_lib.scheduler.site_monitor_db',
448 'SiteAgentTask', BaseAgentTask)
449
450class AgentTask(SiteAgentTask):
451 pass
452
453
454class TaskWithJobKeyvals(object):
455 """AgentTask mixin providing functionality to help with job keyval files."""
456 _KEYVAL_FILE = 'keyval'
457 def _format_keyval(self, key, value):
458 return '%s=%s' % (key, value)
459
460
461 def _keyval_path(self):
462 """Subclasses must override this"""
463 raise NotImplementedError
464
465
466 def _write_keyval_after_job(self, field, value):
467 assert self.monitor
468 if not self.monitor.has_process():
469 return
Jakob Jülich36accc62014-07-23 10:26:55 -0700470 self._drone_manager.write_lines_to_file(
beeps5e2bb4a2013-10-28 11:26:45 -0700471 self._keyval_path(), [self._format_keyval(field, value)],
472 paired_with_process=self.monitor.get_process())
473
474
475 def _job_queued_keyval(self, job):
476 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
477
478
479 def _write_job_finished(self):
480 self._write_keyval_after_job("job_finished", int(time.time()))
481
482
483 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
484 keyval_contents = '\n'.join(self._format_keyval(key, value)
485 for key, value in keyval_dict.iteritems())
486 # always end with a newline to allow additional keyvals to be written
487 keyval_contents += '\n'
Jakob Jülich36accc62014-07-23 10:26:55 -0700488 self._drone_manager.attach_file_to_execution(self._working_directory(),
beeps5e2bb4a2013-10-28 11:26:45 -0700489 keyval_contents,
490 file_path=keyval_path)
491
492
493 def _write_keyvals_before_job(self, keyval_dict):
494 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
495
496
497 def _write_host_keyvals(self, host):
498 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
499 host.hostname)
500 platform, all_labels = host.platform_and_labels()
501 all_labels = [ urllib.quote(label) for label in all_labels ]
502 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
503 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
504
505
506class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
507 """
508 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
509 """
510
511 TASK_TYPE = None
512 host = None
513 queue_entry = None
514
515 def __init__(self, task, extra_command_args):
516 super(SpecialAgentTask, self).__init__()
517
518 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
519
beepscc9fc702013-12-02 12:45:38 -0800520 self.host = rdb_lib.get_hosts([task.host.id])[0]
521 self.host.dbg_str = 'Task: %s' % str(task)
beeps5e2bb4a2013-10-28 11:26:45 -0700522 self.queue_entry = None
523 if task.queue_entry:
524 self.queue_entry = scheduler_models.HostQueueEntry(
525 id=task.queue_entry.id)
beepscc9fc702013-12-02 12:45:38 -0800526 self.host.dbg_str += self.queue_entry.get_dbg_str()
beeps5e2bb4a2013-10-28 11:26:45 -0700527
528 self.task = task
529 self._extra_command_args = extra_command_args
Dan Shi7cf3d842014-08-13 11:20:38 -0700530 self.host.metadata = self.get_metadata()
531
532
533 def get_metadata(self):
534 """Get a dictionary that contains task information.
535
536 The return value is a dictionary that includes task information like id,
537 name and related job information. The value will be stored in metadata
538 database.
539 @return: A dictionary containing the task id, name and related job id.
540 If some attributes are failed to be accessed, an empty
541 dictionary will be returned, and error will be logged.
542 """
543 try:
544 metadata = {'task_id':self.task.id, 'task_name':self.task.task,
545 'hostname':self.task.host.hostname}
546 if self.task.queue_entry:
547 job = self.task.queue_entry.job
548 metadata.update(
549 scheduler_models.get_job_metadata(job))
550 return metadata
551 except AttributeError as e:
552 logging.error('Task has missing attribute: %s', e)
553 return {}
beeps5e2bb4a2013-10-28 11:26:45 -0700554
555
556 def _keyval_path(self):
557 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
558
559
560 def _command_line(self):
561 return autoserv_utils._autoserv_command_line(self.host.hostname,
562 self._extra_command_args,
Simran Basi8e6affb2015-12-16 11:54:11 -0800563 queue_entry=self.queue_entry,
564 in_lab=True)
beeps5e2bb4a2013-10-28 11:26:45 -0700565
566
567 def _working_directory(self):
568 return self.task.execution_path()
569
570
571 @property
572 def owner_username(self):
573 if self.task.requested_by:
574 return self.task.requested_by.login
575 return None
576
577
578 def prolog(self):
579 super(SpecialAgentTask, self).prolog()
580 self.task.activate()
581 self._write_host_keyvals(self.host)
582
583
584 def _fail_queue_entry(self):
585 assert self.queue_entry
586
587 if self.queue_entry.meta_host:
588 return # don't fail metahost entries, they'll be reassigned
589
590 self.queue_entry.update_from_database()
591 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
592 return # entry has been aborted
593
594 self._actually_fail_queue_entry()
595
596
597 # TODO(milleral): http://crbug.com/268607
598 # All this used to be a part of _fail_queue_entry. The
599 # exact semantics of when one should and should not be failing a queue
600 # entry need to be worked out, because provisioning has placed us in a
601 # case where we want to fail a queue entry that could be requeued,
602 # which makes us fail the two above if statements, and thus
603 # _fail_queue_entry() would exit early and have no effect.
604 # What's left here with _actually_fail_queue_entry is a hack to be able to
605 # bypass the checks and unconditionally execute the code.
606 def _actually_fail_queue_entry(self):
607 self.queue_entry.set_execution_subdir()
608 queued_key, queued_time = self._job_queued_keyval(
609 self.queue_entry.job)
610 self._write_keyval_after_job(queued_key, queued_time)
611 self._write_job_finished()
612
613 # copy results logs into the normal place for job results
614 self.monitor.try_copy_results_on_drone(
615 source_path=self._working_directory() + '/',
616 destination_path=self.queue_entry.execution_path() + '/')
617
Jakob Jülich36accc62014-07-23 10:26:55 -0700618 pidfile_id = self._drone_manager.get_pidfile_id_from(
beeps5e2bb4a2013-10-28 11:26:45 -0700619 self.queue_entry.execution_path(),
620 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
Jakob Jülich36accc62014-07-23 10:26:55 -0700621 self._drone_manager.register_pidfile(pidfile_id)
beeps5e2bb4a2013-10-28 11:26:45 -0700622
623 if self.queue_entry.job.parse_failed_repair:
624 self._parse_results([self.queue_entry])
625 else:
626 self._archive_results([self.queue_entry])
627
628 # Also fail all other special tasks that have not yet run for this HQE
629 pending_tasks = models.SpecialTask.objects.filter(
630 queue_entry__id=self.queue_entry.id,
631 is_complete=0)
632 for task in pending_tasks:
633 task.finish(False)
634
635
636 def cleanup(self):
637 super(SpecialAgentTask, self).cleanup()
638
639 # We will consider an aborted task to be "Failed"
640 self.task.finish(bool(self.success))
641
642 if self.monitor:
643 if self.monitor.has_process():
644 self._copy_results([self.task])
645 if self.monitor.pidfile_id is not None:
Jakob Jülich36accc62014-07-23 10:26:55 -0700646 self._drone_manager.unregister_pidfile(self.monitor.pidfile_id)
beeps5e2bb4a2013-10-28 11:26:45 -0700647
648
649 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
650 """Remove a type of special task in all tasks, keep last one if needed.
651
652 @param special_task_to_remove: type of special task to be removed, e.g.,
653 models.SpecialTask.Task.VERIFY.
654 @param keep_last_one: True to keep the last special task if its type is
655 the same as of special_task_to_remove.
656
657 """
658 queued_special_tasks = models.SpecialTask.objects.filter(
659 host__id=self.host.id,
660 task=special_task_to_remove,
661 is_active=False, is_complete=False, queue_entry=None)
662 if keep_last_one:
663 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
664 queued_special_tasks.delete()
Alex Millerec212252014-02-28 16:48:34 -0800665
666
667 def _generate_autoserv_label_args(self, task):
668 """
669 @param task: An instance of afe model's SpecialTask.
670 @returns: The list of arguments to pass to autoserv to tell it what the
671 labels of a job are.
672
673 """
674 labels = {x.name for x in task.queue_entry.job.labels}
675 return ['--job-labels', ','.join(labels)]