blob: 50bc89c4a6f3b29ec49ab3e9b9a0c91b10975f1a [file] [log] [blame]
beepscc9fc702013-12-02 12:45:38 -08001"""Rdb server module.
beeps7d8273b2013-11-06 09:44:34 -08002"""
beepscc9fc702013-12-02 12:45:38 -08003
4import collections
beeps7d8273b2013-11-06 09:44:34 -08005import logging
beepscc9fc702013-12-02 12:45:38 -08006
7import common
8
9from django.core import exceptions as django_exceptions
10from django.db.models import fields
11from django.db.models import Q
12from autotest_lib.frontend.afe import models
13from autotest_lib.scheduler import rdb_utils
beeps7d8273b2013-11-06 09:44:34 -080014from autotest_lib.site_utils.graphite import stats
15
16
17_timer = stats.Timer('rdb')
18
beeps7d8273b2013-11-06 09:44:34 -080019
beepscc9fc702013-12-02 12:45:38 -080020# Qeury managers: Provide a layer of abstraction over the database by
21# encapsulating common query patterns used by the rdb.
22class BaseHostQueryManager(object):
23 """Base manager for host queries on all hosts.
beeps7d8273b2013-11-06 09:44:34 -080024 """
25
beepscc9fc702013-12-02 12:45:38 -080026 host_objects = models.Host.objects
beeps7d8273b2013-11-06 09:44:34 -080027
beepscc9fc702013-12-02 12:45:38 -080028
29 def update_hosts(self, host_ids, **kwargs):
30 """Update fields on a hosts.
31
32 @param host_ids: A list of ids of hosts to update.
33 @param kwargs: A key value dictionary corresponding to column, value
34 in the host database.
35 """
36 self.host_objects.filter(id__in=host_ids).update(**kwargs)
37
38
39 @rdb_utils.return_rdb_host
40 def get_hosts(self, ids):
41 """Get host objects for the given ids.
42
43 @param ids: The ids for which we need host objects.
44
45 @returns: A list of RDBServerHostWrapper objects, ordered by host_id.
46 """
47 return self.host_objects.filter(id__in=ids).order_by('id')
48
49
50 @rdb_utils.return_rdb_host
51 def find_hosts(self, deps, acls):
52 """Finds valid hosts matching deps, acls.
53
54 @param deps: A list of dependencies to match.
55 @param acls: A list of acls, at least one of which must coincide with
56 an acl group the chosen host is in.
57
58 @return: A list of matching hosts available.
59 """
60 hosts_available = self.host_objects.filter(invalid=0)
61 queries = [Q(labels__id=dep) for dep in deps]
62 queries += [Q(aclgroup__id__in=acls)]
63 for query in queries:
64 hosts_available = hosts_available.filter(query)
65 return hosts_available
66
67
68class AvailableHostQueryManager(BaseHostQueryManager):
69 """Query manager for requests on un-leased, un-locked hosts.
70 """
71
72 host_objects = models.Host.leased_objects
73
74
75# Request Handlers: Used in conjunction with requests in rdb_utils, these
76# handlers acquire hosts for a request and record the acquisition in
77# an response_map dictionary keyed on the request itself, with the host/hosts
78# as values.
79class BaseHostRequestHandler(object):
80 """Handler for requests related to hosts, leased or unleased.
81
82 This class is only capable of blindly returning host information.
83 """
84
85 def __init__(self):
86 self.host_query_manager = BaseHostQueryManager()
87 self.response_map = {}
88
89
90 def update_response_map(self, request, response):
91 """Record a response for a request.
92
93 The response_map only contains requests that were either satisfied, or
94 that ran into an exception. Often this translates to reserving hosts
95 against a request. If the rdb hit an exception processing a request, the
96 exception gets recorded in the map for the client to reraise.
97
98 @param response: A response for the request.
99 @param request: The request that has reserved these hosts.
100
101 @raises RDBException: If an empty values is added to the map.
102 """
103 if not response:
104 raise rdb_utils.RDBException('response_map dict can only contain '
105 'valid responses. Request %s, response %s is invalid.' %
106 (request, response))
107 if self.response_map.get(request):
108 raise rdb_utils.RDBException('Request %s already has response %s '
109 'the rdb cannot return multiple '
110 'responses for the same request.' %
111 (request, response))
112 self.response_map[request] = response
113
114
115 def _record_exceptions(self, request, exceptions):
116 """Record a list of exceptions for a request.
117
118 @param request: The request for which the exceptions were hit.
119 @param exceptions: The exceptions hit while processing the request.
120 """
121 rdb_exceptions = [rdb_utils.RDBException(ex) for ex in exceptions]
122 self.update_response_map(request, rdb_exceptions)
123
124
125 def get_response(self):
126 """Convert all RDBServerHostWrapper objects to host info dictionaries.
127
128 @return: A dictionary mapping requests to a list of matching host_infos.
129 """
130 for request, response in self.response_map.iteritems():
131 self.response_map[request] = [reply.wire_format()
132 for reply in response]
133 return self.response_map
134
135
136 def update_hosts(self, update_requests):
137 """Updates host tables with a payload.
138
139 @param update_requests: A list of update requests, as defined in
140 rdb_utils.UpdateHostRequest.
141 """
142 # Last payload for a host_id wins in the case of conflicting requests.
143 unique_host_requests = {}
144 for request in update_requests:
145 if unique_host_requests.get(request.host_id):
146 unique_host_requests[request.host_id].update(request.payload)
147 else:
148 unique_host_requests[request.host_id] = request.payload
149
150 # Batch similar payloads so we can do them in one table scan.
151 similar_requests = {}
152 for host_id, payload in unique_host_requests.iteritems():
153 similar_requests.setdefault(payload, []).append(host_id)
154
155 # If fields of the update don't match columns in the database,
156 # record the exception in the response map. This also means later
157 # updates will get applied even if previous updates fail.
158 for payload, hosts in similar_requests.iteritems():
159 try:
160 response = self.host_query_manager.update_hosts(hosts, **payload)
161 except (django_exceptions.FieldError,
162 fields.FieldDoesNotExist) as e:
163 for host in hosts:
164 # Since update requests have a consistent hash this will map
165 # to the same key as the original request.
166 request = rdb_utils.UpdateHostRequest(
167 host_id=host, payload=payload).get_request()
168 self._record_exceptions(request, [e])
169
170
171 def batch_get_hosts(self, host_requests):
172 """Get hosts matching the requests.
173
174 This method does not acquire the hosts, i.e it reserves hosts against
175 requests leaving their leased state untouched.
176
177 @param host_requests: A list of requests, as defined in
178 rdb_utils.BaseHostRequest.
179 """
180 host_ids = set([request.host_id for request in host_requests])
181 host_map = {}
182
183 # This list will not contain available hosts if executed using
184 # an AvailableHostQueryManager.
185 for host in self.host_query_manager.get_hosts(host_ids):
186 host_map[host.id] = host
187 for request in host_requests:
188 if request.host_id in host_map:
189 self.update_response_map(request, [host_map[request.host_id]])
190 else:
191 logging.warning('rdb could not get host for request: %s, it '
192 'is already leased or locked', request)
193
194
195class AvailableHostRequestHandler(BaseHostRequestHandler):
196 """Handler for requests related to available (unleased and unlocked) hosts.
197
198 This class is capable of acquiring or validating hosts for requests.
199 """
200
201
202 def __init__(self):
203 self.host_query_manager = AvailableHostQueryManager()
204 self.response_map = {}
205
206
207 def lease_hosts(self, hosts):
208 """Leases a list hosts.
209
210 @param hosts: A list of hosts to lease.
211 """
212 requests = [rdb_utils.UpdateHostRequest(host_id=host.id,
213 payload={'leased': 1}).get_request() for host in hosts]
214 super(AvailableHostRequestHandler, self).update_hosts(requests)
215
216
217 @_timer.decorate
218 def batch_acquire_hosts(self, host_requests):
219 """Acquire hosts for a list of requests.
220
221 The act of acquisition involves finding and leasing a set of
222 hosts that match the parameters of a request. Each acquired
223 host is added to the response_map dictionary, as an
224 RDBServerHostWrapper.
225
226 @param host_requests: A list of requests to acquire hosts.
227 """
Prashanth B9bc32fa2014-02-20 12:58:40 -0800228 # Group similar requests and sort by priority, so we don't invert
229 # priorities and lease hosts based on demand alone.
230 batched_host_request = sorted(
231 collections.Counter(host_requests).items(),
232 key=lambda request: request[0].priority, reverse=True)
233
234 for request, count in batched_host_request:
beepscc9fc702013-12-02 12:45:38 -0800235 hosts = self.host_query_manager.find_hosts(
236 request.deps, request.acls)
237 num_hosts = min(len(hosts), count)
238 if num_hosts:
239 # TODO(beeps): Only reserve hosts we have successfully leased.
240 self.lease_hosts(hosts[:num_hosts])
241 self.update_response_map(request, hosts[:num_hosts])
242 if num_hosts < count:
243 logging.warning('%s Unsatisfied rdb acquisition request:%s ',
244 count-num_hosts, request)
245
246
247 @_timer.decorate
248 def batch_validate_hosts(self, requests):
249 """Validate requests with hosts.
250
251 Reserve all hosts, check each one for validity and discard invalid
252 request-host pairings. Lease the remaining hsots.
253
254 @param requests: A list of requests to validate.
255 """
256 # Multiple requests can have the same host (but different acls/deps),
257 # and multiple jobs can submit identical requests (same host_id,
258 # acls, deps). In both these cases the first request to check the host
259 # map wins, though in the second case it doesn't matter.
260 self.batch_get_hosts(set(requests))
261 for request in self.response_map.keys():
262 hosts = self.response_map[request]
263 if len(hosts) > 1:
264 raise rdb_utils.RDBException('Got multiple hosts for a single '
265 'request. Hosts: %s, request %s.' % (hosts, request))
266 host = hosts[0]
267 if not ((request.acls.intersection(host.acls) or host.invalid) and
268 request.deps.intersection(host.labels) == request.deps):
269 if request.host_id != host.id:
270 raise rdb_utils.RDBException('Cannot assign a different '
271 'host for requset: %s, it already has one: %s ' %
272 (request, host.id))
273 del self.response_map[request]
274 logging.warning('Failed rdb validation request:%s ', request)
275
276 # TODO(beeps): Update acquired hosts with failed leases.
277 self.lease_hosts([hosts[0] for hosts in self.response_map.values()])
278
279
280# Request dispatchers: Create the appropriate request handler, send a list
281# of requests to one of its methods. The corresponding request handler in
282# rdb_lib must understand how to match each request with a response from a
283# dispatcher, the easiest way to achieve this is to returned the response_map
284# attribute of the request handler, after making the appropriate requests.
285def get_hosts(host_requests):
286 """Get host information about the requested hosts.
287
288 @param host_requests: A list of requests as defined in BaseHostRequest.
289 @return: A dictionary mapping each request to a list of hosts.
290 """
291 rdb_handler = BaseHostRequestHandler()
292 rdb_handler.batch_get_hosts(host_requests)
293 return rdb_handler.get_response()
294
295
296def update_hosts(update_requests):
297 """Update hosts.
298
299 @param update_requests: A list of updates to host tables
300 as defined in UpdateHostRequest.
301 """
302 rdb_handler = BaseHostRequestHandler()
303 rdb_handler.update_hosts(update_requests)
304 return rdb_handler.get_response()
305
306
307def rdb_host_request_dispatcher(host_requests):
308 """Dispatcher for all host acquisition queries.
309
310 @param host_requests: A list of requests for acquiring hosts, as defined in
311 AcquireHostRequest.
312 @return: A dictionary mapping each request to a list of hosts, or
313 an empty list if none could satisfy the request. Eg:
314 {AcquireHostRequest.template: [host_info_dictionaries]}
315 """
316 validation_requests = []
317 require_hosts_requests = []
318
319 # Validation requests are made by a job scheduled against a specific host
320 # specific host (eg: through the frontend) and only require the rdb to
321 # match the parameters of the host against the request. Acquisition
322 # requests are made by jobs that need hosts (eg: suites) and the rdb needs
323 # to find hosts matching the parameters of the request.
324 for request in host_requests:
325 if request.host_id:
326 validation_requests.append(request)
327 else:
328 require_hosts_requests.append(request)
329
330 rdb_handler = AvailableHostRequestHandler()
331 rdb_handler.batch_validate_hosts(validation_requests)
332 rdb_handler.batch_acquire_hosts(require_hosts_requests)
333 return rdb_handler.get_response()