Add support for joining specific test attributes to the TKO get_test_view() method and friends (including grouping friends). This required a number of cleanups to the code in general. The two big changes were:
* making application of "presentation" arguments (sorting and paging) optional in query_objects() and separately applicables
* getting rid of the explicit passing of extra_select_fields in grouping code, and using Django query builtin extra selects instead (i.e. query.extra(select=...))
Signed-off-by: Steve Howard <showard@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@3806 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/new_tko/tko/models.py b/new_tko/tko/models.py
index 2873b31..8d0cfb1 100644
--- a/new_tko/tko/models.py
+++ b/new_tko/tko/models.py
@@ -21,28 +21,11 @@
return field_names
- def _get_group_query_sql(self, query, group_by, extra_select_fields):
- group_fields = self._get_field_names(group_by, extra_select_fields)
-
- # Clone the queryset, so that we don't change the original
- query = query.all()
-
- # In order to use query.extra(), we need to first clear the limits
- # and then add them back in after the extra
- low = query.query.low_mark
- high = query.query.high_mark
- query.query.clear_limits()
-
- select_fields = dict(
- (field_name, self._get_key_unless_is_function(field_sql))
- for field_name, field_sql in extra_select_fields.iteritems())
- query = query.extra(select=select_fields)
-
- query.query.set_limits(low=low, high=high)
-
+ def _get_group_query_sql(self, query, group_by):
sql, params = query.query.as_sql()
# insert GROUP BY clause into query
+ group_fields = self._get_field_names(group_by, query.query.extra_select)
group_by_clause = ' GROUP BY ' + ', '.join(group_fields)
group_by_position = sql.rfind('ORDER BY')
if group_by_position == -1:
@@ -55,24 +38,21 @@
def _get_column_names(self, cursor):
- """\
+ """
Gets the column names from the cursor description. This method exists
- so that it can be mocked in the unit test for sqlite3 compatibility."
+ so that it can be mocked in the unit test for sqlite3 compatibility.
"""
return [column_info[0] for column_info in cursor.description]
- def execute_group_query(self, query, group_by, extra_select_fields=[]):
+ def execute_group_query(self, query, group_by):
"""
Performs the given query grouped by the fields in group_by with the
- given extra select fields added. extra_select_fields should be a dict
- mapping field alias to field SQL. Usually, the extra fields will use
- group aggregation functions. Returns a list of dicts, where each dict
- corresponds to single row and contains a key for each grouped field as
- well as all of the extra select fields.
+ given query's extra select fields added. Returns a list of dicts, where
+ each dict corresponds to single row and contains a key for each grouped
+ field as well as all of the extra select fields.
"""
- sql, params = self._get_group_query_sql(query, group_by,
- extra_select_fields)
+ sql, params = self._get_group_query_sql(query, group_by)
cursor = readonly_connection.connection().cursor()
cursor.execute(sql, params)
field_names = self._get_column_names(cursor)
@@ -395,6 +375,11 @@
exclude=True)
joined = True
+ test_attributes = filter_data.pop('test_attributes', [])
+ for attribute in test_attributes:
+ query_set = self.join_attribute(query_set, attribute)
+ joined = True
+
if not joined:
filter_data['no_distinct'] = True
@@ -408,8 +393,10 @@
return query_set
- def query_test_ids(self, filter_data):
- dicts = self.model.query_objects(filter_data).values('test_idx')
+ def query_test_ids(self, filter_data, apply_presentation=True):
+ query = self.model.query_objects(filter_data,
+ apply_presentation=apply_presentation)
+ dicts = query.values('test_idx')
return [item['test_idx'] for item in dicts]
@@ -477,6 +464,26 @@
return query_set
+ def join_attribute(self, test_view_query_set, attribute):
+ """
+ Join the given TestView QuerySet to TestAttribute. The resulting query
+ has an additional column for the given attribute named
+ "attribute_<attribute name>".
+ """
+ table_name = TestAttribute._meta.db_table
+ suffix = '_' + attribute
+ alias = table_name + suffix
+ condition = "%s.attribute = '%s'" % (alias,
+ self.escape_user_sql(attribute))
+ query_set = self.add_join(test_view_query_set, table_name,
+ join_key='test_idx', join_condition=condition,
+ suffix=suffix, force_left_join=True)
+
+ select_name = 'attribute_' + attribute
+ query_set = query_set.extra(select={select_name: '%s.value' % alias})
+ return query_set
+
+
class TestView(dbmodels.Model, model_logic.ModelExtensions):
extra_fields = {
'DATE(job_queued_time)': 'job queued day',
@@ -536,20 +543,13 @@
@classmethod
- def query_objects(cls, filter_data, initial_query=None):
+ def query_objects(cls, filter_data, initial_query=None,
+ apply_presentation=True):
if initial_query is None:
initial_query = cls.objects.get_query_set_with_joins(filter_data)
- return super(TestView, cls).query_objects(filter_data,
- initial_query=initial_query)
-
-
- @classmethod
- def list_objects(cls, filter_data, initial_query=None, fields=None):
- # include extra fields
- if fields is None:
- fields = cls.get_field_dict().keys() + cls.extra_fields.keys()
- return super(TestView, cls).list_objects(filter_data, initial_query,
- fields)
+ return super(TestView, cls).query_objects(
+ filter_data, initial_query=initial_query,
+ apply_presentation=apply_presentation)
class Meta: