blob: 99cc462214dca5d190b0807a3ac5b327523dea3a [file] [log] [blame]
Aviv Keshet0c93b862017-07-13 11:58:03 -07001# pylint: disable=missing-docstring
beeps5e2bb4a2013-10-28 11:26:45 -07002
3""" This is the module for everything related to the AgentTask.
4
Allen Lie8adf592017-02-06 17:28:28 -08005The AgentTask imposes an interface through which the scheduler can monitor
beeps5e2bb4a2013-10-28 11:26:45 -07006a processes; Examples of such processes include Verify, Cleanup and the Queue
7Tasks that run the tests. The scheduler itself only understands Agents.
8Agents:
9 The Agent is the bridge between the scheduler and the AgentTask. The
10 schedulers tick has a method called handle_agents, which calls the
11 tick of each agent in the Dispatchers queue. This leads to the Agent
12 polling its AgentTask. The scheduler will keep polling a task through
13 the associated Agent till the Agent is removed from the dispatcher.
14
15 At a high level:
16 agents finished = tasks done
17 agent polls till finished
18 task polls till done
19 task sets done
20 agent is removed from dispatcher
21AgentTasks:
22 Basic AgentTasks are created when an hqe changes state. Examples of these
23 are the QueueTask, which is created when a hqe goes into the Starting state
24 and the FinalReparseTask, which is created when the hqe goes into parsing.
25SpecialAgentTasks:
MK Ryu35d661e2014-09-25 17:44:10 -070026 Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted
beeps5e2bb4a2013-10-28 11:26:45 -070027 in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks.
28
29Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps
30an AgentTask to an Agent, which the scheduler understands. From this point
31onward, the scheduler manages the task through the Agents interface,as follows:
32At a high level:
33 task poll
34 start
35 prolog
36 tick till we get an exit code
37 finished(exit==0)
38 done=True
39 epilog
40 cleanup
41 set is_active, is_complete, success (checked in scheduler)
42
43The first special task for an HQE is usually Reset.
44-poll: The first poll will start the task, polls thereafter will call the tasks
45 tick method. A started task will have the started bit set.
46- start: Call prolog, run the process and set the start bit.
47 - prolog: Usually where one puts any model state changes that happen before
48 the actual task. Different per Task. Examples of things that might
49 happen in a prolog:
50 - state of Host, HQE (to something like Resetting)
51 - delete any unwanted queued special tasks
52 - register a pidfile
53 - set the is_active bit on the special task
54 - run:
55 - create a PidfileRunMonitor
56 - pass the autoserv command, working directory etc to drone manager.
57 This will start the actual autoserv process.
58 - set the start bit: so subsequent polls do not 'start' again
59
60- tick: For as long as a started tasks done bit is not set, a poll will lead
61 to a tick. The tick monitors the pid file of the autoserv process
62 running on the drone through the PidfileRunMonitor created in prolog.
63 If the autoserv process has finished we call finished with true/false
64 depending on autoserv exit code.
65
66 - finished: sets the done and success values, then calls epilog. The
67 done bit is important because the Agent polls this bit to
68 measure the success or failure of its task.
69
70 - epilog: Is generally where we set status of the Host/HQE again,
71 requeue any other task that needs to run after this one
72 and perform cleanup. Just like the prolog, this step is
73 different per task.
74
75 - cleanup: Sets the is_active and is_complete and success
76 states on the tasks model. Also uses the
77 drone_manager to:
78 unregister the pidfile
79 copy results of the task
80 (Note this is not to be confused with the
81 special task called cleanup).
82
83 The actions we take in the epilog are based on the
84 success/failure of the autoserv process set in cleanup,
85 eg: if reset failed we will enqueue a repair, but if all
86 is well the epilog will just return. Prejob task epilogs
87 also have an on_pending method that change the status of
88 the HQE to pending/starting, which gets picked up in the
89 scheduler.
90By this point the is_done flag is set, which results in the Agent noticing that
91the task has finished and unregistering it from the dispatcher.Class hierarchy:
Allen Lie8adf592017-02-06 17:28:28 -080092AgentTask
beeps5e2bb4a2013-10-28 11:26:45 -070093 |--->SpecialAgentTask (prejob_task.py)
94 |--->RepairTask
95 |--->PreJobTask
96 |--->Verify, Cleanup, Reset, Provision
97
98 |--->AbstractQueueTask (monitor_db.py)
99 |--->QueueTask
100 |--->HostlessQueueTask
101
102 |--->PostJobTask (postjob_task.py)
103 |--->GatherLogsTask
104 |--->SelfThrottledPostJobTask
105 |--->FinalReparseTask
beeps5e2bb4a2013-10-28 11:26:45 -0700106
107"""
108
109import logging
110import os
beeps5e2bb4a2013-10-28 11:26:45 -0700111import time
Allen Lie8adf592017-02-06 17:28:28 -0800112import urllib
113
114import common
beeps5e2bb4a2013-10-28 11:26:45 -0700115
Dan Shie6803232016-01-19 17:30:20 -0800116from autotest_lib.client.common_lib import global_config
Dan Shi80f7c532015-08-25 10:23:14 -0700117from autotest_lib.client.common_lib import utils
beeps5e2bb4a2013-10-28 11:26:45 -0700118from autotest_lib.frontend.afe import models
Allen Lie8adf592017-02-06 17:28:28 -0800119from autotest_lib.scheduler import drone_manager
120from autotest_lib.scheduler import email_manager
121from autotest_lib.scheduler import pidfile_monitor
beepscc9fc702013-12-02 12:45:38 -0800122from autotest_lib.scheduler import rdb_lib
Allen Lie8adf592017-02-06 17:28:28 -0800123from autotest_lib.scheduler import scheduler_lib
beeps5e2bb4a2013-10-28 11:26:45 -0700124from autotest_lib.scheduler import scheduler_models
125from autotest_lib.server import autoserv_utils
Dan Shi114e1722016-01-10 18:12:53 -0800126from autotest_lib.server import system_utils
beeps5e2bb4a2013-10-28 11:26:45 -0700127
Dan Shi5e2efb72017-02-07 11:40:23 -0800128try:
129 from chromite.lib import metrics
130except ImportError:
131 metrics = utils.metrics_mock
132
133
Dan Shie6803232016-01-19 17:30:20 -0800134CONFIG = global_config.global_config
beeps5e2bb4a2013-10-28 11:26:45 -0700135AUTOSERV_NICE_LEVEL = 10
136
Dan Shie6803232016-01-19 17:30:20 -0800137ENABLE_DRONE_IN_RESTRICTED_SUBNET = CONFIG.get_config_value(
138 'CROS', 'enable_drone_in_restricted_subnet', type=bool,
139 default=False)
140
beeps5e2bb4a2013-10-28 11:26:45 -0700141
Allen Lie8adf592017-02-06 17:28:28 -0800142class AgentTask(object):
beeps5e2bb4a2013-10-28 11:26:45 -0700143 class _NullMonitor(object):
144 pidfile_id = None
145
146 def has_process(self):
147 return True
148
149
150 def __init__(self, log_file_name=None):
151 """
152 @param log_file_name: (optional) name of file to log command output to
153 """
Jakob Jülich36accc62014-07-23 10:26:55 -0700154 self._drone_manager = drone_manager.instance()
beeps5e2bb4a2013-10-28 11:26:45 -0700155 self.done = False
156 self.started = False
157 self.success = None
158 self.aborted = False
159 self.monitor = None
160 self.queue_entry_ids = []
161 self.host_ids = []
Dan Shi114e1722016-01-10 18:12:53 -0800162 # A map between host id and hostname.
163 self.hostnames = {}
beeps5e2bb4a2013-10-28 11:26:45 -0700164 self._log_file_name = log_file_name
165
166
167 def _set_ids(self, host=None, queue_entries=None):
168 if queue_entries and queue_entries != [None]:
xixuan2d568882017-04-24 14:21:41 -0700169 self.host_ids = []
170 self.queue_entry_ids = []
171 self.hostnames = {}
172 for entry in queue_entries:
173 if entry.host is not None:
174 self.host_ids.append(entry.host.id)
175 self.queue_entry_ids.append(entry.id)
176 self.hostnames[entry.host.id] = entry.host.hostname
177 else:
178 logging.debug(
179 'No host is found for host_queue_entry_id: %r',
180 entry.id)
Aviv Keshet0c93b862017-07-13 11:58:03 -0700181 raise scheduler_lib.NoHostIdError(
xixuan2d568882017-04-24 14:21:41 -0700182 'Failed to schedule a job whose '
Aviv Keshet0c93b862017-07-13 11:58:03 -0700183 'host_queue_entry_id=%r due to no host_id.'
184 % entry.id)
beeps5e2bb4a2013-10-28 11:26:45 -0700185 else:
186 assert host
187 self.host_ids = [host.id]
Dan Shi114e1722016-01-10 18:12:53 -0800188 self.hostnames = {host.id: host.hostname}
beeps5e2bb4a2013-10-28 11:26:45 -0700189
190
191 def poll(self):
192 if not self.started:
193 self.start()
194 if not self.done:
195 self.tick()
196
197
198 def tick(self):
199 assert self.monitor
200 exit_code = self.monitor.exit_code()
201 if exit_code is None:
202 return
203
204 success = (exit_code == 0)
205 self.finished(success)
206
207
208 def is_done(self):
209 return self.done
210
211
212 def finished(self, success):
213 if self.done:
214 assert self.started
215 return
216 self.started = True
217 self.done = True
218 self.success = success
219 self.epilog()
220
221
222 def prolog(self):
223 """
224 To be overridden.
225 """
226 assert not self.monitor
227 self.register_necessary_pidfiles()
228
229
230 def _log_file(self):
231 if not self._log_file_name:
232 return None
233 return os.path.join(self._working_directory(), self._log_file_name)
234
235
236 def cleanup(self):
237 log_file = self._log_file()
238 if self.monitor and log_file:
239 self.monitor.try_copy_to_results_repository(log_file)
240
241
242 def epilog(self):
243 """
244 To be overridden.
245 """
246 self.cleanup()
247 logging.info("%s finished with success=%s", type(self).__name__,
248 self.success)
249
250
251 def start(self):
252 if not self.started:
253 self.prolog()
254 self.run()
255
256 self.started = True
257
258
259 def abort(self):
260 if self.monitor:
261 self.monitor.kill()
262 self.done = True
263 self.aborted = True
264 self.cleanup()
265
266
267 def _get_consistent_execution_path(self, execution_entries):
268 first_execution_path = execution_entries[0].execution_path()
269 for execution_entry in execution_entries[1:]:
270 assert execution_entry.execution_path() == first_execution_path, (
271 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
272 execution_entry,
273 first_execution_path,
274 execution_entries[0]))
275 return first_execution_path
276
277
278 def _copy_results(self, execution_entries, use_monitor=None):
279 """
280 @param execution_entries: list of objects with execution_path() method
281 """
282 if use_monitor is not None and not use_monitor.has_process():
283 return
284
285 assert len(execution_entries) > 0
286 if use_monitor is None:
287 assert self.monitor
288 use_monitor = self.monitor
289 assert use_monitor.has_process()
290 execution_path = self._get_consistent_execution_path(execution_entries)
291 results_path = execution_path + '/'
292 use_monitor.try_copy_to_results_repository(results_path)
293
294
295 def _parse_results(self, queue_entries):
296 for queue_entry in queue_entries:
297 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
298
299
beeps5e2bb4a2013-10-28 11:26:45 -0700300 def _command_line(self):
301 """
302 Return the command line to run. Must be overridden.
303 """
304 raise NotImplementedError
305
306
307 @property
308 def num_processes(self):
309 """
Allen Lie8adf592017-02-06 17:28:28 -0800310 Return the number of processes forked by this AgentTask's process.
beeps5e2bb4a2013-10-28 11:26:45 -0700311 It may only be approximate. To be overridden if necessary.
312 """
313 return 1
314
315
316 def _paired_with_monitor(self):
317 """
Allen Lie8adf592017-02-06 17:28:28 -0800318 If this AgentTask's process must run on the same machine as some
beeps5e2bb4a2013-10-28 11:26:45 -0700319 previous process, this method should be overridden to return a
320 PidfileRunMonitor for that process.
321 """
322 return self._NullMonitor()
323
324
325 @property
326 def owner_username(self):
327 """
328 Return login of user responsible for this task. May be None. Must be
329 overridden.
330 """
331 raise NotImplementedError
332
333
334 def _working_directory(self):
335 """
Allen Lie8adf592017-02-06 17:28:28 -0800336 Return the directory where this AgentTask's process executes.
beeps5e2bb4a2013-10-28 11:26:45 -0700337 Must be overridden.
338 """
339 raise NotImplementedError
340
341
342 def _pidfile_name(self):
343 """
Allen Lie8adf592017-02-06 17:28:28 -0800344 Return the name of the pidfile this AgentTask's process uses. To be
beeps5e2bb4a2013-10-28 11:26:45 -0700345 overridden if necessary.
346 """
347 return drone_manager.AUTOSERV_PID_FILE
348
349
350 def _check_paired_results_exist(self):
351 if not self._paired_with_monitor().has_process():
Aviv Keshet14cac442016-11-20 21:44:11 -0800352 metrics.Counter(
353 'chromeos/autotest/errors/scheduler/no_paired_results'
354 ).increment()
beeps5e2bb4a2013-10-28 11:26:45 -0700355 self.finished(False)
356 return False
357 return True
358
359
360 def _create_monitor(self):
361 assert not self.monitor
362 self.monitor = pidfile_monitor.PidfileRunMonitor()
363
364
365 def run(self):
366 if not self._check_paired_results_exist():
367 return
368
369 self._create_monitor()
370 self.monitor.run(
371 self._command_line(), self._working_directory(),
372 num_processes=self.num_processes,
373 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
374 pidfile_name=self._pidfile_name(),
375 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
376 username=self.owner_username,
377 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
378
379
Dan Shi114e1722016-01-10 18:12:53 -0800380 def get_drone_hostnames_allowed(
Dan Shie6803232016-01-19 17:30:20 -0800381 self, restricted_subnets=utils.RESTRICTED_SUBNETS,
382 enable_drone_in_subnet=ENABLE_DRONE_IN_RESTRICTED_SUBNET):
Dan Shi114e1722016-01-10 18:12:53 -0800383 filtered_drones = None
384 has_unrestricted_host = False
Dan Shie6803232016-01-19 17:30:20 -0800385 if (self.hostnames and restricted_subnets and enable_drone_in_subnet):
Dan Shi114e1722016-01-10 18:12:53 -0800386 for hostname in self.hostnames.values():
387 subnet = utils.get_restricted_subnet(hostname,
388 restricted_subnets)
389
390 # Return an empty set if the list of hosts exists both in
391 # restricted and unrestricted subnet. No drone can work in such
392 # case.
393 if ((not subnet and filtered_drones is not None) or
394 (subnet and has_unrestricted_host)):
395 logging.error('The test has some DUT in restricted subnet, '
396 'but some in unrestricted subnet. Therefore, '
397 'no drone is available to run the test.')
398 return set()
399
400 if not subnet:
401 has_unrestricted_host = True
402 continue
403
404 server_ip_map=system_utils.DroneCache.get_drone_ip_map()
405 filtered_drones_for_host = set(
406 utils.get_servers_in_same_subnet(
407 subnet[0], subnet[1],
408 server_ip_map=server_ip_map))
409 logging.info('DUT %s is in restricted subnet, drone can only '
410 'be chosen from %s', hostname,
411 filtered_drones_for_host)
412 if filtered_drones is None:
413 filtered_drones = filtered_drones_for_host
414 else:
415 filtered_drones = set.intersection(
416 filtered_drones, filtered_drones_for_host)
417
418 # If filtered_drones is an empty set, that means no drone is
419 # allowed to run the task. This is different fron None, which
420 # means all drones are allowed.
421 if filtered_drones == set():
422 logging.error('DUT(s) is in restricted subnet, but no '
423 'drone is available to run the test.')
424 return filtered_drones
425
426 # If host is not in restricted subnet, use the unrestricted drones only.
Dan Shie6803232016-01-19 17:30:20 -0800427 if (filtered_drones is None and restricted_subnets and
428 enable_drone_in_subnet):
Dan Shi114e1722016-01-10 18:12:53 -0800429 filtered_drones = set(
430 system_utils.DroneCache.get_unrestricted_drones(
431 restricted_subnets=restricted_subnets))
432
beeps5e2bb4a2013-10-28 11:26:45 -0700433 if not models.DroneSet.drone_sets_enabled():
Dan Shi114e1722016-01-10 18:12:53 -0800434 return filtered_drones
beeps5e2bb4a2013-10-28 11:26:45 -0700435
436 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
437 if not hqes:
438 # Only special tasks could be missing host queue entries
439 assert isinstance(self, SpecialAgentTask)
440 return self._user_or_global_default_drone_set(
441 self.task, self.task.requested_by)
442
443 job_ids = hqes.values_list('job', flat=True).distinct()
Allen Lie8adf592017-02-06 17:28:28 -0800444 assert job_ids.count() == 1, ("AgentTask's queue entries "
beeps5e2bb4a2013-10-28 11:26:45 -0700445 "span multiple jobs")
446
447 job = models.Job.objects.get(id=job_ids[0])
448 drone_set = job.drone_set
449 if not drone_set:
450 return self._user_or_global_default_drone_set(job, job.user())
451
Dan Shi114e1722016-01-10 18:12:53 -0800452 if filtered_drones:
453 return set.intersection(filtered_drones,
454 drone_set.get_drone_hostnames())
455 else:
456 return drone_set.get_drone_hostnames()
beeps5e2bb4a2013-10-28 11:26:45 -0700457
458
459 def _user_or_global_default_drone_set(self, obj_with_owner, user):
460 """
461 Returns the user's default drone set, if present.
462
463 Otherwise, returns the global default drone set.
464 """
465 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
466 if not user:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -0700467 logging.warning('%s had no owner; using default drone set',
beeps5e2bb4a2013-10-28 11:26:45 -0700468 obj_with_owner)
469 return default_hostnames
470 if not user.drone_set:
Ilja H. Friedel04be2bd2014-05-07 21:29:59 -0700471 logging.warning('User %s has no default drone set, using global '
beeps5e2bb4a2013-10-28 11:26:45 -0700472 'default', user.login)
473 return default_hostnames
474 return user.drone_set.get_drone_hostnames()
475
476
477 def register_necessary_pidfiles(self):
Jakob Jülich36accc62014-07-23 10:26:55 -0700478 pidfile_id = self._drone_manager.get_pidfile_id_from(
beeps5e2bb4a2013-10-28 11:26:45 -0700479 self._working_directory(), self._pidfile_name())
Jakob Jülich36accc62014-07-23 10:26:55 -0700480 self._drone_manager.register_pidfile(pidfile_id)
beeps5e2bb4a2013-10-28 11:26:45 -0700481
482 paired_pidfile_id = self._paired_with_monitor().pidfile_id
483 if paired_pidfile_id:
Jakob Jülich36accc62014-07-23 10:26:55 -0700484 self._drone_manager.register_pidfile(paired_pidfile_id)
beeps5e2bb4a2013-10-28 11:26:45 -0700485
486
487 def recover(self):
488 if not self._check_paired_results_exist():
489 return
490
491 self._create_monitor()
492 self.monitor.attach_to_existing_process(
493 self._working_directory(), pidfile_name=self._pidfile_name(),
494 num_processes=self.num_processes)
495 if not self.monitor.has_process():
496 # no process to recover; wait to be started normally
497 self.monitor = None
498 return
499
500 self.started = True
501 logging.info('Recovering process %s for %s at %s',
502 self.monitor.get_process(), type(self).__name__,
503 self._working_directory())
504
505
506 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
507 allowed_host_statuses=None):
508 class_name = self.__class__.__name__
509 for entry in queue_entries:
510 if entry.status not in allowed_hqe_statuses:
Allen Lie8adf592017-02-06 17:28:28 -0800511 # In the orignal code, here we raise an exception. In an
512 # effort to prevent downtime we will instead abort the job and
513 # send out an email notifying us this has occured.
514 error_message = ('%s attempting to start entry with invalid '
515 'status %s: %s. Aborting Job: %s.'
516 % (class_name, entry.status, entry,
517 entry.job))
518 logging.error(error_message)
519 email_manager.manager.enqueue_notify_email(
520 'Job Aborted - Invalid Host Queue Entry Status',
521 error_message)
522 entry.job.request_abort()
beeps5e2bb4a2013-10-28 11:26:45 -0700523 invalid_host_status = (
524 allowed_host_statuses is not None
525 and entry.host.status not in allowed_host_statuses)
526 if invalid_host_status:
Allen Lie8adf592017-02-06 17:28:28 -0800527 # In the orignal code, here we raise an exception. In an
528 # effort to prevent downtime we will instead abort the job and
529 # send out an email notifying us this has occured.
530 error_message = ('%s attempting to start on queue entry with '
531 'invalid host status %s: %s. Aborting Job: %s'
532 % (class_name, entry.host.status, entry,
533 entry.job))
534 logging.error(error_message)
535 email_manager.manager.enqueue_notify_email(
536 'Job Aborted - Invalid Host Status', error_message)
537 entry.job.request_abort()
beeps5e2bb4a2013-10-28 11:26:45 -0700538
539
540class TaskWithJobKeyvals(object):
541 """AgentTask mixin providing functionality to help with job keyval files."""
542 _KEYVAL_FILE = 'keyval'
543 def _format_keyval(self, key, value):
544 return '%s=%s' % (key, value)
545
546
547 def _keyval_path(self):
548 """Subclasses must override this"""
549 raise NotImplementedError
550
551
552 def _write_keyval_after_job(self, field, value):
553 assert self.monitor
554 if not self.monitor.has_process():
555 return
Jakob Jülich36accc62014-07-23 10:26:55 -0700556 self._drone_manager.write_lines_to_file(
beeps5e2bb4a2013-10-28 11:26:45 -0700557 self._keyval_path(), [self._format_keyval(field, value)],
558 paired_with_process=self.monitor.get_process())
559
560
561 def _job_queued_keyval(self, job):
562 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
563
564
565 def _write_job_finished(self):
566 self._write_keyval_after_job("job_finished", int(time.time()))
567
568
569 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
570 keyval_contents = '\n'.join(self._format_keyval(key, value)
571 for key, value in keyval_dict.iteritems())
572 # always end with a newline to allow additional keyvals to be written
573 keyval_contents += '\n'
Jakob Jülich36accc62014-07-23 10:26:55 -0700574 self._drone_manager.attach_file_to_execution(self._working_directory(),
beeps5e2bb4a2013-10-28 11:26:45 -0700575 keyval_contents,
576 file_path=keyval_path)
577
578
579 def _write_keyvals_before_job(self, keyval_dict):
580 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
581
582
583 def _write_host_keyvals(self, host):
584 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
585 host.hostname)
586 platform, all_labels = host.platform_and_labels()
587 all_labels = [ urllib.quote(label) for label in all_labels ]
588 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
589 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
590
591
592class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
593 """
594 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
595 """
596
597 TASK_TYPE = None
598 host = None
599 queue_entry = None
Aviv Keshete6a6e3d2016-11-17 12:37:38 -0800600 _COUNT_METRIC = 'chromeos/autotest/scheduler/special_task_count'
601 _DUT_METRIC = 'chromeos/autotest/scheduler/special_task_by_dut'
602 _DURATION_METRIC = 'chromeos/autotest/scheduler/special_task_durations'
Aviv Keshetf6b8fc72016-07-12 16:16:54 -0700603
beeps5e2bb4a2013-10-28 11:26:45 -0700604
605 def __init__(self, task, extra_command_args):
606 super(SpecialAgentTask, self).__init__()
607
608 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
609
beepscc9fc702013-12-02 12:45:38 -0800610 self.host = rdb_lib.get_hosts([task.host.id])[0]
611 self.host.dbg_str = 'Task: %s' % str(task)
beeps5e2bb4a2013-10-28 11:26:45 -0700612 self.queue_entry = None
613 if task.queue_entry:
614 self.queue_entry = scheduler_models.HostQueueEntry(
615 id=task.queue_entry.id)
beepscc9fc702013-12-02 12:45:38 -0800616 self.host.dbg_str += self.queue_entry.get_dbg_str()
beeps5e2bb4a2013-10-28 11:26:45 -0700617
Aviv Keshetf6b8fc72016-07-12 16:16:54 -0700618 # This is of type SpecialTask (as defined in frontend/afe/models.py)
beeps5e2bb4a2013-10-28 11:26:45 -0700619 self.task = task
620 self._extra_command_args = extra_command_args
Dan Shi7cf3d842014-08-13 11:20:38 -0700621 self.host.metadata = self.get_metadata()
Allen Li02d7e742016-10-14 15:30:36 -0700622 self._milestone = ''
Dan Shi7cf3d842014-08-13 11:20:38 -0700623
624
625 def get_metadata(self):
626 """Get a dictionary that contains task information.
627
628 The return value is a dictionary that includes task information like id,
629 name and related job information. The value will be stored in metadata
630 database.
631 @return: A dictionary containing the task id, name and related job id.
632 If some attributes are failed to be accessed, an empty
633 dictionary will be returned, and error will be logged.
634 """
635 try:
636 metadata = {'task_id':self.task.id, 'task_name':self.task.task,
637 'hostname':self.task.host.hostname}
638 if self.task.queue_entry:
639 job = self.task.queue_entry.job
640 metadata.update(
641 scheduler_models.get_job_metadata(job))
642 return metadata
643 except AttributeError as e:
644 logging.error('Task has missing attribute: %s', e)
645 return {}
beeps5e2bb4a2013-10-28 11:26:45 -0700646
647
648 def _keyval_path(self):
649 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
650
651
652 def _command_line(self):
653 return autoserv_utils._autoserv_command_line(self.host.hostname,
654 self._extra_command_args,
Simran Basi8e6affb2015-12-16 11:54:11 -0800655 queue_entry=self.queue_entry,
656 in_lab=True)
beeps5e2bb4a2013-10-28 11:26:45 -0700657
658
659 def _working_directory(self):
660 return self.task.execution_path()
661
662
663 @property
664 def owner_username(self):
665 if self.task.requested_by:
666 return self.task.requested_by.login
667 return None
668
669
670 def prolog(self):
671 super(SpecialAgentTask, self).prolog()
672 self.task.activate()
673 self._write_host_keyvals(self.host)
674
675
676 def _fail_queue_entry(self):
677 assert self.queue_entry
678
679 if self.queue_entry.meta_host:
680 return # don't fail metahost entries, they'll be reassigned
681
682 self.queue_entry.update_from_database()
683 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
684 return # entry has been aborted
685
686 self._actually_fail_queue_entry()
687
688
Paul Hobbse7a59b12016-06-23 16:59:19 -0700689 def epilog(self):
690 super(SpecialAgentTask, self).epilog()
691 self._emit_special_task_status_metric()
692
693
694 def _emit_special_task_status_metric(self):
695 """Increments an accumulator associated with this special task."""
Aviv Keshetf6b8fc72016-07-12 16:16:54 -0700696 fields = {'type': self.TASK_TYPE,
697 'success': bool(self.success),
Allen Li02d7e742016-10-14 15:30:36 -0700698 'board': str(self.host.board),
699 'milestone': self._milestone}
Aviv Keshete6a6e3d2016-11-17 12:37:38 -0800700 metrics.Counter(self._COUNT_METRIC).increment(
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700701 fields=fields)
Aviv Keshete6a6e3d2016-11-17 12:37:38 -0800702
Aviv Keshet3082faa2016-07-13 12:49:41 -0700703 if (self.task.time_finished and self.task.time_started):
704 duration = (self.task.time_finished -
705 self.task.time_started).total_seconds()
Aviv Keshete6a6e3d2016-11-17 12:37:38 -0800706 metrics.SecondsDistribution(self._DURATION_METRIC).add(
Paul Hobbseedcb8b2016-10-05 16:44:27 -0700707 duration, fields=fields)
Paul Hobbse7a59b12016-06-23 16:59:19 -0700708
Aviv Keshete6a6e3d2016-11-17 12:37:38 -0800709 dut_fields = {
710 'type': self.TASK_TYPE,
711 'success': bool(self.success),
712 'board': str(self.host.board),
713 'dut_host_name': self.host.hostname
714 }
715 metrics.Counter(self._DUT_METRIC).increment(fields=dut_fields)
Paul Hobbse7a59b12016-06-23 16:59:19 -0700716
beeps5e2bb4a2013-10-28 11:26:45 -0700717 # TODO(milleral): http://crbug.com/268607
718 # All this used to be a part of _fail_queue_entry. The
719 # exact semantics of when one should and should not be failing a queue
720 # entry need to be worked out, because provisioning has placed us in a
721 # case where we want to fail a queue entry that could be requeued,
722 # which makes us fail the two above if statements, and thus
723 # _fail_queue_entry() would exit early and have no effect.
724 # What's left here with _actually_fail_queue_entry is a hack to be able to
725 # bypass the checks and unconditionally execute the code.
726 def _actually_fail_queue_entry(self):
727 self.queue_entry.set_execution_subdir()
728 queued_key, queued_time = self._job_queued_keyval(
729 self.queue_entry.job)
730 self._write_keyval_after_job(queued_key, queued_time)
731 self._write_job_finished()
732
733 # copy results logs into the normal place for job results
734 self.monitor.try_copy_results_on_drone(
735 source_path=self._working_directory() + '/',
736 destination_path=self.queue_entry.execution_path() + '/')
737
Jakob Jülich36accc62014-07-23 10:26:55 -0700738 pidfile_id = self._drone_manager.get_pidfile_id_from(
beeps5e2bb4a2013-10-28 11:26:45 -0700739 self.queue_entry.execution_path(),
740 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
Jakob Jülich36accc62014-07-23 10:26:55 -0700741 self._drone_manager.register_pidfile(pidfile_id)
beeps5e2bb4a2013-10-28 11:26:45 -0700742
Allen Li372400c2017-05-16 18:51:20 -0700743 # TODO(ayatane): This should obey self.queue_entry.job.parse_failed_repair
744 # But nothing sets self.queue_entry.job.parse_failed_repair?
745 # Check Git blame
746 self._parse_results([self.queue_entry])
beeps5e2bb4a2013-10-28 11:26:45 -0700747
748 # Also fail all other special tasks that have not yet run for this HQE
749 pending_tasks = models.SpecialTask.objects.filter(
750 queue_entry__id=self.queue_entry.id,
751 is_complete=0)
752 for task in pending_tasks:
753 task.finish(False)
754
755
756 def cleanup(self):
757 super(SpecialAgentTask, self).cleanup()
758
759 # We will consider an aborted task to be "Failed"
760 self.task.finish(bool(self.success))
761
762 if self.monitor:
763 if self.monitor.has_process():
764 self._copy_results([self.task])
765 if self.monitor.pidfile_id is not None:
Jakob Jülich36accc62014-07-23 10:26:55 -0700766 self._drone_manager.unregister_pidfile(self.monitor.pidfile_id)
beeps5e2bb4a2013-10-28 11:26:45 -0700767
768
769 def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
770 """Remove a type of special task in all tasks, keep last one if needed.
771
772 @param special_task_to_remove: type of special task to be removed, e.g.,
773 models.SpecialTask.Task.VERIFY.
774 @param keep_last_one: True to keep the last special task if its type is
775 the same as of special_task_to_remove.
776
777 """
778 queued_special_tasks = models.SpecialTask.objects.filter(
779 host__id=self.host.id,
780 task=special_task_to_remove,
781 is_active=False, is_complete=False, queue_entry=None)
782 if keep_last_one:
783 queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
784 queued_special_tasks.delete()
Alex Millerec212252014-02-28 16:48:34 -0800785
786
787 def _generate_autoserv_label_args(self, task):
788 """
789 @param task: An instance of afe model's SpecialTask.
790 @returns: The list of arguments to pass to autoserv to tell it what the
791 labels of a job are.
792
793 """
794 labels = {x.name for x in task.queue_entry.job.labels}
795 return ['--job-labels', ','.join(labels)]