Support for job keyvals
* can be passed as an argument to create_job, stored in AFE DB
* scheduler reads them from the AFE DB and writes them to the job-level keyval file before the job starts
* parser reads them from the keyval file and writes them to the TKO DB in a new table

Since the field name "key" happens to be a MySQL keyword, I went ahead and made db.py support proper quoting of field names.  Evetually it'd be really nice to deprecate db.py and use Django models exclusively, but that is a far-off dream.

Still lacking support in the AFE and TKO web clients and CLIs, at least the TKO part will be coming soon

Signed-off-by: Steve Howard <showard@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@4123 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index 07f1b40..41c39c0 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -709,6 +709,11 @@
             created_on=datetime.now())
 
         job.dependency_labels = options['dependencies']
+
+        if options['keyvals'] is not None:
+            for key, value in options['keyvals'].iteritems():
+                JobKeyval.objects.create(job=job, key=key, value=value)
+
         return job
 
 
@@ -755,6 +760,11 @@
         return '%s-%s' % (self.id, self.owner)
 
 
+    def keyval_dict(self):
+        return dict((keyval.key, keyval.value)
+                    for keyval in self.jobkeyval_set.all())
+
+
     class Meta:
         db_table = 'afe_jobs'
 
@@ -762,6 +772,18 @@
         return u'%s (%s-%s)' % (self.name, self.id, self.owner)
 
 
+class JobKeyval(dbmodels.Model, model_logic.ModelExtensions):
+    """Keyvals associated with jobs"""
+    job = dbmodels.ForeignKey(Job)
+    key = dbmodels.CharField(max_length=90)
+    value = dbmodels.CharField(max_length=300)
+
+    objects = model_logic.ExtendedManager()
+
+    class Meta:
+        db_table = 'afe_job_keyvals'
+
+
 class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions):
     job = dbmodels.ForeignKey(Job)
     host = dbmodels.ForeignKey(Host)
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index dd36602..fe56cab 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -402,7 +402,8 @@
                atomic_group_name=None, synch_count=None, is_template=False,
                timeout=None, max_runtime_hrs=None, run_verify=True,
                email_list='', dependencies=(), reboot_before=None,
