blob: 389e494aad6e29b83c5db6c781cd6eecd50a2ccc [file] [log] [blame]
Aviv Keshet18308922013-02-19 17:49:49 -08001#pylint: disable-msg=C0111
mblighe8819cd2008-02-15 16:48:40 +00002"""\
3Utility functions for rpc_interface.py. We keep them in a separate file so that
4only RPC interface functions go into that file.
5"""
6
7__author__ = 'showard@google.com (Steve Howard)'
8
MK Ryu84573e12015-02-18 15:54:09 -08009import datetime
10import inspect
11import os
12import sys
showard3d6ae112009-05-02 00:45:48 +000013import django.http
Dan Shi07e09af2013-04-12 09:31:29 -070014from autotest_lib.frontend.afe import models, model_logic
Alex Miller4a193692013-08-21 13:59:01 -070015from autotest_lib.client.common_lib import control_data, error
Jiaxi Luo421608e2014-07-07 14:38:00 -070016from autotest_lib.client.common_lib import global_config, priorities
MK Ryu0c1a37d2015-04-30 12:00:55 -070017from autotest_lib.client.common_lib import time_utils
Aviv Keshetc68807e2013-07-31 16:13:01 -070018from autotest_lib.server.cros import provision
Jakob Juelich50e91f72014-10-01 12:43:23 -070019from autotest_lib.server import frontend
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080020from autotest_lib.server import utils as server_utils
mblighe8819cd2008-02-15 16:48:40 +000021
showarda62866b2008-07-28 21:27:41 +000022NULL_DATETIME = datetime.datetime.max
23NULL_DATE = datetime.date.max
24
mblighe8819cd2008-02-15 16:48:40 +000025def prepare_for_serialization(objects):
jadmanski0afbb632008-06-06 21:10:57 +000026 """
27 Prepare Python objects to be returned via RPC.
Aviv Keshet18308922013-02-19 17:49:49 -080028 @param objects: objects to be prepared.
jadmanski0afbb632008-06-06 21:10:57 +000029 """
30 if (isinstance(objects, list) and len(objects) and
31 isinstance(objects[0], dict) and 'id' in objects[0]):
32 objects = gather_unique_dicts(objects)
33 return _prepare_data(objects)
showardb8d34242008-04-25 18:11:16 +000034
35
showardc92da832009-04-07 18:14:34 +000036def prepare_rows_as_nested_dicts(query, nested_dict_column_names):
37 """
38 Prepare a Django query to be returned via RPC as a sequence of nested
39 dictionaries.
40
41 @param query - A Django model query object with a select_related() method.
42 @param nested_dict_column_names - A list of column/attribute names for the
43 rows returned by query to expand into nested dictionaries using
44 their get_object_dict() method when not None.
45
46 @returns An list suitable to returned in an RPC.
47 """
48 all_dicts = []
49 for row in query.select_related():
50 row_dict = row.get_object_dict()
51 for column in nested_dict_column_names:
52 if row_dict[column] is not None:
53 row_dict[column] = getattr(row, column).get_object_dict()
54 all_dicts.append(row_dict)
55 return prepare_for_serialization(all_dicts)
56
57
showardb8d34242008-04-25 18:11:16 +000058def _prepare_data(data):
jadmanski0afbb632008-06-06 21:10:57 +000059 """
60 Recursively process data structures, performing necessary type
61 conversions to values in data to allow for RPC serialization:
62 -convert datetimes to strings
showard2b9a88b2008-06-13 20:55:03 +000063 -convert tuples and sets to lists
jadmanski0afbb632008-06-06 21:10:57 +000064 """
65 if isinstance(data, dict):
66 new_data = {}
67 for key, value in data.iteritems():
68 new_data[key] = _prepare_data(value)
69 return new_data
showard2b9a88b2008-06-13 20:55:03 +000070 elif (isinstance(data, list) or isinstance(data, tuple) or
71 isinstance(data, set)):
jadmanski0afbb632008-06-06 21:10:57 +000072 return [_prepare_data(item) for item in data]
showard98659972008-07-17 17:00:07 +000073 elif isinstance(data, datetime.date):
showarda62866b2008-07-28 21:27:41 +000074 if data is NULL_DATETIME or data is NULL_DATE:
75 return None
jadmanski0afbb632008-06-06 21:10:57 +000076 return str(data)
77 else:
78 return data
mblighe8819cd2008-02-15 16:48:40 +000079
80
Moises Osorio2dda22e2014-09-16 15:56:24 -070081def fetchall_as_list_of_dicts(cursor):
82 """
83 Converts each row in the cursor to a dictionary so that values can be read
84 by using the column name.
85 @param cursor: The database cursor to read from.
86 @returns: A list of each row in the cursor as a dictionary.
87 """
88 desc = cursor.description
89 return [ dict(zip([col[0] for col in desc], row))
90 for row in cursor.fetchall() ]
91
92
showard3d6ae112009-05-02 00:45:48 +000093def raw_http_response(response_data, content_type=None):
94 response = django.http.HttpResponse(response_data, mimetype=content_type)
95 response['Content-length'] = str(len(response.content))
96 return response
97
98
showardb0dfb9f2008-06-06 18:08:02 +000099def gather_unique_dicts(dict_iterable):
jadmanski0afbb632008-06-06 21:10:57 +0000100 """\
101 Pick out unique objects (by ID) from an iterable of object dicts.
102 """
103 id_set = set()
104 result = []
105 for obj in dict_iterable:
106 if obj['id'] not in id_set:
107 id_set.add(obj['id'])
108 result.append(obj)
109 return result
showardb0dfb9f2008-06-06 18:08:02 +0000110
111
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700112def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
jadmanski0afbb632008-06-06 21:10:57 +0000113 """\
114 Generate a SQL WHERE clause for job status filtering, and return it in
115 a dict of keyword args to pass to query.extra(). No more than one of
116 the parameters should be passed as True.
showard6c65d252009-10-01 18:45:22 +0000117 * not_yet_run: all HQEs are Queued
118 * finished: all HQEs are complete
119 * running: everything else
jadmanski0afbb632008-06-06 21:10:57 +0000120 """
121 assert not ((not_yet_run and running) or
122 (not_yet_run and finished) or
123 (running and finished)), ('Cannot specify more than one '
124 'filter to this function')
showard6c65d252009-10-01 18:45:22 +0000125
showardeab66ce2009-12-23 00:03:56 +0000126 not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
127 'WHERE status != "%s")'
showard6c65d252009-10-01 18:45:22 +0000128 % models.HostQueueEntry.Status.QUEUED)
showardeab66ce2009-12-23 00:03:56 +0000129 not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
130 'WHERE not complete)')
showard6c65d252009-10-01 18:45:22 +0000131
jadmanski0afbb632008-06-06 21:10:57 +0000132 if not_yet_run:
showard6c65d252009-10-01 18:45:22 +0000133 where = ['id NOT IN ' + not_queued]
jadmanski0afbb632008-06-06 21:10:57 +0000134 elif running:
showard6c65d252009-10-01 18:45:22 +0000135 where = ['(id IN %s) AND (id IN %s)' % (not_queued, not_finished)]
jadmanski0afbb632008-06-06 21:10:57 +0000136 elif finished:
showard6c65d252009-10-01 18:45:22 +0000137 where = ['id NOT IN ' + not_finished]
jadmanski0afbb632008-06-06 21:10:57 +0000138 else:
showard10f41672009-05-13 21:28:25 +0000139 return {}
jadmanski0afbb632008-06-06 21:10:57 +0000140 return {'where': where}
mblighe8819cd2008-02-15 16:48:40 +0000141
142
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700143def extra_job_type_filters(extra_args, suite=False,
144 sub=False, standalone=False):
145 """\
146 Generate a SQL WHERE clause for job status filtering, and return it in
147 a dict of keyword args to pass to query.extra().
148
149 param extra_args: a dict of existing extra_args.
150
151 No more than one of the parameters should be passed as True:
152 * suite: job which is parent of other jobs
153 * sub: job with a parent job
154 * standalone: job with no child or parent jobs
155 """
156 assert not ((suite and sub) or
157 (suite and standalone) or
158 (sub and standalone)), ('Cannot specify more than one '
159 'filter to this function')
160
161 where = extra_args.get('where', [])
162 parent_job_id = ('DISTINCT parent_job_id')
163 child_job_id = ('id')
164 filter_common = ('(SELECT %s FROM afe_jobs '
165 'WHERE parent_job_id IS NOT NULL)')
166
167 if suite:
168 where.append('id IN ' + filter_common % parent_job_id)
169 elif sub:
170 where.append('id IN ' + filter_common % child_job_id)
171 elif standalone:
172 where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
173 'WHERE parent_job_id IS NOT NULL'
174 ' AND (sub_query.parent_job_id=afe_jobs.id'
175 ' OR sub_query.id=afe_jobs.id))')
176 else:
177 return extra_args
178
179 extra_args['where'] = where
180 return extra_args
181
182
183
showard87cc38f2009-08-20 23:37:04 +0000184def extra_host_filters(multiple_labels=()):
jadmanski0afbb632008-06-06 21:10:57 +0000185 """\
186 Generate SQL WHERE clauses for matching hosts in an intersection of
187 labels.
188 """
189 extra_args = {}
showardeab66ce2009-12-23 00:03:56 +0000190 where_str = ('afe_hosts.id in (select host_id from afe_hosts_labels '
jadmanski0afbb632008-06-06 21:10:57 +0000191 'where label_id=%s)')
192 extra_args['where'] = [where_str] * len(multiple_labels)
193 extra_args['params'] = [models.Label.smart_get(label).id
194 for label in multiple_labels]
195 return extra_args
showard8e3aa5e2008-04-08 19:42:32 +0000196
197
showard87cc38f2009-08-20 23:37:04 +0000198def get_host_query(multiple_labels, exclude_only_if_needed_labels,
showard8aa84fc2009-09-16 17:17:55 +0000199 exclude_atomic_group_hosts, valid_only, filter_data):
200 if valid_only:
201 query = models.Host.valid_objects.all()
202 else:
203 query = models.Host.objects.all()
204
showard43a3d262008-11-12 18:17:05 +0000205 if exclude_only_if_needed_labels:
206 only_if_needed_labels = models.Label.valid_objects.filter(
207 only_if_needed=True)
showardf7eac6f2008-11-13 21:18:01 +0000208 if only_if_needed_labels.count() > 0:
showard87cc38f2009-08-20 23:37:04 +0000209 only_if_needed_ids = ','.join(
210 str(label['id'])
211 for label in only_if_needed_labels.values('id'))
showardf7eac6f2008-11-13 21:18:01 +0000212 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000213 query, 'afe_hosts_labels', join_key='host_id',
214 join_condition=('afe_hosts_labels_exclude_OIN.label_id IN (%s)'
showard87cc38f2009-08-20 23:37:04 +0000215 % only_if_needed_ids),
216 suffix='_exclude_OIN', exclude=True)
showard8aa84fc2009-09-16 17:17:55 +0000217
showard87cc38f2009-08-20 23:37:04 +0000218 if exclude_atomic_group_hosts:
219 atomic_group_labels = models.Label.valid_objects.filter(
220 atomic_group__isnull=False)
221 if atomic_group_labels.count() > 0:
222 atomic_group_label_ids = ','.join(
223 str(atomic_group['id'])
224 for atomic_group in atomic_group_labels.values('id'))
225 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000226 query, 'afe_hosts_labels', join_key='host_id',
227 join_condition=(
228 'afe_hosts_labels_exclude_AG.label_id IN (%s)'
229 % atomic_group_label_ids),
showard87cc38f2009-08-20 23:37:04 +0000230 suffix='_exclude_AG', exclude=True)
Fang Deng04d30612013-04-10 18:13:13 -0700231 try:
232 assert 'extra_args' not in filter_data
233 filter_data['extra_args'] = extra_host_filters(multiple_labels)
234 return models.Host.query_objects(filter_data, initial_query=query)
235 except models.Label.DoesNotExist as e:
236 return models.Host.objects.none()
showard43a3d262008-11-12 18:17:05 +0000237
238
showard8fd58242008-03-10 21:29:07 +0000239class InconsistencyException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000240 'Raised when a list of objects does not have a consistent value'
showard8fd58242008-03-10 21:29:07 +0000241
242
243def get_consistent_value(objects, field):
mblighc5ddfd12008-08-04 17:15:00 +0000244 if not objects:
245 # well a list of nothing is consistent
246 return None
247
jadmanski0afbb632008-06-06 21:10:57 +0000248 value = getattr(objects[0], field)
249 for obj in objects:
250 this_value = getattr(obj, field)
251 if this_value != value:
252 raise InconsistencyException(objects[0], obj)
253 return value
showard8fd58242008-03-10 21:29:07 +0000254
255
showard2b9a88b2008-06-13 20:55:03 +0000256def prepare_generate_control_file(tests, kernel, label, profilers):
jadmanski0afbb632008-06-06 21:10:57 +0000257 test_objects = [models.Test.smart_get(test) for test in tests]
showard2b9a88b2008-06-13 20:55:03 +0000258 profiler_objects = [models.Profiler.smart_get(profiler)
259 for profiler in profilers]
jadmanski0afbb632008-06-06 21:10:57 +0000260 # ensure tests are all the same type
261 try:
262 test_type = get_consistent_value(test_objects, 'test_type')
263 except InconsistencyException, exc:
264 test1, test2 = exc.args
mblighec5546d2008-06-16 16:51:28 +0000265 raise model_logic.ValidationError(
jadmanski0afbb632008-06-06 21:10:57 +0000266 {'tests' : 'You cannot run both server- and client-side '
267 'tests together (tests %s and %s differ' % (
268 test1.name, test2.name)})
showard8fd58242008-03-10 21:29:07 +0000269
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700270 is_server = (test_type == control_data.CONTROL_TYPE.SERVER)
showard14374b12009-01-31 00:11:54 +0000271 if test_objects:
272 synch_count = max(test.sync_count for test in test_objects)
273 else:
274 synch_count = 1
jadmanski0afbb632008-06-06 21:10:57 +0000275 if label:
276 label = models.Label.smart_get(label)
mblighe8819cd2008-02-15 16:48:40 +0000277
showard989f25d2008-10-01 11:38:11 +0000278 dependencies = set(label.name for label
279 in models.Label.objects.filter(test__in=test_objects))
280
showard2bab8f42008-11-12 18:15:22 +0000281 cf_info = dict(is_server=is_server, synch_count=synch_count,
282 dependencies=list(dependencies))
283 return cf_info, test_objects, profiler_objects, label
showard989f25d2008-10-01 11:38:11 +0000284
285
286def check_job_dependencies(host_objects, job_dependencies):
287 """
288 Check that a set of machines satisfies a job's dependencies.
289 host_objects: list of models.Host objects
290 job_dependencies: list of names of labels
291 """
292 # check that hosts satisfy dependencies
293 host_ids = [host.id for host in host_objects]
294 hosts_in_job = models.Host.objects.filter(id__in=host_ids)
295 ok_hosts = hosts_in_job
296 for index, dependency in enumerate(job_dependencies):
Alex Milleraa772002014-04-10 17:51:21 -0700297 if not provision.is_for_special_action(dependency):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700298 ok_hosts = ok_hosts.filter(labels__name=dependency)
showard989f25d2008-10-01 11:38:11 +0000299 failing_hosts = (set(host.hostname for host in host_objects) -
300 set(host.hostname for host in ok_hosts))
301 if failing_hosts:
302 raise model_logic.ValidationError(
Eric Lie0493a42010-11-15 13:05:43 -0800303 {'hosts' : 'Host(s) failed to meet job dependencies (' +
304 (', '.join(job_dependencies)) + '): ' +
305 (', '.join(failing_hosts))})
306
showard989f25d2008-10-01 11:38:11 +0000307
Alex Miller4a193692013-08-21 13:59:01 -0700308def check_job_metahost_dependencies(metahost_objects, job_dependencies):
309 """
310 Check that at least one machine within the metahost spec satisfies the job's
311 dependencies.
312
313 @param metahost_objects A list of label objects representing the metahosts.
314 @param job_dependencies A list of strings of the required label names.
315 @raises NoEligibleHostException If a metahost cannot run the job.
316 """
317 for metahost in metahost_objects:
318 hosts = models.Host.objects.filter(labels=metahost)
319 for label_name in job_dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700320 if not provision.is_for_special_action(label_name):
Alex Miller4a193692013-08-21 13:59:01 -0700321 hosts = hosts.filter(labels__name=label_name)
322 if not any(hosts):
323 raise error.NoEligibleHostException("No hosts within %s satisfy %s."
324 % (metahost.name, ', '.join(job_dependencies)))
325
showard2bab8f42008-11-12 18:15:22 +0000326
327def _execution_key_for(host_queue_entry):
328 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
329
330
331def check_abort_synchronous_jobs(host_queue_entries):
332 # ensure user isn't aborting part of a synchronous autoserv execution
333 count_per_execution = {}
334 for queue_entry in host_queue_entries:
335 key = _execution_key_for(queue_entry)
336 count_per_execution.setdefault(key, 0)
337 count_per_execution[key] += 1
338
339 for queue_entry in host_queue_entries:
340 if not queue_entry.execution_subdir:
341 continue
342 execution_count = count_per_execution[_execution_key_for(queue_entry)]
343 if execution_count < queue_entry.job.synch_count:
mbligh1ef218d2009-08-03 16:57:56 +0000344 raise model_logic.ValidationError(
345 {'' : 'You cannot abort part of a synchronous job execution '
346 '(%d/%s), %d included, %d expected'
347 % (queue_entry.job.id, queue_entry.execution_subdir,
348 execution_count, queue_entry.job.synch_count)})
showard8fbae652009-01-20 23:23:10 +0000349
350
showardc92da832009-04-07 18:14:34 +0000351def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700352 dependencies, atomic_group):
showardc92da832009-04-07 18:14:34 +0000353 """
354 Attempt to reject create_job requests with an atomic group that
355 will be impossible to schedule. The checks are not perfect but
356 should catch the most obvious issues.
357
358 @param synch_count - The job's minimum synch count.
359 @param host_objects - A list of models.Host instances.
360 @param metahost_objects - A list of models.Label instances.
361 @param dependencies - A list of job dependency label names.
showardc92da832009-04-07 18:14:34 +0000362 @param labels_by_name - A dictionary mapping label names to models.Label
363 instance. Used to look up instances for dependencies.
364
365 @raises model_logic.ValidationError - When an issue is found.
366 """
367 # If specific host objects were supplied with an atomic group, verify
368 # that there are enough to satisfy the synch_count.
369 minimum_required = synch_count or 1
370 if (host_objects and not metahost_objects and
371 len(host_objects) < minimum_required):
372 raise model_logic.ValidationError(
373 {'hosts':
374 'only %d hosts provided for job with synch_count = %d' %
375 (len(host_objects), synch_count)})
376
377 # Check that the atomic group has a hope of running this job
378 # given any supplied metahosts and dependancies that may limit.
379
380 # Get a set of hostnames in the atomic group.
381 possible_hosts = set()
382 for label in atomic_group.label_set.all():
383 possible_hosts.update(h.hostname for h in label.host_set.all())
384
385 # Filter out hosts that don't match all of the job dependency labels.
Alex Miller871291b2013-08-08 01:19:20 -0700386 for label in models.Label.objects.filter(name__in=dependencies):
showardc92da832009-04-07 18:14:34 +0000387 hosts_in_label = (h.hostname for h in label.host_set.all())
388 possible_hosts.intersection_update(hosts_in_label)
389
showard225bdc12009-04-13 16:09:21 +0000390 if not host_objects and not metahost_objects:
391 # No hosts or metahosts are required to queue an atomic group Job.
392 # However, if they are given, we respect them below.
393 host_set = possible_hosts
394 else:
395 host_set = set(host.hostname for host in host_objects)
396 unusable_host_set = host_set.difference(possible_hosts)
397 if unusable_host_set:
398 raise model_logic.ValidationError(
399 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %
400 (', '.join(sorted(unusable_host_set)), atomic_group.name)})
showardc92da832009-04-07 18:14:34 +0000401
402 # Lookup hosts provided by each meta host and merge them into the
403 # host_set for final counting.
404 for meta_host in metahost_objects:
405 meta_possible = possible_hosts.copy()
406 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())
407 meta_possible.intersection_update(hosts_in_meta_host)
408
409 # Count all hosts that this meta_host will provide.
410 host_set.update(meta_possible)
411
412 if len(host_set) < minimum_required:
413 raise model_logic.ValidationError(
414 {'atomic_group_name':
415 'Insufficient hosts in Atomic Group "%s" with the'
416 ' supplied dependencies and meta_hosts.' %
417 (atomic_group.name,)})
418
419
showardbe0d8692009-08-20 23:42:44 +0000420def check_modify_host(update_data):
421 """
422 Sanity check modify_host* requests.
423
424 @param update_data: A dictionary with the changes to make to a host
425 or hosts.
426 """
427 # Only the scheduler (monitor_db) is allowed to modify Host status.
428 # Otherwise race conditions happen as a hosts state is changed out from
429 # beneath tasks being run on a host.
430 if 'status' in update_data:
431 raise model_logic.ValidationError({
432 'status': 'Host status can not be modified by the frontend.'})
433
434
showardce7c0922009-09-11 18:39:24 +0000435def check_modify_host_locking(host, update_data):
436 """
437 Checks when locking/unlocking has been requested if the host is already
438 locked/unlocked.
439
440 @param host: models.Host object to be modified
441 @param update_data: A dictionary with the changes to make to the host.
442 """
443 locked = update_data.get('locked', None)
Matthew Sartori68186332015-04-27 17:19:53 -0700444 lock_reason = update_data.get('lock_reason', None)
showardce7c0922009-09-11 18:39:24 +0000445 if locked is not None:
446 if locked and host.locked:
447 raise model_logic.ValidationError({
448 'locked': 'Host already locked by %s on %s.' %
449 (host.locked_by, host.lock_time)})
450 if not locked and not host.locked:
451 raise model_logic.ValidationError({
452 'locked': 'Host already unlocked.'})
Matthew Sartori68186332015-04-27 17:19:53 -0700453 if locked and not lock_reason and not host.locked:
454 raise model_logic.ValidationError({
455 'locked': 'Please provide a reason for locking'})
showardce7c0922009-09-11 18:39:24 +0000456
457
showard8fbae652009-01-20 23:23:10 +0000458def get_motd():
459 dirname = os.path.dirname(__file__)
460 filename = os.path.join(dirname, "..", "..", "motd.txt")
461 text = ''
462 try:
463 fp = open(filename, "r")
464 try:
465 text = fp.read()
466 finally:
467 fp.close()
468 except:
469 pass
470
471 return text
showard29f7cd22009-04-29 21:16:24 +0000472
473
474def _get_metahost_counts(metahost_objects):
475 metahost_counts = {}
476 for metahost in metahost_objects:
477 metahost_counts.setdefault(metahost, 0)
478 metahost_counts[metahost] += 1
479 return metahost_counts
480
481
showarda965cef2009-05-15 23:17:41 +0000482def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
showard29f7cd22009-04-29 21:16:24 +0000483 hosts = []
484 one_time_hosts = []
485 meta_hosts = []
486 atomic_group = None
jamesren2275ef12010-04-12 18:25:06 +0000487 hostless = False
showard29f7cd22009-04-29 21:16:24 +0000488
showard4d077562009-05-08 18:24:36 +0000489 queue_entries = job.hostqueueentry_set.all()
showarda965cef2009-05-15 23:17:41 +0000490 if queue_entry_filter_data:
491 queue_entries = models.HostQueueEntry.query_objects(
492 queue_entry_filter_data, initial_query=queue_entries)
showard4d077562009-05-08 18:24:36 +0000493
494 for queue_entry in queue_entries:
showard29f7cd22009-04-29 21:16:24 +0000495 if (queue_entry.host and (preserve_metahosts or
496 not queue_entry.meta_host)):
497 if queue_entry.deleted:
498 continue
499 if queue_entry.host.invalid:
500 one_time_hosts.append(queue_entry.host)
501 else:
502 hosts.append(queue_entry.host)
jamesren2275ef12010-04-12 18:25:06 +0000503 elif queue_entry.meta_host:
showard29f7cd22009-04-29 21:16:24 +0000504 meta_hosts.append(queue_entry.meta_host)
jamesren2275ef12010-04-12 18:25:06 +0000505 else:
506 hostless = True
507
showard29f7cd22009-04-29 21:16:24 +0000508 if atomic_group is None:
509 if queue_entry.atomic_group is not None:
510 atomic_group = queue_entry.atomic_group
511 else:
512 assert atomic_group.name == queue_entry.atomic_group.name, (
513 'DB inconsistency. HostQueueEntries with multiple atomic'
514 ' groups on job %s: %s != %s' % (
515 id, atomic_group.name, queue_entry.atomic_group.name))
516
517 meta_host_counts = _get_metahost_counts(meta_hosts)
518
519 info = dict(dependencies=[label.name for label
520 in job.dependency_labels.all()],
521 hosts=hosts,
522 meta_hosts=meta_hosts,
523 meta_host_counts=meta_host_counts,
524 one_time_hosts=one_time_hosts,
jamesren2275ef12010-04-12 18:25:06 +0000525 atomic_group=atomic_group,
526 hostless=hostless)
showard29f7cd22009-04-29 21:16:24 +0000527 return info
528
529
showard09d80f92009-11-19 01:01:19 +0000530def check_for_duplicate_hosts(host_objects):
531 host_ids = set()
532 duplicate_hostnames = set()
533 for host in host_objects:
534 if host.id in host_ids:
535 duplicate_hostnames.add(host.hostname)
536 host_ids.add(host.id)
537
538 if duplicate_hostnames:
539 raise model_logic.ValidationError(
540 {'hosts' : 'Duplicate hosts: %s'
541 % ', '.join(duplicate_hostnames)})
542
543
showarda1e74b32009-05-12 17:32:04 +0000544def create_new_job(owner, options, host_objects, metahost_objects,
545 atomic_group=None):
showard29f7cd22009-04-29 21:16:24 +0000546 all_host_objects = host_objects + metahost_objects
547 metahost_counts = _get_metahost_counts(metahost_objects)
showarda1e74b32009-05-12 17:32:04 +0000548 dependencies = options.get('dependencies', [])
549 synch_count = options.get('synch_count')
showard29f7cd22009-04-29 21:16:24 +0000550
showard29f7cd22009-04-29 21:16:24 +0000551 if atomic_group:
552 check_atomic_group_create_job(
553 synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700554 dependencies, atomic_group)
showard29f7cd22009-04-29 21:16:24 +0000555 else:
556 if synch_count is not None and synch_count > len(all_host_objects):
557 raise model_logic.ValidationError(
558 {'hosts':
559 'only %d hosts provided for job with synch_count = %d' %
560 (len(all_host_objects), synch_count)})
561 atomic_hosts = models.Host.objects.filter(
562 id__in=[host.id for host in host_objects],
563 labels__atomic_group=True)
564 unusable_host_names = [host.hostname for host in atomic_hosts]
565 if unusable_host_names:
566 raise model_logic.ValidationError(
567 {'hosts':
568 'Host(s) "%s" are atomic group hosts but no '
569 'atomic group was specified for this job.' %
570 (', '.join(unusable_host_names),)})
571
showard09d80f92009-11-19 01:01:19 +0000572 check_for_duplicate_hosts(host_objects)
showard29f7cd22009-04-29 21:16:24 +0000573
Aviv Keshetc68807e2013-07-31 16:13:01 -0700574 for label_name in dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700575 if provision.is_for_special_action(label_name):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700576 # TODO: We could save a few queries
577 # if we had a bulk ensure-label-exists function, which used
578 # a bulk .get() call. The win is probably very small.
Alex Miller871291b2013-08-08 01:19:20 -0700579 _ensure_label_exists(label_name)
Aviv Keshetc68807e2013-07-31 16:13:01 -0700580
Alex Miller4a193692013-08-21 13:59:01 -0700581 # This only checks targeted hosts, not hosts eligible due to the metahost
582 check_job_dependencies(host_objects, dependencies)
583 check_job_metahost_dependencies(metahost_objects, dependencies)
584
Alex Miller871291b2013-08-08 01:19:20 -0700585 options['dependencies'] = list(
586 models.Label.objects.filter(name__in=dependencies))
showard29f7cd22009-04-29 21:16:24 +0000587
showarda1e74b32009-05-12 17:32:04 +0000588 for label in metahost_objects + options['dependencies']:
showard29f7cd22009-04-29 21:16:24 +0000589 if label.atomic_group and not atomic_group:
590 raise model_logic.ValidationError(
591 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000592 'Dependency %r requires an atomic group but no '
593 'atomic_group_name or meta_host in an atomic group was '
594 'specified for this job.' % label.name})
showard29f7cd22009-04-29 21:16:24 +0000595 elif (label.atomic_group and
596 label.atomic_group.name != atomic_group.name):
597 raise model_logic.ValidationError(
598 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000599 'meta_hosts or dependency %r requires atomic group '
600 '%r instead of the supplied atomic_group_name=%r.' %
601 (label.name, label.atomic_group.name, atomic_group.name)})
showard29f7cd22009-04-29 21:16:24 +0000602
showarda1e74b32009-05-12 17:32:04 +0000603 job = models.Job.create(owner=owner, options=options,
604 hosts=all_host_objects)
showard29f7cd22009-04-29 21:16:24 +0000605 job.queue(all_host_objects, atomic_group=atomic_group,
showarda1e74b32009-05-12 17:32:04 +0000606 is_template=options.get('is_template', False))
showard29f7cd22009-04-29 21:16:24 +0000607 return job.id
showard0957a842009-05-11 19:25:08 +0000608
609
Aviv Keshetc68807e2013-07-31 16:13:01 -0700610def _ensure_label_exists(name):
611 """
612 Ensure that a label called |name| exists in the Django models.
613
614 This function is to be called from within afe rpcs only, as an
615 alternative to server.cros.provision.ensure_label_exists(...). It works
616 by Django model manipulation, rather than by making another create_label
617 rpc call.
618
619 @param name: the label to check for/create.
620 @raises ValidationError: There was an error in the response that was
621 not because the label already existed.
622 @returns True is a label was created, False otherwise.
623 """
624 try:
625 models.Label.objects.get(name=name)
626 except models.Label.DoesNotExist:
627 new_label = models.Label.objects.create(name=name)
628 new_label.save()
629 return True
630 return False
631
632
showard909c9142009-07-07 20:54:42 +0000633def find_platform_and_atomic_group(host):
634 """
635 Figure out the platform name and atomic group name for the given host
636 object. If none, the return value for either will be None.
637
638 @returns (platform name, atomic group name) for the given host.
639 """
showard0957a842009-05-11 19:25:08 +0000640 platforms = [label.name for label in host.label_list if label.platform]
641 if not platforms:
showard909c9142009-07-07 20:54:42 +0000642 platform = None
643 else:
644 platform = platforms[0]
showard0957a842009-05-11 19:25:08 +0000645 if len(platforms) > 1:
646 raise ValueError('Host %s has more than one platform: %s' %
647 (host.hostname, ', '.join(platforms)))
showard909c9142009-07-07 20:54:42 +0000648 for label in host.label_list:
649 if label.atomic_group:
650 atomic_group_name = label.atomic_group.name
651 break
652 else:
653 atomic_group_name = None
654 # Don't check for multiple atomic groups on a host here. That is an
655 # error but should not trip up the RPC interface. monitor_db_cleanup
656 # deals with it. This just returns the first one found.
657 return platform, atomic_group_name
showardc0ac3a72009-07-08 21:14:45 +0000658
659
660# support for get_host_queue_entries_and_special_tasks()
661
MK Ryu0c1a37d2015-04-30 12:00:55 -0700662def _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on):
showardc0ac3a72009-07-08 21:14:45 +0000663 return dict(type=type,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700664 host=entry['host'],
showardc0ac3a72009-07-08 21:14:45 +0000665 job=job_dict,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700666 execution_path=exec_path,
667 status=status,
668 started_on=started_on,
669 id=str(entry['id']) + type,
670 oid=entry['id'])
showardc0ac3a72009-07-08 21:14:45 +0000671
672
MK Ryu0c1a37d2015-04-30 12:00:55 -0700673def _special_task_to_dict(task, queue_entries):
674 """Transforms a special task dictionary to another form of dictionary.
675
676 @param task Special task as a dictionary type
677 @param queue_entries Host queue entries as a list of dictionaries.
678
679 @return Transformed dictionary for a special task.
680 """
showardc0ac3a72009-07-08 21:14:45 +0000681 job_dict = None
MK Ryu0c1a37d2015-04-30 12:00:55 -0700682 if task['queue_entry']:
683 # Scan queue_entries to get the job detail info.
684 for qentry in queue_entries:
685 if task['queue_entry']['id'] == qentry['id']:
686 job_dict = qentry['job']
687 break
688 # If not found, get it from DB.
689 if job_dict is None:
690 job = models.Job.objects.get(id=task['queue_entry']['job'])
691 job_dict = job.get_object_dict()
692
693 exec_path = server_utils.get_special_task_exec_path(
694 task['host']['hostname'], task['id'], task['task'],
695 time_utils.time_string_to_datetime(task['time_requested']))
696 status = server_utils.get_special_task_status(
697 task['is_complete'], task['success'], task['is_active'])
698 return _common_entry_to_dict(task, task['task'], job_dict,
699 exec_path, status, task['time_started'])
showardc0ac3a72009-07-08 21:14:45 +0000700
701
702def _queue_entry_to_dict(queue_entry):
MK Ryu0c1a37d2015-04-30 12:00:55 -0700703 job_dict = queue_entry['job']
704 tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner'])
705 exec_path = server_utils.get_hqe_exec_path(tag,
706 queue_entry['execution_subdir'])
707 return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path,
708 queue_entry['status'], queue_entry['started_on'])
709
710
711def prepare_host_queue_entries_and_special_tasks(interleaved_entries,
712 queue_entries):
713 """
714 Prepare for serialization the interleaved entries of host queue entries
715 and special tasks.
716 Each element in the entries is a dictionary type.
717 The special task dictionary has only a job id for a job and lacks
718 the detail of the job while the host queue entry dictionary has.
719 queue_entries is used to look up the job detail info.
720
721 @param interleaved_entries Host queue entries and special tasks as a list
722 of dictionaries.
723 @param queue_entries Host queue entries as a list of dictionaries.
724
725 @return A post-processed list of dictionaries that is to be serialized.
726 """
727 dict_list = []
728 for e in interleaved_entries:
729 # Distinguish the two mixed entries based on the existence of
730 # the key "task". If an entry has the key, the entry is for
731 # special task. Otherwise, host queue entry.
732 if 'task' in e:
733 dict_list.append(_special_task_to_dict(e, queue_entries))
734 else:
735 dict_list.append(_queue_entry_to_dict(e))
736 return prepare_for_serialization(dict_list)
showardc0ac3a72009-07-08 21:14:45 +0000737
738
739def _compute_next_job_for_tasks(queue_entries, special_tasks):
740 """
741 For each task, try to figure out the next job that ran after that task.
742 This is done using two pieces of information:
743 * if the task has a queue entry, we can use that entry's job ID.
744 * if the task has a time_started, we can try to compare that against the
745 started_on field of queue_entries. this isn't guaranteed to work perfectly
746 since queue_entries may also have null started_on values.
747 * if the task has neither, or if use of time_started fails, just use the
748 last computed job ID.
MK Ryu0c1a37d2015-04-30 12:00:55 -0700749
750 @param queue_entries Host queue entries as a list of dictionaries.
751 @param special_tasks Special tasks as a list of dictionaries.
showardc0ac3a72009-07-08 21:14:45 +0000752 """
753 next_job_id = None # most recently computed next job
754 hqe_index = 0 # index for scanning by started_on times
755 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700756 if task['queue_entry']:
757 next_job_id = task['queue_entry']['job']
758 elif task['time_started'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000759 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700760 if queue_entry['started_on'] is None:
showardc0ac3a72009-07-08 21:14:45 +0000761 continue
MK Ryu0c1a37d2015-04-30 12:00:55 -0700762 t1 = time_utils.time_string_to_datetime(
763 queue_entry['started_on'])
764 t2 = time_utils.time_string_to_datetime(task['time_started'])
765 if t1 < t2:
showardc0ac3a72009-07-08 21:14:45 +0000766 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700767 next_job_id = queue_entry['job']['id']
showardc0ac3a72009-07-08 21:14:45 +0000768
MK Ryu0c1a37d2015-04-30 12:00:55 -0700769 task['next_job_id'] = next_job_id
showardc0ac3a72009-07-08 21:14:45 +0000770
771 # advance hqe_index to just after next_job_id
772 if next_job_id is not None:
773 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700774 if queue_entry['job']['id'] < next_job_id:
showardc0ac3a72009-07-08 21:14:45 +0000775 break
776 hqe_index += 1
777
778
779def interleave_entries(queue_entries, special_tasks):
780 """
781 Both lists should be ordered by descending ID.
782 """
783 _compute_next_job_for_tasks(queue_entries, special_tasks)
784
785 # start with all special tasks that've run since the last job
786 interleaved_entries = []
787 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700788 if task['next_job_id'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000789 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700790 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000791
792 # now interleave queue entries with the remaining special tasks
793 special_task_index = len(interleaved_entries)
794 for queue_entry in queue_entries:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700795 interleaved_entries.append(queue_entry)
showardc0ac3a72009-07-08 21:14:45 +0000796 # add all tasks that ran between this job and the previous one
797 for task in special_tasks[special_task_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700798 if task['next_job_id'] < queue_entry['job']['id']:
showardc0ac3a72009-07-08 21:14:45 +0000799 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700800 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000801 special_task_index += 1
802
803 return interleaved_entries
jamesren4a41e012010-07-16 22:33:48 +0000804
805
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800806def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
807 """Figure out which hosts are on which shards.
808
809 @param host_objs: A list of host objects.
810 @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
811 instead of the 'real' shard hostnames. This only matters for testing
812 environments.
813
814 @return: A map of shard hostname: list of hosts on the shard.
815 """
816 shard_host_map = {}
817 for host in host_objs:
818 if host.shard:
819 shard_name = (host.shard.rpc_hostname() if rpc_hostnames
820 else host.shard.hostname)
821 shard_host_map.setdefault(shard_name, []).append(host.hostname)
822 return shard_host_map
823
824
jamesren4a41e012010-07-16 22:33:48 +0000825def get_create_job_common_args(local_args):
826 """
827 Returns a dict containing only the args that apply for create_job_common
828
829 Returns a subset of local_args, which contains only the arguments that can
830 be passed in to create_job_common().
831 """
Alex Miller7d658cf2013-09-04 16:00:35 -0700832 # This code is only here to not kill suites scheduling tests when priority
833 # becomes an int instead of a string.
834 if isinstance(local_args['priority'], str):
835 local_args['priority'] = priorities.Priority.DEFAULT
836 # </migration hack>
jamesren4a41e012010-07-16 22:33:48 +0000837 arg_names, _, _, _ = inspect.getargspec(create_job_common)
838 return dict(item for item in local_args.iteritems() if item[0] in arg_names)
839
840
841def create_job_common(name, priority, control_type, control_file=None,
842 hosts=(), meta_hosts=(), one_time_hosts=(),
843 atomic_group_name=None, synch_count=None,
Simran Basi7e605742013-11-12 13:43:36 -0800844 is_template=False, timeout=None, timeout_mins=None,
845 max_runtime_mins=None, run_verify=True, email_list='',
846 dependencies=(), reboot_before=None, reboot_after=None,
jamesren4a41e012010-07-16 22:33:48 +0000847 parse_failed_repair=None, hostless=False, keyvals=None,
Aviv Keshet18308922013-02-19 17:49:49 -0800848 drone_set=None, parameterized_job=None,
Dan Shiec1d47d2015-02-13 11:38:13 -0800849 parent_job_id=None, test_retry=0, run_reset=True,
850 require_ssp=None):
Aviv Keshet18308922013-02-19 17:49:49 -0800851 #pylint: disable-msg=C0111
jamesren4a41e012010-07-16 22:33:48 +0000852 """
853 Common code between creating "standard" jobs and creating parameterized jobs
854 """
855 user = models.User.current_user()
856 owner = user.login
857
jamesren4a41e012010-07-16 22:33:48 +0000858 # input validation
859 if not (hosts or meta_hosts or one_time_hosts or atomic_group_name
860 or hostless):
861 raise model_logic.ValidationError({
862 'arguments' : "You must pass at least one of 'hosts', "
863 "'meta_hosts', 'one_time_hosts', "
864 "'atomic_group_name', or 'hostless'"
865 })
866
867 if hostless:
868 if hosts or meta_hosts or one_time_hosts or atomic_group_name:
869 raise model_logic.ValidationError({
870 'hostless': 'Hostless jobs cannot include any hosts!'})
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700871 server_type = control_data.CONTROL_TYPE_NAMES.SERVER
jamesren4a41e012010-07-16 22:33:48 +0000872 if control_type != server_type:
873 raise model_logic.ValidationError({
874 'control_type': 'Hostless jobs cannot use client-side '
875 'control files'})
876
Alex Miller871291b2013-08-08 01:19:20 -0700877 atomic_groups_by_name = dict((ag.name, ag)
jamesren4a41e012010-07-16 22:33:48 +0000878 for ag in models.AtomicGroup.objects.all())
Alex Miller871291b2013-08-08 01:19:20 -0700879 label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
jamesren4a41e012010-07-16 22:33:48 +0000880
881 # Schedule on an atomic group automagically if one of the labels given
882 # is an atomic group label and no explicit atomic_group_name was supplied.
883 if not atomic_group_name:
Alex Miller871291b2013-08-08 01:19:20 -0700884 for label in label_objects:
jamesren4a41e012010-07-16 22:33:48 +0000885 if label and label.atomic_group:
886 atomic_group_name = label.atomic_group.name
887 break
jamesren4a41e012010-07-16 22:33:48 +0000888 # convert hostnames & meta hosts to host/label objects
889 host_objects = models.Host.smart_get_bulk(hosts)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800890 if not server_utils.is_shard():
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800891 shard_host_map = bucket_hosts_by_shard(host_objects)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800892 num_shards = len(shard_host_map)
893 if (num_shards > 1 or (num_shards == 1 and
894 len(shard_host_map.values()[0]) != len(host_objects))):
895 # We disallow the following jobs on master:
896 # num_shards > 1: this is a job spanning across multiple shards.
897 # num_shards == 1 but number of hosts on shard is less
898 # than total number of hosts: this is a job that spans across
899 # one shard and the master.
900 raise ValueError(
901 'The following hosts are on shard(s), please create '
902 'seperate jobs for hosts on each shard: %s ' %
903 shard_host_map)
jamesren4a41e012010-07-16 22:33:48 +0000904 metahost_objects = []
Alex Miller871291b2013-08-08 01:19:20 -0700905 meta_host_labels_by_name = {label.name: label for label in label_objects}
jamesren4a41e012010-07-16 22:33:48 +0000906 for label_name in meta_hosts or []:
Alex Miller871291b2013-08-08 01:19:20 -0700907 if label_name in meta_host_labels_by_name:
908 metahost_objects.append(meta_host_labels_by_name[label_name])
jamesren4a41e012010-07-16 22:33:48 +0000909 elif label_name in atomic_groups_by_name:
910 # If given a metahost name that isn't a Label, check to
911 # see if the user was specifying an Atomic Group instead.
912 atomic_group = atomic_groups_by_name[label_name]
913 if atomic_group_name and atomic_group_name != atomic_group.name:
914 raise model_logic.ValidationError({
915 'meta_hosts': (
916 'Label "%s" not found. If assumed to be an '
917 'atomic group it would conflict with the '
918 'supplied atomic group "%s".' % (
919 label_name, atomic_group_name))})
920 atomic_group_name = atomic_group.name
921 else:
922 raise model_logic.ValidationError(
923 {'meta_hosts' : 'Label "%s" not found' % label_name})
924
925 # Create and sanity check an AtomicGroup object if requested.
926 if atomic_group_name:
927 if one_time_hosts:
928 raise model_logic.ValidationError(
929 {'one_time_hosts':
930 'One time hosts cannot be used with an Atomic Group.'})
931 atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
932 if synch_count and synch_count > atomic_group.max_number_of_machines:
933 raise model_logic.ValidationError(
934 {'atomic_group_name' :
935 'You have requested a synch_count (%d) greater than the '
936 'maximum machines in the requested Atomic Group (%d).' %
937 (synch_count, atomic_group.max_number_of_machines)})
938 else:
939 atomic_group = None
940
941 for host in one_time_hosts or []:
942 this_host = models.Host.create_one_time_host(host)
943 host_objects.append(this_host)
944
945 options = dict(name=name,
946 priority=priority,
947 control_file=control_file,
948 control_type=control_type,
949 is_template=is_template,
950 timeout=timeout,
Simran Basi7e605742013-11-12 13:43:36 -0800951 timeout_mins=timeout_mins,
Simran Basi34217022012-11-06 13:43:15 -0800952 max_runtime_mins=max_runtime_mins,
jamesren4a41e012010-07-16 22:33:48 +0000953 synch_count=synch_count,
954 run_verify=run_verify,
955 email_list=email_list,
956 dependencies=dependencies,
957 reboot_before=reboot_before,
958 reboot_after=reboot_after,
959 parse_failed_repair=parse_failed_repair,
960 keyvals=keyvals,
961 drone_set=drone_set,
Aviv Keshet18308922013-02-19 17:49:49 -0800962 parameterized_job=parameterized_job,
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800963 parent_job_id=parent_job_id,
Dan Shi07e09af2013-04-12 09:31:29 -0700964 test_retry=test_retry,
Dan Shiec1d47d2015-02-13 11:38:13 -0800965 run_reset=run_reset,
966 require_ssp=require_ssp)
jamesren4a41e012010-07-16 22:33:48 +0000967 return create_new_job(owner=owner,
968 options=options,
969 host_objects=host_objects,
970 metahost_objects=metahost_objects,
971 atomic_group=atomic_group)
Simran Basib6ec8ae2014-04-23 12:05:08 -0700972
973
974def encode_ascii(control_file):
975 """Force a control file to only contain ascii characters.
976
977 @param control_file: Control file to encode.
978
979 @returns the control file in an ascii encoding.
980
981 @raises error.ControlFileMalformed: if encoding fails.
982 """
983 try:
984 return control_file.encode('ascii')
985 except UnicodeDecodeError as e:
Jiaxi Luo421608e2014-07-07 14:38:00 -0700986 raise error.ControlFileMalformed(str(e))
987
988
989def get_wmatrix_url():
990 """Get wmatrix url from config file.
991
992 @returns the wmatrix url or an empty string.
993 """
994 return global_config.global_config.get_config_value('AUTOTEST_WEB',
995 'wmatrix_url',
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700996 default='')
Jiaxi Luo57bc1952014-07-22 15:27:30 -0700997
998
999def inject_times_to_filter(start_time_key=None, end_time_key=None,
1000 start_time_value=None, end_time_value=None,
1001 **filter_data):
1002 """Inject the key value pairs of start and end time if provided.
1003
1004 @param start_time_key: A string represents the filter key of start_time.
1005 @param end_time_key: A string represents the filter key of end_time.
1006 @param start_time_value: Start_time value.
1007 @param end_time_value: End_time value.
1008
1009 @returns the injected filter_data.
1010 """
1011 if start_time_value:
1012 filter_data[start_time_key] = start_time_value
1013 if end_time_value:
1014 filter_data[end_time_key] = end_time_value
1015 return filter_data
1016
1017
1018def inject_times_to_hqe_special_tasks_filters(filter_data_common,
1019 start_time, end_time):
1020 """Inject start and end time to hqe and special tasks filters.
1021
1022 @param filter_data_common: Common filter for hqe and special tasks.
1023 @param start_time_key: A string represents the filter key of start_time.
1024 @param end_time_key: A string represents the filter key of end_time.
1025
1026 @returns a pair of hqe and special tasks filters.
1027 """
1028 filter_data_special_tasks = filter_data_common.copy()
1029 return (inject_times_to_filter('started_on__gte', 'started_on__lte',
1030 start_time, end_time, **filter_data_common),
1031 inject_times_to_filter('time_started__gte', 'time_started__lte',
1032 start_time, end_time,
Jakob Juelich59cfe542014-09-02 16:37:46 -07001033 **filter_data_special_tasks))
1034
1035
1036def retrieve_shard(shard_hostname):
1037 """
Jakob Juelich77457572014-09-22 17:02:43 -07001038 Retrieves the shard with the given hostname from the database.
Jakob Juelich59cfe542014-09-02 16:37:46 -07001039
1040 @param shard_hostname: Hostname of the shard to retrieve
1041
Jakob Juelich77457572014-09-22 17:02:43 -07001042 @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
1043
Jakob Juelich59cfe542014-09-02 16:37:46 -07001044 @returns: Shard object
1045 """
Jakob Juelich77457572014-09-22 17:02:43 -07001046 return models.Shard.smart_get(shard_hostname)
Jakob Juelich59cfe542014-09-02 16:37:46 -07001047
1048
Jakob Juelich1b525742014-09-30 13:08:07 -07001049def find_records_for_shard(shard, known_job_ids, known_host_ids):
Jakob Juelich59cfe542014-09-02 16:37:46 -07001050 """Find records that should be sent to a shard.
1051
Jakob Juelicha94efe62014-09-18 16:02:49 -07001052 @param shard: Shard to find records for.
Jakob Juelich1b525742014-09-30 13:08:07 -07001053 @param known_job_ids: List of ids of jobs the shard already has.
1054 @param known_host_ids: List of ids of hosts the shard already has.
Jakob Juelicha94efe62014-09-18 16:02:49 -07001055
Fang Dengf3705992014-12-16 17:32:18 -08001056 @returns: Tuple of three lists for hosts, jobs, and suite job keyvals:
1057 (hosts, jobs, suite_job_keyvals).
Jakob Juelich59cfe542014-09-02 16:37:46 -07001058 """
Jakob Juelich1b525742014-09-30 13:08:07 -07001059 hosts = models.Host.assign_to_shard(shard, known_host_ids)
1060 jobs = models.Job.assign_to_shard(shard, known_job_ids)
Fang Dengf3705992014-12-16 17:32:18 -08001061 parent_job_ids = [job.parent_job_id for job in jobs]
1062 suite_job_keyvals = models.JobKeyval.objects.filter(
1063 job_id__in=parent_job_ids)
1064 return hosts, jobs, suite_job_keyvals
Jakob Juelicha94efe62014-09-18 16:02:49 -07001065
1066
1067def _persist_records_with_type_sent_from_shard(
1068 shard, records, record_type, *args, **kwargs):
1069 """
1070 Handle records of a specified type that were sent to the shard master.
1071
1072 @param shard: The shard the records were sent from.
1073 @param records: The records sent in their serialized format.
1074 @param record_type: Type of the objects represented by records.
1075 @param args: Additional arguments that will be passed on to the sanity
1076 checks.
1077 @param kwargs: Additional arguments that will be passed on to the sanity
1078 checks.
1079
1080 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1081
1082 @returns: List of primary keys of the processed records.
1083 """
1084 pks = []
1085 for serialized_record in records:
1086 pk = serialized_record['id']
1087 try:
1088 current_record = record_type.objects.get(pk=pk)
1089 except record_type.DoesNotExist:
1090 raise error.UnallowedRecordsSentToMaster(
1091 'Object with pk %s of type %s does not exist on master.' % (
1092 pk, record_type))
1093
1094 current_record.sanity_check_update_from_shard(
1095 shard, serialized_record, *args, **kwargs)
1096
1097 current_record.update_from_serialized(serialized_record)
1098 pks.append(pk)
1099 return pks
1100
1101
1102def persist_records_sent_from_shard(shard, jobs, hqes):
1103 """
1104 Sanity checking then saving serialized records sent to master from shard.
1105
1106 During heartbeats shards upload jobs and hostqueuentries. This performs
1107 some sanity checks on these and then updates the existing records for those
1108 entries with the updated ones from the heartbeat.
1109
1110 The sanity checks include:
1111 - Checking if the objects sent already exist on the master.
1112 - Checking if the objects sent were assigned to this shard.
1113 - hostqueueentries must be sent together with their jobs.
1114
1115 @param shard: The shard the records were sent from.
1116 @param jobs: The jobs the shard sent.
1117 @param hqes: The hostqueuentries the shart sent.
1118
1119 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1120 """
1121 job_ids_sent = _persist_records_with_type_sent_from_shard(
1122 shard, jobs, models.Job)
1123
1124 _persist_records_with_type_sent_from_shard(
1125 shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001126
1127
Jakob Juelich50e91f72014-10-01 12:43:23 -07001128def forward_single_host_rpc_to_shard(func):
1129 """This decorator forwards rpc calls that modify a host to a shard.
1130
1131 If a host is assigned to a shard, rpcs that change his attributes should be
1132 forwarded to the shard.
1133
1134 This assumes the first argument of the function represents a host id.
1135
1136 @param func: The function to decorate
1137
1138 @returns: The function to replace func with.
1139 """
1140 def replacement(**kwargs):
1141 # Only keyword arguments can be accepted here, as we need the argument
1142 # names to send the rpc. serviceHandler always provides arguments with
1143 # their keywords, so this is not a problem.
1144 host = models.Host.smart_get(kwargs['id'])
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -08001145 if host.shard and not server_utils.is_shard():
Jakob Juelich50e91f72014-10-01 12:43:23 -07001146 run_rpc_on_multiple_hostnames(func.func_name, [host.shard.hostname],
1147 **kwargs)
1148 return func(**kwargs)
1149
1150 return replacement
1151
1152
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001153def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
1154 """Fanout the given rpc to all shards.
1155
1156 @param host_objs: Host objects for the rpc.
1157 @param rpc_name: The name of the rpc.
1158 @param include_hostnames: If True, include the hostnames in the kwargs.
1159 Hostnames are not always necessary, this functions is designed to
1160 send rpcs to the shard a host is on, the rpcs themselves could be
1161 related to labels, acls etc.
1162 @param kwargs: The kwargs for the rpc.
1163 """
1164 # Fanout should only happen from the master to the shards.
1165 if server_utils.is_shard():
1166 return
1167
1168 # Figure out which hosts are on which shards.
1169 shard_host_map = bucket_hosts_by_shard(
1170 host_objs, rpc_hostnames=True)
1171
1172 # Execute the rpc against the appropriate shards.
1173 for shard, hostnames in shard_host_map.iteritems():
1174 if include_hostnames:
1175 kwargs['hosts'] = hostnames
MK Ryu84573e12015-02-18 15:54:09 -08001176 try:
1177 run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
1178 except:
1179 ei = sys.exc_info()
1180 new_exc = error.RPCException('RPC %s failed on shard %s due to '
1181 '%s: %s' % (rpc_name, shard, ei[0].__name__, ei[1]))
1182 raise new_exc.__class__, new_exc, ei[2]
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001183
1184
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001185def forward_multi_host_rpc_to_shards(func):
1186 """This decorator forwards rpc calls that modify multiple hosts.
1187
1188 If a host is assigned to a shard, rpcs that change his attributes should be
1189 forwarded to the shard. Some calls however, take a list of hosts and a
1190 single id to modify, eg: label_add_hosts. This wrapper will sift through
1191 the list of hosts, find each of their shards, and forward the rpc for
1192 those hosts to that shard before calling the local version of the given rpc.
1193
1194 This assumes:
1195 1. The rpc call uses `smart_get` to retrieve host objects, not the
1196 stock django `get` call. This is true for most, if not all rpcs in
1197 the rpc_interface.
1198 2. The kwargs to the function contain either a list of host ids or
1199 hostnames, keyed under 'hosts'. This is true for all the rpc
1200 functions that use 'smart_get'.
1201
1202 @param func: The function to decorate
1203
1204 @returns: The function to replace func with.
1205 """
1206 def replacement(**kwargs):
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001207 fanout_rpc(
1208 models.Host.smart_get_bulk(kwargs['hosts']),
1209 func.func_name, **kwargs)
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001210 return func(**kwargs)
1211
1212 return replacement
1213
1214
Jakob Juelich50e91f72014-10-01 12:43:23 -07001215def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1216 """Runs an rpc to multiple AFEs
1217
1218 This is i.e. used to propagate changes made to hosts after they are assigned
1219 to a shard.
1220
1221 @param rpc_call: Name of the rpc endpoint to call.
1222 @param shard_hostnames: List of hostnames to run the rpcs on.
1223 @param **kwargs: Keyword arguments to pass in the rpcs.
1224 """
1225 for shard_hostname in shard_hostnames:
1226 afe = frontend.AFE(server=shard_hostname)
1227 afe.run(rpc_call, **kwargs)
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001228
1229
1230def get_label(name):
1231 """Gets a label object using a given name.
1232
1233 @param name: Label name.
1234 @raises model.Label.DoesNotExist: when there is no label matching
1235 the given name.
1236 @return: a label object matching the given name.
1237 """
1238 try:
1239 label = models.Label.smart_get(name)
1240 except models.Label.DoesNotExist:
1241 return None
1242 return label
1243
1244
1245def get_global_afe_hostname():
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001246 """Read the hostname of the global AFE from the global configuration."""
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001247 return global_config.global_config.get_config_value(
1248 'SHARD', 'global_afe_hostname')
1249
1250
1251def route_rpc_to_master(rpc_name, **kwargs):
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001252 """Route RPC to master AFE.
MK Ryu2d107562015-02-24 17:45:02 -08001253
1254 @param rpc_name: The name of the rpc.
1255 @param **kwargs: The kwargs for the rpc.
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001256
MK Ryu2d107562015-02-24 17:45:02 -08001257 """
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001258 master_afe = frontend.AFE(server=get_global_afe_hostname())
MK Ryu2d107562015-02-24 17:45:02 -08001259 return master_afe.run(rpc_name, **kwargs)