| #!/usr/bin/python |
| # |
| # Copyright 2010 Google Inc. All Rights Reserved. |
| # |
| |
| import logging |
| import os.path |
| import threading |
| |
| from automation.common import command as cmd |
| from automation.common import job |
| from automation.common import logger |
| from automation.common.command_executer import LoggingCommandExecuter |
| from automation.common.command_executer import CommandTerminator |
| |
| |
| class JobExecuter(threading.Thread): |
| def __init__(self, job_to_execute, machines, listeners): |
| threading.Thread.__init__(self) |
| |
| assert machines |
| |
| self.job = job_to_execute |
| self.listeners = listeners |
| self.machines = machines |
| |
| # Set Thread name. |
| self.name = "%s-%s" % (self.__class__.__name__, self.job.id) |
| |
| self._logger = logging.getLogger(self.__class__.__name__) |
| self._executer = LoggingCommandExecuter(self.job.dry_run) |
| self._terminator = CommandTerminator() |
| |
| def _RunRemotely(self, command, fail_msg, command_timeout=1*60*60): |
| exit_code = self._executer.RunCommand(command, |
| self.job.primary_machine.hostname, |
| self.job.primary_machine.username, |
| command_terminator=self._terminator, |
| command_timeout=command_timeout) |
| if exit_code: |
| raise job.JobFailure(fail_msg, exit_code) |
| |
| def _RunLocally(self, command, fail_msg, command_timeout=1*60*60): |
| exit_code = self._executer.RunCommand(command, |
| command_terminator=self._terminator, |
| command_timeout=command_timeout) |
| if exit_code: |
| raise job.JobFailure(fail_msg, exit_code) |
| |
| def Kill(self): |
| self._terminator.Terminate() |
| |
| def CleanUpWorkDir(self): |
| self._logger.debug('Cleaning up %r work directory.', self.job) |
| self._RunRemotely( |
| cmd.RmTree(self.job.work_dir), "Cleanup workdir failed.") |
| |
| def CleanUpHomeDir(self): |
| self._logger.debug('Cleaning up %r home directory.', self.job) |
| self._RunLocally( |
| cmd.RmTree(self.job.home_dir), "Cleanup homedir failed.") |
| |
| def _PrepareRuntimeEnvironment(self): |
| self._RunRemotely( |
| cmd.MakeDir(self.job.work_dir, self.job.logs_dir, self.job.results_dir), |
| "Creating new job directory failed.") |
| |
| # The log directory is ready, so we can prepare to log command's output. |
| self._executer.OpenLog( |
| os.path.join(self.job.logs_dir, self.job.log_filename_prefix)) |
| |
| def _SatisfyFolderDependencies(self): |
| for dependency in self.job.folder_dependencies: |
| to_folder = os.path.join(self.job.work_dir, dependency.dest) |
| from_folder = os.path.join(dependency.job.work_dir, dependency.src) |
| from_machine = dependency.job.primary_machine |
| |
| if from_machine == self.job.primary_machine and dependency.read_only: |
| # No need to make a copy, just symlink it |
| self._RunRemotely( |
| cmd.MakeSymlink(from_folder, to_folder), |
| "Failed to create symlink to required directory.") |
| else: |
| self._RunRemotely( |
| cmd.RemoteCopyFrom(from_machine.hostname, from_folder, to_folder, |
| username=from_machine.username), |
| "Failed to copy required files.") |
| |
| def _LaunchJobCommand(self): |
| command = self.job.GetCommand() |
| |
| self._RunRemotely("%s; %s" % ("PS1=. TERM=linux source ~/.bashrc", |
| cmd.Wrapper(command, cwd=self.job.work_dir)), |
| "Command failed to execute: '%s'." % command, |
| self.job.timeout) |
| |
| def _CopyJobResults(self): |
| """Copy test results back to directory.""" |
| self._RunLocally( |
| cmd.RemoteCopyFrom(self.job.primary_machine.hostname, |
| self.job.results_dir, |
| self.job.home_dir, |
| username=self.job.primary_machine.username), |
| "Failed to copy results.") |
| |
| def run(self): |
| self.job.status = job.STATUS_SETUP |
| self.job.machines = self.machines |
| self._logger.debug( |
| "Executing %r on %r in directory %s.", |
| self.job, self.job.primary_machine.hostname, self.job.work_dir) |
| |
| try: |
| self.CleanUpWorkDir() |
| |
| self._PrepareRuntimeEnvironment() |
| |
| self.job.status = job.STATUS_COPYING |
| |
| self._SatisfyFolderDependencies() |
| |
| self.job.status = job.STATUS_RUNNING |
| |
| self._LaunchJobCommand() |
| self._CopyJobResults() |
| |
| # If we get here, the job succeeded. |
| self.job.status = job.STATUS_SUCCEEDED |
| except job.JobFailure as ex: |
| self._logger.error( |
| "Job failed. Exit code %s. %s", ex.exit_code, ex) |
| if self._terminator.IsTerminated(): |
| self._logger.info("%r was killed", self.job) |
| |
| self.job.status = job.STATUS_FAILED |
| |
| self._executer.CloseLog() |
| |
| for listener in self.listeners: |
| listener.NotifyJobComplete(self.job) |