blob: deb8ba994c98b4ceb7b16bc2bf4b110628e2779a [file] [log] [blame]
Aviv Keshet18308922013-02-19 17:49:49 -08001#pylint: disable-msg=C0111
mblighe8819cd2008-02-15 16:48:40 +00002"""\
3Utility functions for rpc_interface.py. We keep them in a separate file so that
4only RPC interface functions go into that file.
5"""
6
7__author__ = 'showard@google.com (Steve Howard)'
8
Aviv Keshet18308922013-02-19 17:49:49 -08009import datetime, os, inspect
showard3d6ae112009-05-02 00:45:48 +000010import django.http
Dan Shi07e09af2013-04-12 09:31:29 -070011from autotest_lib.frontend.afe import models, model_logic
Alex Miller4a193692013-08-21 13:59:01 -070012from autotest_lib.client.common_lib import control_data, error
Jiaxi Luo421608e2014-07-07 14:38:00 -070013from autotest_lib.client.common_lib import global_config, priorities
Aviv Keshetc68807e2013-07-31 16:13:01 -070014from autotest_lib.server.cros import provision
Jakob Juelich50e91f72014-10-01 12:43:23 -070015from autotest_lib.server import frontend
mblighe8819cd2008-02-15 16:48:40 +000016
showarda62866b2008-07-28 21:27:41 +000017NULL_DATETIME = datetime.datetime.max
18NULL_DATE = datetime.date.max
19
mblighe8819cd2008-02-15 16:48:40 +000020def prepare_for_serialization(objects):
jadmanski0afbb632008-06-06 21:10:57 +000021 """
22 Prepare Python objects to be returned via RPC.
Aviv Keshet18308922013-02-19 17:49:49 -080023 @param objects: objects to be prepared.
jadmanski0afbb632008-06-06 21:10:57 +000024 """
25 if (isinstance(objects, list) and len(objects) and
26 isinstance(objects[0], dict) and 'id' in objects[0]):
27 objects = gather_unique_dicts(objects)
28 return _prepare_data(objects)
showardb8d34242008-04-25 18:11:16 +000029
30
showardc92da832009-04-07 18:14:34 +000031def prepare_rows_as_nested_dicts(query, nested_dict_column_names):
32 """
33 Prepare a Django query to be returned via RPC as a sequence of nested
34 dictionaries.
35
36 @param query - A Django model query object with a select_related() method.
37 @param nested_dict_column_names - A list of column/attribute names for the
38 rows returned by query to expand into nested dictionaries using
39 their get_object_dict() method when not None.
40
41 @returns An list suitable to returned in an RPC.
42 """
43 all_dicts = []
44 for row in query.select_related():
45 row_dict = row.get_object_dict()
46 for column in nested_dict_column_names:
47 if row_dict[column] is not None:
48 row_dict[column] = getattr(row, column).get_object_dict()
49 all_dicts.append(row_dict)
50 return prepare_for_serialization(all_dicts)
51
52
showardb8d34242008-04-25 18:11:16 +000053def _prepare_data(data):
jadmanski0afbb632008-06-06 21:10:57 +000054 """
55 Recursively process data structures, performing necessary type
56 conversions to values in data to allow for RPC serialization:
57 -convert datetimes to strings
showard2b9a88b2008-06-13 20:55:03 +000058 -convert tuples and sets to lists
jadmanski0afbb632008-06-06 21:10:57 +000059 """
60 if isinstance(data, dict):
61 new_data = {}
62 for key, value in data.iteritems():
63 new_data[key] = _prepare_data(value)
64 return new_data
showard2b9a88b2008-06-13 20:55:03 +000065 elif (isinstance(data, list) or isinstance(data, tuple) or
66 isinstance(data, set)):
jadmanski0afbb632008-06-06 21:10:57 +000067 return [_prepare_data(item) for item in data]
showard98659972008-07-17 17:00:07 +000068 elif isinstance(data, datetime.date):
showarda62866b2008-07-28 21:27:41 +000069 if data is NULL_DATETIME or data is NULL_DATE:
70 return None
jadmanski0afbb632008-06-06 21:10:57 +000071 return str(data)
72 else:
73 return data
mblighe8819cd2008-02-15 16:48:40 +000074
75
Moises Osorio2dda22e2014-09-16 15:56:24 -070076def fetchall_as_list_of_dicts(cursor):
77 """
78 Converts each row in the cursor to a dictionary so that values can be read
79 by using the column name.
80 @param cursor: The database cursor to read from.
81 @returns: A list of each row in the cursor as a dictionary.
82 """
83 desc = cursor.description
84 return [ dict(zip([col[0] for col in desc], row))
85 for row in cursor.fetchall() ]
86
87
showard3d6ae112009-05-02 00:45:48 +000088def raw_http_response(response_data, content_type=None):
89 response = django.http.HttpResponse(response_data, mimetype=content_type)
90 response['Content-length'] = str(len(response.content))
91 return response
92
93
showardb0dfb9f2008-06-06 18:08:02 +000094def gather_unique_dicts(dict_iterable):
jadmanski0afbb632008-06-06 21:10:57 +000095 """\
96 Pick out unique objects (by ID) from an iterable of object dicts.
97 """
98 id_set = set()
99 result = []
100 for obj in dict_iterable:
101 if obj['id'] not in id_set:
102 id_set.add(obj['id'])
103 result.append(obj)
104 return result
showardb0dfb9f2008-06-06 18:08:02 +0000105
106
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700107def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
jadmanski0afbb632008-06-06 21:10:57 +0000108 """\
109 Generate a SQL WHERE clause for job status filtering, and return it in
110 a dict of keyword args to pass to query.extra(). No more than one of
111 the parameters should be passed as True.
showard6c65d252009-10-01 18:45:22 +0000112 * not_yet_run: all HQEs are Queued
113 * finished: all HQEs are complete
114 * running: everything else
jadmanski0afbb632008-06-06 21:10:57 +0000115 """
116 assert not ((not_yet_run and running) or
117 (not_yet_run and finished) or
118 (running and finished)), ('Cannot specify more than one '
119 'filter to this function')
showard6c65d252009-10-01 18:45:22 +0000120
showardeab66ce2009-12-23 00:03:56 +0000121 not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
122 'WHERE status != "%s")'
showard6c65d252009-10-01 18:45:22 +0000123 % models.HostQueueEntry.Status.QUEUED)
showardeab66ce2009-12-23 00:03:56 +0000124 not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
125 'WHERE not complete)')
showard6c65d252009-10-01 18:45:22 +0000126
jadmanski0afbb632008-06-06 21:10:57 +0000127 if not_yet_run:
showard6c65d252009-10-01 18:45:22 +0000128 where = ['id NOT IN ' + not_queued]
jadmanski0afbb632008-06-06 21:10:57 +0000129 elif running:
showard6c65d252009-10-01 18:45:22 +0000130 where = ['(id IN %s) AND (id IN %s)' % (not_queued, not_finished)]
jadmanski0afbb632008-06-06 21:10:57 +0000131 elif finished:
showard6c65d252009-10-01 18:45:22 +0000132 where = ['id NOT IN ' + not_finished]
jadmanski0afbb632008-06-06 21:10:57 +0000133 else:
showard10f41672009-05-13 21:28:25 +0000134 return {}
jadmanski0afbb632008-06-06 21:10:57 +0000135 return {'where': where}
mblighe8819cd2008-02-15 16:48:40 +0000136
137
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700138def extra_job_type_filters(extra_args, suite=False,
139 sub=False, standalone=False):
140 """\
141 Generate a SQL WHERE clause for job status filtering, and return it in
142 a dict of keyword args to pass to query.extra().
143
144 param extra_args: a dict of existing extra_args.
145
146 No more than one of the parameters should be passed as True:
147 * suite: job which is parent of other jobs
148 * sub: job with a parent job
149 * standalone: job with no child or parent jobs
150 """
151 assert not ((suite and sub) or
152 (suite and standalone) or
153 (sub and standalone)), ('Cannot specify more than one '
154 'filter to this function')
155
156 where = extra_args.get('where', [])
157 parent_job_id = ('DISTINCT parent_job_id')
158 child_job_id = ('id')
159 filter_common = ('(SELECT %s FROM afe_jobs '
160 'WHERE parent_job_id IS NOT NULL)')
161
162 if suite:
163 where.append('id IN ' + filter_common % parent_job_id)
164 elif sub:
165 where.append('id IN ' + filter_common % child_job_id)
166 elif standalone:
167 where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
168 'WHERE parent_job_id IS NOT NULL'
169 ' AND (sub_query.parent_job_id=afe_jobs.id'
170 ' OR sub_query.id=afe_jobs.id))')
171 else:
172 return extra_args
173
174 extra_args['where'] = where
175 return extra_args
176
177
178
showard87cc38f2009-08-20 23:37:04 +0000179def extra_host_filters(multiple_labels=()):
jadmanski0afbb632008-06-06 21:10:57 +0000180 """\
181 Generate SQL WHERE clauses for matching hosts in an intersection of
182 labels.
183 """
184 extra_args = {}
showardeab66ce2009-12-23 00:03:56 +0000185 where_str = ('afe_hosts.id in (select host_id from afe_hosts_labels '
jadmanski0afbb632008-06-06 21:10:57 +0000186 'where label_id=%s)')
187 extra_args['where'] = [where_str] * len(multiple_labels)
188 extra_args['params'] = [models.Label.smart_get(label).id
189 for label in multiple_labels]
190 return extra_args
showard8e3aa5e2008-04-08 19:42:32 +0000191
192
showard87cc38f2009-08-20 23:37:04 +0000193def get_host_query(multiple_labels, exclude_only_if_needed_labels,
showard8aa84fc2009-09-16 17:17:55 +0000194 exclude_atomic_group_hosts, valid_only, filter_data):
195 if valid_only:
196 query = models.Host.valid_objects.all()
197 else:
198 query = models.Host.objects.all()
199
showard43a3d262008-11-12 18:17:05 +0000200 if exclude_only_if_needed_labels:
201 only_if_needed_labels = models.Label.valid_objects.filter(
202 only_if_needed=True)
showardf7eac6f2008-11-13 21:18:01 +0000203 if only_if_needed_labels.count() > 0:
showard87cc38f2009-08-20 23:37:04 +0000204 only_if_needed_ids = ','.join(
205 str(label['id'])
206 for label in only_if_needed_labels.values('id'))
showardf7eac6f2008-11-13 21:18:01 +0000207 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000208 query, 'afe_hosts_labels', join_key='host_id',
209 join_condition=('afe_hosts_labels_exclude_OIN.label_id IN (%s)'
showard87cc38f2009-08-20 23:37:04 +0000210 % only_if_needed_ids),
211 suffix='_exclude_OIN', exclude=True)
showard8aa84fc2009-09-16 17:17:55 +0000212
showard87cc38f2009-08-20 23:37:04 +0000213 if exclude_atomic_group_hosts:
214 atomic_group_labels = models.Label.valid_objects.filter(
215 atomic_group__isnull=False)
216 if atomic_group_labels.count() > 0:
217 atomic_group_label_ids = ','.join(
218 str(atomic_group['id'])
219 for atomic_group in atomic_group_labels.values('id'))
220 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000221 query, 'afe_hosts_labels', join_key='host_id',
222 join_condition=(
223 'afe_hosts_labels_exclude_AG.label_id IN (%s)'
224 % atomic_group_label_ids),
showard87cc38f2009-08-20 23:37:04 +0000225 suffix='_exclude_AG', exclude=True)
Fang Deng04d30612013-04-10 18:13:13 -0700226 try:
227 assert 'extra_args' not in filter_data
228 filter_data['extra_args'] = extra_host_filters(multiple_labels)
229 return models.Host.query_objects(filter_data, initial_query=query)
230 except models.Label.DoesNotExist as e:
231 return models.Host.objects.none()
showard43a3d262008-11-12 18:17:05 +0000232
233
showard8fd58242008-03-10 21:29:07 +0000234class InconsistencyException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000235 'Raised when a list of objects does not have a consistent value'
showard8fd58242008-03-10 21:29:07 +0000236
237
238def get_consistent_value(objects, field):
mblighc5ddfd12008-08-04 17:15:00 +0000239 if not objects:
240 # well a list of nothing is consistent
241 return None
242
jadmanski0afbb632008-06-06 21:10:57 +0000243 value = getattr(objects[0], field)
244 for obj in objects:
245 this_value = getattr(obj, field)
246 if this_value != value:
247 raise InconsistencyException(objects[0], obj)
248 return value
showard8fd58242008-03-10 21:29:07 +0000249
250
showard2b9a88b2008-06-13 20:55:03 +0000251def prepare_generate_control_file(tests, kernel, label, profilers):
jadmanski0afbb632008-06-06 21:10:57 +0000252 test_objects = [models.Test.smart_get(test) for test in tests]
showard2b9a88b2008-06-13 20:55:03 +0000253 profiler_objects = [models.Profiler.smart_get(profiler)
254 for profiler in profilers]
jadmanski0afbb632008-06-06 21:10:57 +0000255 # ensure tests are all the same type
256 try:
257 test_type = get_consistent_value(test_objects, 'test_type')
258 except InconsistencyException, exc:
259 test1, test2 = exc.args
mblighec5546d2008-06-16 16:51:28 +0000260 raise model_logic.ValidationError(
jadmanski0afbb632008-06-06 21:10:57 +0000261 {'tests' : 'You cannot run both server- and client-side '
262 'tests together (tests %s and %s differ' % (
263 test1.name, test2.name)})
showard8fd58242008-03-10 21:29:07 +0000264
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700265 is_server = (test_type == control_data.CONTROL_TYPE.SERVER)
showard14374b12009-01-31 00:11:54 +0000266 if test_objects:
267 synch_count = max(test.sync_count for test in test_objects)
268 else:
269 synch_count = 1
jadmanski0afbb632008-06-06 21:10:57 +0000270 if label:
271 label = models.Label.smart_get(label)
mblighe8819cd2008-02-15 16:48:40 +0000272
showard989f25d2008-10-01 11:38:11 +0000273 dependencies = set(label.name for label
274 in models.Label.objects.filter(test__in=test_objects))
275
showard2bab8f42008-11-12 18:15:22 +0000276 cf_info = dict(is_server=is_server, synch_count=synch_count,
277 dependencies=list(dependencies))
278 return cf_info, test_objects, profiler_objects, label
showard989f25d2008-10-01 11:38:11 +0000279
280
281def check_job_dependencies(host_objects, job_dependencies):
282 """
283 Check that a set of machines satisfies a job's dependencies.
284 host_objects: list of models.Host objects
285 job_dependencies: list of names of labels
286 """
287 # check that hosts satisfy dependencies
288 host_ids = [host.id for host in host_objects]
289 hosts_in_job = models.Host.objects.filter(id__in=host_ids)
290 ok_hosts = hosts_in_job
291 for index, dependency in enumerate(job_dependencies):
Alex Milleraa772002014-04-10 17:51:21 -0700292 if not provision.is_for_special_action(dependency):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700293 ok_hosts = ok_hosts.filter(labels__name=dependency)
showard989f25d2008-10-01 11:38:11 +0000294 failing_hosts = (set(host.hostname for host in host_objects) -
295 set(host.hostname for host in ok_hosts))
296 if failing_hosts:
297 raise model_logic.ValidationError(
Eric Lie0493a42010-11-15 13:05:43 -0800298 {'hosts' : 'Host(s) failed to meet job dependencies (' +
299 (', '.join(job_dependencies)) + '): ' +
300 (', '.join(failing_hosts))})
301
showard989f25d2008-10-01 11:38:11 +0000302
Alex Miller4a193692013-08-21 13:59:01 -0700303def check_job_metahost_dependencies(metahost_objects, job_dependencies):
304 """
305 Check that at least one machine within the metahost spec satisfies the job's
306 dependencies.
307
308 @param metahost_objects A list of label objects representing the metahosts.
309 @param job_dependencies A list of strings of the required label names.
310 @raises NoEligibleHostException If a metahost cannot run the job.
311 """
312 for metahost in metahost_objects:
313 hosts = models.Host.objects.filter(labels=metahost)
314 for label_name in job_dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700315 if not provision.is_for_special_action(label_name):
Alex Miller4a193692013-08-21 13:59:01 -0700316 hosts = hosts.filter(labels__name=label_name)
317 if not any(hosts):
318 raise error.NoEligibleHostException("No hosts within %s satisfy %s."
319 % (metahost.name, ', '.join(job_dependencies)))
320
showard2bab8f42008-11-12 18:15:22 +0000321
322def _execution_key_for(host_queue_entry):
323 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
324
325
326def check_abort_synchronous_jobs(host_queue_entries):
327 # ensure user isn't aborting part of a synchronous autoserv execution
328 count_per_execution = {}
329 for queue_entry in host_queue_entries:
330 key = _execution_key_for(queue_entry)
331 count_per_execution.setdefault(key, 0)
332 count_per_execution[key] += 1
333
334 for queue_entry in host_queue_entries:
335 if not queue_entry.execution_subdir:
336 continue
337 execution_count = count_per_execution[_execution_key_for(queue_entry)]
338 if execution_count < queue_entry.job.synch_count:
mbligh1ef218d2009-08-03 16:57:56 +0000339 raise model_logic.ValidationError(
340 {'' : 'You cannot abort part of a synchronous job execution '
341 '(%d/%s), %d included, %d expected'
342 % (queue_entry.job.id, queue_entry.execution_subdir,
343 execution_count, queue_entry.job.synch_count)})
showard8fbae652009-01-20 23:23:10 +0000344
345
showardc92da832009-04-07 18:14:34 +0000346def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700347 dependencies, atomic_group):
showardc92da832009-04-07 18:14:34 +0000348 """
349 Attempt to reject create_job requests with an atomic group that
350 will be impossible to schedule. The checks are not perfect but
351 should catch the most obvious issues.
352
353 @param synch_count - The job's minimum synch count.
354 @param host_objects - A list of models.Host instances.
355 @param metahost_objects - A list of models.Label instances.
356 @param dependencies - A list of job dependency label names.
showardc92da832009-04-07 18:14:34 +0000357 @param labels_by_name - A dictionary mapping label names to models.Label
358 instance. Used to look up instances for dependencies.
359
360 @raises model_logic.ValidationError - When an issue is found.
361 """
362 # If specific host objects were supplied with an atomic group, verify
363 # that there are enough to satisfy the synch_count.
364 minimum_required = synch_count or 1
365 if (host_objects and not metahost_objects and
366 len(host_objects) < minimum_required):
367 raise model_logic.ValidationError(
368 {'hosts':
369 'only %d hosts provided for job with synch_count = %d' %
370 (len(host_objects), synch_count)})
371
372 # Check that the atomic group has a hope of running this job
373 # given any supplied metahosts and dependancies that may limit.
374
375 # Get a set of hostnames in the atomic group.
376 possible_hosts = set()
377 for label in atomic_group.label_set.all():
378 possible_hosts.update(h.hostname for h in label.host_set.all())
379
380 # Filter out hosts that don't match all of the job dependency labels.
Alex Miller871291b2013-08-08 01:19:20 -0700381 for label in models.Label.objects.filter(name__in=dependencies):
showardc92da832009-04-07 18:14:34 +0000382 hosts_in_label = (h.hostname for h in label.host_set.all())
383 possible_hosts.intersection_update(hosts_in_label)
384
showard225bdc12009-04-13 16:09:21 +0000385 if not host_objects and not metahost_objects:
386 # No hosts or metahosts are required to queue an atomic group Job.
387 # However, if they are given, we respect them below.
388 host_set = possible_hosts
389 else:
390 host_set = set(host.hostname for host in host_objects)
391 unusable_host_set = host_set.difference(possible_hosts)
392 if unusable_host_set:
393 raise model_logic.ValidationError(
394 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %
395 (', '.join(sorted(unusable_host_set)), atomic_group.name)})
showardc92da832009-04-07 18:14:34 +0000396
397 # Lookup hosts provided by each meta host and merge them into the
398 # host_set for final counting.
399 for meta_host in metahost_objects:
400 meta_possible = possible_hosts.copy()
401 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())
402 meta_possible.intersection_update(hosts_in_meta_host)
403
404 # Count all hosts that this meta_host will provide.
405 host_set.update(meta_possible)
406
407 if len(host_set) < minimum_required:
408 raise model_logic.ValidationError(
409 {'atomic_group_name':
410 'Insufficient hosts in Atomic Group "%s" with the'
411 ' supplied dependencies and meta_hosts.' %
412 (atomic_group.name,)})
413
414
showardbe0d8692009-08-20 23:42:44 +0000415def check_modify_host(update_data):
416 """
417 Sanity check modify_host* requests.
418
419 @param update_data: A dictionary with the changes to make to a host
420 or hosts.
421 """
422 # Only the scheduler (monitor_db) is allowed to modify Host status.
423 # Otherwise race conditions happen as a hosts state is changed out from
424 # beneath tasks being run on a host.
425 if 'status' in update_data:
426 raise model_logic.ValidationError({
427 'status': 'Host status can not be modified by the frontend.'})
428
429
showardce7c0922009-09-11 18:39:24 +0000430def check_modify_host_locking(host, update_data):
431 """
432 Checks when locking/unlocking has been requested if the host is already
433 locked/unlocked.
434
435 @param host: models.Host object to be modified
436 @param update_data: A dictionary with the changes to make to the host.
437 """
438 locked = update_data.get('locked', None)
439 if locked is not None:
440 if locked and host.locked:
441 raise model_logic.ValidationError({
442 'locked': 'Host already locked by %s on %s.' %
443 (host.locked_by, host.lock_time)})
444 if not locked and not host.locked:
445 raise model_logic.ValidationError({
446 'locked': 'Host already unlocked.'})
447
448
showard8fbae652009-01-20 23:23:10 +0000449def get_motd():
450 dirname = os.path.dirname(__file__)
451 filename = os.path.join(dirname, "..", "..", "motd.txt")
452 text = ''
453 try:
454 fp = open(filename, "r")
455 try:
456 text = fp.read()
457 finally:
458 fp.close()
459 except:
460 pass
461
462 return text
showard29f7cd22009-04-29 21:16:24 +0000463
464
465def _get_metahost_counts(metahost_objects):
466 metahost_counts = {}
467 for metahost in metahost_objects:
468 metahost_counts.setdefault(metahost, 0)
469 metahost_counts[metahost] += 1
470 return metahost_counts
471
472
showarda965cef2009-05-15 23:17:41 +0000473def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
showard29f7cd22009-04-29 21:16:24 +0000474 hosts = []
475 one_time_hosts = []
476 meta_hosts = []
477 atomic_group = None
jamesren2275ef12010-04-12 18:25:06 +0000478 hostless = False
showard29f7cd22009-04-29 21:16:24 +0000479
showard4d077562009-05-08 18:24:36 +0000480 queue_entries = job.hostqueueentry_set.all()
showarda965cef2009-05-15 23:17:41 +0000481 if queue_entry_filter_data:
482 queue_entries = models.HostQueueEntry.query_objects(
483 queue_entry_filter_data, initial_query=queue_entries)
showard4d077562009-05-08 18:24:36 +0000484
485 for queue_entry in queue_entries:
showard29f7cd22009-04-29 21:16:24 +0000486 if (queue_entry.host and (preserve_metahosts or
487 not queue_entry.meta_host)):
488 if queue_entry.deleted:
489 continue
490 if queue_entry.host.invalid:
491 one_time_hosts.append(queue_entry.host)
492 else:
493 hosts.append(queue_entry.host)
jamesren2275ef12010-04-12 18:25:06 +0000494 elif queue_entry.meta_host:
showard29f7cd22009-04-29 21:16:24 +0000495 meta_hosts.append(queue_entry.meta_host)
jamesren2275ef12010-04-12 18:25:06 +0000496 else:
497 hostless = True
498
showard29f7cd22009-04-29 21:16:24 +0000499 if atomic_group is None:
500 if queue_entry.atomic_group is not None:
501 atomic_group = queue_entry.atomic_group
502 else:
503 assert atomic_group.name == queue_entry.atomic_group.name, (
504 'DB inconsistency. HostQueueEntries with multiple atomic'
505 ' groups on job %s: %s != %s' % (
506 id, atomic_group.name, queue_entry.atomic_group.name))
507
508 meta_host_counts = _get_metahost_counts(meta_hosts)
509
510 info = dict(dependencies=[label.name for label
511 in job.dependency_labels.all()],
512 hosts=hosts,
513 meta_hosts=meta_hosts,
514 meta_host_counts=meta_host_counts,
515 one_time_hosts=one_time_hosts,
jamesren2275ef12010-04-12 18:25:06 +0000516 atomic_group=atomic_group,
517 hostless=hostless)
showard29f7cd22009-04-29 21:16:24 +0000518 return info
519
520
showard09d80f92009-11-19 01:01:19 +0000521def check_for_duplicate_hosts(host_objects):
522 host_ids = set()
523 duplicate_hostnames = set()
524 for host in host_objects:
525 if host.id in host_ids:
526 duplicate_hostnames.add(host.hostname)
527 host_ids.add(host.id)
528
529 if duplicate_hostnames:
530 raise model_logic.ValidationError(
531 {'hosts' : 'Duplicate hosts: %s'
532 % ', '.join(duplicate_hostnames)})
533
534
showarda1e74b32009-05-12 17:32:04 +0000535def create_new_job(owner, options, host_objects, metahost_objects,
536 atomic_group=None):
showard29f7cd22009-04-29 21:16:24 +0000537 all_host_objects = host_objects + metahost_objects
538 metahost_counts = _get_metahost_counts(metahost_objects)
showarda1e74b32009-05-12 17:32:04 +0000539 dependencies = options.get('dependencies', [])
540 synch_count = options.get('synch_count')
showard29f7cd22009-04-29 21:16:24 +0000541
showard29f7cd22009-04-29 21:16:24 +0000542 if atomic_group:
543 check_atomic_group_create_job(
544 synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700545 dependencies, atomic_group)
showard29f7cd22009-04-29 21:16:24 +0000546 else:
547 if synch_count is not None and synch_count > len(all_host_objects):
548 raise model_logic.ValidationError(
549 {'hosts':
550 'only %d hosts provided for job with synch_count = %d' %
551 (len(all_host_objects), synch_count)})
552 atomic_hosts = models.Host.objects.filter(
553 id__in=[host.id for host in host_objects],
554 labels__atomic_group=True)
555 unusable_host_names = [host.hostname for host in atomic_hosts]
556 if unusable_host_names:
557 raise model_logic.ValidationError(
558 {'hosts':
559 'Host(s) "%s" are atomic group hosts but no '
560 'atomic group was specified for this job.' %
561 (', '.join(unusable_host_names),)})
562
showard09d80f92009-11-19 01:01:19 +0000563 check_for_duplicate_hosts(host_objects)
showard29f7cd22009-04-29 21:16:24 +0000564
Aviv Keshetc68807e2013-07-31 16:13:01 -0700565 for label_name in dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700566 if provision.is_for_special_action(label_name):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700567 # TODO: We could save a few queries
568 # if we had a bulk ensure-label-exists function, which used
569 # a bulk .get() call. The win is probably very small.
Alex Miller871291b2013-08-08 01:19:20 -0700570 _ensure_label_exists(label_name)
Aviv Keshetc68807e2013-07-31 16:13:01 -0700571
Alex Miller4a193692013-08-21 13:59:01 -0700572 # This only checks targeted hosts, not hosts eligible due to the metahost
573 check_job_dependencies(host_objects, dependencies)
574 check_job_metahost_dependencies(metahost_objects, dependencies)
575
Alex Miller871291b2013-08-08 01:19:20 -0700576 options['dependencies'] = list(
577 models.Label.objects.filter(name__in=dependencies))
showard29f7cd22009-04-29 21:16:24 +0000578
showarda1e74b32009-05-12 17:32:04 +0000579 for label in metahost_objects + options['dependencies']:
showard29f7cd22009-04-29 21:16:24 +0000580 if label.atomic_group and not atomic_group:
581 raise model_logic.ValidationError(
582 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000583 'Dependency %r requires an atomic group but no '
584 'atomic_group_name or meta_host in an atomic group was '
585 'specified for this job.' % label.name})
showard29f7cd22009-04-29 21:16:24 +0000586 elif (label.atomic_group and
587 label.atomic_group.name != atomic_group.name):
588 raise model_logic.ValidationError(
589 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000590 'meta_hosts or dependency %r requires atomic group '
591 '%r instead of the supplied atomic_group_name=%r.' %
592 (label.name, label.atomic_group.name, atomic_group.name)})
showard29f7cd22009-04-29 21:16:24 +0000593
showarda1e74b32009-05-12 17:32:04 +0000594 job = models.Job.create(owner=owner, options=options,
595 hosts=all_host_objects)
showard29f7cd22009-04-29 21:16:24 +0000596 job.queue(all_host_objects, atomic_group=atomic_group,
showarda1e74b32009-05-12 17:32:04 +0000597 is_template=options.get('is_template', False))
showard29f7cd22009-04-29 21:16:24 +0000598 return job.id
showard0957a842009-05-11 19:25:08 +0000599
600
Aviv Keshetc68807e2013-07-31 16:13:01 -0700601def _ensure_label_exists(name):
602 """
603 Ensure that a label called |name| exists in the Django models.
604
605 This function is to be called from within afe rpcs only, as an
606 alternative to server.cros.provision.ensure_label_exists(...). It works
607 by Django model manipulation, rather than by making another create_label
608 rpc call.
609
610 @param name: the label to check for/create.
611 @raises ValidationError: There was an error in the response that was
612 not because the label already existed.
613 @returns True is a label was created, False otherwise.
614 """
615 try:
616 models.Label.objects.get(name=name)
617 except models.Label.DoesNotExist:
618 new_label = models.Label.objects.create(name=name)
619 new_label.save()
620 return True
621 return False
622
623
showard909c9142009-07-07 20:54:42 +0000624def find_platform_and_atomic_group(host):
625 """
626 Figure out the platform name and atomic group name for the given host
627 object. If none, the return value for either will be None.
628
629 @returns (platform name, atomic group name) for the given host.
630 """
showard0957a842009-05-11 19:25:08 +0000631 platforms = [label.name for label in host.label_list if label.platform]
632 if not platforms:
showard909c9142009-07-07 20:54:42 +0000633 platform = None
634 else:
635 platform = platforms[0]
showard0957a842009-05-11 19:25:08 +0000636 if len(platforms) > 1:
637 raise ValueError('Host %s has more than one platform: %s' %
638 (host.hostname, ', '.join(platforms)))
showard909c9142009-07-07 20:54:42 +0000639 for label in host.label_list:
640 if label.atomic_group:
641 atomic_group_name = label.atomic_group.name
642 break
643 else:
644 atomic_group_name = None
645 # Don't check for multiple atomic groups on a host here. That is an
646 # error but should not trip up the RPC interface. monitor_db_cleanup
647 # deals with it. This just returns the first one found.
648 return platform, atomic_group_name
showardc0ac3a72009-07-08 21:14:45 +0000649
650
651# support for get_host_queue_entries_and_special_tasks()
652
653def _common_entry_to_dict(entry, type, job_dict):
654 return dict(type=type,
655 host=entry.host.get_object_dict(),
656 job=job_dict,
657 execution_path=entry.execution_path(),
658 status=entry.status,
659 started_on=entry.started_on,
Jiaxi Luocb91d2e2014-06-30 10:37:22 -0700660 id=str(entry.id) + type,
661 oid=entry.id)
showardc0ac3a72009-07-08 21:14:45 +0000662
663
664def _special_task_to_dict(special_task):
665 job_dict = None
666 if special_task.queue_entry:
667 job_dict = special_task.queue_entry.job.get_object_dict()
668 return _common_entry_to_dict(special_task, special_task.task, job_dict)
669
670
671def _queue_entry_to_dict(queue_entry):
672 return _common_entry_to_dict(queue_entry, 'Job',
673 queue_entry.job.get_object_dict())
674
675
676def _compute_next_job_for_tasks(queue_entries, special_tasks):
677 """
678 For each task, try to figure out the next job that ran after that task.
679 This is done using two pieces of information:
680 * if the task has a queue entry, we can use that entry's job ID.
681 * if the task has a time_started, we can try to compare that against the
682 started_on field of queue_entries. this isn't guaranteed to work perfectly
683 since queue_entries may also have null started_on values.
684 * if the task has neither, or if use of time_started fails, just use the
685 last computed job ID.
686 """
687 next_job_id = None # most recently computed next job
688 hqe_index = 0 # index for scanning by started_on times
689 for task in special_tasks:
690 if task.queue_entry:
691 next_job_id = task.queue_entry.job.id
692 elif task.time_started is not None:
693 for queue_entry in queue_entries[hqe_index:]:
694 if queue_entry.started_on is None:
695 continue
696 if queue_entry.started_on < task.time_started:
697 break
698 next_job_id = queue_entry.job.id
699
700 task.next_job_id = next_job_id
701
702 # advance hqe_index to just after next_job_id
703 if next_job_id is not None:
704 for queue_entry in queue_entries[hqe_index:]:
705 if queue_entry.job.id < next_job_id:
706 break
707 hqe_index += 1
708
709
710def interleave_entries(queue_entries, special_tasks):
711 """
712 Both lists should be ordered by descending ID.
713 """
714 _compute_next_job_for_tasks(queue_entries, special_tasks)
715
716 # start with all special tasks that've run since the last job
717 interleaved_entries = []
718 for task in special_tasks:
719 if task.next_job_id is not None:
720 break
721 interleaved_entries.append(_special_task_to_dict(task))
722
723 # now interleave queue entries with the remaining special tasks
724 special_task_index = len(interleaved_entries)
725 for queue_entry in queue_entries:
726 interleaved_entries.append(_queue_entry_to_dict(queue_entry))
727 # add all tasks that ran between this job and the previous one
728 for task in special_tasks[special_task_index:]:
729 if task.next_job_id < queue_entry.job.id:
730 break
731 interleaved_entries.append(_special_task_to_dict(task))
732 special_task_index += 1
733
734 return interleaved_entries
jamesren4a41e012010-07-16 22:33:48 +0000735
736
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800737def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
738 """Figure out which hosts are on which shards.
739
740 @param host_objs: A list of host objects.
741 @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
742 instead of the 'real' shard hostnames. This only matters for testing
743 environments.
744
745 @return: A map of shard hostname: list of hosts on the shard.
746 """
747 shard_host_map = {}
748 for host in host_objs:
749 if host.shard:
750 shard_name = (host.shard.rpc_hostname() if rpc_hostnames
751 else host.shard.hostname)
752 shard_host_map.setdefault(shard_name, []).append(host.hostname)
753 return shard_host_map
754
755
jamesren4a41e012010-07-16 22:33:48 +0000756def get_create_job_common_args(local_args):
757 """
758 Returns a dict containing only the args that apply for create_job_common
759
760 Returns a subset of local_args, which contains only the arguments that can
761 be passed in to create_job_common().
762 """
Alex Miller7d658cf2013-09-04 16:00:35 -0700763 # This code is only here to not kill suites scheduling tests when priority
764 # becomes an int instead of a string.
765 if isinstance(local_args['priority'], str):
766 local_args['priority'] = priorities.Priority.DEFAULT
767 # </migration hack>
jamesren4a41e012010-07-16 22:33:48 +0000768 arg_names, _, _, _ = inspect.getargspec(create_job_common)
769 return dict(item for item in local_args.iteritems() if item[0] in arg_names)
770
771
772def create_job_common(name, priority, control_type, control_file=None,
773 hosts=(), meta_hosts=(), one_time_hosts=(),
774 atomic_group_name=None, synch_count=None,
Simran Basi7e605742013-11-12 13:43:36 -0800775 is_template=False, timeout=None, timeout_mins=None,
776 max_runtime_mins=None, run_verify=True, email_list='',
777 dependencies=(), reboot_before=None, reboot_after=None,
jamesren4a41e012010-07-16 22:33:48 +0000778 parse_failed_repair=None, hostless=False, keyvals=None,
Aviv Keshet18308922013-02-19 17:49:49 -0800779 drone_set=None, parameterized_job=None,
Dan Shi07e09af2013-04-12 09:31:29 -0700780 parent_job_id=None, test_retry=0, run_reset=True):
Aviv Keshet18308922013-02-19 17:49:49 -0800781 #pylint: disable-msg=C0111
jamesren4a41e012010-07-16 22:33:48 +0000782 """
783 Common code between creating "standard" jobs and creating parameterized jobs
784 """
785 user = models.User.current_user()
786 owner = user.login
787
jamesren4a41e012010-07-16 22:33:48 +0000788 # input validation
789 if not (hosts or meta_hosts or one_time_hosts or atomic_group_name
790 or hostless):
791 raise model_logic.ValidationError({
792 'arguments' : "You must pass at least one of 'hosts', "
793 "'meta_hosts', 'one_time_hosts', "
794 "'atomic_group_name', or 'hostless'"
795 })
796
797 if hostless:
798 if hosts or meta_hosts or one_time_hosts or atomic_group_name:
799 raise model_logic.ValidationError({
800 'hostless': 'Hostless jobs cannot include any hosts!'})
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700801 server_type = control_data.CONTROL_TYPE_NAMES.SERVER
jamesren4a41e012010-07-16 22:33:48 +0000802 if control_type != server_type:
803 raise model_logic.ValidationError({
804 'control_type': 'Hostless jobs cannot use client-side '
805 'control files'})
806
Alex Miller871291b2013-08-08 01:19:20 -0700807 atomic_groups_by_name = dict((ag.name, ag)
jamesren4a41e012010-07-16 22:33:48 +0000808 for ag in models.AtomicGroup.objects.all())
Alex Miller871291b2013-08-08 01:19:20 -0700809 label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
jamesren4a41e012010-07-16 22:33:48 +0000810
811 # Schedule on an atomic group automagically if one of the labels given
812 # is an atomic group label and no explicit atomic_group_name was supplied.
813 if not atomic_group_name:
Alex Miller871291b2013-08-08 01:19:20 -0700814 for label in label_objects:
jamesren4a41e012010-07-16 22:33:48 +0000815 if label and label.atomic_group:
816 atomic_group_name = label.atomic_group.name
817 break
818
819 # convert hostnames & meta hosts to host/label objects
820 host_objects = models.Host.smart_get_bulk(hosts)
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800821 # Don't ever create jobs against hosts on shards. Though we have a hook
822 # in host scheduler that will prevent these jobs from even starting,
823 # a perpetually queued job will lead to a lot of confusion.
824 if not is_shard():
825 shard_host_map = bucket_hosts_by_shard(host_objects)
826 if shard_host_map:
827 raise ValueError('The following hosts are on shards, please '
828 'follow the link to the shards and create jobs '
829 'there instead. %s.' % shard_host_map)
830
jamesren4a41e012010-07-16 22:33:48 +0000831 metahost_objects = []
Alex Miller871291b2013-08-08 01:19:20 -0700832 meta_host_labels_by_name = {label.name: label for label in label_objects}
jamesren4a41e012010-07-16 22:33:48 +0000833 for label_name in meta_hosts or []:
Alex Miller871291b2013-08-08 01:19:20 -0700834 if label_name in meta_host_labels_by_name:
835 metahost_objects.append(meta_host_labels_by_name[label_name])
jamesren4a41e012010-07-16 22:33:48 +0000836 elif label_name in atomic_groups_by_name:
837 # If given a metahost name that isn't a Label, check to
838 # see if the user was specifying an Atomic Group instead.
839 atomic_group = atomic_groups_by_name[label_name]
840 if atomic_group_name and atomic_group_name != atomic_group.name:
841 raise model_logic.ValidationError({
842 'meta_hosts': (
843 'Label "%s" not found. If assumed to be an '
844 'atomic group it would conflict with the '
845 'supplied atomic group "%s".' % (
846 label_name, atomic_group_name))})
847 atomic_group_name = atomic_group.name
848 else:
849 raise model_logic.ValidationError(
850 {'meta_hosts' : 'Label "%s" not found' % label_name})
851
852 # Create and sanity check an AtomicGroup object if requested.
853 if atomic_group_name:
854 if one_time_hosts:
855 raise model_logic.ValidationError(
856 {'one_time_hosts':
857 'One time hosts cannot be used with an Atomic Group.'})
858 atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
859 if synch_count and synch_count > atomic_group.max_number_of_machines:
860 raise model_logic.ValidationError(
861 {'atomic_group_name' :
862 'You have requested a synch_count (%d) greater than the '
863 'maximum machines in the requested Atomic Group (%d).' %
864 (synch_count, atomic_group.max_number_of_machines)})
865 else:
866 atomic_group = None
867
868 for host in one_time_hosts or []:
869 this_host = models.Host.create_one_time_host(host)
870 host_objects.append(this_host)
871
872 options = dict(name=name,
873 priority=priority,
874 control_file=control_file,
875 control_type=control_type,
876 is_template=is_template,
877 timeout=timeout,
Simran Basi7e605742013-11-12 13:43:36 -0800878 timeout_mins=timeout_mins,
Simran Basi34217022012-11-06 13:43:15 -0800879 max_runtime_mins=max_runtime_mins,
jamesren4a41e012010-07-16 22:33:48 +0000880 synch_count=synch_count,
881 run_verify=run_verify,
882 email_list=email_list,
883 dependencies=dependencies,
884 reboot_before=reboot_before,
885 reboot_after=reboot_after,
886 parse_failed_repair=parse_failed_repair,
887 keyvals=keyvals,
888 drone_set=drone_set,
Aviv Keshet18308922013-02-19 17:49:49 -0800889 parameterized_job=parameterized_job,
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800890 parent_job_id=parent_job_id,
Dan Shi07e09af2013-04-12 09:31:29 -0700891 test_retry=test_retry,
892 run_reset=run_reset)
jamesren4a41e012010-07-16 22:33:48 +0000893 return create_new_job(owner=owner,
894 options=options,
895 host_objects=host_objects,
896 metahost_objects=metahost_objects,
897 atomic_group=atomic_group)
Simran Basib6ec8ae2014-04-23 12:05:08 -0700898
899
900def encode_ascii(control_file):
901 """Force a control file to only contain ascii characters.
902
903 @param control_file: Control file to encode.
904
905 @returns the control file in an ascii encoding.
906
907 @raises error.ControlFileMalformed: if encoding fails.
908 """
909 try:
910 return control_file.encode('ascii')
911 except UnicodeDecodeError as e:
Jiaxi Luo421608e2014-07-07 14:38:00 -0700912 raise error.ControlFileMalformed(str(e))
913
914
915def get_wmatrix_url():
916 """Get wmatrix url from config file.
917
918 @returns the wmatrix url or an empty string.
919 """
920 return global_config.global_config.get_config_value('AUTOTEST_WEB',
921 'wmatrix_url',
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700922 default='')
Jiaxi Luo57bc1952014-07-22 15:27:30 -0700923
924
925def inject_times_to_filter(start_time_key=None, end_time_key=None,
926 start_time_value=None, end_time_value=None,
927 **filter_data):
928 """Inject the key value pairs of start and end time if provided.
929
930 @param start_time_key: A string represents the filter key of start_time.
931 @param end_time_key: A string represents the filter key of end_time.
932 @param start_time_value: Start_time value.
933 @param end_time_value: End_time value.
934
935 @returns the injected filter_data.
936 """
937 if start_time_value:
938 filter_data[start_time_key] = start_time_value
939 if end_time_value:
940 filter_data[end_time_key] = end_time_value
941 return filter_data
942
943
944def inject_times_to_hqe_special_tasks_filters(filter_data_common,
945 start_time, end_time):
946 """Inject start and end time to hqe and special tasks filters.
947
948 @param filter_data_common: Common filter for hqe and special tasks.
949 @param start_time_key: A string represents the filter key of start_time.
950 @param end_time_key: A string represents the filter key of end_time.
951
952 @returns a pair of hqe and special tasks filters.
953 """
954 filter_data_special_tasks = filter_data_common.copy()
955 return (inject_times_to_filter('started_on__gte', 'started_on__lte',
956 start_time, end_time, **filter_data_common),
957 inject_times_to_filter('time_started__gte', 'time_started__lte',
958 start_time, end_time,
Jakob Juelich59cfe542014-09-02 16:37:46 -0700959 **filter_data_special_tasks))
960
961
962def retrieve_shard(shard_hostname):
963 """
Jakob Juelich77457572014-09-22 17:02:43 -0700964 Retrieves the shard with the given hostname from the database.
Jakob Juelich59cfe542014-09-02 16:37:46 -0700965
966 @param shard_hostname: Hostname of the shard to retrieve
967
Jakob Juelich77457572014-09-22 17:02:43 -0700968 @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
969
Jakob Juelich59cfe542014-09-02 16:37:46 -0700970 @returns: Shard object
971 """
Jakob Juelich77457572014-09-22 17:02:43 -0700972 return models.Shard.smart_get(shard_hostname)
Jakob Juelich59cfe542014-09-02 16:37:46 -0700973
974
Jakob Juelich1b525742014-09-30 13:08:07 -0700975def find_records_for_shard(shard, known_job_ids, known_host_ids):
Jakob Juelich59cfe542014-09-02 16:37:46 -0700976 """Find records that should be sent to a shard.
977
Jakob Juelicha94efe62014-09-18 16:02:49 -0700978 @param shard: Shard to find records for.
Jakob Juelich1b525742014-09-30 13:08:07 -0700979 @param known_job_ids: List of ids of jobs the shard already has.
980 @param known_host_ids: List of ids of hosts the shard already has.
Jakob Juelicha94efe62014-09-18 16:02:49 -0700981
982 @returns: Tuple of two lists for hosts and jobs: (hosts, jobs).
Jakob Juelich59cfe542014-09-02 16:37:46 -0700983 """
Jakob Juelich1b525742014-09-30 13:08:07 -0700984 hosts = models.Host.assign_to_shard(shard, known_host_ids)
985 jobs = models.Job.assign_to_shard(shard, known_job_ids)
Jakob Juelich59cfe542014-09-02 16:37:46 -0700986
987 return hosts, jobs
Jakob Juelicha94efe62014-09-18 16:02:49 -0700988
989
990def _persist_records_with_type_sent_from_shard(
991 shard, records, record_type, *args, **kwargs):
992 """
993 Handle records of a specified type that were sent to the shard master.
994
995 @param shard: The shard the records were sent from.
996 @param records: The records sent in their serialized format.
997 @param record_type: Type of the objects represented by records.
998 @param args: Additional arguments that will be passed on to the sanity
999 checks.
1000 @param kwargs: Additional arguments that will be passed on to the sanity
1001 checks.
1002
1003 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1004
1005 @returns: List of primary keys of the processed records.
1006 """
1007 pks = []
1008 for serialized_record in records:
1009 pk = serialized_record['id']
1010 try:
1011 current_record = record_type.objects.get(pk=pk)
1012 except record_type.DoesNotExist:
1013 raise error.UnallowedRecordsSentToMaster(
1014 'Object with pk %s of type %s does not exist on master.' % (
1015 pk, record_type))
1016
1017 current_record.sanity_check_update_from_shard(
1018 shard, serialized_record, *args, **kwargs)
1019
1020 current_record.update_from_serialized(serialized_record)
1021 pks.append(pk)
1022 return pks
1023
1024
1025def persist_records_sent_from_shard(shard, jobs, hqes):
1026 """
1027 Sanity checking then saving serialized records sent to master from shard.
1028
1029 During heartbeats shards upload jobs and hostqueuentries. This performs
1030 some sanity checks on these and then updates the existing records for those
1031 entries with the updated ones from the heartbeat.
1032
1033 The sanity checks include:
1034 - Checking if the objects sent already exist on the master.
1035 - Checking if the objects sent were assigned to this shard.
1036 - hostqueueentries must be sent together with their jobs.
1037
1038 @param shard: The shard the records were sent from.
1039 @param jobs: The jobs the shard sent.
1040 @param hqes: The hostqueuentries the shart sent.
1041
1042 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1043 """
1044 job_ids_sent = _persist_records_with_type_sent_from_shard(
1045 shard, jobs, models.Job)
1046
1047 _persist_records_with_type_sent_from_shard(
1048 shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001049
1050
1051def is_shard():
1052 """Determines if this instance is running as a shard.
1053
1054 Reads the global_config value shard_hostname in the section SHARD.
1055
1056 @return True, if shard_hostname is set, False otherwise.
1057 """
1058 hostname = global_config.global_config.get_config_value(
1059 'SHARD', 'shard_hostname', default=None)
1060 return bool(hostname)
1061
1062
1063def forward_single_host_rpc_to_shard(func):
1064 """This decorator forwards rpc calls that modify a host to a shard.
1065
1066 If a host is assigned to a shard, rpcs that change his attributes should be
1067 forwarded to the shard.
1068
1069 This assumes the first argument of the function represents a host id.
1070
1071 @param func: The function to decorate
1072
1073 @returns: The function to replace func with.
1074 """
1075 def replacement(**kwargs):
1076 # Only keyword arguments can be accepted here, as we need the argument
1077 # names to send the rpc. serviceHandler always provides arguments with
1078 # their keywords, so this is not a problem.
1079 host = models.Host.smart_get(kwargs['id'])
1080 if host.shard and not is_shard():
1081 run_rpc_on_multiple_hostnames(func.func_name, [host.shard.hostname],
1082 **kwargs)
1083 return func(**kwargs)
1084
1085 return replacement
1086
1087
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001088def forward_multi_host_rpc_to_shards(func):
1089 """This decorator forwards rpc calls that modify multiple hosts.
1090
1091 If a host is assigned to a shard, rpcs that change his attributes should be
1092 forwarded to the shard. Some calls however, take a list of hosts and a
1093 single id to modify, eg: label_add_hosts. This wrapper will sift through
1094 the list of hosts, find each of their shards, and forward the rpc for
1095 those hosts to that shard before calling the local version of the given rpc.
1096
1097 This assumes:
1098 1. The rpc call uses `smart_get` to retrieve host objects, not the
1099 stock django `get` call. This is true for most, if not all rpcs in
1100 the rpc_interface.
1101 2. The kwargs to the function contain either a list of host ids or
1102 hostnames, keyed under 'hosts'. This is true for all the rpc
1103 functions that use 'smart_get'.
1104
1105 @param func: The function to decorate
1106
1107 @returns: The function to replace func with.
1108 """
1109 def replacement(**kwargs):
1110 if not is_shard():
1111
1112 # Figure out which hosts are on which shards.
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -08001113 shard_host_map = bucket_hosts_by_shard(
1114 models.Host.smart_get_bulk(kwargs['hosts']),
1115 rpc_hostnames=True)
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001116
1117 # Execute the rpc against the appropriate shards.
1118 for shard, hostnames in shard_host_map.iteritems():
1119 kwargs['hosts'] = hostnames
1120 run_rpc_on_multiple_hostnames(func.func_name, [shard],
1121 **kwargs)
1122 return func(**kwargs)
1123
1124 return replacement
1125
1126
Jakob Juelich50e91f72014-10-01 12:43:23 -07001127def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1128 """Runs an rpc to multiple AFEs
1129
1130 This is i.e. used to propagate changes made to hosts after they are assigned
1131 to a shard.
1132
1133 @param rpc_call: Name of the rpc endpoint to call.
1134 @param shard_hostnames: List of hostnames to run the rpcs on.
1135 @param **kwargs: Keyword arguments to pass in the rpcs.
1136 """
1137 for shard_hostname in shard_hostnames:
1138 afe = frontend.AFE(server=shard_hostname)
1139 afe.run(rpc_call, **kwargs)