-               reboot_after=None, parse_failed_repair=None, hostless=False):
+               reboot_after=None, parse_failed_repair=None, hostless=False,
+               keyvals=None):
     """\
     Create and enqueue a job.
 
@@ -424,6 +425,7 @@
     @param parse_failed_repair if true, results of failed repairs launched by
     this job will be parsed as part of the job.
     @param hostless if true, create a hostless job
+    @param keyvals dict of keyvals to associate with the job
 
     @param hosts List of hosts to run job on.
     @param meta_hosts List where each entry is a label name, and for each entry
@@ -531,7 +533,8 @@
                    dependencies=dependencies,
                    reboot_before=reboot_before,
                    reboot_after=reboot_after,
-                   parse_failed_repair=parse_failed_repair)
+                   parse_failed_repair=parse_failed_repair,
+                   keyvals=keyvals)
     return rpc_utils.create_new_job(owner=owner,
                                     options=options,
                                     host_objects=host_objects,
@@ -584,10 +587,13 @@
     jobs = list(models.Job.query_objects(filter_data))
     models.Job.objects.populate_relationships(jobs, models.Label,
                                               'dependencies')
+    models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
     for job in jobs:
         job_dict = job.get_object_dict()
         job_dict['dependencies'] = ','.join(label.name
                                             for label in job.dependencies)
+        job_dict['keyvals'] = dict((keyval.key, keyval.value)
+                                   for keyval in job.keyvals)
         job_dicts.append(job_dict)
     return rpc_utils.prepare_for_serialization(job_dicts)
 
diff --git a/frontend/afe/rpc_interface_unittest.py b/frontend/afe/rpc_interface_unittest.py
index 822ec1f..c84dacf 100755
--- a/frontend/afe/rpc_interface_unittest.py
+++ b/frontend/afe/rpc_interface_unittest.py
@@ -93,6 +93,18 @@
         self._check_hostnames(hosts, ['host2'])
 
 
+    def test_job_keyvals(self):
+        keyval_dict = {'mykey': 'myvalue'}
+        job_id = rpc_interface.create_job(name='test', priority='Medium',
+                                          control_file='foo',
+                                          control_type='Client',
+                                          hosts=['host1'],
+                                          keyvals=keyval_dict)
+        jobs = rpc_interface.get_jobs(id=job_id)
+        self.assertEquals(len(jobs), 1)
+        self.assertEquals(jobs[0]['keyvals'], keyval_dict)
+
+
     def test_get_jobs_summary(self):
         job = self._create_job(hosts=xrange(1, 4))
         entries = list(job.hostqueueentry_set.all())
diff --git a/frontend/migrations/047_job_keyvals.py b/frontend/migrations/047_job_keyvals.py
new file mode 100644
index 0000000..d587fd8
--- /dev/null
+++ b/frontend/migrations/047_job_keyvals.py
@@ -0,0 +1,28 @@
+UP_SQL = """
+CREATE TABLE `afe_job_keyvals` (
+    `id` integer AUTO_INCREMENT NOT NULL PRIMARY KEY,
+    `job_id` integer NOT NULL,
+    INDEX `afe_job_keyvals_job_id` (`job_id`),
+    FOREIGN KEY (`job_id`) REFERENCES `afe_jobs` (`id`) ON DELETE NO ACTION,
+    `key` varchar(90) NOT NULL,
+    INDEX `afe_job_keyvals_key` (`key`),
+    `value` varchar(300) NOT NULL
+) ENGINE=InnoDB;
+
+CREATE TABLE `tko_job_keyvals` (
+    `id` integer AUTO_INCREMENT NOT NULL PRIMARY KEY,
+    `job_id` int(10) unsigned NOT NULL,
+    INDEX `tko_job_keyvals_job_id` (`job_id`),
+    FOREIGN KEY (`job_id`) REFERENCES `tko_jobs` (`job_idx`)
+        ON DELETE NO ACTION,
+    `key` varchar(90) NOT NULL,
+    INDEX `tko_job_keyvals_key` (`key`),
+    `value` varchar(300) NOT NULL
+) ENGINE=InnoDB;
+"""
+
+
+DOWN_SQL = """
+DROP TABLE afe_job_keyvals;
+DROP TABLE tko_job_keyvals;
+"""
diff --git a/frontend/tko/models.py b/frontend/tko/models.py
index 61b24af..429473d 100644
--- a/frontend/tko/models.py
+++ b/frontend/tko/models.py
@@ -155,6 +155,16 @@
         db_table = 'tko_jobs'
 
 
+class JobKeyval(dbmodels.Model):
+    job = dbmodels.ForeignKey(Job)
+    key = dbmodels.CharField(max_length=90)
+    value = dbmodels.CharField(blank=True, max_length=300)
+
+
+    class Meta:
+        db_table = 'tko_job_keyvals'
+
+
 class Test(dbmodels.Model, model_logic.ModelExtensions,
            model_logic.ModelWithAttributes):
     test_idx = dbmodels.AutoField(primary_key=True)
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index f4cb804..4164732 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -2097,7 +2097,8 @@
 
     def prolog(self):
         queued_key, queued_time = self._job_queued_keyval(self.job)
-        keyval_dict = {queued_key: queued_time}
+        keyval_dict = self.job.keyval_dict()
+        keyval_dict[queued_key] = queued_time
         group_name = self.queue_entries[0].get_group_name()
         if group_name:
             keyval_dict['host_group_name'] = group_name
@@ -3214,6 +3215,10 @@
         self._owner_model = None # caches model instance of owner
 
 
+    def model(self):
+        return models.Job.objects.get(id=self.id)
+
+
     def owner_model(self):
         # work around the fact that the Job owner field is a string, not a
         # foreign key
@@ -3250,6 +3255,10 @@
                 queue_entry.set_status(status)
 
 
+    def keyval_dict(self):
+        return self.model().keyval_dict()
+
+
     def _atomic_and_has_started(self):
         """
         @returns True if any of the HostQueueEntries associated with this job
@@ -3579,8 +3588,7 @@
 
     def request_abort(self):
         """Request that this Job be aborted on the next scheduler cycle."""
-        self_model = models.Job.objects.get(id=self.id)
-        self_model.abort()
+        self.model().abort()
 
 
     def schedule_delayed_callback_task(self, queue_entry):
