blob: 3a41ca6efcac6ffdd13a00418d61cf4bf069cc00 [file] [log] [blame]
Aviv Keshet18308922013-02-19 17:49:49 -08001#pylint: disable-msg=C0111
mblighe8819cd2008-02-15 16:48:40 +00002"""\
3Utility functions for rpc_interface.py. We keep them in a separate file so that
4only RPC interface functions go into that file.
5"""
6
7__author__ = 'showard@google.com (Steve Howard)'
8
Aviv Keshet18308922013-02-19 17:49:49 -08009import datetime, os, inspect
showard3d6ae112009-05-02 00:45:48 +000010import django.http
Dan Shi07e09af2013-04-12 09:31:29 -070011from autotest_lib.frontend.afe import models, model_logic
Alex Miller4a193692013-08-21 13:59:01 -070012from autotest_lib.client.common_lib import control_data, error
Jiaxi Luo421608e2014-07-07 14:38:00 -070013from autotest_lib.client.common_lib import global_config, priorities
Aviv Keshetc68807e2013-07-31 16:13:01 -070014from autotest_lib.server.cros import provision
Jakob Juelich50e91f72014-10-01 12:43:23 -070015from autotest_lib.server import frontend
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080016from autotest_lib.server import utils as server_utils
mblighe8819cd2008-02-15 16:48:40 +000017
showarda62866b2008-07-28 21:27:41 +000018NULL_DATETIME = datetime.datetime.max
19NULL_DATE = datetime.date.max
20
mblighe8819cd2008-02-15 16:48:40 +000021def prepare_for_serialization(objects):
jadmanski0afbb632008-06-06 21:10:57 +000022 """
23 Prepare Python objects to be returned via RPC.
Aviv Keshet18308922013-02-19 17:49:49 -080024 @param objects: objects to be prepared.
jadmanski0afbb632008-06-06 21:10:57 +000025 """
26 if (isinstance(objects, list) and len(objects) and
27 isinstance(objects[0], dict) and 'id' in objects[0]):
28 objects = gather_unique_dicts(objects)
29 return _prepare_data(objects)
showardb8d34242008-04-25 18:11:16 +000030
31
showardc92da832009-04-07 18:14:34 +000032def prepare_rows_as_nested_dicts(query, nested_dict_column_names):
33 """
34 Prepare a Django query to be returned via RPC as a sequence of nested
35 dictionaries.
36
37 @param query - A Django model query object with a select_related() method.
38 @param nested_dict_column_names - A list of column/attribute names for the
39 rows returned by query to expand into nested dictionaries using
40 their get_object_dict() method when not None.
41
42 @returns An list suitable to returned in an RPC.
43 """
44 all_dicts = []
45 for row in query.select_related():
46 row_dict = row.get_object_dict()
47 for column in nested_dict_column_names:
48 if row_dict[column] is not None:
49 row_dict[column] = getattr(row, column).get_object_dict()
50 all_dicts.append(row_dict)
51 return prepare_for_serialization(all_dicts)
52
53
showardb8d34242008-04-25 18:11:16 +000054def _prepare_data(data):
jadmanski0afbb632008-06-06 21:10:57 +000055 """
56 Recursively process data structures, performing necessary type
57 conversions to values in data to allow for RPC serialization:
58 -convert datetimes to strings
showard2b9a88b2008-06-13 20:55:03 +000059 -convert tuples and sets to lists
jadmanski0afbb632008-06-06 21:10:57 +000060 """
61 if isinstance(data, dict):
62 new_data = {}
63 for key, value in data.iteritems():
64 new_data[key] = _prepare_data(value)
65 return new_data
showard2b9a88b2008-06-13 20:55:03 +000066 elif (isinstance(data, list) or isinstance(data, tuple) or
67 isinstance(data, set)):
jadmanski0afbb632008-06-06 21:10:57 +000068 return [_prepare_data(item) for item in data]
showard98659972008-07-17 17:00:07 +000069 elif isinstance(data, datetime.date):
showarda62866b2008-07-28 21:27:41 +000070 if data is NULL_DATETIME or data is NULL_DATE:
71 return None
jadmanski0afbb632008-06-06 21:10:57 +000072 return str(data)
73 else:
74 return data
mblighe8819cd2008-02-15 16:48:40 +000075
76
Moises Osorio2dda22e2014-09-16 15:56:24 -070077def fetchall_as_list_of_dicts(cursor):
78 """
79 Converts each row in the cursor to a dictionary so that values can be read
80 by using the column name.
81 @param cursor: The database cursor to read from.
82 @returns: A list of each row in the cursor as a dictionary.
83 """
84 desc = cursor.description
85 return [ dict(zip([col[0] for col in desc], row))
86 for row in cursor.fetchall() ]
87
88
showard3d6ae112009-05-02 00:45:48 +000089def raw_http_response(response_data, content_type=None):
90 response = django.http.HttpResponse(response_data, mimetype=content_type)
91 response['Content-length'] = str(len(response.content))
92 return response
93
94
showardb0dfb9f2008-06-06 18:08:02 +000095def gather_unique_dicts(dict_iterable):
jadmanski0afbb632008-06-06 21:10:57 +000096 """\
97 Pick out unique objects (by ID) from an iterable of object dicts.
98 """
99 id_set = set()
100 result = []
101 for obj in dict_iterable:
102 if obj['id'] not in id_set:
103 id_set.add(obj['id'])
104 result.append(obj)
105 return result
showardb0dfb9f2008-06-06 18:08:02 +0000106
107
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700108def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
jadmanski0afbb632008-06-06 21:10:57 +0000109 """\
110 Generate a SQL WHERE clause for job status filtering, and return it in
111 a dict of keyword args to pass to query.extra(). No more than one of
112 the parameters should be passed as True.
showard6c65d252009-10-01 18:45:22 +0000113 * not_yet_run: all HQEs are Queued
114 * finished: all HQEs are complete
115 * running: everything else
jadmanski0afbb632008-06-06 21:10:57 +0000116 """
117 assert not ((not_yet_run and running) or
118 (not_yet_run and finished) or
119 (running and finished)), ('Cannot specify more than one '
120 'filter to this function')
showard6c65d252009-10-01 18:45:22 +0000121
showardeab66ce2009-12-23 00:03:56 +0000122 not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
123 'WHERE status != "%s")'
showard6c65d252009-10-01 18:45:22 +0000124 % models.HostQueueEntry.Status.QUEUED)
showardeab66ce2009-12-23 00:03:56 +0000125 not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
126 'WHERE not complete)')
showard6c65d252009-10-01 18:45:22 +0000127
jadmanski0afbb632008-06-06 21:10:57 +0000128 if not_yet_run:
showard6c65d252009-10-01 18:45:22 +0000129 where = ['id NOT IN ' + not_queued]
jadmanski0afbb632008-06-06 21:10:57 +0000130 elif running:
showard6c65d252009-10-01 18:45:22 +0000131 where = ['(id IN %s) AND (id IN %s)' % (not_queued, not_finished)]
jadmanski0afbb632008-06-06 21:10:57 +0000132 elif finished:
showard6c65d252009-10-01 18:45:22 +0000133 where = ['id NOT IN ' + not_finished]
jadmanski0afbb632008-06-06 21:10:57 +0000134 else:
showard10f41672009-05-13 21:28:25 +0000135 return {}
jadmanski0afbb632008-06-06 21:10:57 +0000136 return {'where': where}
mblighe8819cd2008-02-15 16:48:40 +0000137
138
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700139def extra_job_type_filters(extra_args, suite=False,
140 sub=False, standalone=False):
141 """\
142 Generate a SQL WHERE clause for job status filtering, and return it in
143 a dict of keyword args to pass to query.extra().
144
145 param extra_args: a dict of existing extra_args.
146
147 No more than one of the parameters should be passed as True:
148 * suite: job which is parent of other jobs
149 * sub: job with a parent job
150 * standalone: job with no child or parent jobs
151 """
152 assert not ((suite and sub) or
153 (suite and standalone) or
154 (sub and standalone)), ('Cannot specify more than one '
155 'filter to this function')
156
157 where = extra_args.get('where', [])
158 parent_job_id = ('DISTINCT parent_job_id')
159 child_job_id = ('id')
160 filter_common = ('(SELECT %s FROM afe_jobs '
161 'WHERE parent_job_id IS NOT NULL)')
162
163 if suite:
164 where.append('id IN ' + filter_common % parent_job_id)
165 elif sub:
166 where.append('id IN ' + filter_common % child_job_id)
167 elif standalone:
168 where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
169 'WHERE parent_job_id IS NOT NULL'
170 ' AND (sub_query.parent_job_id=afe_jobs.id'
171 ' OR sub_query.id=afe_jobs.id))')
172 else:
173 return extra_args
174
175 extra_args['where'] = where
176 return extra_args
177
178
179
showard87cc38f2009-08-20 23:37:04 +0000180def extra_host_filters(multiple_labels=()):
jadmanski0afbb632008-06-06 21:10:57 +0000181 """\
182 Generate SQL WHERE clauses for matching hosts in an intersection of
183 labels.
184 """
185 extra_args = {}
showardeab66ce2009-12-23 00:03:56 +0000186 where_str = ('afe_hosts.id in (select host_id from afe_hosts_labels '
jadmanski0afbb632008-06-06 21:10:57 +0000187 'where label_id=%s)')
188 extra_args['where'] = [where_str] * len(multiple_labels)
189 extra_args['params'] = [models.Label.smart_get(label).id
190 for label in multiple_labels]
191 return extra_args
showard8e3aa5e2008-04-08 19:42:32 +0000192
193
showard87cc38f2009-08-20 23:37:04 +0000194def get_host_query(multiple_labels, exclude_only_if_needed_labels,
showard8aa84fc2009-09-16 17:17:55 +0000195 exclude_atomic_group_hosts, valid_only, filter_data):
196 if valid_only:
197 query = models.Host.valid_objects.all()
198 else:
199 query = models.Host.objects.all()
200
showard43a3d262008-11-12 18:17:05 +0000201 if exclude_only_if_needed_labels:
202 only_if_needed_labels = models.Label.valid_objects.filter(
203 only_if_needed=True)
showardf7eac6f2008-11-13 21:18:01 +0000204 if only_if_needed_labels.count() > 0:
showard87cc38f2009-08-20 23:37:04 +0000205 only_if_needed_ids = ','.join(
206 str(label['id'])
207 for label in only_if_needed_labels.values('id'))
showardf7eac6f2008-11-13 21:18:01 +0000208 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000209 query, 'afe_hosts_labels', join_key='host_id',
210 join_condition=('afe_hosts_labels_exclude_OIN.label_id IN (%s)'
showard87cc38f2009-08-20 23:37:04 +0000211 % only_if_needed_ids),
212 suffix='_exclude_OIN', exclude=True)
showard8aa84fc2009-09-16 17:17:55 +0000213
showard87cc38f2009-08-20 23:37:04 +0000214 if exclude_atomic_group_hosts:
215 atomic_group_labels = models.Label.valid_objects.filter(
216 atomic_group__isnull=False)
217 if atomic_group_labels.count() > 0:
218 atomic_group_label_ids = ','.join(
219 str(atomic_group['id'])
220 for atomic_group in atomic_group_labels.values('id'))
221 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000222 query, 'afe_hosts_labels', join_key='host_id',
223 join_condition=(
224 'afe_hosts_labels_exclude_AG.label_id IN (%s)'
225 % atomic_group_label_ids),
showard87cc38f2009-08-20 23:37:04 +0000226 suffix='_exclude_AG', exclude=True)
Fang Deng04d30612013-04-10 18:13:13 -0700227 try:
228 assert 'extra_args' not in filter_data
229 filter_data['extra_args'] = extra_host_filters(multiple_labels)
230 return models.Host.query_objects(filter_data, initial_query=query)
231 except models.Label.DoesNotExist as e:
232 return models.Host.objects.none()
showard43a3d262008-11-12 18:17:05 +0000233
234
showard8fd58242008-03-10 21:29:07 +0000235class InconsistencyException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000236 'Raised when a list of objects does not have a consistent value'
showard8fd58242008-03-10 21:29:07 +0000237
238
239def get_consistent_value(objects, field):
mblighc5ddfd12008-08-04 17:15:00 +0000240 if not objects:
241 # well a list of nothing is consistent
242 return None
243
jadmanski0afbb632008-06-06 21:10:57 +0000244 value = getattr(objects[0], field)
245 for obj in objects:
246 this_value = getattr(obj, field)
247 if this_value != value:
248 raise InconsistencyException(objects[0], obj)
249 return value
showard8fd58242008-03-10 21:29:07 +0000250
251
showard2b9a88b2008-06-13 20:55:03 +0000252def prepare_generate_control_file(tests, kernel, label, profilers):
jadmanski0afbb632008-06-06 21:10:57 +0000253 test_objects = [models.Test.smart_get(test) for test in tests]
showard2b9a88b2008-06-13 20:55:03 +0000254 profiler_objects = [models.Profiler.smart_get(profiler)
255 for profiler in profilers]
jadmanski0afbb632008-06-06 21:10:57 +0000256 # ensure tests are all the same type
257 try:
258 test_type = get_consistent_value(test_objects, 'test_type')
259 except InconsistencyException, exc:
260 test1, test2 = exc.args
mblighec5546d2008-06-16 16:51:28 +0000261 raise model_logic.ValidationError(
jadmanski0afbb632008-06-06 21:10:57 +0000262 {'tests' : 'You cannot run both server- and client-side '
263 'tests together (tests %s and %s differ' % (
264 test1.name, test2.name)})
showard8fd58242008-03-10 21:29:07 +0000265
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700266 is_server = (test_type == control_data.CONTROL_TYPE.SERVER)
showard14374b12009-01-31 00:11:54 +0000267 if test_objects:
268 synch_count = max(test.sync_count for test in test_objects)
269 else:
270 synch_count = 1
jadmanski0afbb632008-06-06 21:10:57 +0000271 if label:
272 label = models.Label.smart_get(label)
mblighe8819cd2008-02-15 16:48:40 +0000273
showard989f25d2008-10-01 11:38:11 +0000274 dependencies = set(label.name for label
275 in models.Label.objects.filter(test__in=test_objects))
276
showard2bab8f42008-11-12 18:15:22 +0000277 cf_info = dict(is_server=is_server, synch_count=synch_count,
278 dependencies=list(dependencies))
279 return cf_info, test_objects, profiler_objects, label
showard989f25d2008-10-01 11:38:11 +0000280
281
282def check_job_dependencies(host_objects, job_dependencies):
283 """
284 Check that a set of machines satisfies a job's dependencies.
285 host_objects: list of models.Host objects
286 job_dependencies: list of names of labels
287 """
288 # check that hosts satisfy dependencies
289 host_ids = [host.id for host in host_objects]
290 hosts_in_job = models.Host.objects.filter(id__in=host_ids)
291 ok_hosts = hosts_in_job
292 for index, dependency in enumerate(job_dependencies):
Alex Milleraa772002014-04-10 17:51:21 -0700293 if not provision.is_for_special_action(dependency):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700294 ok_hosts = ok_hosts.filter(labels__name=dependency)
showard989f25d2008-10-01 11:38:11 +0000295 failing_hosts = (set(host.hostname for host in host_objects) -
296 set(host.hostname for host in ok_hosts))
297 if failing_hosts:
298 raise model_logic.ValidationError(
Eric Lie0493a42010-11-15 13:05:43 -0800299 {'hosts' : 'Host(s) failed to meet job dependencies (' +
300 (', '.join(job_dependencies)) + '): ' +
301 (', '.join(failing_hosts))})
302
showard989f25d2008-10-01 11:38:11 +0000303
Alex Miller4a193692013-08-21 13:59:01 -0700304def check_job_metahost_dependencies(metahost_objects, job_dependencies):
305 """
306 Check that at least one machine within the metahost spec satisfies the job's
307 dependencies.
308
309 @param metahost_objects A list of label objects representing the metahosts.
310 @param job_dependencies A list of strings of the required label names.
311 @raises NoEligibleHostException If a metahost cannot run the job.
312 """
313 for metahost in metahost_objects:
314 hosts = models.Host.objects.filter(labels=metahost)
315 for label_name in job_dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700316 if not provision.is_for_special_action(label_name):
Alex Miller4a193692013-08-21 13:59:01 -0700317 hosts = hosts.filter(labels__name=label_name)
318 if not any(hosts):
319 raise error.NoEligibleHostException("No hosts within %s satisfy %s."
320 % (metahost.name, ', '.join(job_dependencies)))
321
showard2bab8f42008-11-12 18:15:22 +0000322
323def _execution_key_for(host_queue_entry):
324 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
325
326
327def check_abort_synchronous_jobs(host_queue_entries):
328 # ensure user isn't aborting part of a synchronous autoserv execution
329 count_per_execution = {}
330 for queue_entry in host_queue_entries:
331 key = _execution_key_for(queue_entry)
332 count_per_execution.setdefault(key, 0)
333 count_per_execution[key] += 1
334
335 for queue_entry in host_queue_entries:
336 if not queue_entry.execution_subdir:
337 continue
338 execution_count = count_per_execution[_execution_key_for(queue_entry)]
339 if execution_count < queue_entry.job.synch_count:
mbligh1ef218d2009-08-03 16:57:56 +0000340 raise model_logic.ValidationError(
341 {'' : 'You cannot abort part of a synchronous job execution '
342 '(%d/%s), %d included, %d expected'
343 % (queue_entry.job.id, queue_entry.execution_subdir,
344 execution_count, queue_entry.job.synch_count)})
showard8fbae652009-01-20 23:23:10 +0000345
346
showardc92da832009-04-07 18:14:34 +0000347def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700348 dependencies, atomic_group):
showardc92da832009-04-07 18:14:34 +0000349 """
350 Attempt to reject create_job requests with an atomic group that
351 will be impossible to schedule. The checks are not perfect but
352 should catch the most obvious issues.
353
354 @param synch_count - The job's minimum synch count.
355 @param host_objects - A list of models.Host instances.
356 @param metahost_objects - A list of models.Label instances.
357 @param dependencies - A list of job dependency label names.
showardc92da832009-04-07 18:14:34 +0000358 @param labels_by_name - A dictionary mapping label names to models.Label
359 instance. Used to look up instances for dependencies.
360
361 @raises model_logic.ValidationError - When an issue is found.
362 """
363 # If specific host objects were supplied with an atomic group, verify
364 # that there are enough to satisfy the synch_count.
365 minimum_required = synch_count or 1
366 if (host_objects and not metahost_objects and
367 len(host_objects) < minimum_required):
368 raise model_logic.ValidationError(
369 {'hosts':
370 'only %d hosts provided for job with synch_count = %d' %
371 (len(host_objects), synch_count)})
372
373 # Check that the atomic group has a hope of running this job
374 # given any supplied metahosts and dependancies that may limit.
375
376 # Get a set of hostnames in the atomic group.
377 possible_hosts = set()
378 for label in atomic_group.label_set.all():
379 possible_hosts.update(h.hostname for h in label.host_set.all())
380
381 # Filter out hosts that don't match all of the job dependency labels.
Alex Miller871291b2013-08-08 01:19:20 -0700382 for label in models.Label.objects.filter(name__in=dependencies):
showardc92da832009-04-07 18:14:34 +0000383 hosts_in_label = (h.hostname for h in label.host_set.all())
384 possible_hosts.intersection_update(hosts_in_label)
385
showard225bdc12009-04-13 16:09:21 +0000386 if not host_objects and not metahost_objects:
387 # No hosts or metahosts are required to queue an atomic group Job.
388 # However, if they are given, we respect them below.
389 host_set = possible_hosts
390 else:
391 host_set = set(host.hostname for host in host_objects)
392 unusable_host_set = host_set.difference(possible_hosts)
393 if unusable_host_set:
394 raise model_logic.ValidationError(
395 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %
396 (', '.join(sorted(unusable_host_set)), atomic_group.name)})
showardc92da832009-04-07 18:14:34 +0000397
398 # Lookup hosts provided by each meta host and merge them into the
399 # host_set for final counting.
400 for meta_host in metahost_objects:
401 meta_possible = possible_hosts.copy()
402 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())
403 meta_possible.intersection_update(hosts_in_meta_host)
404
405 # Count all hosts that this meta_host will provide.
406 host_set.update(meta_possible)
407
408 if len(host_set) < minimum_required:
409 raise model_logic.ValidationError(
410 {'atomic_group_name':
411 'Insufficient hosts in Atomic Group "%s" with the'
412 ' supplied dependencies and meta_hosts.' %
413 (atomic_group.name,)})
414
415
showardbe0d8692009-08-20 23:42:44 +0000416def check_modify_host(update_data):
417 """
418 Sanity check modify_host* requests.
419
420 @param update_data: A dictionary with the changes to make to a host
421 or hosts.
422 """
423 # Only the scheduler (monitor_db) is allowed to modify Host status.
424 # Otherwise race conditions happen as a hosts state is changed out from
425 # beneath tasks being run on a host.
426 if 'status' in update_data:
427 raise model_logic.ValidationError({
428 'status': 'Host status can not be modified by the frontend.'})
429
430
showardce7c0922009-09-11 18:39:24 +0000431def check_modify_host_locking(host, update_data):
432 """
433 Checks when locking/unlocking has been requested if the host is already
434 locked/unlocked.
435
436 @param host: models.Host object to be modified
437 @param update_data: A dictionary with the changes to make to the host.
438 """
439 locked = update_data.get('locked', None)
440 if locked is not None:
441 if locked and host.locked:
442 raise model_logic.ValidationError({
443 'locked': 'Host already locked by %s on %s.' %
444 (host.locked_by, host.lock_time)})
445 if not locked and not host.locked:
446 raise model_logic.ValidationError({
447 'locked': 'Host already unlocked.'})
448
449
showard8fbae652009-01-20 23:23:10 +0000450def get_motd():
451 dirname = os.path.dirname(__file__)
452 filename = os.path.join(dirname, "..", "..", "motd.txt")
453 text = ''
454 try:
455 fp = open(filename, "r")
456 try:
457 text = fp.read()
458 finally:
459 fp.close()
460 except:
461 pass
462
463 return text
showard29f7cd22009-04-29 21:16:24 +0000464
465
466def _get_metahost_counts(metahost_objects):
467 metahost_counts = {}
468 for metahost in metahost_objects:
469 metahost_counts.setdefault(metahost, 0)
470 metahost_counts[metahost] += 1
471 return metahost_counts
472
473
showarda965cef2009-05-15 23:17:41 +0000474def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
showard29f7cd22009-04-29 21:16:24 +0000475 hosts = []
476 one_time_hosts = []
477 meta_hosts = []
478 atomic_group = None
jamesren2275ef12010-04-12 18:25:06 +0000479 hostless = False
showard29f7cd22009-04-29 21:16:24 +0000480
showard4d077562009-05-08 18:24:36 +0000481 queue_entries = job.hostqueueentry_set.all()
showarda965cef2009-05-15 23:17:41 +0000482 if queue_entry_filter_data:
483 queue_entries = models.HostQueueEntry.query_objects(
484 queue_entry_filter_data, initial_query=queue_entries)
showard4d077562009-05-08 18:24:36 +0000485
486 for queue_entry in queue_entries:
showard29f7cd22009-04-29 21:16:24 +0000487 if (queue_entry.host and (preserve_metahosts or
488 not queue_entry.meta_host)):
489 if queue_entry.deleted:
490 continue
491 if queue_entry.host.invalid:
492 one_time_hosts.append(queue_entry.host)
493 else:
494 hosts.append(queue_entry.host)
jamesren2275ef12010-04-12 18:25:06 +0000495 elif queue_entry.meta_host:
showard29f7cd22009-04-29 21:16:24 +0000496 meta_hosts.append(queue_entry.meta_host)
jamesren2275ef12010-04-12 18:25:06 +0000497 else:
498 hostless = True
499
showard29f7cd22009-04-29 21:16:24 +0000500 if atomic_group is None:
501 if queue_entry.atomic_group is not None:
502 atomic_group = queue_entry.atomic_group
503 else:
504 assert atomic_group.name == queue_entry.atomic_group.name, (
505 'DB inconsistency. HostQueueEntries with multiple atomic'
506 ' groups on job %s: %s != %s' % (
507 id, atomic_group.name, queue_entry.atomic_group.name))
508
509 meta_host_counts = _get_metahost_counts(meta_hosts)
510
511 info = dict(dependencies=[label.name for label
512 in job.dependency_labels.all()],
513 hosts=hosts,
514 meta_hosts=meta_hosts,
515 meta_host_counts=meta_host_counts,
516 one_time_hosts=one_time_hosts,
jamesren2275ef12010-04-12 18:25:06 +0000517 atomic_group=atomic_group,
518 hostless=hostless)
showard29f7cd22009-04-29 21:16:24 +0000519 return info
520
521
showard09d80f92009-11-19 01:01:19 +0000522def check_for_duplicate_hosts(host_objects):
523 host_ids = set()
524 duplicate_hostnames = set()
525 for host in host_objects:
526 if host.id in host_ids:
527 duplicate_hostnames.add(host.hostname)
528 host_ids.add(host.id)
529
530 if duplicate_hostnames:
531 raise model_logic.ValidationError(
532 {'hosts' : 'Duplicate hosts: %s'
533 % ', '.join(duplicate_hostnames)})
534
535
showarda1e74b32009-05-12 17:32:04 +0000536def create_new_job(owner, options, host_objects, metahost_objects,
537 atomic_group=None):
showard29f7cd22009-04-29 21:16:24 +0000538 all_host_objects = host_objects + metahost_objects
539 metahost_counts = _get_metahost_counts(metahost_objects)
showarda1e74b32009-05-12 17:32:04 +0000540 dependencies = options.get('dependencies', [])
541 synch_count = options.get('synch_count')
showard29f7cd22009-04-29 21:16:24 +0000542
showard29f7cd22009-04-29 21:16:24 +0000543 if atomic_group:
544 check_atomic_group_create_job(
545 synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700546 dependencies, atomic_group)
showard29f7cd22009-04-29 21:16:24 +0000547 else:
548 if synch_count is not None and synch_count > len(all_host_objects):
549 raise model_logic.ValidationError(
550 {'hosts':
551 'only %d hosts provided for job with synch_count = %d' %
552 (len(all_host_objects), synch_count)})
553 atomic_hosts = models.Host.objects.filter(
554 id__in=[host.id for host in host_objects],
555 labels__atomic_group=True)
556 unusable_host_names = [host.hostname for host in atomic_hosts]
557 if unusable_host_names:
558 raise model_logic.ValidationError(
559 {'hosts':
560 'Host(s) "%s" are atomic group hosts but no '
561 'atomic group was specified for this job.' %
562 (', '.join(unusable_host_names),)})
563
showard09d80f92009-11-19 01:01:19 +0000564 check_for_duplicate_hosts(host_objects)
showard29f7cd22009-04-29 21:16:24 +0000565
Aviv Keshetc68807e2013-07-31 16:13:01 -0700566 for label_name in dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700567 if provision.is_for_special_action(label_name):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700568 # TODO: We could save a few queries
569 # if we had a bulk ensure-label-exists function, which used
570 # a bulk .get() call. The win is probably very small.
Alex Miller871291b2013-08-08 01:19:20 -0700571 _ensure_label_exists(label_name)
Aviv Keshetc68807e2013-07-31 16:13:01 -0700572
Alex Miller4a193692013-08-21 13:59:01 -0700573 # This only checks targeted hosts, not hosts eligible due to the metahost
574 check_job_dependencies(host_objects, dependencies)
575 check_job_metahost_dependencies(metahost_objects, dependencies)
576
Alex Miller871291b2013-08-08 01:19:20 -0700577 options['dependencies'] = list(
578 models.Label.objects.filter(name__in=dependencies))
showard29f7cd22009-04-29 21:16:24 +0000579
showarda1e74b32009-05-12 17:32:04 +0000580 for label in metahost_objects + options['dependencies']:
showard29f7cd22009-04-29 21:16:24 +0000581 if label.atomic_group and not atomic_group:
582 raise model_logic.ValidationError(
583 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000584 'Dependency %r requires an atomic group but no '
585 'atomic_group_name or meta_host in an atomic group was '
586 'specified for this job.' % label.name})
showard29f7cd22009-04-29 21:16:24 +0000587 elif (label.atomic_group and
588 label.atomic_group.name != atomic_group.name):
589 raise model_logic.ValidationError(
590 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000591 'meta_hosts or dependency %r requires atomic group '
592 '%r instead of the supplied atomic_group_name=%r.' %
593 (label.name, label.atomic_group.name, atomic_group.name)})
showard29f7cd22009-04-29 21:16:24 +0000594
showarda1e74b32009-05-12 17:32:04 +0000595 job = models.Job.create(owner=owner, options=options,
596 hosts=all_host_objects)
showard29f7cd22009-04-29 21:16:24 +0000597 job.queue(all_host_objects, atomic_group=atomic_group,
showarda1e74b32009-05-12 17:32:04 +0000598 is_template=options.get('is_template', False))
showard29f7cd22009-04-29 21:16:24 +0000599 return job.id
showard0957a842009-05-11 19:25:08 +0000600
601
Aviv Keshetc68807e2013-07-31 16:13:01 -0700602def _ensure_label_exists(name):
603 """
604 Ensure that a label called |name| exists in the Django models.
605
606 This function is to be called from within afe rpcs only, as an
607 alternative to server.cros.provision.ensure_label_exists(...). It works
608 by Django model manipulation, rather than by making another create_label
609 rpc call.
610
611 @param name: the label to check for/create.
612 @raises ValidationError: There was an error in the response that was
613 not because the label already existed.
614 @returns True is a label was created, False otherwise.
615 """
616 try:
617 models.Label.objects.get(name=name)
618 except models.Label.DoesNotExist:
619 new_label = models.Label.objects.create(name=name)
620 new_label.save()
621 return True
622 return False
623
624
showard909c9142009-07-07 20:54:42 +0000625def find_platform_and_atomic_group(host):
626 """
627 Figure out the platform name and atomic group name for the given host
628 object. If none, the return value for either will be None.
629
630 @returns (platform name, atomic group name) for the given host.
631 """
showard0957a842009-05-11 19:25:08 +0000632 platforms = [label.name for label in host.label_list if label.platform]
633 if not platforms:
showard909c9142009-07-07 20:54:42 +0000634 platform = None
635 else:
636 platform = platforms[0]
showard0957a842009-05-11 19:25:08 +0000637 if len(platforms) > 1:
638 raise ValueError('Host %s has more than one platform: %s' %
639 (host.hostname, ', '.join(platforms)))
showard909c9142009-07-07 20:54:42 +0000640 for label in host.label_list:
641 if label.atomic_group:
642 atomic_group_name = label.atomic_group.name
643 break
644 else:
645 atomic_group_name = None
646 # Don't check for multiple atomic groups on a host here. That is an
647 # error but should not trip up the RPC interface. monitor_db_cleanup
648 # deals with it. This just returns the first one found.
649 return platform, atomic_group_name
showardc0ac3a72009-07-08 21:14:45 +0000650
651
652# support for get_host_queue_entries_and_special_tasks()
653
654def _common_entry_to_dict(entry, type, job_dict):
655 return dict(type=type,
656 host=entry.host.get_object_dict(),
657 job=job_dict,
658 execution_path=entry.execution_path(),
659 status=entry.status,
660 started_on=entry.started_on,
Jiaxi Luocb91d2e2014-06-30 10:37:22 -0700661 id=str(entry.id) + type,
662 oid=entry.id)
showardc0ac3a72009-07-08 21:14:45 +0000663
664
665def _special_task_to_dict(special_task):
666 job_dict = None
667 if special_task.queue_entry:
668 job_dict = special_task.queue_entry.job.get_object_dict()
669 return _common_entry_to_dict(special_task, special_task.task, job_dict)
670
671
672def _queue_entry_to_dict(queue_entry):
673 return _common_entry_to_dict(queue_entry, 'Job',
674 queue_entry.job.get_object_dict())
675
676
677def _compute_next_job_for_tasks(queue_entries, special_tasks):
678 """
679 For each task, try to figure out the next job that ran after that task.
680 This is done using two pieces of information:
681 * if the task has a queue entry, we can use that entry's job ID.
682 * if the task has a time_started, we can try to compare that against the
683 started_on field of queue_entries. this isn't guaranteed to work perfectly
684 since queue_entries may also have null started_on values.
685 * if the task has neither, or if use of time_started fails, just use the
686 last computed job ID.
687 """
688 next_job_id = None # most recently computed next job
689 hqe_index = 0 # index for scanning by started_on times
690 for task in special_tasks:
691 if task.queue_entry:
692 next_job_id = task.queue_entry.job.id
693 elif task.time_started is not None:
694 for queue_entry in queue_entries[hqe_index:]:
695 if queue_entry.started_on is None:
696 continue
697 if queue_entry.started_on < task.time_started:
698 break
699 next_job_id = queue_entry.job.id
700
701 task.next_job_id = next_job_id
702
703 # advance hqe_index to just after next_job_id
704 if next_job_id is not None:
705 for queue_entry in queue_entries[hqe_index:]:
706 if queue_entry.job.id < next_job_id:
707 break
708 hqe_index += 1
709
710
711def interleave_entries(queue_entries, special_tasks):
712 """
713 Both lists should be ordered by descending ID.
714 """
715 _compute_next_job_for_tasks(queue_entries, special_tasks)
716
717 # start with all special tasks that've run since the last job
718 interleaved_entries = []
719 for task in special_tasks:
720 if task.next_job_id is not None:
721 break
722 interleaved_entries.append(_special_task_to_dict(task))
723
724 # now interleave queue entries with the remaining special tasks
725 special_task_index = len(interleaved_entries)
726 for queue_entry in queue_entries:
727 interleaved_entries.append(_queue_entry_to_dict(queue_entry))
728 # add all tasks that ran between this job and the previous one
729 for task in special_tasks[special_task_index:]:
730 if task.next_job_id < queue_entry.job.id:
731 break
732 interleaved_entries.append(_special_task_to_dict(task))
733 special_task_index += 1
734
735 return interleaved_entries
jamesren4a41e012010-07-16 22:33:48 +0000736
737
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800738def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
739 """Figure out which hosts are on which shards.
740
741 @param host_objs: A list of host objects.
742 @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
743 instead of the 'real' shard hostnames. This only matters for testing
744 environments.
745
746 @return: A map of shard hostname: list of hosts on the shard.
747 """
748 shard_host_map = {}
749 for host in host_objs:
750 if host.shard:
751 shard_name = (host.shard.rpc_hostname() if rpc_hostnames
752 else host.shard.hostname)
753 shard_host_map.setdefault(shard_name, []).append(host.hostname)
754 return shard_host_map
755
756
jamesren4a41e012010-07-16 22:33:48 +0000757def get_create_job_common_args(local_args):
758 """
759 Returns a dict containing only the args that apply for create_job_common
760
761 Returns a subset of local_args, which contains only the arguments that can
762 be passed in to create_job_common().
763 """
Alex Miller7d658cf2013-09-04 16:00:35 -0700764 # This code is only here to not kill suites scheduling tests when priority
765 # becomes an int instead of a string.
766 if isinstance(local_args['priority'], str):
767 local_args['priority'] = priorities.Priority.DEFAULT
768 # </migration hack>
jamesren4a41e012010-07-16 22:33:48 +0000769 arg_names, _, _, _ = inspect.getargspec(create_job_common)
770 return dict(item for item in local_args.iteritems() if item[0] in arg_names)
771
772
773def create_job_common(name, priority, control_type, control_file=None,
774 hosts=(), meta_hosts=(), one_time_hosts=(),
775 atomic_group_name=None, synch_count=None,
Simran Basi7e605742013-11-12 13:43:36 -0800776 is_template=False, timeout=None, timeout_mins=None,
777 max_runtime_mins=None, run_verify=True, email_list='',
778 dependencies=(), reboot_before=None, reboot_after=None,
jamesren4a41e012010-07-16 22:33:48 +0000779 parse_failed_repair=None, hostless=False, keyvals=None,
Aviv Keshet18308922013-02-19 17:49:49 -0800780 drone_set=None, parameterized_job=None,
Dan Shi07e09af2013-04-12 09:31:29 -0700781 parent_job_id=None, test_retry=0, run_reset=True):
Aviv Keshet18308922013-02-19 17:49:49 -0800782 #pylint: disable-msg=C0111
jamesren4a41e012010-07-16 22:33:48 +0000783 """
784 Common code between creating "standard" jobs and creating parameterized jobs
785 """
786 user = models.User.current_user()
787 owner = user.login
788
jamesren4a41e012010-07-16 22:33:48 +0000789 # input validation
790 if not (hosts or meta_hosts or one_time_hosts or atomic_group_name
791 or hostless):
792 raise model_logic.ValidationError({
793 'arguments' : "You must pass at least one of 'hosts', "
794 "'meta_hosts', 'one_time_hosts', "
795 "'atomic_group_name', or 'hostless'"
796 })
797
798 if hostless:
799 if hosts or meta_hosts or one_time_hosts or atomic_group_name:
800 raise model_logic.ValidationError({
801 'hostless': 'Hostless jobs cannot include any hosts!'})
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700802 server_type = control_data.CONTROL_TYPE_NAMES.SERVER
jamesren4a41e012010-07-16 22:33:48 +0000803 if control_type != server_type:
804 raise model_logic.ValidationError({
805 'control_type': 'Hostless jobs cannot use client-side '
806 'control files'})
807
Alex Miller871291b2013-08-08 01:19:20 -0700808 atomic_groups_by_name = dict((ag.name, ag)
jamesren4a41e012010-07-16 22:33:48 +0000809 for ag in models.AtomicGroup.objects.all())
Alex Miller871291b2013-08-08 01:19:20 -0700810 label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
jamesren4a41e012010-07-16 22:33:48 +0000811
812 # Schedule on an atomic group automagically if one of the labels given
813 # is an atomic group label and no explicit atomic_group_name was supplied.
814 if not atomic_group_name:
Alex Miller871291b2013-08-08 01:19:20 -0700815 for label in label_objects:
jamesren4a41e012010-07-16 22:33:48 +0000816 if label and label.atomic_group:
817 atomic_group_name = label.atomic_group.name
818 break
jamesren4a41e012010-07-16 22:33:48 +0000819 # convert hostnames & meta hosts to host/label objects
820 host_objects = models.Host.smart_get_bulk(hosts)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800821 if not server_utils.is_shard():
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800822 shard_host_map = bucket_hosts_by_shard(host_objects)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800823 num_shards = len(shard_host_map)
824 if (num_shards > 1 or (num_shards == 1 and
825 len(shard_host_map.values()[0]) != len(host_objects))):
826 # We disallow the following jobs on master:
827 # num_shards > 1: this is a job spanning across multiple shards.
828 # num_shards == 1 but number of hosts on shard is less
829 # than total number of hosts: this is a job that spans across
830 # one shard and the master.
831 raise ValueError(
832 'The following hosts are on shard(s), please create '
833 'seperate jobs for hosts on each shard: %s ' %
834 shard_host_map)
jamesren4a41e012010-07-16 22:33:48 +0000835 metahost_objects = []
Alex Miller871291b2013-08-08 01:19:20 -0700836 meta_host_labels_by_name = {label.name: label for label in label_objects}
jamesren4a41e012010-07-16 22:33:48 +0000837 for label_name in meta_hosts or []:
Alex Miller871291b2013-08-08 01:19:20 -0700838 if label_name in meta_host_labels_by_name:
839 metahost_objects.append(meta_host_labels_by_name[label_name])
jamesren4a41e012010-07-16 22:33:48 +0000840 elif label_name in atomic_groups_by_name:
841 # If given a metahost name that isn't a Label, check to
842 # see if the user was specifying an Atomic Group instead.
843 atomic_group = atomic_groups_by_name[label_name]
844 if atomic_group_name and atomic_group_name != atomic_group.name:
845 raise model_logic.ValidationError({
846 'meta_hosts': (
847 'Label "%s" not found. If assumed to be an '
848 'atomic group it would conflict with the '
849 'supplied atomic group "%s".' % (
850 label_name, atomic_group_name))})
851 atomic_group_name = atomic_group.name
852 else:
853 raise model_logic.ValidationError(
854 {'meta_hosts' : 'Label "%s" not found' % label_name})
855
856 # Create and sanity check an AtomicGroup object if requested.
857 if atomic_group_name:
858 if one_time_hosts:
859 raise model_logic.ValidationError(
860 {'one_time_hosts':
861 'One time hosts cannot be used with an Atomic Group.'})
862 atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
863 if synch_count and synch_count > atomic_group.max_number_of_machines:
864 raise model_logic.ValidationError(
865 {'atomic_group_name' :
866 'You have requested a synch_count (%d) greater than the '
867 'maximum machines in the requested Atomic Group (%d).' %
868 (synch_count, atomic_group.max_number_of_machines)})
869 else:
870 atomic_group = None
871
872 for host in one_time_hosts or []:
873 this_host = models.Host.create_one_time_host(host)
874 host_objects.append(this_host)
875
876 options = dict(name=name,
877 priority=priority,
878 control_file=control_file,
879 control_type=control_type,
880 is_template=is_template,
881 timeout=timeout,
Simran Basi7e605742013-11-12 13:43:36 -0800882 timeout_mins=timeout_mins,
Simran Basi34217022012-11-06 13:43:15 -0800883 max_runtime_mins=max_runtime_mins,
jamesren4a41e012010-07-16 22:33:48 +0000884 synch_count=synch_count,
885 run_verify=run_verify,
886 email_list=email_list,
887 dependencies=dependencies,
888 reboot_before=reboot_before,
889 reboot_after=reboot_after,
890 parse_failed_repair=parse_failed_repair,
891 keyvals=keyvals,
892 drone_set=drone_set,
Aviv Keshet18308922013-02-19 17:49:49 -0800893 parameterized_job=parameterized_job,
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800894 parent_job_id=parent_job_id,
Dan Shi07e09af2013-04-12 09:31:29 -0700895 test_retry=test_retry,
896 run_reset=run_reset)
jamesren4a41e012010-07-16 22:33:48 +0000897 return create_new_job(owner=owner,
898 options=options,
899 host_objects=host_objects,
900 metahost_objects=metahost_objects,
901 atomic_group=atomic_group)
Simran Basib6ec8ae2014-04-23 12:05:08 -0700902
903
904def encode_ascii(control_file):
905 """Force a control file to only contain ascii characters.
906
907 @param control_file: Control file to encode.
908
909 @returns the control file in an ascii encoding.
910
911 @raises error.ControlFileMalformed: if encoding fails.
912 """
913 try:
914 return control_file.encode('ascii')
915 except UnicodeDecodeError as e:
Jiaxi Luo421608e2014-07-07 14:38:00 -0700916 raise error.ControlFileMalformed(str(e))
917
918
919def get_wmatrix_url():
920 """Get wmatrix url from config file.
921
922 @returns the wmatrix url or an empty string.
923 """
924 return global_config.global_config.get_config_value('AUTOTEST_WEB',
925 'wmatrix_url',
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700926 default='')
Jiaxi Luo57bc1952014-07-22 15:27:30 -0700927
928
929def inject_times_to_filter(start_time_key=None, end_time_key=None,
930 start_time_value=None, end_time_value=None,
931 **filter_data):
932 """Inject the key value pairs of start and end time if provided.
933
934 @param start_time_key: A string represents the filter key of start_time.
935 @param end_time_key: A string represents the filter key of end_time.
936 @param start_time_value: Start_time value.
937 @param end_time_value: End_time value.
938
939 @returns the injected filter_data.
940 """
941 if start_time_value:
942 filter_data[start_time_key] = start_time_value
943 if end_time_value:
944 filter_data[end_time_key] = end_time_value
945 return filter_data
946
947
948def inject_times_to_hqe_special_tasks_filters(filter_data_common,
949 start_time, end_time):
950 """Inject start and end time to hqe and special tasks filters.
951
952 @param filter_data_common: Common filter for hqe and special tasks.
953 @param start_time_key: A string represents the filter key of start_time.
954 @param end_time_key: A string represents the filter key of end_time.
955
956 @returns a pair of hqe and special tasks filters.
957 """
958 filter_data_special_tasks = filter_data_common.copy()
959 return (inject_times_to_filter('started_on__gte', 'started_on__lte',
960 start_time, end_time, **filter_data_common),
961 inject_times_to_filter('time_started__gte', 'time_started__lte',
962 start_time, end_time,
Jakob Juelich59cfe542014-09-02 16:37:46 -0700963 **filter_data_special_tasks))
964
965
966def retrieve_shard(shard_hostname):
967 """
Jakob Juelich77457572014-09-22 17:02:43 -0700968 Retrieves the shard with the given hostname from the database.
Jakob Juelich59cfe542014-09-02 16:37:46 -0700969
970 @param shard_hostname: Hostname of the shard to retrieve
971
Jakob Juelich77457572014-09-22 17:02:43 -0700972 @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
973
Jakob Juelich59cfe542014-09-02 16:37:46 -0700974 @returns: Shard object
975 """
Jakob Juelich77457572014-09-22 17:02:43 -0700976 return models.Shard.smart_get(shard_hostname)
Jakob Juelich59cfe542014-09-02 16:37:46 -0700977
978
Jakob Juelich1b525742014-09-30 13:08:07 -0700979def find_records_for_shard(shard, known_job_ids, known_host_ids):
Jakob Juelich59cfe542014-09-02 16:37:46 -0700980 """Find records that should be sent to a shard.
981
Jakob Juelicha94efe62014-09-18 16:02:49 -0700982 @param shard: Shard to find records for.
Jakob Juelich1b525742014-09-30 13:08:07 -0700983 @param known_job_ids: List of ids of jobs the shard already has.
984 @param known_host_ids: List of ids of hosts the shard already has.
Jakob Juelicha94efe62014-09-18 16:02:49 -0700985
986 @returns: Tuple of two lists for hosts and jobs: (hosts, jobs).
Jakob Juelich59cfe542014-09-02 16:37:46 -0700987 """
Jakob Juelich1b525742014-09-30 13:08:07 -0700988 hosts = models.Host.assign_to_shard(shard, known_host_ids)
989 jobs = models.Job.assign_to_shard(shard, known_job_ids)
Jakob Juelich59cfe542014-09-02 16:37:46 -0700990
991 return hosts, jobs
Jakob Juelicha94efe62014-09-18 16:02:49 -0700992
993
994def _persist_records_with_type_sent_from_shard(
995 shard, records, record_type, *args, **kwargs):
996 """
997 Handle records of a specified type that were sent to the shard master.
998
999 @param shard: The shard the records were sent from.
1000 @param records: The records sent in their serialized format.
1001 @param record_type: Type of the objects represented by records.
1002 @param args: Additional arguments that will be passed on to the sanity
1003 checks.
1004 @param kwargs: Additional arguments that will be passed on to the sanity
1005 checks.
1006
1007 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1008
1009 @returns: List of primary keys of the processed records.
1010 """
1011 pks = []
1012 for serialized_record in records:
1013 pk = serialized_record['id']
1014 try:
1015 current_record = record_type.objects.get(pk=pk)
1016 except record_type.DoesNotExist:
1017 raise error.UnallowedRecordsSentToMaster(
1018 'Object with pk %s of type %s does not exist on master.' % (
1019 pk, record_type))
1020
1021 current_record.sanity_check_update_from_shard(
1022 shard, serialized_record, *args, **kwargs)
1023
1024 current_record.update_from_serialized(serialized_record)
1025 pks.append(pk)
1026 return pks
1027
1028
1029def persist_records_sent_from_shard(shard, jobs, hqes):
1030 """
1031 Sanity checking then saving serialized records sent to master from shard.
1032
1033 During heartbeats shards upload jobs and hostqueuentries. This performs
1034 some sanity checks on these and then updates the existing records for those
1035 entries with the updated ones from the heartbeat.
1036
1037 The sanity checks include:
1038 - Checking if the objects sent already exist on the master.
1039 - Checking if the objects sent were assigned to this shard.
1040 - hostqueueentries must be sent together with their jobs.
1041
1042 @param shard: The shard the records were sent from.
1043 @param jobs: The jobs the shard sent.
1044 @param hqes: The hostqueuentries the shart sent.
1045
1046 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1047 """
1048 job_ids_sent = _persist_records_with_type_sent_from_shard(
1049 shard, jobs, models.Job)
1050
1051 _persist_records_with_type_sent_from_shard(
1052 shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001053
1054
Jakob Juelich50e91f72014-10-01 12:43:23 -07001055def forward_single_host_rpc_to_shard(func):
1056 """This decorator forwards rpc calls that modify a host to a shard.
1057
1058 If a host is assigned to a shard, rpcs that change his attributes should be
1059 forwarded to the shard.
1060
1061 This assumes the first argument of the function represents a host id.
1062
1063 @param func: The function to decorate
1064
1065 @returns: The function to replace func with.
1066 """
1067 def replacement(**kwargs):
1068 # Only keyword arguments can be accepted here, as we need the argument
1069 # names to send the rpc. serviceHandler always provides arguments with
1070 # their keywords, so this is not a problem.
1071 host = models.Host.smart_get(kwargs['id'])
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -08001072 if host.shard and not server_utils.is_shard():
Jakob Juelich50e91f72014-10-01 12:43:23 -07001073 run_rpc_on_multiple_hostnames(func.func_name, [host.shard.hostname],
1074 **kwargs)
1075 return func(**kwargs)
1076
1077 return replacement
1078
1079
Prashanth Balasubramanian744898f2015-01-13 05:04:16 -08001080def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
1081 """Fanout the give rpc to all shards.
1082
1083 @param host_objs: Host objects for the rpc.
1084 @param rpc_name: The name of the rpc.
1085 @param include_hostnames: If True, include the hostnames in the kwargs.
1086 Hostnames are not always necessary, this functions is designed to
1087 send rpcs to the shard a host is on, the rpcs themselves could be
1088 related to labels, acls etc.
1089 @param kwargs: The kwargs for the rpc.
1090 """
1091 # Fanout should only happen from the master to the shards.
1092 if server_utils.is_shard():
1093 return
1094
1095 # Figure out which hosts are on which shards.
1096 shard_host_map = bucket_hosts_by_shard(
1097 host_objs, rpc_hostnames=True)
1098
1099 # Execute the rpc against the appropriate shards.
1100 for shard, hostnames in shard_host_map.iteritems():
1101 if include_hostnames:
1102 kwargs['hosts'] = hostnames
1103 run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
1104
1105
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001106def forward_multi_host_rpc_to_shards(func):
1107 """This decorator forwards rpc calls that modify multiple hosts.
1108
1109 If a host is assigned to a shard, rpcs that change his attributes should be
1110 forwarded to the shard. Some calls however, take a list of hosts and a
1111 single id to modify, eg: label_add_hosts. This wrapper will sift through
1112 the list of hosts, find each of their shards, and forward the rpc for
1113 those hosts to that shard before calling the local version of the given rpc.
1114
1115 This assumes:
1116 1. The rpc call uses `smart_get` to retrieve host objects, not the
1117 stock django `get` call. This is true for most, if not all rpcs in
1118 the rpc_interface.
1119 2. The kwargs to the function contain either a list of host ids or
1120 hostnames, keyed under 'hosts'. This is true for all the rpc
1121 functions that use 'smart_get'.
1122
1123 @param func: The function to decorate
1124
1125 @returns: The function to replace func with.
1126 """
1127 def replacement(**kwargs):
Prashanth Balasubramanian744898f2015-01-13 05:04:16 -08001128 fanout_rpc(
1129 models.Host.smart_get_bulk(kwargs['hosts']),
1130 func.func_name, **kwargs)
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001131 return func(**kwargs)
1132
1133 return replacement
1134
1135
Jakob Juelich50e91f72014-10-01 12:43:23 -07001136def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1137 """Runs an rpc to multiple AFEs
1138
1139 This is i.e. used to propagate changes made to hosts after they are assigned
1140 to a shard.
1141
1142 @param rpc_call: Name of the rpc endpoint to call.
1143 @param shard_hostnames: List of hostnames to run the rpcs on.
1144 @param **kwargs: Keyword arguments to pass in the rpcs.
1145 """
1146 for shard_hostname in shard_hostnames:
1147 afe = frontend.AFE(server=shard_hostname)
1148 afe.run(rpc_call, **kwargs)
Prashanth Balasubramanian744898f2015-01-13 05:04:16 -08001149
1150
1151def get_label(name):
1152 """Gets a label object using a given name.
1153
1154 @param name: Label name.
1155 @raises model.Label.DoesNotExist: when there is no label matching
1156 the given name.
1157 @return: a label object matching the given name.
1158 """
1159 try:
1160 label = models.Label.smart_get(name)
1161 except models.Label.DoesNotExist:
1162 return None
1163 return label