[autotest] RDB Refactor II + Request/Response API.
Scheduler Refactor:
1. Batched processing of jobs.
2. Rdb hits the database instead of going through host_scheduler.
3. Migration to add a leased column.The scheduler released hosts
every tick, back to the rdb.
4. Client rdb host that queue_entries use to track a host, instead
of a database model.
Establishes a basic request/response api for the rdb:
rdb_utils:
1. Requests: Assert the format and fields of some basic request types.
2. Helper client/server modules to communicate with the rdb.
rdb_lib:
1. Request managers for rdb methods:
a. Match request-response
b. Abstract the batching of requests.
2. JobQueryManager: Regulates database access for job information.
rdb:
1. QueryManagers: Regulate database access
2. RequestHandlers: Use query managers to get things done.
3. Dispatchers: Send incoming requests to the appropriate handlers.
Ignores wire formats.
TEST=unittests, functional verification.
BUG=chromium:314081, chromium:314083, chromium:314084
DEPLOY=scheduler, migrate
Change-Id: Id174c663c6e78295d365142751053eae4023116d
Reviewed-on: https://chromium-review.googlesource.com/183385
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/agent_task.py b/scheduler/agent_task.py
index 7cd28f6..87ec371 100644
--- a/scheduler/agent_task.py
+++ b/scheduler/agent_task.py
@@ -116,6 +116,7 @@
from autotest_lib.scheduler import drone_manager, pidfile_monitor
from autotest_lib.client.common_lib import utils
from autotest_lib.scheduler import email_manager, host_scheduler
+from autotest_lib.scheduler import rdb_lib
from autotest_lib.scheduler import scheduler_models
from autotest_lib.server import autoserv_utils
@@ -513,11 +514,13 @@
assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
- self.host = scheduler_models.Host(id=task.host.id)
+ self.host = rdb_lib.get_hosts([task.host.id])[0]
+ self.host.dbg_str = 'Task: %s' % str(task)
self.queue_entry = None
if task.queue_entry:
self.queue_entry = scheduler_models.HostQueueEntry(
id=task.queue_entry.id)
+ self.host.dbg_str += self.queue_entry.get_dbg_str()
self.task = task
self._extra_command_args = extra_command_args
diff --git a/scheduler/host_scheduler.py b/scheduler/host_scheduler.py
index 7fe4e4e..6d933a6 100644
--- a/scheduler/host_scheduler.py
+++ b/scheduler/host_scheduler.py
@@ -1,5 +1,4 @@
-"""
-Autotest client module for the rdb.
+"""Autotest host scheduler.
"""
@@ -8,7 +7,7 @@
from autotest_lib.client.common_lib import global_config, utils
from autotest_lib.frontend.afe import models
from autotest_lib.scheduler import metahost_scheduler, scheduler_config
-from autotest_lib.scheduler import rdb, scheduler_models
+from autotest_lib.scheduler import scheduler_models
from autotest_lib.site_utils.graphite import stats
from autotest_lib.server.cros import provision
@@ -59,7 +58,12 @@
@_timer.decorate
- def _get_ready_hosts(self):
+ def _release_hosts(self):
+ """Release hosts to the RDB.
+
+ Release all hosts that are ready and are currently not being used by an
+ active hqe, and don't have a new special task scheduled against them.
+ """
# Avoid any host with a currently active queue entry against it.
hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '
'ON (afe_hosts.id = active_hqe.host_id AND '
@@ -79,10 +83,27 @@
hosts = scheduler_models.Host.fetch(
joins='%s %s' % (hqe_join, special_task_join),
where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "
+ "AND afe_hosts.leased "
"AND NOT afe_hosts.locked "
"AND (afe_hosts.status IS NULL "
"OR afe_hosts.status = 'Ready')")
+ for ready_host in hosts:
+ ready_host.update_field('leased', 0)
+
+
+ @_timer.decorate
+ def _get_ready_hosts(self):
+ # We don't lose anything by re-doing these checks
+ # even though we release hosts on the same conditions.
+ # In the future we might have multiple clients that
+ # release_hosts and/or lock them independent of the
+ # scheduler tick.
+ hosts = scheduler_models.Host.fetch(
+ where="NOT afe_hosts.leased "
+ "AND NOT afe_hosts.locked "
+ "AND (afe_hosts.status IS NULL "
+ "OR afe_hosts.status = 'Ready')")
return dict((host.id, host) for host in hosts)
@@ -221,8 +242,7 @@
@_timer.decorate
def tick(self):
- for metahost_scheduler in self._metahost_schedulers:
- metahost_scheduler.tick()
+ self._release_hosts()
def hosts_in_label(self, label_id):
@@ -375,100 +395,6 @@
return True
- def get_job_info(self, queue_entry):
- """
- Extract job information from a queue_entry/host-scheduler.
-
- Unfortunately the information needed to choose hosts for a job
- are split across several tables and not restricted to just the
- hqe. At the very least we require the deps and acls of the job, but
- we also need to know if the job has a host assigned to it. This method
- consolidates this information into a light-weight dictionary that the
- host_scheduler and the rdb can pass back and forth.
-
- @param queue_entry: the queue_entry of the job we would like
- information about.
-
- @return: A dictionary containing 1. A set of deps 2. A set of acls
- 3. The host id of the host assigned to the hqe, or None.
- """
- job_id = queue_entry.job_id
- host_id = queue_entry.host_id
- job_deps = self._job_dependencies.get(job_id, set())
- job_deps = set([dep for dep in job_deps if
- not provision.can_provision(self._labels[dep].name)])
- job_acls = self._job_acls.get(job_id, set())
-
- return {'deps': set(job_deps),
- 'acls': set(job_acls),
- 'host_id': host_id}
-
-
- def schedule_entry(self, queue_entry):
- """
- Schedule a hqe aginst a host.
-
- A hqe can either have a host assigned to it or not. In eithercase
- however, actually scheduling the hqe on the host involves validating
- the assignment by checking acls and labels. If the hqe doesn't have a
- host we need to find a host before we can perform this validation.
-
- If we successfully validate the host->hqe pairing, return the host. The
- scheduler will not begin scheduling special tasks for the hqe until it
- acquires a valid host.
-
- @param queue_entry: The queue_entry that requires a host.
- @return: The host assigned to the hqe, if any.
- """
- host_id = queue_entry.host_id
- job_id = queue_entry.job_id
- job_info = self.get_job_info(queue_entry)
- host = None
-
- if host_id:
- host = self._hosts_available.get(host_id, None)
-
- # TODO(beeps): Remove the need for 2 rdb calls. Ideally we should
- # just do one call to validate the assignment, however, since we're
- # currently still using the host_scheduler, we'd need to pass it
- # as an argument to validate_host_assignment, which is less clean
- # than just splitting this work into 2 calls.
- host_info = rdb.get_host_info(self, host_id)
-
- # If the host is either unavailable or in-eligible for this job,
- # defer scheduling this queue_entry till the next tick.
- if (host is None or not
- rdb.validate_host_assignment(job_info, host_info)):
- return None
- else:
- host = rdb.get_host(self, job_info)
- if host is None:
- return None
- queue_entry.set_host(host)
-
- # TODO(beeps): Make it so we don't need to set the hqe active status
- # to remove a host from the active pool.
- # A host will remain in the available pool for as long as its status
- # is Ready and it is not referenced by an active hqe. The state of
- # the host is not under our control, as it will only change to
- # resetting etc whenever the prejob task starts. However, the hqe
- # is theoretically active from the moment we assign a healthy host
- # to it. Setting the host on an inactive hqe will not remove it
- # from the available pool, leading to unnecessary scheduling
- # overhead.
- # Without this, we will process each hqe twice because it is still
- # judged as 'new', and perform the host<->hqe assignment twice,
- # because the host assigned to the hqe is still 'available', as
- # the first prejob task only runs at the end of the next tick's
- # handle_agents call. Note that the status is still 'Queued', and
- # will remaing 'Queued' till an agent changes it.
- queue_entry.update_field('active', True)
-
- # The available_hosts dictionary determines our scheduling decisions
- # for subsequent jobs processed in this tick.
- self._hosts_available.pop(host.id)
- logging.debug('Scheduling job: %s, Host %s', job_id, host.id)
- return host
def find_eligible_atomic_group(self, queue_entry):
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 3cacbfa..4c320e7 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -22,6 +22,8 @@
from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
from autotest_lib.scheduler import postjob_task, scheduler_logging_config
+from autotest_lib.scheduler import rdb_lib
+from autotest_lib.scheduler import rdb_utils
from autotest_lib.scheduler import scheduler_models
from autotest_lib.scheduler import status_server, scheduler_config
from autotest_lib.server import autoserv_utils
@@ -817,10 +819,49 @@
def _schedule_hostless_job(self, queue_entry):
+ """Schedule a hostless (suite) job.
+
+ @param queue_entry: The queue_entry representing the hostless job.
+ """
self.add_agent_task(HostlessQueueTask(queue_entry))
queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
+ def _schedule_host_job(self, host, queue_entry):
+ """Schedules a job on the given host.
+
+ 1. Assign the host to the hqe, if it isn't already assigned.
+ 2. Create a SpecialAgentTask for the hqe.
+ 3. Activate the hqe.
+
+ @param queue_entry: The job to schedule.
+ @param host: The host to schedule the job on.
+ """
+ if self.host_has_agent(host):
+ host_agent_task = list(self._host_agents.get(host.id))[0].task
+ subject = 'Host with agents assigned to an HQE'
+ message = ('HQE: %s assigned host %s, but the host has '
+ 'agent: %s for queue_entry %s. The HQE '
+ 'will remain in a queued state till the '
+ 'the host is usable.' %
+ (queue_entry, host.hostname, host_agent_task,
+ host_agent_task.queue_entry))
+ email_manager.manager.enqueue_notify_email(subject, message)
+ queue_entry.set_host(None)
+ queue_entry.update_field('active', False)
+ else:
+ if queue_entry.host_id is None:
+ queue_entry.set_host(host)
+ else:
+ if host.id != queue_entry.host_id:
+ raise rdb_utils.RDBException('The rdb returned host: %s '
+ 'but the job:%s was already assigned a host: %s. ' %
+ (host.hostname, queue_entry.job_id,
+ queue_entry.host.hostname))
+ queue_entry.update_field('active', True)
+ self._run_queue_entry(queue_entry)
+
+
def _schedule_new_jobs(self):
"""
Find any new HQEs and call schedule_pre_job_tasks for it.
@@ -833,55 +874,31 @@
"""
queue_entries = self._refresh_pending_queue_entries()
+ key = 'scheduler.jobs_per_tick'
new_hostless_jobs = 0
- new_atomic_groups = 0
new_jobs_with_hosts = 0
new_jobs_need_hosts = 0
-
+ host_jobs = []
logging.debug('Processing %d queue_entries', len(queue_entries))
- for queue_entry in queue_entries:
- self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
- is_unassigned_atomic_group = (
- queue_entry.atomic_group_id is not None
- and queue_entry.host_id is None)
+ for queue_entry in queue_entries:
if queue_entry.is_hostless():
- self._log_extra_msg('Scheduling hostless job.')
self._schedule_hostless_job(queue_entry)
new_hostless_jobs = new_hostless_jobs + 1
- elif is_unassigned_atomic_group:
- self._schedule_atomic_group(queue_entry)
- new_atmoic_groups = new_atomic_groups + 1
else:
+ host_jobs.append(queue_entry)
new_jobs_need_hosts = new_jobs_need_hosts + 1
- assigned_host = self._host_scheduler.schedule_entry(queue_entry)
- if assigned_host:
- # If we ever find ourselves in a position where a ready host
- # has an agent, roll back the host assignment and try again
- # next tick.
- if self.host_has_agent(assigned_host):
- host_agent_task = [host_agent.task for host_agent in
- list(self._host_agents.get(
- assigned_host.id))][0]
- subject = 'Host with agents assigned to an HQE'
- message = ('HQE: %s assigned host %s, but the host has '
- 'agent: %s for queue_entry %s. The HQE '
- 'will remain in a queued state till the '
- 'the host is usable.' %
- (queue_entry, assigned_host.hostname,
- host_agent_task,
- host_agent_task.queue_entry))
- email_manager.manager.enqueue_notify_email(subject, message)
- queue_entry.set_host(None)
- queue_entry.update_field('active', False)
- else:
- assert assigned_host.id == queue_entry.host_id
- self._run_queue_entry(queue_entry)
- new_jobs_with_hosts = new_jobs_with_hosts + 1
- key = 'scheduler.jobs_per_tick'
stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
- stats.Gauge(key).send('new_atomic_groups', new_atomic_groups)
+ if not host_jobs:
+ return
+
+ hosts = rdb_lib.acquire_hosts(self._host_scheduler, host_jobs)
+ for host, queue_entry in zip(hosts, host_jobs):
+ if host:
+ self._schedule_host_job(host, queue_entry)
+ new_jobs_with_hosts = new_jobs_with_hosts + 1
+
stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
stats.Gauge(key).send('new_jobs_without_hosts',
new_jobs_need_hosts - new_jobs_with_hosts)
diff --git a/scheduler/monitor_db_functional_test.py b/scheduler/monitor_db_functional_test.py
index d5ace50..6efb58b 100755
--- a/scheduler/monitor_db_functional_test.py
+++ b/scheduler/monitor_db_functional_test.py
@@ -20,6 +20,8 @@
# it arises in an important query
_re_translator(r'GROUP_CONCAT\((.*?)\)', r'\1'),
_re_translator(r'TRUNCATE TABLE', 'DELETE FROM'),
+ _re_translator(r'ISNULL\(([a-z,_]+)\)',
+ r'ifnull(nullif(\1, NULL), \1) DESC'),
)
HqeStatus = models.HostQueueEntry.Status
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 42ecf7d..3ca8f50 100755
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -221,6 +221,7 @@
def _run_scheduler(self):
+ self._dispatcher._host_scheduler.tick()
for _ in xrange(2): # metahost scheduling can take two cycles
self._dispatcher._schedule_new_jobs()
diff --git a/scheduler/rdb.py b/scheduler/rdb.py
index 8dc3bcd..a0c5250 100644
--- a/scheduler/rdb.py
+++ b/scheduler/rdb.py
@@ -1,111 +1,328 @@
+"""Rdb server module.
"""
-Rdb server module.
-"""
+
+import collections
import logging
+
+import common
+
+from django.core import exceptions as django_exceptions
+from django.db.models import fields
+from django.db.models import Q
+from autotest_lib.frontend.afe import models
+from autotest_lib.scheduler import rdb_utils
from autotest_lib.site_utils.graphite import stats
_timer = stats.Timer('rdb')
-def _check_acls(job_acls, host_acls):
- if job_acls is None or host_acls is None:
- return False
- return len(host_acls.intersection(job_acls))
-
-def _check_deps(job_deps, host_labels):
- if job_deps is None or host_labels is None:
- return False
- return len(job_deps - host_labels) == 0
-
-
-def validate_host_assignment(job_info, host_info):
- """ Validate this job<->host pairing.
-
- @param job_info: Information about the job as determined by
- the client rdb module.
- @param host_info: Information about the host as determined by
- get_host_info.
-
- @return: True if the job<->host pairing is valid, False otherwise.
- False, if we don't have enough information to make a decision.
- """
- one_time_host = host_info.get('invalid') and job_info.get('host_id')
-
- return (_check_acls(job_info.get('acls'), host_info.get('acls')) and
- _check_deps(job_info.get('deps'), host_info.get('labels')) and
- not host_info.get('invalid', True) or one_time_host and
- not host_info.get('locked', True))
-
-
-def get_host_info(host_scheduler, host_id):
- """
- Utility method to parse information about a host into a dictionary.
-
- Ideally this can just return the Host object, but doing this has the
- following advantages:
- 1. Changes to the schema will only require changes to this method.
- 2. We can reimplement this method to make use of django caching.
- 3. We can lock rows of the host table in a centralized location.
-
- @param host_id: id of the host to get information about.
- @return: A dictionary containing all information needed to make a
- scheduling decision regarding this host.
- """
- acls = host_scheduler._host_acls.get(host_id, set())
- labels = host_scheduler._host_labels.get(host_id, set())
- host_info = {'labels': labels, 'acls': acls}
- host = host_scheduler._hosts_available.get(host_id)
- if host:
- host_info.update({'locked': host.locked, 'invalid': host.invalid})
- return host_info
-
-
-def _order_labels(host_scheduler, labels):
- """Given a list of labels, order them by available host count.
-
- To make a scheduling decision, we need a host that matches all dependencies
- of a job, hence the most restrictive search space we can use is the list
- of ready hosts that have the least frequent label.
-
- @param labels: A list of labels. If no hosts are available in a label,
- it will be the first in this list.
- """
- label_count = [len(host_scheduler._label_hosts.get(label, []))
- for label in labels]
- return [label_tuple[1] for label_tuple in sorted(zip(label_count, labels))]
-
-
-@_timer.decorate
-def get_host(host_scheduler, job_info):
- """
- Get a host matching the job's selection criterion.
-
- - Get all hosts in rarest label.
- - Check which ones are still usable.
- - Return the first of these hosts that passes our validity checks.
-
- @param job_info: A dictionary of job information needed to pick a host.
-
- @return: A host object from the available_hosts map.
+# Qeury managers: Provide a layer of abstraction over the database by
+# encapsulating common query patterns used by the rdb.
+class BaseHostQueryManager(object):
+ """Base manager for host queries on all hosts.
"""
- # A job must at least have one dependency (eg:'board:') in order for us to
- # find a host for it. To do so we use 2 data structures of host_scheduler:
- # - label to hosts map: to count label frequencies, and get hosts in a label
- # - hosts_available map: to mark a host as used, as it would be difficult
- # to delete this host from all the label keys it has, in the label to
- # hosts map.
- rarest_label = _order_labels(host_scheduler, job_info.get('deps'))[0]
+ host_objects = models.Host.objects
- # TODO(beeps): Once we have implemented locking in afe_hosts make this:
- # afe.models.Host.object.filter(locked).filter(acls).filter(labels)...
- # where labels are chained according to frequency. Currently this will
- # require a join across all hqes which could be expensive, and is
- # unnecessary anyway since we need to move away from this scheduling model.
- hosts_considered = host_scheduler._label_hosts.get(rarest_label, [])
- for host_id in hosts_considered:
- host = host_scheduler._hosts_available.get(host_id)
- host_info = get_host_info(host_scheduler, host_id)
- if host and validate_host_assignment(job_info, host_info):
- return host
+
+ def update_hosts(self, host_ids, **kwargs):
+ """Update fields on a hosts.
+
+ @param host_ids: A list of ids of hosts to update.
+ @param kwargs: A key value dictionary corresponding to column, value
+ in the host database.
+ """
+ self.host_objects.filter(id__in=host_ids).update(**kwargs)
+
+
+ @rdb_utils.return_rdb_host
+ def get_hosts(self, ids):
+ """Get host objects for the given ids.
+
+ @param ids: The ids for which we need host objects.
+
+ @returns: A list of RDBServerHostWrapper objects, ordered by host_id.
+ """
+ return self.host_objects.filter(id__in=ids).order_by('id')
+
+
+ @rdb_utils.return_rdb_host
+ def find_hosts(self, deps, acls):
+ """Finds valid hosts matching deps, acls.
+
+ @param deps: A list of dependencies to match.
+ @param acls: A list of acls, at least one of which must coincide with
+ an acl group the chosen host is in.
+
+ @return: A list of matching hosts available.
+ """
+ hosts_available = self.host_objects.filter(invalid=0)
+ queries = [Q(labels__id=dep) for dep in deps]
+ queries += [Q(aclgroup__id__in=acls)]
+ for query in queries:
+ hosts_available = hosts_available.filter(query)
+ return hosts_available
+
+
+class AvailableHostQueryManager(BaseHostQueryManager):
+ """Query manager for requests on un-leased, un-locked hosts.
+ """
+
+ host_objects = models.Host.leased_objects
+
+
+# Request Handlers: Used in conjunction with requests in rdb_utils, these
+# handlers acquire hosts for a request and record the acquisition in
+# an response_map dictionary keyed on the request itself, with the host/hosts
+# as values.
+class BaseHostRequestHandler(object):
+ """Handler for requests related to hosts, leased or unleased.
+
+ This class is only capable of blindly returning host information.
+ """
+
+ def __init__(self):
+ self.host_query_manager = BaseHostQueryManager()
+ self.response_map = {}
+
+
+ def update_response_map(self, request, response):
+ """Record a response for a request.
+
+ The response_map only contains requests that were either satisfied, or
+ that ran into an exception. Often this translates to reserving hosts
+ against a request. If the rdb hit an exception processing a request, the
+ exception gets recorded in the map for the client to reraise.
+
+ @param response: A response for the request.
+ @param request: The request that has reserved these hosts.
+
+ @raises RDBException: If an empty values is added to the map.
+ """
+ if not response:
+ raise rdb_utils.RDBException('response_map dict can only contain '
+ 'valid responses. Request %s, response %s is invalid.' %
+ (request, response))
+ if self.response_map.get(request):
+ raise rdb_utils.RDBException('Request %s already has response %s '
+ 'the rdb cannot return multiple '
+ 'responses for the same request.' %
+ (request, response))
+ self.response_map[request] = response
+
+
+ def _record_exceptions(self, request, exceptions):
+ """Record a list of exceptions for a request.
+
+ @param request: The request for which the exceptions were hit.
+ @param exceptions: The exceptions hit while processing the request.
+ """
+ rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions]
+ self.update_response_map(request, rdb_exceptions)
+
+
+ def get_response(self):
+ """Convert all RDBServerHostWrapper objects to host info dictionaries.
+
+ @return: A dictionary mapping requests to a list of matching host_infos.
+ """
+ for request, response in self.response_map.iteritems():
+ self.response_map[request] = [reply.wire_format()
+ for reply in response]
+ return self.response_map
+
+
+ def update_hosts(self, update_requests):
+ """Updates host tables with a payload.
+
+ @param update_requests: A list of update requests, as defined in
+ rdb_utils.UpdateHostRequest.
+ """
+ # Last payload for a host_id wins in the case of conflicting requests.
+ unique_host_requests = {}
+ for request in update_requests:
+ if unique_host_requests.get(request.host_id):
+ unique_host_requests[request.host_id].update(request.payload)
+ else:
+ unique_host_requests[request.host_id] = request.payload
+
+ # Batch similar payloads so we can do them in one table scan.
+ similar_requests = {}
+ for host_id, payload in unique_host_requests.iteritems():
+ similar_requests.setdefault(payload, []).append(host_id)
+
+ # If fields of the update don't match columns in the database,
+ # record the exception in the response map. This also means later
+ # updates will get applied even if previous updates fail.
+ for payload, hosts in similar_requests.iteritems():
+ try:
+ response = self.host_query_manager.update_hosts(hosts, **payload)
+ except (django_exceptions.FieldError,
+ fields.FieldDoesNotExist) as e:
+ for host in hosts:
+ # Since update requests have a consistent hash this will map
+ # to the same key as the original request.
+ request = rdb_utils.UpdateHostRequest(
+ host_id=host, payload=payload).get_request()
+ self._record_exceptions(request, [e])
+
+
+ def batch_get_hosts(self, host_requests):
+ """Get hosts matching the requests.
+
+ This method does not acquire the hosts, i.e it reserves hosts against
+ requests leaving their leased state untouched.
+
+ @param host_requests: A list of requests, as defined in
+ rdb_utils.BaseHostRequest.
+ """
+ host_ids = set([request.host_id for request in host_requests])
+ host_map = {}
+
+ # This list will not contain available hosts if executed using
+ # an AvailableHostQueryManager.
+ for host in self.host_query_manager.get_hosts(host_ids):
+ host_map[host.id] = host
+ for request in host_requests:
+ if request.host_id in host_map:
+ self.update_response_map(request, [host_map[request.host_id]])
+ else:
+ logging.warning('rdb could not get host for request: %s, it '
+ 'is already leased or locked', request)
+
+
+class AvailableHostRequestHandler(BaseHostRequestHandler):
+ """Handler for requests related to available (unleased and unlocked) hosts.
+
+ This class is capable of acquiring or validating hosts for requests.
+ """
+
+
+ def __init__(self):
+ self.host_query_manager = AvailableHostQueryManager()
+ self.response_map = {}
+
+
+ def lease_hosts(self, hosts):
+ """Leases a list hosts.
+
+ @param hosts: A list of hosts to lease.
+ """
+ requests = [rdb_utils.UpdateHostRequest(host_id=host.id,
+ payload={'leased': 1}).get_request() for host in hosts]
+ super(AvailableHostRequestHandler, self).update_hosts(requests)
+
+
+ @_timer.decorate
+ def batch_acquire_hosts(self, host_requests):
+ """Acquire hosts for a list of requests.
+
+ The act of acquisition involves finding and leasing a set of
+ hosts that match the parameters of a request. Each acquired
+ host is added to the response_map dictionary, as an
+ RDBServerHostWrapper.
+
+ @param host_requests: A list of requests to acquire hosts.
+ """
+ batched_host_request = collections.Counter(host_requests)
+ for request, count in batched_host_request.iteritems():
+ hosts = self.host_query_manager.find_hosts(
+ request.deps, request.acls)
+ num_hosts = min(len(hosts), count)
+ if num_hosts:
+ # TODO(beeps): Only reserve hosts we have successfully leased.
+ self.lease_hosts(hosts[:num_hosts])
+ self.update_response_map(request, hosts[:num_hosts])
+ if num_hosts < count:
+ logging.warning('%s Unsatisfied rdb acquisition request:%s ',
+ count-num_hosts, request)
+
+
+ @_timer.decorate
+ def batch_validate_hosts(self, requests):
+ """Validate requests with hosts.
+
+ Reserve all hosts, check each one for validity and discard invalid
+ request-host pairings. Lease the remaining hsots.
+
+ @param requests: A list of requests to validate.
+ """
+ # Multiple requests can have the same host (but different acls/deps),
+ # and multiple jobs can submit identical requests (same host_id,
+ # acls, deps). In both these cases the first request to check the host
+ # map wins, though in the second case it doesn't matter.
+ self.batch_get_hosts(set(requests))
+ for request in self.response_map.keys():
+ hosts = self.response_map[request]
+ if len(hosts) > 1:
+ raise rdb_utils.RDBException('Got multiple hosts for a single '
+ 'request. Hosts: %s, request %s.' % (hosts, request))
+ host = hosts[0]
+ if not ((request.acls.intersection(host.acls) or host.invalid) and
+ request.deps.intersection(host.labels) == request.deps):
+ if request.host_id != host.id:
+ raise rdb_utils.RDBException('Cannot assign a different '
+ 'host for requset: %s, it already has one: %s ' %
+ (request, host.id))
+ del self.response_map[request]
+ logging.warning('Failed rdb validation request:%s ', request)
+
+ # TODO(beeps): Update acquired hosts with failed leases.
+ self.lease_hosts([hosts[0] for hosts in self.response_map.values()])
+
+
+# Request dispatchers: Create the appropriate request handler, send a list
+# of requests to one of its methods. The corresponding request handler in
+# rdb_lib must understand how to match each request with a response from a
+# dispatcher, the easiest way to achieve this is to returned the response_map
+# attribute of the request handler, after making the appropriate requests.
+def get_hosts(host_requests):
+ """Get host information about the requested hosts.
+
+ @param host_requests: A list of requests as defined in BaseHostRequest.
+ @return: A dictionary mapping each request to a list of hosts.
+ """
+ rdb_handler = BaseHostRequestHandler()
+ rdb_handler.batch_get_hosts(host_requests)
+ return rdb_handler.get_response()
+
+
+def update_hosts(update_requests):
+ """Update hosts.
+
+ @param update_requests: A list of updates to host tables
+ as defined in UpdateHostRequest.
+ """
+ rdb_handler = BaseHostRequestHandler()
+ rdb_handler.update_hosts(update_requests)
+ return rdb_handler.get_response()
+
+
+def rdb_host_request_dispatcher(host_requests):
+ """Dispatcher for all host acquisition queries.
+
+ @param host_requests: A list of requests for acquiring hosts, as defined in
+ AcquireHostRequest.
+ @return: A dictionary mapping each request to a list of hosts, or
+ an empty list if none could satisfy the request. Eg:
+ {AcquireHostRequest.template: [host_info_dictionaries]}
+ """
+ validation_requests = []
+ require_hosts_requests = []
+
+ # Validation requests are made by a job scheduled against a specific host
+ # specific host (eg: through the frontend) and only require the rdb to
+ # match the parameters of the host against the request. Acquisition
+ # requests are made by jobs that need hosts (eg: suites) and the rdb needs
+ # to find hosts matching the parameters of the request.
+ for request in host_requests:
+ if request.host_id:
+ validation_requests.append(request)
+ else:
+ require_hosts_requests.append(request)
+
+ rdb_handler = AvailableHostRequestHandler()
+ rdb_handler.batch_validate_hosts(validation_requests)
+ rdb_handler.batch_acquire_hosts(require_hosts_requests)
+ return rdb_handler.get_response()
diff --git a/scheduler/rdb_lib.py b/scheduler/rdb_lib.py
new file mode 100644
index 0000000..5d3432e
--- /dev/null
+++ b/scheduler/rdb_lib.py
@@ -0,0 +1,230 @@
+"""Performs translation between monitor_db and the rdb.
+"""
+import logging
+
+import common
+from autotest_lib.scheduler import rdb
+from autotest_lib.scheduler import rdb_utils
+from autotest_lib.server.cros import provision
+
+
+# RDB request managers: Call an rdb api_method with a list of RDBRequests, and
+# match the requests to the responses returned.
+class RDBRequestManager(object):
+ """Base request manager for RDB requests.
+
+ Each instance of a request manager is associated with one request, and
+ one api call. All subclasses maintain a queue of unexecuted requests, and
+ and expose an api to add requests/retrieve the response for these requests.
+ """
+
+
+ def __init__(self, request, api_call):
+ """
+ @param request: A subclass of rdb_utls.RDBRequest. The manager can only
+ manage requests of one type.
+ @param api_call: The rdb api call this manager is expected to make.
+ A manager can only send requests of type request, to this api call.
+ """
+ self.request = request
+ self.api_call = api_call
+ self.request_queue = []
+
+
+ def add_request(self, **kwargs):
+ """Add an RDBRequest to the queue."""
+ self.request_queue.append(self.request(**kwargs).get_request())
+
+
+ def response(self):
+ """Execute the api call and return a response for each request.
+
+ The order of responses is the same as the order of requests added
+ to the queue.
+
+ @yield: A response for each request added to the queue after the
+ last invocation of response.
+ """
+ if not self.request_queue:
+ raise rdb_utils.RDBException('No requests. Call add_requests '
+ 'with the appropriate kwargs, before calling response.')
+
+ result = self.api_call(self.request_queue)
+ requests = self.request_queue
+ self.request_queue = []
+ for request in requests:
+ yield result.get(request) if result else None
+
+
+class BaseHostRequestManager(RDBRequestManager):
+ """Manager for batched get requests on hosts."""
+
+
+ def response(self):
+ """Yields a popped host from the returned host list."""
+
+ # As a side-effect of returning a host, this method also removes it
+ # from the list of hosts matched up against a request. Eg:
+ # hqes: [hqe1, hqe2, hqe3]
+ # client requests: [c_r1, c_r2, c_r3]
+ # generate requests in rdb: [r1 (c_r1 and c_r2), r2]
+ # and response {r1: [h1, h2], r2:[h3]}
+ # c_r1 and c_r2 need to get different hosts though they're the same
+ # request, because they're from different queue_entries.
+ for hosts in super(BaseHostRequestManager, self).response():
+ yield hosts.pop() if hosts else None
+
+
+# Scheduler host proxy: Convert host information returned by the rdb into
+# a client host object capable of proxying updates back to the rdb.
+class RDBClientHostWrapper(object):
+ """A wrapper for host information.
+
+ This wrapper is used whenever the queue entry needs direct access
+ to the host.
+ """
+
+ required_fields = set(['id', 'hostname', 'platform','labels',
+ 'acls', 'protection', 'dirty', 'status'])
+
+
+ def _update_attributes(self, new_attributes):
+ """Updates attributes based on an input dictionary.
+
+ Since reads are not proxied to the rdb this method caches updates to
+ the host tables as class attributes.
+
+ @param new_attributes: A dictionary of attributes to update.
+ """
+ for name, value in new_attributes.iteritems():
+ setattr(self, name, value)
+
+
+ def __init__(self, **kwargs):
+ if self.required_fields - set(kwargs.keys()):
+ raise rdb_utils.RDBException('Creating %s requires %s, got %s '
+ % (self.__class__, self.required_fields, kwargs.keys()))
+ self._update_attributes(kwargs)
+ self.update_request_manager = RDBRequestManager(
+ rdb_utils.UpdateHostRequest, rdb.update_hosts)
+ self.dbg_str = ''
+
+
+ def _update(self, payload):
+ """Send an update to rdb, save the attributes of the payload locally.
+
+ @param: A dictionary representing 'key':value of the update required.
+
+ @raises RDBException: If the update fails.
+ """
+ logging.info('Host %s in %s updating %s through rdb on behalf of: %s ',
+ self.hostname, self.status, payload, self.dbg_str)
+ self.update_request_manager.add_request(host_id=self.id, payload=payload)
+ for response in self.update_request_manager.response():
+ if response:
+ raise rdb_utils.RDBException('Host %s unable to perform update '
+ '%s through rdb on behalf of %s: %s', self.hostname,
+ payload, self.dbg_str, response)
+ self._update_attributes(payload)
+
+
+ def set_status(self, status):
+ """Proxy for setting the status of a host via the rdb.
+
+ @param status: The new status.
+ """
+ self._update({'status': status})
+
+
+ def update_field(self, fieldname, value):
+ """Proxy for updating a field on the host.
+
+ @param fieldname: The fieldname as a string.
+ @param value: The value to assign to the field.
+ """
+ self._update({fieldname: value})
+
+
+ def platform_and_labels(self):
+ """Get the platform and labels on this host.
+
+ @return: A tuple containing a list of label names, and the platform name.
+ """
+ platform = self.platform
+ labels = [label for label in self.labels if label != platform]
+ return platform, labels
+
+
+# Adapters for scheduler specific objects: Convert job information to a
+# format more ameanable to the rdb/rdb request managers.
+class JobQueryManager(object):
+ """A caching query manager for all job related information."""
+ def __init__(self, host_scheduler, queue_entries):
+
+ # TODO(beeps): Break the dependency on the host_scheduler,
+ # crbug.com/336934.
+ self.host_scheduler = host_scheduler
+ jobs = [queue_entry.job_id for queue_entry in queue_entries]
+ self._job_acls = self.host_scheduler._get_job_acl_groups(jobs)
+ self._job_deps = self.host_scheduler._get_job_dependencies(jobs)
+ self._labels = self.host_scheduler._get_labels(self._job_deps)
+
+
+ def get_job_info(self, queue_entry):
+ """Extract job information from a queue_entry/host-scheduler.
+
+ @param queue_entry: The queue_entry for which we need job information.
+
+ @return: A dictionary representing job related information.
+ """
+ job_id = queue_entry.job_id
+ job_deps = self._job_deps.get(job_id, [])
+ job_deps = [dep for dep in job_deps
+ if not provision.can_provision(self._labels[dep].name)]
+ job_acls = self._job_acls.get(job_id, [])
+
+ return {'deps': job_deps, 'acls': job_acls,
+ 'host_id': queue_entry.host_id}
+
+
+def acquire_hosts(host_scheduler, queue_entries):
+ """Acquire hosts for the list of queue_entries.
+
+ @param queue_entries: A list of queue_entries that need hosts.
+ @param host_scheduler: The host_scheduler object, needed to get job
+ information.
+
+ @yield: An RDBClientHostWrapper for each host acquired on behalf of a
+ queue_entry, or None if a host wasn't found.
+
+ @raises RDBException: If something goes wrong making the request.
+ """
+ job_query_manager = JobQueryManager(host_scheduler, queue_entries)
+ request_manager = BaseHostRequestManager(
+ rdb_utils.AcquireHostRequest, rdb.rdb_host_request_dispatcher)
+ for entry in queue_entries:
+ request_manager.add_request(**job_query_manager.get_job_info(entry))
+
+ for host in request_manager.response():
+ yield (RDBClientHostWrapper(**host)
+ if host else None)
+
+
+def get_hosts(host_ids):
+ """Get information about the hosts with ids in host_ids.
+
+ @param host_ids: A list of host_ids.
+
+ @return: A list of RDBClientHostWrapper objects.
+
+ @raises RDBException: If something goes wrong in making the request.
+ """
+ request_manager = BaseHostRequestManager(rdb_utils.HostRequest, rdb.get_hosts)
+ for host_id in host_ids:
+ request_manager.add_request(host_id=host_id)
+
+ hosts = []
+ for host in request_manager.response():
+ hosts.append(RDBClientHostWrapper(**host)
+ if host else None)
+ return hosts
diff --git a/scheduler/rdb_utils.py b/scheduler/rdb_utils.py
new file mode 100644
index 0000000..37f8723
--- /dev/null
+++ b/scheduler/rdb_utils.py
@@ -0,0 +1,246 @@
+"""RDB utilities.
+
+Do not import rdb or autotest modules here to avoid cyclic dependencies.
+"""
+import itertools
+import collections
+
+
+class RDBException(Exception):
+ """Generic RDB exception."""
+
+ def wire_format(self):
+ """Convert the exception to a format better suited to an rpc response.
+ """
+ return str(self)
+
+
+class RDBRequestMeta(type):
+ """Metaclass for constructing rdb requests.
+
+ This meta class creates a read-only request template by combining the
+ request_arguments of all classes in the inheritence hierarchy into a namedtuple.
+ """
+ def __new__(cls, name, bases, dctn):
+ for base in bases:
+ try:
+ dctn['_request_args'].update(base._request_args)
+ except AttributeError:
+ pass
+ dctn['template'] = collections.namedtuple('template',
+ dctn['_request_args'])
+ return type.__new__(cls, name, bases, dctn)
+
+
+# RDB Request classes: Used in conjunction with the request managers defined in
+# rdb_lib. Each class defines the set of fields the rdb needs to fulfill the
+# request, and a hashable request object the request managers use to identify
+# a response with a request.
+class RDBRequest(object):
+ """Base class for an rdb request.
+
+ All classes inheriting from RDBRequest will need to specify a list of
+ request_args necessary to create the request, and will in turn get a
+ request that the rdb understands.
+ """
+ __metaclass__ = RDBRequestMeta
+ __slots__ = set(['_request_args', '_request'])
+ _request_args = set([])
+
+
+ def __init__(self, **kwargs):
+ for key,value in kwargs.iteritems():
+ try:
+ hash(value)
+ except TypeError as e:
+ raise RDBException('All fields of a %s must be hashable. '
+ '%s: %s, %s failed this test.' %
+ (self.__class__, key, type(value), value))
+ try:
+ self._request = self.template(**kwargs)
+ except TypeError:
+ raise RDBException('Creating %s requires args %s, got %s ' %
+ (self.__class__, self.template._fields, kwargs.keys()))
+
+
+ def get_request(self):
+ """Returns a request that the rdb understands.
+
+ @return: A named tuple with all the fields necessary to make a request.
+ """
+ return self._request
+
+
+class HashableDict(dict):
+ """A hashable dictionary.
+
+ This class assumes all values of the input dict are hashable.
+ """
+
+ def __hash__(self):
+ return hash(tuple(sorted(self.items())))
+
+
+class HostRequest(RDBRequest):
+ """Basic request for information about a single host.
+
+ Eg: HostRequest(host_id=x): Will return all information about host x.
+ """
+ _request_args = set(['host_id'])
+
+
+class UpdateHostRequest(HostRequest):
+ """Defines requests to update hosts.
+
+ Eg:
+ UpdateHostRequest(host_id=x, payload={'afe_hosts_col_name': value}):
+ Will update column afe_hosts_col_name with the given value, for
+ the given host_id.
+
+ @raises RDBException: If the input arguments don't contain the expected
+ fields to make the request, or are of the wrong type.
+ """
+ _request_args = set(['payload'])
+
+
+ def __init__(self, **kwargs):
+ try:
+ kwargs['payload'] = HashableDict(kwargs['payload'])
+ except (KeyError, TypeError) as e:
+ raise RDBException('Creating %s requires args %s, got %s ' %
+ (self.__class__, self.template._fields, kwargs.keys()))
+ super(UpdateHostRequest, self).__init__(**kwargs)
+
+
+class AcquireHostRequest(HostRequest):
+ """Defines requests to acquire hosts.
+
+ Eg:
+ AcquireHostRequest(host_id=None, deps=[d1, d2], acls=[a1, a2]): Will
+ acquire and return a host that matches the specified deps/acls.
+ AcquireHostRequest(host_id=x, deps=[d1, d2], acls=[a1, a2]) : Will
+ acquire and return host x, after checking deps/acls match.
+
+ @raises RDBException: If the the input arguments don't contain the expected
+ fields to make a request, or are of the wrong type.
+ """
+ _request_args = set(['deps', 'acls'])
+
+
+ def __init__(self, **kwargs):
+ try:
+ kwargs['deps'] = frozenset(kwargs['deps'])
+ kwargs['acls'] = frozenset(kwargs['acls'])
+ except (KeyError, TypeError) as e:
+ raise RDBException('Creating %s requires args %s, got %s ' %
+ (self.__class__, self.template._fields, kwargs.keys()))
+ super(AcquireHostRequest, self).__init__(**kwargs)
+
+
+# Custom iterators: Used by the rdb to lazily convert the iteration of a
+# queryset to a database query and return an appropriately formatted response.
+class RememberingIterator(object):
+ """An iterator capable of reproducing all values in the input generator.
+ """
+
+ #pylint: disable-msg=C0111
+ def __init__(self, gen):
+ self.current, self.history = itertools.tee(gen)
+ self.items = []
+
+
+ def __iter__(self):
+ return self
+
+
+ def next(self):
+ return self.current.next()
+
+
+ def get_all_items(self):
+ """Get all the items in the generator this object was created with.
+
+ @return: A list of items.
+ """
+ if not self.items:
+ self.items = list(self.history)
+ return self.items
+
+
+class LabelIterator(RememberingIterator):
+ """A RememberingIterator for labels.
+
+ Within the rdb any label/dependency comparisons are performed based on label
+ ids. However, the host object returned needs to contain label names instead.
+ This class returns the label id when iterated over, but a list of all label
+ names when accessed through get_all_items.
+ """
+
+
+ def next(self):
+ return super(LabelIterator, self).next().id
+
+
+ def get_all_items(self):
+ """Get all label names of the labels in the input generator.
+
+ @return: A list of label names.
+ """
+ return [label.name
+ for label in super(LabelIterator, self).get_all_items()]
+
+
+# Rdb host adapters: Help in making a raw database host object more ameanable
+# to the classes and functions in the rdb and/or rdb clients.
+class RDBServerHostWrapper(object):
+ """A host wrapper for the raw database object.
+ """
+
+
+ def __init__(self, host):
+ self.id = host.id
+ self.hostname = host.hostname
+ self.status = host.status
+ self.protection = host.protection
+ self.dirty = host.dirty
+ self.invalid = host.invalid
+ self.labels = LabelIterator(
+ (label for label in host.labels.all()))
+ self.acls = RememberingIterator(
+ (acl.id for acl in host.aclgroup_set.all()))
+ platform = host.platform()
+ self.platform = platform.name if platform else None
+
+
+ def wire_format(self):
+ """Returns all information needed to scheduler jobs on the host.
+
+ @return: A dictionary of host information.
+ """
+ host_info = {}
+ for key, value in self.__dict__.iteritems():
+ if isinstance(value, RememberingIterator):
+ host_info[key] = value.get_all_items()
+ else:
+ host_info[key] = value
+ return host_info
+
+
+def return_rdb_host(func):
+ """Decorator for functions that return a list of Host objects.
+
+ @param func: The decorated function.
+ @return: A functions capable of converting each host_object to a
+ RDBServerHostWrapper.
+ """
+ def get_rdb_host(*args, **kwargs):
+ """Takes a list of hosts and returns a list of host_infos.
+
+ @param hosts: A list of hosts. Each host is assumed to contain
+ all the fields in a host_info defined above.
+ @return: A list of RDBServerHostWrappers, one per host, or an empty
+ list is no hosts were found..
+ """
+ hosts = func(*args, **kwargs)
+ return [RDBServerHostWrapper(host) for host in hosts]
+ return get_rdb_host
diff --git a/scheduler/scheduler_models.py b/scheduler/scheduler_models.py
index 61af052..2211cbd 100644
--- a/scheduler/scheduler_models.py
+++ b/scheduler/scheduler_models.py
@@ -24,6 +24,7 @@
from autotest_lib.frontend.afe import models, model_attributes
from autotest_lib.database import database_connection
from autotest_lib.scheduler import drone_manager, email_manager
+from autotest_lib.scheduler import rdb_lib
from autotest_lib.scheduler import scheduler_config
from autotest_lib.site_utils.graphite import stats
from autotest_lib.client.common_lib import control_data
@@ -361,7 +362,8 @@
class Host(DBObject):
_table_name = 'afe_hosts'
_fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
- 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
+ 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty',
+ 'leased')
_timer = stats.Timer("scheduler_models.Host")
@@ -448,7 +450,8 @@
self.job = Job(self.job_id)
if self.host_id:
- self.host = Host(self.host_id)
+ self.host = rdb_lib.get_hosts([self.host_id])[0]
+ self.host.dbg_str = self.get_dbg_str()
else:
self.host = None
@@ -537,6 +540,17 @@
return 'no host'
+ def get_dbg_str(self):
+ """Get a debug string to identify this host.
+
+ @return: A string containing the hqe and job id.
+ """
+ try:
+ return 'HQE: %s, for job: %s' % (self.id, self.job_id)
+ except AttributeError as e:
+ return 'HQE has not been initialized yet: %s' % e
+
+
def __str__(self):
flags = []
if self.active:
@@ -550,8 +564,9 @@
flags_str = ','.join(flags)
if flags_str:
flags_str = ' [%s]' % flags_str
- return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
- self.status, flags_str)
+ return ("%s and host: %s has status:%s%s" %
+ (self.get_dbg_str(), self._get_hostname(), self.status,
+ flags_str))
@_timer.decorate