| #!/usr/bin/python |
| |
| # Copyright 2011-2015 Google Inc. All Rights Reserved. |
| |
| """The experiment runner module.""" |
| import getpass |
| import os |
| import random |
| import shutil |
| import sys |
| import time |
| |
| import afe_lock_machine |
| |
| from utils import command_executer |
| from utils import logger |
| from utils.email_sender import EmailSender |
| from utils.file_utils import FileUtils |
| |
| import config |
| from experiment_status import ExperimentStatus |
| from results_cache import CacheConditions |
| from results_cache import ResultsCache |
| from results_report import HTMLResultsReport |
| from results_report import TextResultsReport |
| from results_report import JSONResultsReport |
| from schedv2 import Schedv2 |
| |
| |
| class ExperimentRunner(object): |
| """ExperimentRunner Class.""" |
| |
| STATUS_TIME_DELAY = 30 |
| THREAD_MONITOR_DELAY = 2 |
| |
| def __init__(self, experiment, json_report, using_schedv2=False, log=None, |
| cmd_exec=None): |
| self._experiment = experiment |
| self.l = log or logger.GetLogger(experiment.log_dir) |
| self._ce = cmd_exec or command_executer.GetCommandExecuter(self.l) |
| self._terminated = False |
| self.json_report = json_report |
| self.locked_machines = [] |
| if experiment.log_level != "verbose": |
| self.STATUS_TIME_DELAY = 10 |
| |
| # Setting this to True will use crosperf sched v2 (feature in progress). |
| self._using_schedv2 = using_schedv2 |
| |
| def _GetMachineList(self): |
| """Return a list of all requested machines. |
| |
| Create a list of all the requested machines, both global requests and |
| label-specific requests, and return the list. |
| """ |
| machines = self._experiment.remote |
| # All Label.remote is a sublist of experiment.remote. |
| for l in self._experiment.labels: |
| for r in l.remote: |
| assert r in machines |
| return machines |
| |
| def _UpdateMachineList(self, locked_machines): |
| """Update machines lists to contain only locked machines. |
| |
| Go through all the lists of requested machines, both global and |
| label-specific requests, and remove any machine that we were not |
| able to lock. |
| |
| Args: |
| locked_machines: A list of the machines we successfully locked. |
| """ |
| for m in self._experiment.remote: |
| if m not in locked_machines: |
| self._experiment.remote.remove(m) |
| |
| for l in self._experiment.labels: |
| for m in l.remote: |
| if m not in locked_machines: |
| l.remote.remove(m) |
| |
| def _LockAllMachines(self, experiment): |
| """Attempt to globally lock all of the machines requested for run. |
| |
| This method will use the AFE server to globally lock all of the machines |
| requested for this crosperf run, to prevent any other crosperf runs from |
| being able to update/use the machines while this experiment is running. |
| """ |
| lock_mgr = afe_lock_machine.AFELockManager( |
| self._GetMachineList(), |
| "", |
| experiment.labels[0].chromeos_root, |
| None, |
| log=self.l, |
| ) |
| for m in lock_mgr.machines: |
| if not lock_mgr.MachineIsKnown(m): |
| lock_mgr.AddLocalMachine(m) |
| machine_states = lock_mgr.GetMachineStates("lock") |
| lock_mgr.CheckMachineLocks(machine_states, "lock") |
| self.locked_machines = lock_mgr.UpdateMachines(True) |
| self._experiment.locked_machines = self.locked_machines |
| self._UpdateMachineList(self.locked_machines) |
| self._experiment.machine_manager.RemoveNonLockedMachines( |
| self.locked_machines) |
| if len(self.locked_machines) == 0: |
| raise RuntimeError("Unable to lock any machines.") |
| |
| def _UnlockAllMachines(self, experiment): |
| """Attempt to globally unlock all of the machines requested for run. |
| |
| The method will use the AFE server to globally unlock all of the machines |
| requested for this crosperf run. |
| """ |
| if not self.locked_machines: |
| return |
| |
| lock_mgr = afe_lock_machine.AFELockManager( |
| self.locked_machines, |
| "", |
| experiment.labels[0].chromeos_root, |
| None, |
| log=self.l, |
| ) |
| machine_states = lock_mgr.GetMachineStates("unlock") |
| lock_mgr.CheckMachineLocks(machine_states, "unlock") |
| lock_mgr.UpdateMachines(False) |
| |
| def _ClearCacheEntries(self, experiment): |
| for br in experiment.benchmark_runs: |
| cache = ResultsCache() |
| cache.Init (br.label.chromeos_image, br.label.chromeos_root, |
| br.benchmark.test_name, br.iteration, br.test_args, |
| br.profiler_args, br.machine_manager, br.machine, |
| br.label.board, br.cache_conditions, br._logger, br.log_level, |
| br.label, br.share_cache, br.benchmark.suite, |
| br.benchmark.show_all_results, br.benchmark.run_local) |
| cache_dir = cache._GetCacheDirForWrite() |
| if os.path.exists(cache_dir): |
| self.l.LogOutput("Removing cache dir: %s" % cache_dir) |
| shutil.rmtree(cache_dir) |
| |
| def _Run(self, experiment): |
| try: |
| if not experiment.locks_dir: |
| self._LockAllMachines(experiment) |
| if self._using_schedv2: |
| schedv2 = Schedv2(experiment) |
| experiment.set_schedv2(schedv2) |
| if CacheConditions.FALSE in experiment.cache_conditions: |
| self._ClearCacheEntries(experiment) |
| status = ExperimentStatus(experiment) |
| experiment.Run() |
| last_status_time = 0 |
| last_status_string = "" |
| try: |
| if experiment.log_level != "verbose": |
| self.l.LogStartDots() |
| while not experiment.IsComplete(): |
| if last_status_time + self.STATUS_TIME_DELAY < time.time(): |
| last_status_time = time.time() |
| border = "==============================" |
| if experiment.log_level == "verbose": |
| self.l.LogOutput(border) |
| self.l.LogOutput(status.GetProgressString()) |
| self.l.LogOutput(status.GetStatusString()) |
| self.l.LogOutput(border) |
| else: |
| current_status_string = status.GetStatusString() |
| if (current_status_string != last_status_string): |
| self.l.LogEndDots() |
| self.l.LogOutput(border) |
| self.l.LogOutput(current_status_string) |
| self.l.LogOutput(border) |
| last_status_string = current_status_string |
| else: |
| self.l.LogAppendDot() |
| time.sleep(self.THREAD_MONITOR_DELAY) |
| except KeyboardInterrupt: |
| self._terminated = True |
| self.l.LogError("Ctrl-c pressed. Cleaning up...") |
| experiment.Terminate() |
| raise |
| except SystemExit: |
| self._terminate = True |
| self.l.LogError("Unexpected exit. Cleaning up...") |
| experiment.Terminate() |
| raise |
| finally: |
| if not experiment.locks_dir: |
| self._UnlockAllMachines(experiment) |
| |
| def _PrintTable(self, experiment): |
| self.l.LogOutput(TextResultsReport(experiment).GetReport()) |
| |
| def _Email(self, experiment): |
| # Only email by default if a new run was completed. |
| send_mail = False |
| for benchmark_run in experiment.benchmark_runs: |
| if not benchmark_run.cache_hit: |
| send_mail = True |
| break |
| if (not send_mail and not experiment.email_to |
| or config.GetConfig("no_email")): |
| return |
| |
| label_names = [] |
| for label in experiment.labels: |
| label_names.append(label.name) |
| subject = "%s: %s" % (experiment.name, " vs. ".join(label_names)) |
| |
| text_report = TextResultsReport(experiment, True).GetReport() |
| text_report += ("\nResults are stored in %s.\n" % |
| experiment.results_directory) |
| text_report = "<pre style='font-size: 13px'>%s</pre>" % text_report |
| html_report = HTMLResultsReport(experiment).GetReport() |
| attachment = EmailSender.Attachment("report.html", html_report) |
| email_to = [getpass.getuser()] + experiment.email_to |
| EmailSender().SendEmail(email_to, |
| subject, |
| text_report, |
| attachments=[attachment], |
| msg_type="html") |
| |
| def _StoreResults (self, experiment): |
| if self._terminated: |
| return |
| results_directory = experiment.results_directory |
| FileUtils().RmDir(results_directory) |
| FileUtils().MkDirP(results_directory) |
| self.l.LogOutput("Storing experiment file in %s." % results_directory) |
| experiment_file_path = os.path.join(results_directory, |
| "experiment.exp") |
| FileUtils().WriteFile(experiment_file_path, experiment.experiment_file) |
| |
| self.l.LogOutput("Storing results report in %s." % results_directory) |
| results_table_path = os.path.join(results_directory, "results.html") |
| report = HTMLResultsReport(experiment).GetReport() |
| if self.json_report: |
| JSONResultsReport(experiment).GetReport(results_directory) |
| FileUtils().WriteFile(results_table_path, report) |
| |
| self.l.LogOutput("Storing email message body in %s." % results_directory) |
| msg_file_path = os.path.join(results_directory, "msg_body.html") |
| text_report = TextResultsReport(experiment, True).GetReport() |
| text_report += ("\nResults are stored in %s.\n" % |
| experiment.results_directory) |
| msg_body = "<pre style='font-size: 13px'>%s</pre>" % text_report |
| FileUtils().WriteFile(msg_file_path, msg_body) |
| |
| self.l.LogOutput("Storing results of each benchmark run.") |
| for benchmark_run in experiment.benchmark_runs: |
| if benchmark_run.result: |
| benchmark_run_name = filter(str.isalnum, benchmark_run.name) |
| benchmark_run_path = os.path.join(results_directory, |
| benchmark_run_name) |
| benchmark_run.result.CopyResultsTo(benchmark_run_path) |
| benchmark_run.result.CleanUp(benchmark_run.benchmark.rm_chroot_tmp) |
| |
| def Run(self): |
| try: |
| self._Run(self._experiment) |
| finally: |
| # Always print the report at the end of the run. |
| self._PrintTable(self._experiment) |
| if not self._terminated: |
| self._StoreResults(self._experiment) |
| self._Email(self._experiment) |
| |
| |
| class MockExperimentRunner(ExperimentRunner): |
| """Mocked ExperimentRunner for testing.""" |
| |
| def __init__(self, experiment): |
| super(MockExperimentRunner, self).__init__(experiment) |
| |
| def _Run(self, experiment): |
| self.l.LogOutput("Would run the following experiment: '%s'." % |
| experiment.name) |
| |
| def _PrintTable(self, experiment): |
| self.l.LogOutput("Would print the experiment table.") |
| |
| def _Email(self, experiment): |
| self.l.LogOutput("Would send result email.") |
| |
| def _StoreResults(self, experiment): |
| self.l.LogOutput("Would store the results.") |