[autotest] Make global scheduler not execute hqes of shards
The scheduler today just executes all hqes that are in the database.
With shards this behaviour is not wanted, it should only execute
the hqes that are not meant to be executed by a shard.
This changes adds a condition, so the scheduler won't pick up hqes
with meta_hosts, that have an entry in afe_shards_labels.
TEST=Ran suites + Ran dummy, ensured only hostless job is executed.
DEPLOY=scheduler,host_scheduler
CQ-DEPEND=CL:211214
Change-Id: I85b14acb24bb24e9a14d5db2d42c6611fd15fc02
Reviewed-on: https://chromium-review.googlesource.com/214656
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Jakob Jülich <jakobjuelich@chromium.org>
Tested-by: Jakob Jülich <jakobjuelich@chromium.org>
diff --git a/scheduler/query_managers.py b/scheduler/query_managers.py
index e8f9f98..26ccfce 100644
--- a/scheduler/query_managers.py
+++ b/scheduler/query_managers.py
@@ -19,6 +19,7 @@
from autotest_lib.scheduler import scheduler_lib
+_job_timer = stats.Timer('scheduler.job_query_manager')
class AFEJobQueryManager(object):
"""Query manager for AFE Jobs."""
@@ -26,6 +27,7 @@
hostless_query = 'host_id IS NULL AND meta_host IS NULL'
+ @_job_timer.decorate
def get_pending_queue_entries(self, only_hostless=False):
"""
Fetch a list of new host queue entries.
@@ -72,15 +74,40 @@
'ISNULL(meta_host), '
'parent_job_id, '
'job_id')
+ # Don't execute jobs that should be executed by a shard in the global
+ # scheduler.
+ # This won't prevent the shard scheduler to run this, as the shard db
+ # doesn't have an an entry in afe_shards_labels.
query=('NOT complete AND NOT active AND status="Queued"'
- 'AND NOT aborted')
+ 'AND NOT aborted AND afe_shards_labels.id IS NULL')
+
+ # TODO(jakobjuelich, beeps): Optimize this query. Details:
+ # Compressed output of EXPLAIN <query>:
+ # +------------------------+--------+-------------------------+-------+
+ # | table | type | key | rows |
+ # +------------------------+--------+-------------------------+-------+
+ # | afe_host_queue_entries | ref | host_queue_entry_status | 30536 |
+ # | afe_shards_labels | ref | shard_label_id_fk | 1 |
+ # | afe_jobs | eq_ref | PRIMARY | 1 |
+ # +------------------------+--------+-------------------------+-------+
+ # This shows the first part of the query fetches a lot of objects, that
+ # are then filtered. The joins are comparably fast: There's usually just
+ # one or none shard mapping that can be answered fully using an index
+ # (shard_label_id_fk), similar thing applies to the job.
+ #
+ # This works for now, but once O(#Jobs in shard) << O(#Jobs in Queued),
+ # it might be more efficient to filter on the meta_host first, instead
+ # of the status.
if only_hostless:
query = '%s AND (%s)' % (query, self.hostless_query)
return list(scheduler_models.HostQueueEntry.fetch(
- joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
+ joins=('INNER JOIN afe_jobs ON (job_id=afe_jobs.id) '
+ 'LEFT JOIN afe_shards_labels ON ('
+ 'meta_host=afe_shards_labels.label_id)'),
where=query, order_by=sort_order))
+ @_job_timer.decorate
def get_prioritized_special_tasks(self, only_tasks_with_leased_hosts=False):
"""
Returns all queued SpecialTasks prioritized for repair first, then
@@ -143,7 +170,7 @@
'id', 'job_id', 'host_id'))
-_timer = stats.Timer('scheduler.host_query_manager')
+_host_timer = stats.Timer('scheduler.host_query_manager')
class AFEHostQueryManager(object):
"""Query manager for AFE Hosts."""
@@ -177,7 +204,7 @@
return self._process_many2many_dict(rows, flip)
- @_timer.decorate
+ @_host_timer.decorate
def _get_ready_hosts(self):
# We don't lose anything by re-doing these checks
# even though we release hosts on the same conditions.
@@ -192,7 +219,7 @@
return dict((host.id, host) for host in hosts)
- @_timer.decorate
+ @_host_timer.decorate
def _get_job_acl_groups(self, job_ids):
query = """
SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
@@ -205,7 +232,7 @@
return self._get_many2many_dict(query, job_ids)
- @_timer.decorate
+ @_host_timer.decorate
def _get_job_ineligible_hosts(self, job_ids):
query = """
SELECT job_id, host_id
@@ -215,7 +242,7 @@
return self._get_many2many_dict(query, job_ids)
- @_timer.decorate
+ @_host_timer.decorate
def _get_job_dependencies(self, job_ids):
query = """
SELECT job_id, label_id
@@ -224,7 +251,7 @@
"""
return self._get_many2many_dict(query, job_ids)
- @_timer.decorate
+ @_host_timer.decorate
def _get_host_acls(self, host_ids):
query = """
SELECT host_id, aclgroup_id
@@ -234,7 +261,7 @@
return self._get_many2many_dict(query, host_ids)
- @_timer.decorate
+ @_host_timer.decorate
def _get_label_hosts(self, host_ids):
if not host_ids:
return {}, {}
@@ -280,7 +307,7 @@
"OR afe_hosts.status = 'Ready')")
- @_timer.decorate
+ @_host_timer.decorate
def set_leased(self, leased_value, **kwargs):
"""Modify the leased bit on the hosts with ids in host_ids.
@@ -293,7 +320,7 @@
models.Host.objects.filter(**kwargs).update(leased=leased_value)
- @_timer.decorate
+ @_host_timer.decorate
def _get_labels(self, job_dependencies):
"""
Calculate a dict mapping label id to label object so that we don't
@@ -324,7 +351,7 @@
return id_to_label
- @_timer.decorate
+ @_host_timer.decorate
def refresh(self, pending_queue_entries):
"""Update the query manager.