Added folder copying logic and job_manager start/stop logic.

PRESUBMIT=passed
R=asharif
DELTA=105  (102 added, 0 deleted, 3 changed)
OCL=45429-p2
RCL=45435-p2
RDATE=2010/11/24 11:17:25


P4 change: 42606547
diff --git a/v14/automation/job_executer.py b/v14/automation/job_executer.py
index 2bbb348..813cb76 100755
--- a/v14/automation/job_executer.py
+++ b/v14/automation/job_executer.py
@@ -3,6 +3,8 @@
 import machine_manager
 from utils import utils
 
+JOBDIR_PREFIX = "/usr/local/google/home/automation"
+
 class JobExecuter(threading.Thread):
 
   def __init__(self, job, machine, job_manager):
@@ -19,6 +21,10 @@
     # Do execute here
     print "EXECUTING: " + self.job.GetCommand()
 
+    # Set job directory
+    job_dir = JOBDIR_PREFIX + str(self.job.GetID())
+    self.job.SetJobDir(job_dir)
+
     # Get the machines required
     machines = (self.machine_manager.GetMachines
                 (self.job.GetMachineDescriptions()))
@@ -26,11 +32,34 @@
       print "Could not acquire machines for the job"
     else:
       primary_machine = machines[0]
+      self.job.set_machine(primary_machine)
+
+      utils.RunCommand("mkdir -p " + self.job.GetJobDir())
+      utils.RunCommand("mkdir -p " + self.job.GetWorkDir())
+      print "mkdir -p" + job_dir
+      for required_folder in self.job.GetRequiredFolders():
+        to_folder = self.job.GetWorkDirectory() + "/" + required_folder.folder
+        from_folder = (required_folder.job.GetWorkDirectory() + "/" +
+                         required_folder.folder)
+        if required_folder.job.GetMachine().name == primary_machine.name:
+          # Same machine, do cp
+          utils.RunCommand("cp %s %s", from_folder, to_folder)
+        else:
+          # Different machine, do scp
+          from_machine = required_folder.job.GetMachine().name
+          from_user = required_folder.job.GetMachine.username
+          to_machine = self.job.GetMachine().name
+          to_user = self.job.GetMachine().username
+          utils.RunCommand("scp %s@%s:%s %s@%s:%s"
+                           % (from_user, from_machine, from_folder, to_user,
+                              to_machine, to_folder))
+
       result = utils.RunCommand("ssh %s@%s -- %s" %
                                 (primary_machine.username, primary_machine.name,
                                  self.job.GetCommand()), True)
       print "OUTPUT: " + str(result)
 
+
     # Mark as complete
     self.job.SetStatus(jobs.job.STATUS_COMPLETED)
     self.job_manager.NotifyJobComplete(self.job)
diff --git a/v14/automation/job_manager.py b/v14/automation/job_manager.py
index 1911e84..1c51a0f 100755
--- a/v14/automation/job_manager.py
+++ b/v14/automation/job_manager.py
@@ -1,6 +1,9 @@
 import threading
 import job_executer
 
+JOB_MANAGER_STARTED = 1
+JOB_MANAGER_STOPPING = 2
+JOB_MANAGER_STOPPED = 3
 
 class JobManager(threading.Thread):
 
@@ -15,9 +18,29 @@
     self.job_lock = threading.Lock()
     self.job_ready_event = threading.Event()
 
+    self.job_counter = 0
+
+    self.status = JOB_MANAGER_STOPPED
+
+  def StartJobManager(self):
+    self.job_lock.acquire()
+    if self.status == JOB_MANAGER_STOPPED:
+      self.start()
+      self.status = JOB_MANAGER_STARTED
+    self.job_lock.release()
+
+  def StopJobManager(self):
+    self.job_lock.acquire()
+    if self.status == JOB_MANAGER_STARTED:
+      self.status = JOB_MANAGER_STOPPING
+    self.job_lock.release()
+
   def AddJob(self, current_job):
     self.job_lock.acquire()
 
+    current_job.SetID(self.job_counter)
+    self.job_counter += 1
+
     self.all_jobs.append(current_job)
     # Only queue a job as ready if it has no dependencies
     if current_job.IsReady():
@@ -53,4 +76,12 @@
         executer = job_executer.JobExecuter(ready_job, None, self)
         executer.start()
 
+
+      if self.status == JOB_MANAGER_STOPPING:
+        self.status = JOB_MANAGER_STOPPED
+      return
+
+
       self.job_lock.release()
+
+
diff --git a/v14/automation/jobs/job.py b/v14/automation/jobs/job.py
index 1375235..fc8b9d8 100644
--- a/v14/automation/jobs/job.py
+++ b/v14/automation/jobs/job.py
@@ -11,6 +11,14 @@
 STATUS_EXECUTING = 1
 STATUS_COMPLETED = 2
 
+SUBDIR_WORK = "/work"
+SUBDIR_LOGS = "/logs"
+
+
+class RequiredFolder:
+  def __init__(self, job, folder):
+    self.job = job
+    self.folder = folder
 
 class Job:
   """A class representing a job whose commands will be executed."""
@@ -20,6 +28,22 @@
     self.dependencies = []
     self.dependents = []
     self.machine_descriptions = []
+    self.required_folders = []
+    self.id = 0
+    self.job_dir = ""
+    self.machine = None
+
+  def SetID(self, id):
+    self.id = id
+
+  def GetID(self):
+    return self.id
+
+  def SetMachine(self, machine):
+    self.machine = machine
+
+  def GetMachine(self):
+    return self.machine
 
   def SetStatus(self, status):
     self.status = status
@@ -27,9 +51,27 @@
   def GetStatus(self):
     return self.status
 
-  def AddDependency(self, dep):
-    self.dependencies.append(dep)
-    dep.dependents.append(self)
+  def AddRequiredFolder(self, job, folder):
+    self.required_folders.append(RequiredFolder(job, folder))
+
+  def GetRequiredFolders(self):
+    return self.required_folders
+
+  def SetJobDir(self, job_dir):
+    self.job_dir = job_dir
+
+  def GetJobDir(self):
+    return self.job_dir
+
+  def GetWorkDir(self):
+    return self.job_dir + SUBDIR_WORK
+
+  def GetLogsDir(self):
+    return self.job_dir + SUBDIR_LOGS
+
+  def AddDependency(self, job):
+    self.dependencies.append(job)
+    job.dependents.append(self)
 
   def GetDependencies(self):
     return self.dependencies