| # Copyright 2010 Google Inc. All Rights Reserved. |
| # |
| """A module for a job in the infrastructure.""" |
| |
| __author__ = 'raymes@google.com (Raymes Khoury)' |
| |
| import os.path |
| |
| from automation.common import state_machine |
| |
| STATUS_NOT_EXECUTED = 'NOT_EXECUTED' |
| STATUS_SETUP = 'SETUP' |
| STATUS_COPYING = 'COPYING' |
| STATUS_RUNNING = 'RUNNING' |
| STATUS_SUCCEEDED = 'SUCCEEDED' |
| STATUS_FAILED = 'FAILED' |
| |
| |
| class FolderDependency(object): |
| |
| def __init__(self, job, src, dest=None): |
| if not dest: |
| dest = src |
| |
| # TODO(kbaclawski): rename to producer |
| self.job = job |
| self.src = src |
| self.dest = dest |
| |
| @property |
| def read_only(self): |
| return self.dest == self.src |
| |
| |
| class JobStateMachine(state_machine.BasicStateMachine): |
| state_machine = { |
| STATUS_NOT_EXECUTED: [STATUS_SETUP], |
| STATUS_SETUP: [STATUS_COPYING, STATUS_FAILED], |
| STATUS_COPYING: [STATUS_RUNNING, STATUS_FAILED], |
| STATUS_RUNNING: [STATUS_SUCCEEDED, STATUS_FAILED] |
| } |
| |
| final_states = [STATUS_SUCCEEDED, STATUS_FAILED] |
| |
| |
| class JobFailure(Exception): |
| |
| def __init__(self, message, exit_code): |
| Exception.__init__(self, message) |
| self.exit_code = exit_code |
| |
| |
| class Job(object): |
| """A class representing a job whose commands will be executed.""" |
| |
| WORKDIR_PREFIX = '/usr/local/google/tmp/automation' |
| |
| def __init__(self, label, command, timeout=4 * 60 * 60): |
| self._state = JobStateMachine(STATUS_NOT_EXECUTED) |
| self.predecessors = set() |
| self.successors = set() |
| self.machine_dependencies = [] |
| self.folder_dependencies = [] |
| self.id = 0 |
| self.machines = [] |
| self.command = command |
| self._has_primary_machine_spec = False |
| self.group = None |
| self.dry_run = None |
| self.label = label |
| self.timeout = timeout |
| |
| def _StateGet(self): |
| return self._state |
| |
| def _StateSet(self, new_state): |
| self._state.Change(new_state) |
| |
| status = property(_StateGet, _StateSet) |
| |
| @property |
| def timeline(self): |
| return self._state.timeline |
| |
| def __repr__(self): |
| return '{%s: %s}' % (self.__class__.__name__, self.id) |
| |
| def __str__(self): |
| res = [] |
| res.append('%d' % self.id) |
| res.append('Predecessors:') |
| res.extend(['%d' % pred.id for pred in self.predecessors]) |
| res.append('Successors:') |
| res.extend(['%d' % succ.id for succ in self.successors]) |
| res.append('Machines:') |
| res.extend(['%s' % machine for machine in self.machines]) |
| res.append(self.PrettyFormatCommand()) |
| res.append('%s' % self.status) |
| res.append(self.timeline.GetTransitionEventReport()) |
| return '\n'.join(res) |
| |
| @staticmethod |
| def _FormatCommand(cmd, substitutions): |
| for pattern, replacement in substitutions: |
| cmd = cmd.replace(pattern, replacement) |
| |
| return cmd |
| |
| def GetCommand(self): |
| substitutions = [ |
| ('$JOB_ID', str(self.id)), ('$JOB_TMP', self.work_dir), |
| ('$JOB_HOME', self.home_dir), |
| ('$PRIMARY_MACHINE', self.primary_machine.hostname) |
| ] |
| |
| if len(self.machines) > 1: |
| for num, machine in enumerate(self.machines[1:]): |
| substitutions.append(('$SECONDARY_MACHINES[%d]' % num, machine.hostname |
| )) |
| |
| return self._FormatCommand(str(self.command), substitutions) |
| |
| def PrettyFormatCommand(self): |
| # TODO(kbaclawski): This method doesn't belong here, but rather to |
| # non existing Command class. If one is created then PrettyFormatCommand |
| # shall become its method. |
| return self._FormatCommand(self.GetCommand(), [ |
| ('\{ ', ''), ('; \}', ''), ('\} ', '\n'), ('\s*&&\s*', '\n') |
| ]) |
| |
| def DependsOnFolder(self, dependency): |
| self.folder_dependencies.append(dependency) |
| self.DependsOn(dependency.job) |
| |
| @property |
| def results_dir(self): |
| return os.path.join(self.work_dir, 'results') |
| |
| @property |
| def logs_dir(self): |
| return os.path.join(self.home_dir, 'logs') |
| |
| @property |
| def log_filename_prefix(self): |
| return 'job-%d.log' % self.id |
| |
| @property |
| def work_dir(self): |
| return os.path.join(self.WORKDIR_PREFIX, 'job-%d' % self.id) |
| |
| @property |
| def home_dir(self): |
| return os.path.join(self.group.home_dir, 'job-%d' % self.id) |
| |
| @property |
| def primary_machine(self): |
| return self.machines[0] |
| |
| def DependsOn(self, job): |
| """Specifies Jobs to be finished before this job can be launched.""" |
| self.predecessors.add(job) |
| job.successors.add(self) |
| |
| @property |
| def is_ready(self): |
| """Check that all our dependencies have been executed.""" |
| return all(pred.status == STATUS_SUCCEEDED for pred in self.predecessors) |
| |
| def DependsOnMachine(self, machine_spec, primary=True): |
| # Job will run on arbitrarily chosen machine specified by |
| # MachineSpecification class instances passed to this method. |
| if primary: |
| if self._has_primary_machine_spec: |
| raise RuntimeError('Only one primary machine specification allowed.') |
| self._has_primary_machine_spec = True |
| self.machine_dependencies.insert(0, machine_spec) |
| else: |
| self.machine_dependencies.append(machine_spec) |