diff --git a/scheduler/monitor_db_functional_test.py b/scheduler/monitor_db_functional_test.py
index 5ee0fee..f0e1e10 100755
--- a/scheduler/monitor_db_functional_test.py
+++ b/scheduler/monitor_db_functional_test.py
@@ -1019,7 +1019,15 @@
 
 
     def test_pre_job_keyvals(self):
-        self.test_simple_job()
+        job = self._create_job(hosts=[1])
+        job.run_verify = False
+        job.reboot_before = models.RebootBefore.NEVER
+        job.save()
+        models.JobKeyval.objects.create(job=job, key='mykey', value='myvalue')
+
+        self._run_dispatcher()
+        self._finish_job(job.hostqueueentry_set.all()[0])
+
         attached_files = self.mock_drone_manager.attached_files(
                 '1-my_user/host1')
         job_keyval_path = '1-my_user/host1/keyval'
@@ -1028,6 +1036,7 @@
         keyval_dict = dict(line.strip().split('=', 1)
                            for line in keyval_contents.splitlines())
         self.assert_('job_queued' in keyval_dict, keyval_dict)
+        self.assertEquals(keyval_dict['mykey'], 'myvalue')
 
 
 if __name__ == '__main__':
diff --git a/tko/db.py b/tko/db.py
index c170fe9..9b9f456 100644
--- a/tko/db.py
+++ b/tko/db.py
@@ -139,8 +139,42 @@
         return self.cur.fetchall()[0][0]
 
 
