Add job maximum runtime, a new per-job timeout that counts time since the job actually started.
* added started_on field to host_queue_entries, so that we could actually compute this timeout
* added max_runtime_hrs to jobs, with default in global config, and added option to create_job() RPC
* added the usual controls to AFE and the CLI for the new job option
* added new max runtime timeout method to
* added migration to add new fields and set a safe default max runtime for existing jobs
Signed-off-by: Steve Howard <showard@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@3132 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/cli/job.py b/cli/job.py
index d0ef9b7..1dfc3e5 100755
--- a/cli/job.py
+++ b/cli/job.py
@@ -210,8 +210,9 @@
[--mlist </path/to/machinelist>] [--machine <host1 host2 host3>]
[--labels <list of labels of machines to run on>]
[--reboot_before <option>] [--reboot_after <option>]
- [--noverify] [--timeout <timeout>] [--one-time-hosts <hosts>]
- [--email <email>] [--dependencies <labels this job is dependent on>]
+ [--noverify] [--timeout <timeout>] [--max_runtime <max runtime>]
+ [--one-time-hosts <hosts>] [--email <email>]
+ [--dependencies <labels this job is dependent on>]
[--atomic_group <atomic group name>] [--parse-failed-repair <option>]
job_name
@@ -289,6 +290,8 @@
default=False, action='store_true')
self.parser.add_option('-o', '--timeout', help='Job timeout in hours.',
metavar='TIMEOUT')
+ self.parser.add_option('--max_runtime',
+ help='Job maximum runtime in hours')
def parse(self):
@@ -375,6 +378,8 @@
self.data['run_verify'] = False
if options.timeout:
self.data['timeout'] = options.timeout
+ if options.max_runtime:
+ self.data['max_runtime_hrs'] = options.max_runtime
if self.one_time_hosts:
self.data['one_time_hosts'] = self.one_time_hosts
diff --git a/cli/job_unittest.py b/cli/job_unittest.py
index ffbfae3..31444c5 100755
--- a/cli/job_unittest.py
+++ b/cli/job_unittest.py
@@ -804,6 +804,18 @@
out_words_ok=['test_job0', 'Created'],)
+ def test_execute_create_job_with_max_runtime(self):
+ data = self.data.copy()
+ data['max_runtime_hrs'] = '222'
+ filename = cli_mock.create_file(self.ctrl_file)
+ self.run_cmd(argv=['atest', 'job', 'create', '-f', filename,
+ 'test_job0', '-m', 'host0', '--max_runtime', '222',
+ '--ignore_site_file'],
+ rpcs=[('create_job', data, True, 42)],
+ out_words_ok=['test_job0', 'Created'],)
+
+
+
def test_execute_create_job_with_noverify(self):
data = self.data.copy()
data['run_verify'] = False
diff --git a/frontend/afe/doctests/001_rpc_test.txt b/frontend/afe/doctests/001_rpc_test.txt
index 92959a8..553127d 100644
--- a/frontend/afe/doctests/001_rpc_test.txt
+++ b/frontend/afe/doctests/001_rpc_test.txt
@@ -470,6 +470,7 @@
... 'priority': 'Low',
... 'synch_count': 1,
... 'timeout': 72,
+... 'max_runtime_hrs': 72,
... 'run_verify': 1,
... 'email_list': '',
... 'reboot_before': 'If dirty',
@@ -499,6 +500,7 @@
... 'execution_subdir': '',
... 'atomic_group': None,
... 'aborted': False,
+... 'started_on': None,
... 'full_status': 'Queued'})
True
>>> data[2] == (
@@ -513,6 +515,7 @@
... 'execution_subdir': '',
... 'atomic_group': None,
... 'aborted': False,
+... 'started_on': None,
... 'full_status': 'Queued'})
True
>>> rpc_interface.get_num_host_queue_entries(job=1)
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index e832c33..464d9b2 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -626,7 +626,8 @@
submitted_on: date of job submission
synch_count: how many hosts should be used per autoserv execution
run_verify: Whether or not to run the verify phase
- timeout: hours until job times out
+ timeout: hours from queuing time until job times out
+ max_runtime_hrs: hours from job starting time until job times out
email_list: list of people to email on completion delimited by any of:
white space, ',', ':', ';'
dependency_labels: many-to-many relationship with labels corresponding to
@@ -638,6 +639,8 @@
"""
DEFAULT_TIMEOUT = global_config.global_config.get_config_value(
'AUTOTEST_WEB', 'job_timeout_default', default=240)
+ DEFAULT_MAX_RUNTIME_HRS = global_config.global_config.get_config_value(
+ 'AUTOTEST_WEB', 'job_max_runtime_hrs_default', default=72)
DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value(
'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool,
default=False)
@@ -669,6 +672,7 @@
default=DEFAULT_REBOOT_AFTER)
parse_failed_repair = dbmodels.BooleanField(
default=DEFAULT_PARSE_FAILED_REPAIR)
+ max_runtime_hrs = dbmodels.IntegerField(default=DEFAULT_MAX_RUNTIME_HRS)
# custom manager
@@ -694,6 +698,7 @@
control_type=options['control_type'],
synch_count=options.get('synch_count'),
timeout=options.get('timeout'),
+ max_runtime_hrs=options.get('max_runtime_hrs'),
run_verify=options.get('run_verify'),
email_list=options.get('email_list'),
reboot_before=options.get('reboot_before'),
@@ -784,6 +789,7 @@
# be expanded into many actual hosts within the group at schedule time.
atomic_group = dbmodels.ForeignKey(AtomicGroup, blank=True, null=True)
aborted = dbmodels.BooleanField(default=False)
+ started_on = dbmodels.DateTimeField(null=True)
objects = model_logic.ExtendedManager()
@@ -875,12 +881,20 @@
class Meta:
db_table = 'host_queue_entries'
+
if settings.FULL_ADMIN:
class Admin:
list_display = ('id', 'job', 'host', 'status',
'meta_host')
+ def __str__(self):
+ hostname = None
+ if self.host:
+ hostname = self.host.hostname
+ return "%s/%d (%d)" % (hostname, self.job.id, self.id)
+
+
class AbortedHostQueueEntry(dbmodels.Model, model_logic.ModelExtensions):
queue_entry = dbmodels.OneToOneField(HostQueueEntry, primary_key=True)
aborted_by = dbmodels.ForeignKey(User)
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index 09f88ed..a0a5634 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -349,11 +349,12 @@
return cf_info
-def create_job(name, priority, control_file, control_type, is_template=False,
- timeout=None, synch_count=None, hosts=(), meta_hosts=(),
- run_verify=True, one_time_hosts=(), email_list='',
- dependencies=(), reboot_before=None, reboot_after=None,
- parse_failed_repair=None, atomic_group_name=None):
+def create_job(name, priority, control_file, control_type,
+ hosts=(), meta_hosts=(), one_time_hosts=(),
+ atomic_group_name=None, synch_count=None, is_template=False,
+ timeout=None, max_runtime_hrs=None, run_verify=True,
+ email_list='', dependencies=(), reboot_before=None,
+ reboot_after=None, parse_failed_repair=None):
"""\
Create and enqueue a job.
@@ -366,6 +367,7 @@
given this value is treated as a minimum.
@param is_template If true then create a template job.
@param timeout Hours after this call returns until the job times out.
+ @param max_runtime_hrs Hours from job starting time until job times out
@param run_verify Should the host be verified before running the test?
@param email_list String containing emails to mail when the job is done
@param dependencies List of label names on which this job depends
@@ -430,6 +432,7 @@
control_type=control_type,
is_template=is_template,
timeout=timeout,
+ max_runtime_hrs=max_runtime_hrs,
synch_count=synch_count,
run_verify=run_verify,
email_list=email_list,
@@ -666,6 +669,7 @@
result['host_statuses'] = sorted(models.Host.Status.names)
result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)
result['job_timeout_default'] = models.Job.DEFAULT_TIMEOUT
+ result['job_max_runtime_hrs_default'] = models.Job.DEFAULT_MAX_RUNTIME_HRS
result['parse_failed_repair_default'] = bool(
models.Job.DEFAULT_PARSE_FAILED_REPAIR)
result['reboot_before_options'] = models.RebootBefore.names
diff --git a/frontend/client/src/autotest/afe/CreateJobView.java b/frontend/client/src/autotest/afe/CreateJobView.java
index 521a920..b94d733 100644
--- a/frontend/client/src/autotest/afe/CreateJobView.java
+++ b/frontend/client/src/autotest/afe/CreateJobView.java
@@ -158,6 +158,7 @@
protected ListBox priorityList = new ListBox();
protected TextBox kernel = new TextBox();
protected TextBox timeout = new TextBox();
+ private TextBox maxRuntime = new TextBox();
protected TextBox emailList = new TextBox();
protected CheckBox skipVerify = new CheckBox();
private RadioChooser rebootBefore = new RadioChooser();
@@ -211,8 +212,8 @@
}
}
- timeout.setText(Integer.toString(
- (int) jobObject.get("timeout").isNumber().doubleValue()));
+ timeout.setText(Utils.jsonToString(jobObject.get("timeout")));
+ maxRuntime.setText(Utils.jsonToString(jobObject.get("max_runtime_hrs")));
emailList.setText(
jobObject.get("email_list").isString().stringValue());
@@ -526,6 +527,7 @@
RootPanel.get("create_job_name").add(jobName);
RootPanel.get("create_kernel").add(kernel);
RootPanel.get("create_timeout").add(timeout);
+ RootPanel.get("create_max_runtime").add(maxRuntime);
RootPanel.get("create_email_list").add(emailList);
RootPanel.get("create_priority").add(priorityList);
RootPanel.get("create_skip_verify").add(skipVerify);
@@ -552,7 +554,8 @@
parseFailedRepair.setChecked(
repository.getData("parse_failed_repair_default").isBoolean().booleanValue());
kernel.setText("");
- timeout.setText(repository.getData("job_timeout_default").isString().stringValue());
+ timeout.setText(Utils.jsonToString(repository.getData("job_timeout_default")));
+ maxRuntime.setText(Utils.jsonToString(repository.getData("job_max_runtime_hrs_default")));
emailList.setText("");
testSelector.reset();
skipVerify.setChecked(false);
@@ -572,9 +575,10 @@
}
protected void submitJob(final boolean isTemplate) {
- final int timeoutValue, synchCount;
+ final int timeoutValue, maxRuntimeValue, synchCount;
try {
timeoutValue = parsePositiveIntegerInput(timeout.getText(), "timeout");
+ maxRuntimeValue = parsePositiveIntegerInput(maxRuntime.getText(), "max runtime");
synchCount = parsePositiveIntegerInput(synchCountInput.getText(),
"number of machines used per execution");
} catch (IllegalArgumentException exc) {
@@ -595,6 +599,7 @@
new JSONString(controlTypeSelect.getControlType()));
args.put("synch_count", new JSONNumber(synchCount));
args.put("timeout", new JSONNumber(timeoutValue));
+ args.put("max_runtime_hrs", new JSONNumber(maxRuntimeValue));
args.put("email_list", new JSONString(emailList.getText()));
args.put("run_verify", JSONBoolean.getInstance(!skipVerify.isChecked()));
args.put("is_template", JSONBoolean.getInstance(isTemplate));
diff --git a/frontend/client/src/autotest/afe/JobDetailView.java b/frontend/client/src/autotest/afe/JobDetailView.java
index d001d47..8e6232e 100644
--- a/frontend/client/src/autotest/afe/JobDetailView.java
+++ b/frontend/client/src/autotest/afe/JobDetailView.java
@@ -101,6 +101,7 @@
showField(jobObject, "priority", "view_priority");
showField(jobObject, "created_on", "view_created");
showField(jobObject, "timeout", "view_timeout");
+ showField(jobObject, "max_runtime_hrs", "view_max_runtime");
showField(jobObject, "email_list", "view_email_list");
showText(runVerify, "view_run_verify");
showField(jobObject, "reboot_before", "view_reboot_before");
diff --git a/frontend/client/src/autotest/public/AfeClient.html b/frontend/client/src/autotest/public/AfeClient.html
index 4d2756c..5068df9 100644
--- a/frontend/client/src/autotest/public/AfeClient.html
+++ b/frontend/client/src/autotest/public/AfeClient.html
@@ -62,6 +62,8 @@
<span id="view_created"></span><br>
<span class="field-name">Timeout:</span>
<span id="view_timeout"></span> hours<br>
+ <span class="field-name">Max runtime:</span>
+ <span id="view_max_runtime"></span> hours<br>
<span class="field-name">Email List:</span>
<span id="view_email_list"></span><br>
<span class="field-name">Run verify:</span>
@@ -111,6 +113,8 @@
<td id="create_kernel"></td><td></td></tr>
<tr><td class="field-name">Timeout (hours):</td>
<td id="create_timeout"></td></td></td></tr>
+ <tr><td class="field-name">Max runtime (hours):</td>
+ <td id="create_max_runtime"></td></td></td></tr>
<tr><td class="field-name">Email List:</td>
<td id="create_email_list"></td></td></td></tr>
<tr><td class="field-name">Skip verify:</td>
diff --git a/frontend/migrations/035_job_max_runtime.py b/frontend/migrations/035_job_max_runtime.py
new file mode 100644
index 0000000..907ec62
--- /dev/null
+++ b/frontend/migrations/035_job_max_runtime.py
@@ -0,0 +1,12 @@
+UP_SQL = """
+ALTER TABLE host_queue_entries ADD COLUMN started_on datetime NULL;
+ALTER TABLE jobs ADD COLUMN max_runtime_hrs integer NOT NULL;
+-- conservative value for existing jobs, to make sure they don't get
+-- unexpectedly timed out.
+UPDATE jobs SET max_runtime_hrs = timeout;
+"""
+
+DOWN_SQL = """
+ALTER TABLE jobs DROP COLUMN max_runtime_hrs;
+ALTER TABLE host_queue_entries DROP COLUMN started_on;
+"""
diff --git a/global_config.ini b/global_config.ini
index 75235a3..4777527 100644
--- a/global_config.ini
+++ b/global_config.ini
@@ -19,6 +19,7 @@
user: autotest
password: please_set_this_password
job_timeout_default: 72
+job_max_runtime_hrs_default: 72
parse_failed_repair_default: 0
# Only set this if your server is not 'http://[SERVER] hostname/afe/'
#base_url: http://your_autotest_server/afe/
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 5d468d6..7a23ace 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -1589,6 +1589,7 @@
for queue_entry in self.queue_entries:
self._write_host_keyvals(queue_entry.host)
queue_entry.set_status('Running')
+ queue_entry.update_field('started_on', datetime.datetime.now())
queue_entry.host.set_status('Running')
queue_entry.host.update_field('dirty', 1)
if self.job.synch_count == 1:
@@ -2245,7 +2246,7 @@
_table_name = 'host_queue_entries'
_fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
'active', 'complete', 'deleted', 'execution_subdir',
- 'atomic_group_id', 'aborted')
+ 'atomic_group_id', 'aborted', 'started_on')
def __init__(self, id=None, row=None, **kwargs):
@@ -2432,6 +2433,7 @@
def requeue(self):
self.set_status('Queued')
+ self.update_field('started_on', None)
# verify/cleanup failure sets the execution subdir, so reset it here
self.set_execution_subdir('')
if self.meta_host:
@@ -2520,7 +2522,7 @@
_fields = ('id', 'owner', 'name', 'priority', 'control_file',
'control_type', 'created_on', 'synch_count', 'timeout',
'run_verify', 'email_list', 'reboot_before', 'reboot_after',
- 'parse_failed_repair')
+ 'parse_failed_repair', 'max_runtime_hrs')
def __init__(self, id=None, row=None, **kwargs):
diff --git a/scheduler/monitor_db_cleanup.py b/scheduler/monitor_db_cleanup.py
index 9a55da6..59313c0 100644
--- a/scheduler/monitor_db_cleanup.py
+++ b/scheduler/monitor_db_cleanup.py
@@ -52,6 +52,7 @@
logging.info('Running periodic cleanup')
self._abort_timed_out_jobs()
self._abort_jobs_past_synch_start_timeout()
+ self._abort_jobs_past_max_runtime()
self._clear_inactive_blocks()
self._check_for_db_inconsistencies()
@@ -88,6 +89,24 @@
queue_entry.abort(None)
+ def _abort_jobs_past_max_runtime(self):
+ """
+ Abort executions that have started and are past the job's max runtime.
+ """
+ logging.info('Aborting all jobs that have passed maximum runtime')
+ rows = self._db.execute("""
+ SELECT hqe.id
+ FROM host_queue_entries AS hqe
+ INNER JOIN jobs ON (hqe.job_id = jobs.id)
+ WHERE NOT hqe.complete AND NOT hqe.aborted AND
+ hqe.started_on + INTERVAL jobs.max_runtime_hrs HOUR < NOW()""")
+ query = models.HostQueueEntry.objects.filter(
+ id__in=[row[0] for row in rows])
+ for queue_entry in query.distinct():
+ logging.warning('Aborting entry %s due to max runtime', queue_entry)
+ queue_entry.abort(None)
+
+
def _check_for_db_inconsistencies(self):
logging.info('Checking for db inconsistencies')
query = models.HostQueueEntry.objects.filter(active=True, complete=True)
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 4cc3068..f85f5d9 100644
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -272,7 +272,7 @@
# If either of these are called, a query was made when it shouldn't be.
host_a._compare_fields_in_row = lambda _: self.fail('eek! a query!')
- host_a._update_fields_from_row = host_a._compare_fields_in_row
+ host_a._update_fields_from_row = host_a._compare_fields_in_row
host_c = monitor_db.Host(id=2, always_query=False)
self.assert_(host_a is host_c, 'Cached instance not returned')
@@ -295,7 +295,7 @@
self.god.stub_with(monitor_db, 'Job', MockJob)
hqe = monitor_db.HostQueueEntry(
new_record=True,
- row=[0, 1, 2, 'Queued', None, 0, 0, 0, '.', None, False])
+ row=[0, 1, 2, 'Queued', None, 0, 0, 0, '.', None, False, None])
hqe.save()
new_id = hqe.id
# Force a re-query and verify that the correct data was stored.
@@ -311,6 +311,7 @@
self.assertEqual(hqe.deleted, False)
self.assertEqual(hqe.execution_subdir, '.')
self.assertEqual(hqe.atomic_group_id, None)
+ self.assertEqual(hqe.started_on, None)
class DispatcherSchedulingTest(BaseSchedulerTest):