Added folder copying logic and job_manager start/stop logic.
PRESUBMIT=passed
R=asharif
DELTA=105 (102 added, 0 deleted, 3 changed)
OCL=45429-p2
RCL=45435-p2
RDATE=2010/11/24 11:17:25
P4 change: 42606547
diff --git a/v14/automation/job_executer.py b/v14/automation/job_executer.py
index 2bbb348..813cb76 100755
--- a/v14/automation/job_executer.py
+++ b/v14/automation/job_executer.py
@@ -3,6 +3,8 @@
import machine_manager
from utils import utils
+JOBDIR_PREFIX = "/usr/local/google/home/automation"
+
class JobExecuter(threading.Thread):
def __init__(self, job, machine, job_manager):
@@ -19,6 +21,10 @@
# Do execute here
print "EXECUTING: " + self.job.GetCommand()
+ # Set job directory
+ job_dir = JOBDIR_PREFIX + str(self.job.GetID())
+ self.job.SetJobDir(job_dir)
+
# Get the machines required
machines = (self.machine_manager.GetMachines
(self.job.GetMachineDescriptions()))
@@ -26,11 +32,34 @@
print "Could not acquire machines for the job"
else:
primary_machine = machines[0]
+ self.job.set_machine(primary_machine)
+
+ utils.RunCommand("mkdir -p " + self.job.GetJobDir())
+ utils.RunCommand("mkdir -p " + self.job.GetWorkDir())
+ print "mkdir -p" + job_dir
+ for required_folder in self.job.GetRequiredFolders():
+ to_folder = self.job.GetWorkDirectory() + "/" + required_folder.folder
+ from_folder = (required_folder.job.GetWorkDirectory() + "/" +
+ required_folder.folder)
+ if required_folder.job.GetMachine().name == primary_machine.name:
+ # Same machine, do cp
+ utils.RunCommand("cp %s %s", from_folder, to_folder)
+ else:
+ # Different machine, do scp
+ from_machine = required_folder.job.GetMachine().name
+ from_user = required_folder.job.GetMachine.username
+ to_machine = self.job.GetMachine().name
+ to_user = self.job.GetMachine().username
+ utils.RunCommand("scp %s@%s:%s %s@%s:%s"
+ % (from_user, from_machine, from_folder, to_user,
+ to_machine, to_folder))
+
result = utils.RunCommand("ssh %s@%s -- %s" %
(primary_machine.username, primary_machine.name,
self.job.GetCommand()), True)
print "OUTPUT: " + str(result)
+
# Mark as complete
self.job.SetStatus(jobs.job.STATUS_COMPLETED)
self.job_manager.NotifyJobComplete(self.job)
diff --git a/v14/automation/job_manager.py b/v14/automation/job_manager.py
index 1911e84..1c51a0f 100755
--- a/v14/automation/job_manager.py
+++ b/v14/automation/job_manager.py
@@ -1,6 +1,9 @@
import threading
import job_executer
+JOB_MANAGER_STARTED = 1
+JOB_MANAGER_STOPPING = 2
+JOB_MANAGER_STOPPED = 3
class JobManager(threading.Thread):
@@ -15,9 +18,29 @@
self.job_lock = threading.Lock()
self.job_ready_event = threading.Event()
+ self.job_counter = 0
+
+ self.status = JOB_MANAGER_STOPPED
+
+ def StartJobManager(self):
+ self.job_lock.acquire()
+ if self.status == JOB_MANAGER_STOPPED:
+ self.start()
+ self.status = JOB_MANAGER_STARTED
+ self.job_lock.release()
+
+ def StopJobManager(self):
+ self.job_lock.acquire()
+ if self.status == JOB_MANAGER_STARTED:
+ self.status = JOB_MANAGER_STOPPING
+ self.job_lock.release()
+
def AddJob(self, current_job):
self.job_lock.acquire()
+ current_job.SetID(self.job_counter)
+ self.job_counter += 1
+
self.all_jobs.append(current_job)
# Only queue a job as ready if it has no dependencies
if current_job.IsReady():
@@ -53,4 +76,12 @@
executer = job_executer.JobExecuter(ready_job, None, self)
executer.start()
+
+ if self.status == JOB_MANAGER_STOPPING:
+ self.status = JOB_MANAGER_STOPPED
+ return
+
+
self.job_lock.release()
+
+
diff --git a/v14/automation/jobs/job.py b/v14/automation/jobs/job.py
index 1375235..fc8b9d8 100644
--- a/v14/automation/jobs/job.py
+++ b/v14/automation/jobs/job.py
@@ -11,6 +11,14 @@
STATUS_EXECUTING = 1
STATUS_COMPLETED = 2
+SUBDIR_WORK = "/work"
+SUBDIR_LOGS = "/logs"
+
+
+class RequiredFolder:
+ def __init__(self, job, folder):
+ self.job = job
+ self.folder = folder
class Job:
"""A class representing a job whose commands will be executed."""
@@ -20,6 +28,22 @@
self.dependencies = []
self.dependents = []
self.machine_descriptions = []
+ self.required_folders = []
+ self.id = 0
+ self.job_dir = ""
+ self.machine = None
+
+ def SetID(self, id):
+ self.id = id
+
+ def GetID(self):
+ return self.id
+
+ def SetMachine(self, machine):
+ self.machine = machine
+
+ def GetMachine(self):
+ return self.machine
def SetStatus(self, status):
self.status = status
@@ -27,9 +51,27 @@
def GetStatus(self):
return self.status
- def AddDependency(self, dep):
- self.dependencies.append(dep)
- dep.dependents.append(self)
+ def AddRequiredFolder(self, job, folder):
+ self.required_folders.append(RequiredFolder(job, folder))
+
+ def GetRequiredFolders(self):
+ return self.required_folders
+
+ def SetJobDir(self, job_dir):
+ self.job_dir = job_dir
+
+ def GetJobDir(self):
+ return self.job_dir
+
+ def GetWorkDir(self):
+ return self.job_dir + SUBDIR_WORK
+
+ def GetLogsDir(self):
+ return self.job_dir + SUBDIR_LOGS
+
+ def AddDependency(self, job):
+ self.dependencies.append(job)
+ job.dependents.append(self)
def GetDependencies(self):
return self.dependencies