-    def select(self, fields, table, where, wherein={},
-               distinct = False, group_by = None, max_rows = None):
+    def _quote(self, field):
+        return '`%s`' % field
+
+
+    def _where_clause(self, where):
+        if not where:
+            return '', []
+
+        if isinstance(where, dict):
+            # key/value pairs (which should be equal, or None for null)
+            keys, values = [], []
+            for field, value in where.iteritems():
+                quoted_field = self._quote(field)
+                if value is None:
+                    keys.append(quoted_field + ' is null')
+                else:
+                    keys.append(quoted_field + '=%s')
+                    values.append(value)
+            where_clause = ' and '.join(keys)
+        elif isinstance(where, basestring):
+            # the exact string
+            where_clause = where
+            values = []
+        elif isinstance(where, tuple):
+            # preformatted where clause + values
+            where_clause, values = where
+            assert where_clause
+        else:
+            raise ValueError('Invalid "where" value: %r' % where)
+
+        return ' WHERE ' + where_clause, values
+
+
+
+    def select(self, fields, table, where, distinct=False, group_by=None,
+               max_rows=None):
         """\
                 This selects all the fields requested from a
                 specific table with a particular where clause.
@@ -162,31 +196,8 @@
             cmd.append('distinct')
         cmd += [fields, 'from', table]
 
-        values = []
-        if where and isinstance(where, types.DictionaryType):
-            # key/value pairs (which should be equal, or None for null)
-            keys, values = [], []
-            for field, value in where.iteritems():
-                if value is None:
-                    keys.append(field + ' is null')
-                else:
-                    keys.append(field + '=%s')
-                    values.append(value)
-            cmd.append(' where ' + ' and '.join(keys))
-        elif where and isinstance(where, types.StringTypes):
-            # the exact string
-            cmd.append(' where ' + where)
-        elif where and isinstance(where, types.TupleType):
-            # preformatted where clause + values
-            (sql, vals) = where
-            values = vals
-            cmd.append(' where (%s) ' % sql)
-
-        # TODO: this assumes there's a where clause...bad
-        if wherein and isinstance(wherein, types.DictionaryType):
-            keys_in = ["%s in (%s) " % (field, ','.join(where))
-                       for field, where in wherein.iteritems()]
-            cmd.append(' and '+' and '.join(keys_in))
+        where_clause, values = self._where_clause(where)
+        cmd.append(where_clause)
 
         if group_by:
             cmd.append(' GROUP BY ' + group_by)
@@ -252,8 +263,9 @@
         fields = data.keys()
         refs = ['%s' for field in fields]
         values = [data[field] for field in fields]
-        cmd = 'insert into %s (%s) values (%s)' % \
-                        (table, ','.join(fields), ','.join(refs))
+        cmd = ('insert into %s (%s) values (%s)' %
+               (table, ','.join(self._quote(field) for field in fields),
+                ','.join(refs)))
         self.dprint('%s %s' % (cmd, values))
 
         self._exec_sql_with_commit(cmd, values, commit)
@@ -263,10 +275,8 @@
         cmd = ['delete from', table]
         if commit is None:
             commit = self.autocommit
-        if where and isinstance(where, types.DictionaryType):
-            keys = [field + '=%s' for field in where.keys()]
-            values = [where[field] for field in where.keys()]
-            cmd += ['where', ' and '.join(keys)]
+        where_clause, values = self._where_clause(where)
+        cmd.append(where_clause)
         sql = ' '.join(cmd)
         self.dprint('%s %s' % (sql, values))
 
@@ -284,13 +294,12 @@
             commit = self.autocommit
         cmd = 'update %s ' % table
         fields = data.keys()
-        data_refs = [field + '=%s' for field in fields]
+        data_refs = [self._quote(field) + '=%s' for field in fields]
         data_values = [data[field] for field in fields]
         cmd += ' set ' + ', '.join(data_refs)
 
-        where_keys = [field + '=%s' for field in where.keys()]
-        where_values = [where[field] for field in where.keys()]
-        cmd += ' where ' + ' and '.join(where_keys)
+        where_clause, where_values = self._where_clause(where)
+        cmd += where_clause
 
         values = data_values + where_values
         self.dprint('%s %s' % (cmd, values))
@@ -338,10 +347,23 @@
         else:
             self.insert('tko_jobs', data, commit=commit)
             job.index = self.get_last_autonumber_value()
+        self.update_job_keyvals(job, commit=commit)
         for test in job.tests:
             self.insert_test(job, test, commit=commit)
 
 
+    def update_job_keyvals(self, job, commit=None):
+        for key, value in job.keyval_dict.iteritems():
+            where = {'job_id': job.index, 'key': key}
+            data = dict(where, value=value)
+            exists = self.select('id', 'tko_job_keyvals', where=where)
+
+            if exists:
+                self.update('tko_job_keyvals', data, where=where, commit=commit)
+            else:
+                self.insert('tko_job_keyvals', data, commit=commit)
+
+
     def insert_test(self, job, test, commit = None):
         kver = self.insert_kernel(test.kernel, commit=commit)
         data = {'job_idx':job.index, 'test':test.testname,
diff --git a/tko/frontend.py b/tko/frontend.py
index cbc328d..9033c20 100644
--- a/tko/frontend.py
+++ b/tko/frontend.py
@@ -200,12 +200,12 @@
 
 class test:
     @classmethod
-    def select(klass, db, where = {}, wherein = {}, distinct = False):
+    def select(klass, db, where={}, distinct=False):
         fields = ['test_idx', 'job_idx', 'test', 'subdir',
                   'kernel_idx', 'status', 'reason', 'machine_idx']
         tests = []
         for row in db.select(','.join(fields), 'tko_tests', where,
-                             wherein,distinct):
+                             distinct):
             tests.append(klass(db, *row))
         return tests
 
diff --git a/tko/models.py b/tko/models.py
index 0842e18..bc70074 100644
--- a/tko/models.py
+++ b/tko/models.py
@@ -7,7 +7,7 @@
 class job(object):
     def __init__(self, dir, user, label, machine, queued_time, started_time,
                  finished_time, machine_owner, machine_group, aborted_by,
-                 aborted_on):
+                 aborted_on, keyval_dict):
         self.dir = dir
         self.tests = []
         self.user = user
@@ -20,6 +20,7 @@
         self.machine_group = machine_group
         self.aborted_by = aborted_by
         self.aborted_on = aborted_on
+        self.keyval_dict = keyval_dict
 
 
     @staticmethod
diff --git a/tko/parsers/version_0.py b/tko/parsers/version_0.py
index f1be7b4..10daf19 100644
--- a/tko/parsers/version_0.py
+++ b/tko/parsers/version_0.py
@@ -36,7 +36,7 @@
                 "queued_time": queued_time, "started_time": started_time,
                 "finished_time": finished_time, "machine_owner": machine_owner,
                 "machine_group": machine_group, "aborted_by": aborted_by,
-                "aborted_on": aborted_at}
+                "aborted_on": aborted_at, "keyval_dict": keyval}
 
 
     @classmethod