blob: fd3b64560fec11c36246fa0f358a86d396074043 [file] [log] [blame]
Aviv Keshet18308922013-02-19 17:49:49 -08001#pylint: disable-msg=C0111
mblighe8819cd2008-02-15 16:48:40 +00002"""\
3Utility functions for rpc_interface.py. We keep them in a separate file so that
4only RPC interface functions go into that file.
5"""
6
7__author__ = 'showard@google.com (Steve Howard)'
8
MK Ryu84573e12015-02-18 15:54:09 -08009import datetime
10import inspect
11import os
12import sys
showard3d6ae112009-05-02 00:45:48 +000013import django.http
Dan Shi07e09af2013-04-12 09:31:29 -070014from autotest_lib.frontend.afe import models, model_logic
Alex Miller4a193692013-08-21 13:59:01 -070015from autotest_lib.client.common_lib import control_data, error
Jiaxi Luo421608e2014-07-07 14:38:00 -070016from autotest_lib.client.common_lib import global_config, priorities
Aviv Keshetc68807e2013-07-31 16:13:01 -070017from autotest_lib.server.cros import provision
Jakob Juelich50e91f72014-10-01 12:43:23 -070018from autotest_lib.server import frontend
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080019from autotest_lib.server import utils as server_utils
mblighe8819cd2008-02-15 16:48:40 +000020
showarda62866b2008-07-28 21:27:41 +000021NULL_DATETIME = datetime.datetime.max
22NULL_DATE = datetime.date.max
23
mblighe8819cd2008-02-15 16:48:40 +000024def prepare_for_serialization(objects):
jadmanski0afbb632008-06-06 21:10:57 +000025 """
26 Prepare Python objects to be returned via RPC.
Aviv Keshet18308922013-02-19 17:49:49 -080027 @param objects: objects to be prepared.
jadmanski0afbb632008-06-06 21:10:57 +000028 """
29 if (isinstance(objects, list) and len(objects) and
30 isinstance(objects[0], dict) and 'id' in objects[0]):
31 objects = gather_unique_dicts(objects)
32 return _prepare_data(objects)
showardb8d34242008-04-25 18:11:16 +000033
34
showardc92da832009-04-07 18:14:34 +000035def prepare_rows_as_nested_dicts(query, nested_dict_column_names):
36 """
37 Prepare a Django query to be returned via RPC as a sequence of nested
38 dictionaries.
39
40 @param query - A Django model query object with a select_related() method.
41 @param nested_dict_column_names - A list of column/attribute names for the
42 rows returned by query to expand into nested dictionaries using
43 their get_object_dict() method when not None.
44
45 @returns An list suitable to returned in an RPC.
46 """
47 all_dicts = []
48 for row in query.select_related():
49 row_dict = row.get_object_dict()
50 for column in nested_dict_column_names:
51 if row_dict[column] is not None:
52 row_dict[column] = getattr(row, column).get_object_dict()
53 all_dicts.append(row_dict)
54 return prepare_for_serialization(all_dicts)
55
56
showardb8d34242008-04-25 18:11:16 +000057def _prepare_data(data):
jadmanski0afbb632008-06-06 21:10:57 +000058 """
59 Recursively process data structures, performing necessary type
60 conversions to values in data to allow for RPC serialization:
61 -convert datetimes to strings
showard2b9a88b2008-06-13 20:55:03 +000062 -convert tuples and sets to lists
jadmanski0afbb632008-06-06 21:10:57 +000063 """
64 if isinstance(data, dict):
65 new_data = {}
66 for key, value in data.iteritems():
67 new_data[key] = _prepare_data(value)
68 return new_data
showard2b9a88b2008-06-13 20:55:03 +000069 elif (isinstance(data, list) or isinstance(data, tuple) or
70 isinstance(data, set)):
jadmanski0afbb632008-06-06 21:10:57 +000071 return [_prepare_data(item) for item in data]
showard98659972008-07-17 17:00:07 +000072 elif isinstance(data, datetime.date):
showarda62866b2008-07-28 21:27:41 +000073 if data is NULL_DATETIME or data is NULL_DATE:
74 return None
jadmanski0afbb632008-06-06 21:10:57 +000075 return str(data)
76 else:
77 return data
mblighe8819cd2008-02-15 16:48:40 +000078
79
Moises Osorio2dda22e2014-09-16 15:56:24 -070080def fetchall_as_list_of_dicts(cursor):
81 """
82 Converts each row in the cursor to a dictionary so that values can be read
83 by using the column name.
84 @param cursor: The database cursor to read from.
85 @returns: A list of each row in the cursor as a dictionary.
86 """
87 desc = cursor.description
88 return [ dict(zip([col[0] for col in desc], row))
89 for row in cursor.fetchall() ]
90
91
showard3d6ae112009-05-02 00:45:48 +000092def raw_http_response(response_data, content_type=None):
93 response = django.http.HttpResponse(response_data, mimetype=content_type)
94 response['Content-length'] = str(len(response.content))
95 return response
96
97
showardb0dfb9f2008-06-06 18:08:02 +000098def gather_unique_dicts(dict_iterable):
jadmanski0afbb632008-06-06 21:10:57 +000099 """\
100 Pick out unique objects (by ID) from an iterable of object dicts.
101 """
102 id_set = set()
103 result = []
104 for obj in dict_iterable:
105 if obj['id'] not in id_set:
106 id_set.add(obj['id'])
107 result.append(obj)
108 return result
showardb0dfb9f2008-06-06 18:08:02 +0000109
110
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700111def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
jadmanski0afbb632008-06-06 21:10:57 +0000112 """\
113 Generate a SQL WHERE clause for job status filtering, and return it in
114 a dict of keyword args to pass to query.extra(). No more than one of
115 the parameters should be passed as True.
showard6c65d252009-10-01 18:45:22 +0000116 * not_yet_run: all HQEs are Queued
117 * finished: all HQEs are complete
118 * running: everything else
jadmanski0afbb632008-06-06 21:10:57 +0000119 """
120 assert not ((not_yet_run and running) or
121 (not_yet_run and finished) or
122 (running and finished)), ('Cannot specify more than one '
123 'filter to this function')
showard6c65d252009-10-01 18:45:22 +0000124
showardeab66ce2009-12-23 00:03:56 +0000125 not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
126 'WHERE status != "%s")'
showard6c65d252009-10-01 18:45:22 +0000127 % models.HostQueueEntry.Status.QUEUED)
showardeab66ce2009-12-23 00:03:56 +0000128 not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
129 'WHERE not complete)')
showard6c65d252009-10-01 18:45:22 +0000130
jadmanski0afbb632008-06-06 21:10:57 +0000131 if not_yet_run:
showard6c65d252009-10-01 18:45:22 +0000132 where = ['id NOT IN ' + not_queued]
jadmanski0afbb632008-06-06 21:10:57 +0000133 elif running:
showard6c65d252009-10-01 18:45:22 +0000134 where = ['(id IN %s) AND (id IN %s)' % (not_queued, not_finished)]
jadmanski0afbb632008-06-06 21:10:57 +0000135 elif finished:
showard6c65d252009-10-01 18:45:22 +0000136 where = ['id NOT IN ' + not_finished]
jadmanski0afbb632008-06-06 21:10:57 +0000137 else:
showard10f41672009-05-13 21:28:25 +0000138 return {}
jadmanski0afbb632008-06-06 21:10:57 +0000139 return {'where': where}
mblighe8819cd2008-02-15 16:48:40 +0000140
141
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700142def extra_job_type_filters(extra_args, suite=False,
143 sub=False, standalone=False):
144 """\
145 Generate a SQL WHERE clause for job status filtering, and return it in
146 a dict of keyword args to pass to query.extra().
147
148 param extra_args: a dict of existing extra_args.
149
150 No more than one of the parameters should be passed as True:
151 * suite: job which is parent of other jobs
152 * sub: job with a parent job
153 * standalone: job with no child or parent jobs
154 """
155 assert not ((suite and sub) or
156 (suite and standalone) or
157 (sub and standalone)), ('Cannot specify more than one '
158 'filter to this function')
159
160 where = extra_args.get('where', [])
161 parent_job_id = ('DISTINCT parent_job_id')
162 child_job_id = ('id')
163 filter_common = ('(SELECT %s FROM afe_jobs '
164 'WHERE parent_job_id IS NOT NULL)')
165
166 if suite:
167 where.append('id IN ' + filter_common % parent_job_id)
168 elif sub:
169 where.append('id IN ' + filter_common % child_job_id)
170 elif standalone:
171 where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
172 'WHERE parent_job_id IS NOT NULL'
173 ' AND (sub_query.parent_job_id=afe_jobs.id'
174 ' OR sub_query.id=afe_jobs.id))')
175 else:
176 return extra_args
177
178 extra_args['where'] = where
179 return extra_args
180
181
182
showard87cc38f2009-08-20 23:37:04 +0000183def extra_host_filters(multiple_labels=()):
jadmanski0afbb632008-06-06 21:10:57 +0000184 """\
185 Generate SQL WHERE clauses for matching hosts in an intersection of
186 labels.
187 """
188 extra_args = {}
showardeab66ce2009-12-23 00:03:56 +0000189 where_str = ('afe_hosts.id in (select host_id from afe_hosts_labels '
jadmanski0afbb632008-06-06 21:10:57 +0000190 'where label_id=%s)')
191 extra_args['where'] = [where_str] * len(multiple_labels)
192 extra_args['params'] = [models.Label.smart_get(label).id
193 for label in multiple_labels]
194 return extra_args
showard8e3aa5e2008-04-08 19:42:32 +0000195
196
showard87cc38f2009-08-20 23:37:04 +0000197def get_host_query(multiple_labels, exclude_only_if_needed_labels,
showard8aa84fc2009-09-16 17:17:55 +0000198 exclude_atomic_group_hosts, valid_only, filter_data):
199 if valid_only:
200 query = models.Host.valid_objects.all()
201 else:
202 query = models.Host.objects.all()
203
showard43a3d262008-11-12 18:17:05 +0000204 if exclude_only_if_needed_labels:
205 only_if_needed_labels = models.Label.valid_objects.filter(
206 only_if_needed=True)
showardf7eac6f2008-11-13 21:18:01 +0000207 if only_if_needed_labels.count() > 0:
showard87cc38f2009-08-20 23:37:04 +0000208 only_if_needed_ids = ','.join(
209 str(label['id'])
210 for label in only_if_needed_labels.values('id'))
showardf7eac6f2008-11-13 21:18:01 +0000211 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000212 query, 'afe_hosts_labels', join_key='host_id',
213 join_condition=('afe_hosts_labels_exclude_OIN.label_id IN (%s)'
showard87cc38f2009-08-20 23:37:04 +0000214 % only_if_needed_ids),
215 suffix='_exclude_OIN', exclude=True)
showard8aa84fc2009-09-16 17:17:55 +0000216
showard87cc38f2009-08-20 23:37:04 +0000217 if exclude_atomic_group_hosts:
218 atomic_group_labels = models.Label.valid_objects.filter(
219 atomic_group__isnull=False)
220 if atomic_group_labels.count() > 0:
221 atomic_group_label_ids = ','.join(
222 str(atomic_group['id'])
223 for atomic_group in atomic_group_labels.values('id'))
224 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000225 query, 'afe_hosts_labels', join_key='host_id',
226 join_condition=(
227 'afe_hosts_labels_exclude_AG.label_id IN (%s)'
228 % atomic_group_label_ids),
showard87cc38f2009-08-20 23:37:04 +0000229 suffix='_exclude_AG', exclude=True)
Fang Deng04d30612013-04-10 18:13:13 -0700230 try:
231 assert 'extra_args' not in filter_data
232 filter_data['extra_args'] = extra_host_filters(multiple_labels)
233 return models.Host.query_objects(filter_data, initial_query=query)
234 except models.Label.DoesNotExist as e:
235 return models.Host.objects.none()
showard43a3d262008-11-12 18:17:05 +0000236
237
showard8fd58242008-03-10 21:29:07 +0000238class InconsistencyException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000239 'Raised when a list of objects does not have a consistent value'
showard8fd58242008-03-10 21:29:07 +0000240
241
242def get_consistent_value(objects, field):
mblighc5ddfd12008-08-04 17:15:00 +0000243 if not objects:
244 # well a list of nothing is consistent
245 return None
246
jadmanski0afbb632008-06-06 21:10:57 +0000247 value = getattr(objects[0], field)
248 for obj in objects:
249 this_value = getattr(obj, field)
250 if this_value != value:
251 raise InconsistencyException(objects[0], obj)
252 return value
showard8fd58242008-03-10 21:29:07 +0000253
254
showard2b9a88b2008-06-13 20:55:03 +0000255def prepare_generate_control_file(tests, kernel, label, profilers):
jadmanski0afbb632008-06-06 21:10:57 +0000256 test_objects = [models.Test.smart_get(test) for test in tests]
showard2b9a88b2008-06-13 20:55:03 +0000257 profiler_objects = [models.Profiler.smart_get(profiler)
258 for profiler in profilers]
jadmanski0afbb632008-06-06 21:10:57 +0000259 # ensure tests are all the same type
260 try:
261 test_type = get_consistent_value(test_objects, 'test_type')
262 except InconsistencyException, exc:
263 test1, test2 = exc.args
mblighec5546d2008-06-16 16:51:28 +0000264 raise model_logic.ValidationError(
jadmanski0afbb632008-06-06 21:10:57 +0000265 {'tests' : 'You cannot run both server- and client-side '
266 'tests together (tests %s and %s differ' % (
267 test1.name, test2.name)})
showard8fd58242008-03-10 21:29:07 +0000268
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700269 is_server = (test_type == control_data.CONTROL_TYPE.SERVER)
showard14374b12009-01-31 00:11:54 +0000270 if test_objects:
271 synch_count = max(test.sync_count for test in test_objects)
272 else:
273 synch_count = 1
jadmanski0afbb632008-06-06 21:10:57 +0000274 if label:
275 label = models.Label.smart_get(label)
mblighe8819cd2008-02-15 16:48:40 +0000276
showard989f25d2008-10-01 11:38:11 +0000277 dependencies = set(label.name for label
278 in models.Label.objects.filter(test__in=test_objects))
279
showard2bab8f42008-11-12 18:15:22 +0000280 cf_info = dict(is_server=is_server, synch_count=synch_count,
281 dependencies=list(dependencies))
282 return cf_info, test_objects, profiler_objects, label
showard989f25d2008-10-01 11:38:11 +0000283
284
285def check_job_dependencies(host_objects, job_dependencies):
286 """
287 Check that a set of machines satisfies a job's dependencies.
288 host_objects: list of models.Host objects
289 job_dependencies: list of names of labels
290 """
291 # check that hosts satisfy dependencies
292 host_ids = [host.id for host in host_objects]
293 hosts_in_job = models.Host.objects.filter(id__in=host_ids)
294 ok_hosts = hosts_in_job
295 for index, dependency in enumerate(job_dependencies):
Alex Milleraa772002014-04-10 17:51:21 -0700296 if not provision.is_for_special_action(dependency):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700297 ok_hosts = ok_hosts.filter(labels__name=dependency)
showard989f25d2008-10-01 11:38:11 +0000298 failing_hosts = (set(host.hostname for host in host_objects) -
299 set(host.hostname for host in ok_hosts))
300 if failing_hosts:
301 raise model_logic.ValidationError(
Eric Lie0493a42010-11-15 13:05:43 -0800302 {'hosts' : 'Host(s) failed to meet job dependencies (' +
303 (', '.join(job_dependencies)) + '): ' +
304 (', '.join(failing_hosts))})
305
showard989f25d2008-10-01 11:38:11 +0000306
Alex Miller4a193692013-08-21 13:59:01 -0700307def check_job_metahost_dependencies(metahost_objects, job_dependencies):
308 """
309 Check that at least one machine within the metahost spec satisfies the job's
310 dependencies.
311
312 @param metahost_objects A list of label objects representing the metahosts.
313 @param job_dependencies A list of strings of the required label names.
314 @raises NoEligibleHostException If a metahost cannot run the job.
315 """
316 for metahost in metahost_objects:
317 hosts = models.Host.objects.filter(labels=metahost)
318 for label_name in job_dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700319 if not provision.is_for_special_action(label_name):
Alex Miller4a193692013-08-21 13:59:01 -0700320 hosts = hosts.filter(labels__name=label_name)
321 if not any(hosts):
322 raise error.NoEligibleHostException("No hosts within %s satisfy %s."
323 % (metahost.name, ', '.join(job_dependencies)))
324
showard2bab8f42008-11-12 18:15:22 +0000325
326def _execution_key_for(host_queue_entry):
327 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
328
329
330def check_abort_synchronous_jobs(host_queue_entries):
331 # ensure user isn't aborting part of a synchronous autoserv execution
332 count_per_execution = {}
333 for queue_entry in host_queue_entries:
334 key = _execution_key_for(queue_entry)
335 count_per_execution.setdefault(key, 0)
336 count_per_execution[key] += 1
337
338 for queue_entry in host_queue_entries:
339 if not queue_entry.execution_subdir:
340 continue
341 execution_count = count_per_execution[_execution_key_for(queue_entry)]
342 if execution_count < queue_entry.job.synch_count:
mbligh1ef218d2009-08-03 16:57:56 +0000343 raise model_logic.ValidationError(
344 {'' : 'You cannot abort part of a synchronous job execution '
345 '(%d/%s), %d included, %d expected'
346 % (queue_entry.job.id, queue_entry.execution_subdir,
347 execution_count, queue_entry.job.synch_count)})
showard8fbae652009-01-20 23:23:10 +0000348
349
showardc92da832009-04-07 18:14:34 +0000350def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700351 dependencies, atomic_group):
showardc92da832009-04-07 18:14:34 +0000352 """
353 Attempt to reject create_job requests with an atomic group that
354 will be impossible to schedule. The checks are not perfect but
355 should catch the most obvious issues.
356
357 @param synch_count - The job's minimum synch count.
358 @param host_objects - A list of models.Host instances.
359 @param metahost_objects - A list of models.Label instances.
360 @param dependencies - A list of job dependency label names.
showardc92da832009-04-07 18:14:34 +0000361 @param labels_by_name - A dictionary mapping label names to models.Label
362 instance. Used to look up instances for dependencies.
363
364 @raises model_logic.ValidationError - When an issue is found.
365 """
366 # If specific host objects were supplied with an atomic group, verify
367 # that there are enough to satisfy the synch_count.
368 minimum_required = synch_count or 1
369 if (host_objects and not metahost_objects and
370 len(host_objects) < minimum_required):
371 raise model_logic.ValidationError(
372 {'hosts':
373 'only %d hosts provided for job with synch_count = %d' %
374 (len(host_objects), synch_count)})
375
376 # Check that the atomic group has a hope of running this job
377 # given any supplied metahosts and dependancies that may limit.
378
379 # Get a set of hostnames in the atomic group.
380 possible_hosts = set()
381 for label in atomic_group.label_set.all():
382 possible_hosts.update(h.hostname for h in label.host_set.all())
383
384 # Filter out hosts that don't match all of the job dependency labels.
Alex Miller871291b2013-08-08 01:19:20 -0700385 for label in models.Label.objects.filter(name__in=dependencies):
showardc92da832009-04-07 18:14:34 +0000386 hosts_in_label = (h.hostname for h in label.host_set.all())
387 possible_hosts.intersection_update(hosts_in_label)
388
showard225bdc12009-04-13 16:09:21 +0000389 if not host_objects and not metahost_objects:
390 # No hosts or metahosts are required to queue an atomic group Job.
391 # However, if they are given, we respect them below.
392 host_set = possible_hosts
393 else:
394 host_set = set(host.hostname for host in host_objects)
395 unusable_host_set = host_set.difference(possible_hosts)
396 if unusable_host_set:
397 raise model_logic.ValidationError(
398 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %
399 (', '.join(sorted(unusable_host_set)), atomic_group.name)})
showardc92da832009-04-07 18:14:34 +0000400
401 # Lookup hosts provided by each meta host and merge them into the
402 # host_set for final counting.
403 for meta_host in metahost_objects:
404 meta_possible = possible_hosts.copy()
405 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())
406 meta_possible.intersection_update(hosts_in_meta_host)
407
408 # Count all hosts that this meta_host will provide.
409 host_set.update(meta_possible)
410
411 if len(host_set) < minimum_required:
412 raise model_logic.ValidationError(
413 {'atomic_group_name':
414 'Insufficient hosts in Atomic Group "%s" with the'
415 ' supplied dependencies and meta_hosts.' %
416 (atomic_group.name,)})
417
418
showardbe0d8692009-08-20 23:42:44 +0000419def check_modify_host(update_data):
420 """
421 Sanity check modify_host* requests.
422
423 @param update_data: A dictionary with the changes to make to a host
424 or hosts.
425 """
426 # Only the scheduler (monitor_db) is allowed to modify Host status.
427 # Otherwise race conditions happen as a hosts state is changed out from
428 # beneath tasks being run on a host.
429 if 'status' in update_data:
430 raise model_logic.ValidationError({
431 'status': 'Host status can not be modified by the frontend.'})
432
433
showardce7c0922009-09-11 18:39:24 +0000434def check_modify_host_locking(host, update_data):
435 """
436 Checks when locking/unlocking has been requested if the host is already
437 locked/unlocked.
438
439 @param host: models.Host object to be modified
440 @param update_data: A dictionary with the changes to make to the host.
441 """
442 locked = update_data.get('locked', None)
Matthew Sartori68186332015-04-27 17:19:53 -0700443 lock_reason = update_data.get('lock_reason', None)
showardce7c0922009-09-11 18:39:24 +0000444 if locked is not None:
445 if locked and host.locked:
446 raise model_logic.ValidationError({
447 'locked': 'Host already locked by %s on %s.' %
448 (host.locked_by, host.lock_time)})
449 if not locked and not host.locked:
450 raise model_logic.ValidationError({
451 'locked': 'Host already unlocked.'})
Matthew Sartori68186332015-04-27 17:19:53 -0700452 if locked and not lock_reason and not host.locked:
453 raise model_logic.ValidationError({
454 'locked': 'Please provide a reason for locking'})
showardce7c0922009-09-11 18:39:24 +0000455
456
showard8fbae652009-01-20 23:23:10 +0000457def get_motd():
458 dirname = os.path.dirname(__file__)
459 filename = os.path.join(dirname, "..", "..", "motd.txt")
460 text = ''
461 try:
462 fp = open(filename, "r")
463 try:
464 text = fp.read()
465 finally:
466 fp.close()
467 except:
468 pass
469
470 return text
showard29f7cd22009-04-29 21:16:24 +0000471
472
473def _get_metahost_counts(metahost_objects):
474 metahost_counts = {}
475 for metahost in metahost_objects:
476 metahost_counts.setdefault(metahost, 0)
477 metahost_counts[metahost] += 1
478 return metahost_counts
479
480
showarda965cef2009-05-15 23:17:41 +0000481def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
showard29f7cd22009-04-29 21:16:24 +0000482 hosts = []
483 one_time_hosts = []
484 meta_hosts = []
485 atomic_group = None
jamesren2275ef12010-04-12 18:25:06 +0000486 hostless = False
showard29f7cd22009-04-29 21:16:24 +0000487
showard4d077562009-05-08 18:24:36 +0000488 queue_entries = job.hostqueueentry_set.all()
showarda965cef2009-05-15 23:17:41 +0000489 if queue_entry_filter_data:
490 queue_entries = models.HostQueueEntry.query_objects(
491 queue_entry_filter_data, initial_query=queue_entries)
showard4d077562009-05-08 18:24:36 +0000492
493 for queue_entry in queue_entries:
showard29f7cd22009-04-29 21:16:24 +0000494 if (queue_entry.host and (preserve_metahosts or
495 not queue_entry.meta_host)):
496 if queue_entry.deleted:
497 continue
498 if queue_entry.host.invalid:
499 one_time_hosts.append(queue_entry.host)
500 else:
501 hosts.append(queue_entry.host)
jamesren2275ef12010-04-12 18:25:06 +0000502 elif queue_entry.meta_host:
showard29f7cd22009-04-29 21:16:24 +0000503 meta_hosts.append(queue_entry.meta_host)
jamesren2275ef12010-04-12 18:25:06 +0000504 else:
505 hostless = True
506
showard29f7cd22009-04-29 21:16:24 +0000507 if atomic_group is None:
508 if queue_entry.atomic_group is not None:
509 atomic_group = queue_entry.atomic_group
510 else:
511 assert atomic_group.name == queue_entry.atomic_group.name, (
512 'DB inconsistency. HostQueueEntries with multiple atomic'
513 ' groups on job %s: %s != %s' % (
514 id, atomic_group.name, queue_entry.atomic_group.name))
515
516 meta_host_counts = _get_metahost_counts(meta_hosts)
517
518 info = dict(dependencies=[label.name for label
519 in job.dependency_labels.all()],
520 hosts=hosts,
521 meta_hosts=meta_hosts,
522 meta_host_counts=meta_host_counts,
523 one_time_hosts=one_time_hosts,
jamesren2275ef12010-04-12 18:25:06 +0000524 atomic_group=atomic_group,
525 hostless=hostless)
showard29f7cd22009-04-29 21:16:24 +0000526 return info
527
528
showard09d80f92009-11-19 01:01:19 +0000529def check_for_duplicate_hosts(host_objects):
530 host_ids = set()
531 duplicate_hostnames = set()
532 for host in host_objects:
533 if host.id in host_ids:
534 duplicate_hostnames.add(host.hostname)
535 host_ids.add(host.id)
536
537 if duplicate_hostnames:
538 raise model_logic.ValidationError(
539 {'hosts' : 'Duplicate hosts: %s'
540 % ', '.join(duplicate_hostnames)})
541
542
showarda1e74b32009-05-12 17:32:04 +0000543def create_new_job(owner, options, host_objects, metahost_objects,
544 atomic_group=None):
showard29f7cd22009-04-29 21:16:24 +0000545 all_host_objects = host_objects + metahost_objects
546 metahost_counts = _get_metahost_counts(metahost_objects)
showarda1e74b32009-05-12 17:32:04 +0000547 dependencies = options.get('dependencies', [])
548 synch_count = options.get('synch_count')
showard29f7cd22009-04-29 21:16:24 +0000549
showard29f7cd22009-04-29 21:16:24 +0000550 if atomic_group:
551 check_atomic_group_create_job(
552 synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700553 dependencies, atomic_group)
showard29f7cd22009-04-29 21:16:24 +0000554 else:
555 if synch_count is not None and synch_count > len(all_host_objects):
556 raise model_logic.ValidationError(
557 {'hosts':
558 'only %d hosts provided for job with synch_count = %d' %
559 (len(all_host_objects), synch_count)})
560 atomic_hosts = models.Host.objects.filter(
561 id__in=[host.id for host in host_objects],
562 labels__atomic_group=True)
563 unusable_host_names = [host.hostname for host in atomic_hosts]
564 if unusable_host_names:
565 raise model_logic.ValidationError(
566 {'hosts':
567 'Host(s) "%s" are atomic group hosts but no '
568 'atomic group was specified for this job.' %
569 (', '.join(unusable_host_names),)})
570
showard09d80f92009-11-19 01:01:19 +0000571 check_for_duplicate_hosts(host_objects)
showard29f7cd22009-04-29 21:16:24 +0000572
Aviv Keshetc68807e2013-07-31 16:13:01 -0700573 for label_name in dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700574 if provision.is_for_special_action(label_name):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700575 # TODO: We could save a few queries
576 # if we had a bulk ensure-label-exists function, which used
577 # a bulk .get() call. The win is probably very small.
Alex Miller871291b2013-08-08 01:19:20 -0700578 _ensure_label_exists(label_name)
Aviv Keshetc68807e2013-07-31 16:13:01 -0700579
Alex Miller4a193692013-08-21 13:59:01 -0700580 # This only checks targeted hosts, not hosts eligible due to the metahost
581 check_job_dependencies(host_objects, dependencies)
582 check_job_metahost_dependencies(metahost_objects, dependencies)
583
Alex Miller871291b2013-08-08 01:19:20 -0700584 options['dependencies'] = list(
585 models.Label.objects.filter(name__in=dependencies))
showard29f7cd22009-04-29 21:16:24 +0000586
showarda1e74b32009-05-12 17:32:04 +0000587 for label in metahost_objects + options['dependencies']:
showard29f7cd22009-04-29 21:16:24 +0000588 if label.atomic_group and not atomic_group:
589 raise model_logic.ValidationError(
590 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000591 'Dependency %r requires an atomic group but no '
592 'atomic_group_name or meta_host in an atomic group was '
593 'specified for this job.' % label.name})
showard29f7cd22009-04-29 21:16:24 +0000594 elif (label.atomic_group and
595 label.atomic_group.name != atomic_group.name):
596 raise model_logic.ValidationError(
597 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000598 'meta_hosts or dependency %r requires atomic group '
599 '%r instead of the supplied atomic_group_name=%r.' %
600 (label.name, label.atomic_group.name, atomic_group.name)})
showard29f7cd22009-04-29 21:16:24 +0000601
showarda1e74b32009-05-12 17:32:04 +0000602 job = models.Job.create(owner=owner, options=options,
603 hosts=all_host_objects)
showard29f7cd22009-04-29 21:16:24 +0000604 job.queue(all_host_objects, atomic_group=atomic_group,
showarda1e74b32009-05-12 17:32:04 +0000605 is_template=options.get('is_template', False))
showard29f7cd22009-04-29 21:16:24 +0000606 return job.id
showard0957a842009-05-11 19:25:08 +0000607
608
Aviv Keshetc68807e2013-07-31 16:13:01 -0700609def _ensure_label_exists(name):
610 """
611 Ensure that a label called |name| exists in the Django models.
612
613 This function is to be called from within afe rpcs only, as an
614 alternative to server.cros.provision.ensure_label_exists(...). It works
615 by Django model manipulation, rather than by making another create_label
616 rpc call.
617
618 @param name: the label to check for/create.
619 @raises ValidationError: There was an error in the response that was
620 not because the label already existed.
621 @returns True is a label was created, False otherwise.
622 """
623 try:
624 models.Label.objects.get(name=name)
625 except models.Label.DoesNotExist:
626 new_label = models.Label.objects.create(name=name)
627 new_label.save()
628 return True
629 return False
630
631
showard909c9142009-07-07 20:54:42 +0000632def find_platform_and_atomic_group(host):
633 """
634 Figure out the platform name and atomic group name for the given host
635 object. If none, the return value for either will be None.
636
637 @returns (platform name, atomic group name) for the given host.
638 """
showard0957a842009-05-11 19:25:08 +0000639 platforms = [label.name for label in host.label_list if label.platform]
640 if not platforms:
showard909c9142009-07-07 20:54:42 +0000641 platform = None
642 else:
643 platform = platforms[0]
showard0957a842009-05-11 19:25:08 +0000644 if len(platforms) > 1:
645 raise ValueError('Host %s has more than one platform: %s' %
646 (host.hostname, ', '.join(platforms)))
showard909c9142009-07-07 20:54:42 +0000647 for label in host.label_list:
648 if label.atomic_group:
649 atomic_group_name = label.atomic_group.name
650 break
651 else:
652 atomic_group_name = None
653 # Don't check for multiple atomic groups on a host here. That is an
654 # error but should not trip up the RPC interface. monitor_db_cleanup
655 # deals with it. This just returns the first one found.
656 return platform, atomic_group_name
showardc0ac3a72009-07-08 21:14:45 +0000657
658
659# support for get_host_queue_entries_and_special_tasks()
660
661def _common_entry_to_dict(entry, type, job_dict):
662 return dict(type=type,
663 host=entry.host.get_object_dict(),
664 job=job_dict,
665 execution_path=entry.execution_path(),
666 status=entry.status,
667 started_on=entry.started_on,
Jiaxi Luocb91d2e2014-06-30 10:37:22 -0700668 id=str(entry.id) + type,
669 oid=entry.id)
showardc0ac3a72009-07-08 21:14:45 +0000670
671
672def _special_task_to_dict(special_task):
673 job_dict = None
674 if special_task.queue_entry:
675 job_dict = special_task.queue_entry.job.get_object_dict()
676 return _common_entry_to_dict(special_task, special_task.task, job_dict)
677
678
679def _queue_entry_to_dict(queue_entry):
680 return _common_entry_to_dict(queue_entry, 'Job',
681 queue_entry.job.get_object_dict())
682
683
684def _compute_next_job_for_tasks(queue_entries, special_tasks):
685 """
686 For each task, try to figure out the next job that ran after that task.
687 This is done using two pieces of information:
688 * if the task has a queue entry, we can use that entry's job ID.
689 * if the task has a time_started, we can try to compare that against the
690 started_on field of queue_entries. this isn't guaranteed to work perfectly
691 since queue_entries may also have null started_on values.
692 * if the task has neither, or if use of time_started fails, just use the
693 last computed job ID.
694 """
695 next_job_id = None # most recently computed next job
696 hqe_index = 0 # index for scanning by started_on times
697 for task in special_tasks:
698 if task.queue_entry:
699 next_job_id = task.queue_entry.job.id
700 elif task.time_started is not None:
701 for queue_entry in queue_entries[hqe_index:]:
702 if queue_entry.started_on is None:
703 continue
704 if queue_entry.started_on < task.time_started:
705 break
706 next_job_id = queue_entry.job.id
707
708 task.next_job_id = next_job_id
709
710 # advance hqe_index to just after next_job_id
711 if next_job_id is not None:
712 for queue_entry in queue_entries[hqe_index:]:
713 if queue_entry.job.id < next_job_id:
714 break
715 hqe_index += 1
716
717
718def interleave_entries(queue_entries, special_tasks):
719 """
720 Both lists should be ordered by descending ID.
721 """
722 _compute_next_job_for_tasks(queue_entries, special_tasks)
723
724 # start with all special tasks that've run since the last job
725 interleaved_entries = []
726 for task in special_tasks:
727 if task.next_job_id is not None:
728 break
729 interleaved_entries.append(_special_task_to_dict(task))
730
731 # now interleave queue entries with the remaining special tasks
732 special_task_index = len(interleaved_entries)
733 for queue_entry in queue_entries:
734 interleaved_entries.append(_queue_entry_to_dict(queue_entry))
735 # add all tasks that ran between this job and the previous one
736 for task in special_tasks[special_task_index:]:
737 if task.next_job_id < queue_entry.job.id:
738 break
739 interleaved_entries.append(_special_task_to_dict(task))
740 special_task_index += 1
741
742 return interleaved_entries
jamesren4a41e012010-07-16 22:33:48 +0000743
744
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800745def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
746 """Figure out which hosts are on which shards.
747
748 @param host_objs: A list of host objects.
749 @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
750 instead of the 'real' shard hostnames. This only matters for testing
751 environments.
752
753 @return: A map of shard hostname: list of hosts on the shard.
754 """
755 shard_host_map = {}
756 for host in host_objs:
757 if host.shard:
758 shard_name = (host.shard.rpc_hostname() if rpc_hostnames
759 else host.shard.hostname)
760 shard_host_map.setdefault(shard_name, []).append(host.hostname)
761 return shard_host_map
762
763
jamesren4a41e012010-07-16 22:33:48 +0000764def get_create_job_common_args(local_args):
765 """
766 Returns a dict containing only the args that apply for create_job_common
767
768 Returns a subset of local_args, which contains only the arguments that can
769 be passed in to create_job_common().
770 """
Alex Miller7d658cf2013-09-04 16:00:35 -0700771 # This code is only here to not kill suites scheduling tests when priority
772 # becomes an int instead of a string.
773 if isinstance(local_args['priority'], str):
774 local_args['priority'] = priorities.Priority.DEFAULT
775 # </migration hack>
jamesren4a41e012010-07-16 22:33:48 +0000776 arg_names, _, _, _ = inspect.getargspec(create_job_common)
777 return dict(item for item in local_args.iteritems() if item[0] in arg_names)
778
779
780def create_job_common(name, priority, control_type, control_file=None,
781 hosts=(), meta_hosts=(), one_time_hosts=(),
782 atomic_group_name=None, synch_count=None,
Simran Basi7e605742013-11-12 13:43:36 -0800783 is_template=False, timeout=None, timeout_mins=None,
784 max_runtime_mins=None, run_verify=True, email_list='',
785 dependencies=(), reboot_before=None, reboot_after=None,
jamesren4a41e012010-07-16 22:33:48 +0000786 parse_failed_repair=None, hostless=False, keyvals=None,
Aviv Keshet18308922013-02-19 17:49:49 -0800787 drone_set=None, parameterized_job=None,
Dan Shiec1d47d2015-02-13 11:38:13 -0800788 parent_job_id=None, test_retry=0, run_reset=True,
789 require_ssp=None):
Aviv Keshet18308922013-02-19 17:49:49 -0800790 #pylint: disable-msg=C0111
jamesren4a41e012010-07-16 22:33:48 +0000791 """
792 Common code between creating "standard" jobs and creating parameterized jobs
793 """
794 user = models.User.current_user()
795 owner = user.login
796
jamesren4a41e012010-07-16 22:33:48 +0000797 # input validation
798 if not (hosts or meta_hosts or one_time_hosts or atomic_group_name
799 or hostless):
800 raise model_logic.ValidationError({
801 'arguments' : "You must pass at least one of 'hosts', "
802 "'meta_hosts', 'one_time_hosts', "
803 "'atomic_group_name', or 'hostless'"
804 })
805
806 if hostless:
807 if hosts or meta_hosts or one_time_hosts or atomic_group_name:
808 raise model_logic.ValidationError({
809 'hostless': 'Hostless jobs cannot include any hosts!'})
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700810 server_type = control_data.CONTROL_TYPE_NAMES.SERVER
jamesren4a41e012010-07-16 22:33:48 +0000811 if control_type != server_type:
812 raise model_logic.ValidationError({
813 'control_type': 'Hostless jobs cannot use client-side '
814 'control files'})
815
Alex Miller871291b2013-08-08 01:19:20 -0700816 atomic_groups_by_name = dict((ag.name, ag)
jamesren4a41e012010-07-16 22:33:48 +0000817 for ag in models.AtomicGroup.objects.all())
Alex Miller871291b2013-08-08 01:19:20 -0700818 label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
jamesren4a41e012010-07-16 22:33:48 +0000819
820 # Schedule on an atomic group automagically if one of the labels given
821 # is an atomic group label and no explicit atomic_group_name was supplied.
822 if not atomic_group_name:
Alex Miller871291b2013-08-08 01:19:20 -0700823 for label in label_objects:
jamesren4a41e012010-07-16 22:33:48 +0000824 if label and label.atomic_group:
825 atomic_group_name = label.atomic_group.name
826 break
jamesren4a41e012010-07-16 22:33:48 +0000827 # convert hostnames & meta hosts to host/label objects
828 host_objects = models.Host.smart_get_bulk(hosts)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800829 if not server_utils.is_shard():
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800830 shard_host_map = bucket_hosts_by_shard(host_objects)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800831 num_shards = len(shard_host_map)
832 if (num_shards > 1 or (num_shards == 1 and
833 len(shard_host_map.values()[0]) != len(host_objects))):
834 # We disallow the following jobs on master:
835 # num_shards > 1: this is a job spanning across multiple shards.
836 # num_shards == 1 but number of hosts on shard is less
837 # than total number of hosts: this is a job that spans across
838 # one shard and the master.
839 raise ValueError(
840 'The following hosts are on shard(s), please create '
841 'seperate jobs for hosts on each shard: %s ' %
842 shard_host_map)
jamesren4a41e012010-07-16 22:33:48 +0000843 metahost_objects = []
Alex Miller871291b2013-08-08 01:19:20 -0700844 meta_host_labels_by_name = {label.name: label for label in label_objects}
jamesren4a41e012010-07-16 22:33:48 +0000845 for label_name in meta_hosts or []:
Alex Miller871291b2013-08-08 01:19:20 -0700846 if label_name in meta_host_labels_by_name:
847 metahost_objects.append(meta_host_labels_by_name[label_name])
jamesren4a41e012010-07-16 22:33:48 +0000848 elif label_name in atomic_groups_by_name:
849 # If given a metahost name that isn't a Label, check to
850 # see if the user was specifying an Atomic Group instead.
851 atomic_group = atomic_groups_by_name[label_name]
852 if atomic_group_name and atomic_group_name != atomic_group.name:
853 raise model_logic.ValidationError({
854 'meta_hosts': (
855 'Label "%s" not found. If assumed to be an '
856 'atomic group it would conflict with the '
857 'supplied atomic group "%s".' % (
858 label_name, atomic_group_name))})
859 atomic_group_name = atomic_group.name
860 else:
861 raise model_logic.ValidationError(
862 {'meta_hosts' : 'Label "%s" not found' % label_name})
863
864 # Create and sanity check an AtomicGroup object if requested.
865 if atomic_group_name:
866 if one_time_hosts:
867 raise model_logic.ValidationError(
868 {'one_time_hosts':
869 'One time hosts cannot be used with an Atomic Group.'})
870 atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
871 if synch_count and synch_count > atomic_group.max_number_of_machines:
872 raise model_logic.ValidationError(
873 {'atomic_group_name' :
874 'You have requested a synch_count (%d) greater than the '
875 'maximum machines in the requested Atomic Group (%d).' %
876 (synch_count, atomic_group.max_number_of_machines)})
877 else:
878 atomic_group = None
879
880 for host in one_time_hosts or []:
881 this_host = models.Host.create_one_time_host(host)
882 host_objects.append(this_host)
883
884 options = dict(name=name,
885 priority=priority,
886 control_file=control_file,
887 control_type=control_type,
888 is_template=is_template,
889 timeout=timeout,
Simran Basi7e605742013-11-12 13:43:36 -0800890 timeout_mins=timeout_mins,
Simran Basi34217022012-11-06 13:43:15 -0800891 max_runtime_mins=max_runtime_mins,
jamesren4a41e012010-07-16 22:33:48 +0000892 synch_count=synch_count,
893 run_verify=run_verify,
894 email_list=email_list,
895 dependencies=dependencies,
896 reboot_before=reboot_before,
897 reboot_after=reboot_after,
898 parse_failed_repair=parse_failed_repair,
899 keyvals=keyvals,
900 drone_set=drone_set,
Aviv Keshet18308922013-02-19 17:49:49 -0800901 parameterized_job=parameterized_job,
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800902 parent_job_id=parent_job_id,
Dan Shi07e09af2013-04-12 09:31:29 -0700903 test_retry=test_retry,
Dan Shiec1d47d2015-02-13 11:38:13 -0800904 run_reset=run_reset,
905 require_ssp=require_ssp)
jamesren4a41e012010-07-16 22:33:48 +0000906 return create_new_job(owner=owner,
907 options=options,
908 host_objects=host_objects,
909 metahost_objects=metahost_objects,
910 atomic_group=atomic_group)
Simran Basib6ec8ae2014-04-23 12:05:08 -0700911
912
913def encode_ascii(control_file):
914 """Force a control file to only contain ascii characters.
915
916 @param control_file: Control file to encode.
917
918 @returns the control file in an ascii encoding.
919
920 @raises error.ControlFileMalformed: if encoding fails.
921 """
922 try:
923 return control_file.encode('ascii')
924 except UnicodeDecodeError as e:
Jiaxi Luo421608e2014-07-07 14:38:00 -0700925 raise error.ControlFileMalformed(str(e))
926
927
928def get_wmatrix_url():
929 """Get wmatrix url from config file.
930
931 @returns the wmatrix url or an empty string.
932 """
933 return global_config.global_config.get_config_value('AUTOTEST_WEB',
934 'wmatrix_url',
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700935 default='')
Jiaxi Luo57bc1952014-07-22 15:27:30 -0700936
937
938def inject_times_to_filter(start_time_key=None, end_time_key=None,
939 start_time_value=None, end_time_value=None,
940 **filter_data):
941 """Inject the key value pairs of start and end time if provided.
942
943 @param start_time_key: A string represents the filter key of start_time.
944 @param end_time_key: A string represents the filter key of end_time.
945 @param start_time_value: Start_time value.
946 @param end_time_value: End_time value.
947
948 @returns the injected filter_data.
949 """
950 if start_time_value:
951 filter_data[start_time_key] = start_time_value
952 if end_time_value:
953 filter_data[end_time_key] = end_time_value
954 return filter_data
955
956
957def inject_times_to_hqe_special_tasks_filters(filter_data_common,
958 start_time, end_time):
959 """Inject start and end time to hqe and special tasks filters.
960
961 @param filter_data_common: Common filter for hqe and special tasks.
962 @param start_time_key: A string represents the filter key of start_time.
963 @param end_time_key: A string represents the filter key of end_time.
964
965 @returns a pair of hqe and special tasks filters.
966 """
967 filter_data_special_tasks = filter_data_common.copy()
968 return (inject_times_to_filter('started_on__gte', 'started_on__lte',
969 start_time, end_time, **filter_data_common),
970 inject_times_to_filter('time_started__gte', 'time_started__lte',
971 start_time, end_time,
Jakob Juelich59cfe542014-09-02 16:37:46 -0700972 **filter_data_special_tasks))
973
974
975def retrieve_shard(shard_hostname):
976 """
Jakob Juelich77457572014-09-22 17:02:43 -0700977 Retrieves the shard with the given hostname from the database.
Jakob Juelich59cfe542014-09-02 16:37:46 -0700978
979 @param shard_hostname: Hostname of the shard to retrieve
980
Jakob Juelich77457572014-09-22 17:02:43 -0700981 @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
982
Jakob Juelich59cfe542014-09-02 16:37:46 -0700983 @returns: Shard object
984 """
Jakob Juelich77457572014-09-22 17:02:43 -0700985 return models.Shard.smart_get(shard_hostname)
Jakob Juelich59cfe542014-09-02 16:37:46 -0700986
987
Jakob Juelich1b525742014-09-30 13:08:07 -0700988def find_records_for_shard(shard, known_job_ids, known_host_ids):
Jakob Juelich59cfe542014-09-02 16:37:46 -0700989 """Find records that should be sent to a shard.
990
Jakob Juelicha94efe62014-09-18 16:02:49 -0700991 @param shard: Shard to find records for.
Jakob Juelich1b525742014-09-30 13:08:07 -0700992 @param known_job_ids: List of ids of jobs the shard already has.
993 @param known_host_ids: List of ids of hosts the shard already has.
Jakob Juelicha94efe62014-09-18 16:02:49 -0700994
Fang Dengf3705992014-12-16 17:32:18 -0800995 @returns: Tuple of three lists for hosts, jobs, and suite job keyvals:
996 (hosts, jobs, suite_job_keyvals).
Jakob Juelich59cfe542014-09-02 16:37:46 -0700997 """
Jakob Juelich1b525742014-09-30 13:08:07 -0700998 hosts = models.Host.assign_to_shard(shard, known_host_ids)
999 jobs = models.Job.assign_to_shard(shard, known_job_ids)
Fang Dengf3705992014-12-16 17:32:18 -08001000 parent_job_ids = [job.parent_job_id for job in jobs]
1001 suite_job_keyvals = models.JobKeyval.objects.filter(
1002 job_id__in=parent_job_ids)
1003 return hosts, jobs, suite_job_keyvals
Jakob Juelicha94efe62014-09-18 16:02:49 -07001004
1005
1006def _persist_records_with_type_sent_from_shard(
1007 shard, records, record_type, *args, **kwargs):
1008 """
1009 Handle records of a specified type that were sent to the shard master.
1010
1011 @param shard: The shard the records were sent from.
1012 @param records: The records sent in their serialized format.
1013 @param record_type: Type of the objects represented by records.
1014 @param args: Additional arguments that will be passed on to the sanity
1015 checks.
1016 @param kwargs: Additional arguments that will be passed on to the sanity
1017 checks.
1018
1019 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1020
1021 @returns: List of primary keys of the processed records.
1022 """
1023 pks = []
1024 for serialized_record in records:
1025 pk = serialized_record['id']
1026 try:
1027 current_record = record_type.objects.get(pk=pk)
1028 except record_type.DoesNotExist:
1029 raise error.UnallowedRecordsSentToMaster(
1030 'Object with pk %s of type %s does not exist on master.' % (
1031 pk, record_type))
1032
1033 current_record.sanity_check_update_from_shard(
1034 shard, serialized_record, *args, **kwargs)
1035
1036 current_record.update_from_serialized(serialized_record)
1037 pks.append(pk)
1038 return pks
1039
1040
1041def persist_records_sent_from_shard(shard, jobs, hqes):
1042 """
1043 Sanity checking then saving serialized records sent to master from shard.
1044
1045 During heartbeats shards upload jobs and hostqueuentries. This performs
1046 some sanity checks on these and then updates the existing records for those
1047 entries with the updated ones from the heartbeat.
1048
1049 The sanity checks include:
1050 - Checking if the objects sent already exist on the master.
1051 - Checking if the objects sent were assigned to this shard.
1052 - hostqueueentries must be sent together with their jobs.
1053
1054 @param shard: The shard the records were sent from.
1055 @param jobs: The jobs the shard sent.
1056 @param hqes: The hostqueuentries the shart sent.
1057
1058 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1059 """
1060 job_ids_sent = _persist_records_with_type_sent_from_shard(
1061 shard, jobs, models.Job)
1062
1063 _persist_records_with_type_sent_from_shard(
1064 shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001065
1066
Jakob Juelich50e91f72014-10-01 12:43:23 -07001067def forward_single_host_rpc_to_shard(func):
1068 """This decorator forwards rpc calls that modify a host to a shard.
1069
1070 If a host is assigned to a shard, rpcs that change his attributes should be
1071 forwarded to the shard.
1072
1073 This assumes the first argument of the function represents a host id.
1074
1075 @param func: The function to decorate
1076
1077 @returns: The function to replace func with.
1078 """
1079 def replacement(**kwargs):
1080 # Only keyword arguments can be accepted here, as we need the argument
1081 # names to send the rpc. serviceHandler always provides arguments with
1082 # their keywords, so this is not a problem.
1083 host = models.Host.smart_get(kwargs['id'])
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -08001084 if host.shard and not server_utils.is_shard():
Jakob Juelich50e91f72014-10-01 12:43:23 -07001085 run_rpc_on_multiple_hostnames(func.func_name, [host.shard.hostname],
1086 **kwargs)
1087 return func(**kwargs)
1088
1089 return replacement
1090
1091
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001092def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
1093 """Fanout the given rpc to all shards.
1094
1095 @param host_objs: Host objects for the rpc.
1096 @param rpc_name: The name of the rpc.
1097 @param include_hostnames: If True, include the hostnames in the kwargs.
1098 Hostnames are not always necessary, this functions is designed to
1099 send rpcs to the shard a host is on, the rpcs themselves could be
1100 related to labels, acls etc.
1101 @param kwargs: The kwargs for the rpc.
1102 """
1103 # Fanout should only happen from the master to the shards.
1104 if server_utils.is_shard():
1105 return
1106
1107 # Figure out which hosts are on which shards.
1108 shard_host_map = bucket_hosts_by_shard(
1109 host_objs, rpc_hostnames=True)
1110
1111 # Execute the rpc against the appropriate shards.
1112 for shard, hostnames in shard_host_map.iteritems():
1113 if include_hostnames:
1114 kwargs['hosts'] = hostnames
MK Ryu84573e12015-02-18 15:54:09 -08001115 try:
1116 run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
1117 except:
1118 ei = sys.exc_info()
1119 new_exc = error.RPCException('RPC %s failed on shard %s due to '
1120 '%s: %s' % (rpc_name, shard, ei[0].__name__, ei[1]))
1121 raise new_exc.__class__, new_exc, ei[2]
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001122
1123
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001124def forward_multi_host_rpc_to_shards(func):
1125 """This decorator forwards rpc calls that modify multiple hosts.
1126
1127 If a host is assigned to a shard, rpcs that change his attributes should be
1128 forwarded to the shard. Some calls however, take a list of hosts and a
1129 single id to modify, eg: label_add_hosts. This wrapper will sift through
1130 the list of hosts, find each of their shards, and forward the rpc for
1131 those hosts to that shard before calling the local version of the given rpc.
1132
1133 This assumes:
1134 1. The rpc call uses `smart_get` to retrieve host objects, not the
1135 stock django `get` call. This is true for most, if not all rpcs in
1136 the rpc_interface.
1137 2. The kwargs to the function contain either a list of host ids or
1138 hostnames, keyed under 'hosts'. This is true for all the rpc
1139 functions that use 'smart_get'.
1140
1141 @param func: The function to decorate
1142
1143 @returns: The function to replace func with.
1144 """
1145 def replacement(**kwargs):
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001146 fanout_rpc(
1147 models.Host.smart_get_bulk(kwargs['hosts']),
1148 func.func_name, **kwargs)
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001149 return func(**kwargs)
1150
1151 return replacement
1152
1153
Jakob Juelich50e91f72014-10-01 12:43:23 -07001154def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1155 """Runs an rpc to multiple AFEs
1156
1157 This is i.e. used to propagate changes made to hosts after they are assigned
1158 to a shard.
1159
1160 @param rpc_call: Name of the rpc endpoint to call.
1161 @param shard_hostnames: List of hostnames to run the rpcs on.
1162 @param **kwargs: Keyword arguments to pass in the rpcs.
1163 """
1164 for shard_hostname in shard_hostnames:
1165 afe = frontend.AFE(server=shard_hostname)
1166 afe.run(rpc_call, **kwargs)
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001167
1168
1169def get_label(name):
1170 """Gets a label object using a given name.
1171
1172 @param name: Label name.
1173 @raises model.Label.DoesNotExist: when there is no label matching
1174 the given name.
1175 @return: a label object matching the given name.
1176 """
1177 try:
1178 label = models.Label.smart_get(name)
1179 except models.Label.DoesNotExist:
1180 return None
1181 return label
1182
1183
1184def get_global_afe_hostname():
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001185 """Read the hostname of the global AFE from the global configuration."""
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001186 return global_config.global_config.get_config_value(
1187 'SHARD', 'global_afe_hostname')
1188
1189
1190def route_rpc_to_master(rpc_name, **kwargs):
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001191 """Route RPC to master AFE.
MK Ryu2d107562015-02-24 17:45:02 -08001192
1193 @param rpc_name: The name of the rpc.
1194 @param **kwargs: The kwargs for the rpc.
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001195
MK Ryu2d107562015-02-24 17:45:02 -08001196 """
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001197 master_afe = frontend.AFE(server=get_global_afe_hostname())
MK Ryu2d107562015-02-24 17:45:02 -08001198 return master_afe.run(rpc_name, **kwargs)