blob: 1c143507ed3c055f3a0a600a6372eb1f3ed8d99d [file] [log] [blame]
Aviv Keshet18308922013-02-19 17:49:49 -08001#pylint: disable-msg=C0111
mblighe8819cd2008-02-15 16:48:40 +00002"""\
3Utility functions for rpc_interface.py. We keep them in a separate file so that
4only RPC interface functions go into that file.
5"""
6
7__author__ = 'showard@google.com (Steve Howard)'
8
MK Ryu84573e12015-02-18 15:54:09 -08009import datetime
MK Ryufbb002c2015-06-08 14:13:16 -070010from functools import wraps
MK Ryu84573e12015-02-18 15:54:09 -080011import inspect
12import os
13import sys
showard3d6ae112009-05-02 00:45:48 +000014import django.http
Dan Shi07e09af2013-04-12 09:31:29 -070015from autotest_lib.frontend.afe import models, model_logic
Alex Miller4a193692013-08-21 13:59:01 -070016from autotest_lib.client.common_lib import control_data, error
Jiaxi Luo421608e2014-07-07 14:38:00 -070017from autotest_lib.client.common_lib import global_config, priorities
MK Ryu0c1a37d2015-04-30 12:00:55 -070018from autotest_lib.client.common_lib import time_utils
MK Ryu509516b2015-05-18 12:00:47 -070019from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080020from autotest_lib.server import utils as server_utils
MK Ryu9651ca52015-06-08 17:48:22 -070021from autotest_lib.server.cros import provision
22from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
mblighe8819cd2008-02-15 16:48:40 +000023
showarda62866b2008-07-28 21:27:41 +000024NULL_DATETIME = datetime.datetime.max
25NULL_DATE = datetime.date.max
26
mblighe8819cd2008-02-15 16:48:40 +000027def prepare_for_serialization(objects):
jadmanski0afbb632008-06-06 21:10:57 +000028 """
29 Prepare Python objects to be returned via RPC.
Aviv Keshet18308922013-02-19 17:49:49 -080030 @param objects: objects to be prepared.
jadmanski0afbb632008-06-06 21:10:57 +000031 """
32 if (isinstance(objects, list) and len(objects) and
33 isinstance(objects[0], dict) and 'id' in objects[0]):
34 objects = gather_unique_dicts(objects)
35 return _prepare_data(objects)
showardb8d34242008-04-25 18:11:16 +000036
37
showardc92da832009-04-07 18:14:34 +000038def prepare_rows_as_nested_dicts(query, nested_dict_column_names):
39 """
40 Prepare a Django query to be returned via RPC as a sequence of nested
41 dictionaries.
42
43 @param query - A Django model query object with a select_related() method.
44 @param nested_dict_column_names - A list of column/attribute names for the
45 rows returned by query to expand into nested dictionaries using
46 their get_object_dict() method when not None.
47
48 @returns An list suitable to returned in an RPC.
49 """
50 all_dicts = []
51 for row in query.select_related():
52 row_dict = row.get_object_dict()
53 for column in nested_dict_column_names:
54 if row_dict[column] is not None:
55 row_dict[column] = getattr(row, column).get_object_dict()
56 all_dicts.append(row_dict)
57 return prepare_for_serialization(all_dicts)
58
59
showardb8d34242008-04-25 18:11:16 +000060def _prepare_data(data):
jadmanski0afbb632008-06-06 21:10:57 +000061 """
62 Recursively process data structures, performing necessary type
63 conversions to values in data to allow for RPC serialization:
64 -convert datetimes to strings
showard2b9a88b2008-06-13 20:55:03 +000065 -convert tuples and sets to lists
jadmanski0afbb632008-06-06 21:10:57 +000066 """
67 if isinstance(data, dict):
68 new_data = {}
69 for key, value in data.iteritems():
70 new_data[key] = _prepare_data(value)
71 return new_data
showard2b9a88b2008-06-13 20:55:03 +000072 elif (isinstance(data, list) or isinstance(data, tuple) or
73 isinstance(data, set)):
jadmanski0afbb632008-06-06 21:10:57 +000074 return [_prepare_data(item) for item in data]
showard98659972008-07-17 17:00:07 +000075 elif isinstance(data, datetime.date):
showarda62866b2008-07-28 21:27:41 +000076 if data is NULL_DATETIME or data is NULL_DATE:
77 return None
jadmanski0afbb632008-06-06 21:10:57 +000078 return str(data)
79 else:
80 return data
mblighe8819cd2008-02-15 16:48:40 +000081
82
Moises Osorio2dda22e2014-09-16 15:56:24 -070083def fetchall_as_list_of_dicts(cursor):
84 """
85 Converts each row in the cursor to a dictionary so that values can be read
86 by using the column name.
87 @param cursor: The database cursor to read from.
88 @returns: A list of each row in the cursor as a dictionary.
89 """
90 desc = cursor.description
91 return [ dict(zip([col[0] for col in desc], row))
92 for row in cursor.fetchall() ]
93
94
showard3d6ae112009-05-02 00:45:48 +000095def raw_http_response(response_data, content_type=None):
96 response = django.http.HttpResponse(response_data, mimetype=content_type)
97 response['Content-length'] = str(len(response.content))
98 return response
99
100
showardb0dfb9f2008-06-06 18:08:02 +0000101def gather_unique_dicts(dict_iterable):
jadmanski0afbb632008-06-06 21:10:57 +0000102 """\
103 Pick out unique objects (by ID) from an iterable of object dicts.
104 """
105 id_set = set()
106 result = []
107 for obj in dict_iterable:
108 if obj['id'] not in id_set:
109 id_set.add(obj['id'])
110 result.append(obj)
111 return result
showardb0dfb9f2008-06-06 18:08:02 +0000112
113
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700114def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
jadmanski0afbb632008-06-06 21:10:57 +0000115 """\
116 Generate a SQL WHERE clause for job status filtering, and return it in
117 a dict of keyword args to pass to query.extra(). No more than one of
118 the parameters should be passed as True.
showard6c65d252009-10-01 18:45:22 +0000119 * not_yet_run: all HQEs are Queued
120 * finished: all HQEs are complete
121 * running: everything else
jadmanski0afbb632008-06-06 21:10:57 +0000122 """
123 assert not ((not_yet_run and running) or
124 (not_yet_run and finished) or
125 (running and finished)), ('Cannot specify more than one '
126 'filter to this function')
showard6c65d252009-10-01 18:45:22 +0000127
showardeab66ce2009-12-23 00:03:56 +0000128 not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
129 'WHERE status != "%s")'
showard6c65d252009-10-01 18:45:22 +0000130 % models.HostQueueEntry.Status.QUEUED)
showardeab66ce2009-12-23 00:03:56 +0000131 not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
132 'WHERE not complete)')
showard6c65d252009-10-01 18:45:22 +0000133
jadmanski0afbb632008-06-06 21:10:57 +0000134 if not_yet_run:
showard6c65d252009-10-01 18:45:22 +0000135 where = ['id NOT IN ' + not_queued]
jadmanski0afbb632008-06-06 21:10:57 +0000136 elif running:
showard6c65d252009-10-01 18:45:22 +0000137 where = ['(id IN %s) AND (id IN %s)' % (not_queued, not_finished)]
jadmanski0afbb632008-06-06 21:10:57 +0000138 elif finished:
showard6c65d252009-10-01 18:45:22 +0000139 where = ['id NOT IN ' + not_finished]
jadmanski0afbb632008-06-06 21:10:57 +0000140 else:
showard10f41672009-05-13 21:28:25 +0000141 return {}
jadmanski0afbb632008-06-06 21:10:57 +0000142 return {'where': where}
mblighe8819cd2008-02-15 16:48:40 +0000143
144
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700145def extra_job_type_filters(extra_args, suite=False,
146 sub=False, standalone=False):
147 """\
148 Generate a SQL WHERE clause for job status filtering, and return it in
149 a dict of keyword args to pass to query.extra().
150
151 param extra_args: a dict of existing extra_args.
152
153 No more than one of the parameters should be passed as True:
154 * suite: job which is parent of other jobs
155 * sub: job with a parent job
156 * standalone: job with no child or parent jobs
157 """
158 assert not ((suite and sub) or
159 (suite and standalone) or
160 (sub and standalone)), ('Cannot specify more than one '
161 'filter to this function')
162
163 where = extra_args.get('where', [])
164 parent_job_id = ('DISTINCT parent_job_id')
165 child_job_id = ('id')
166 filter_common = ('(SELECT %s FROM afe_jobs '
167 'WHERE parent_job_id IS NOT NULL)')
168
169 if suite:
170 where.append('id IN ' + filter_common % parent_job_id)
171 elif sub:
172 where.append('id IN ' + filter_common % child_job_id)
173 elif standalone:
174 where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
175 'WHERE parent_job_id IS NOT NULL'
176 ' AND (sub_query.parent_job_id=afe_jobs.id'
177 ' OR sub_query.id=afe_jobs.id))')
178 else:
179 return extra_args
180
181 extra_args['where'] = where
182 return extra_args
183
184
185
showard87cc38f2009-08-20 23:37:04 +0000186def extra_host_filters(multiple_labels=()):
jadmanski0afbb632008-06-06 21:10:57 +0000187 """\
188 Generate SQL WHERE clauses for matching hosts in an intersection of
189 labels.
190 """
191 extra_args = {}
showardeab66ce2009-12-23 00:03:56 +0000192 where_str = ('afe_hosts.id in (select host_id from afe_hosts_labels '
jadmanski0afbb632008-06-06 21:10:57 +0000193 'where label_id=%s)')
194 extra_args['where'] = [where_str] * len(multiple_labels)
195 extra_args['params'] = [models.Label.smart_get(label).id
196 for label in multiple_labels]
197 return extra_args
showard8e3aa5e2008-04-08 19:42:32 +0000198
199
showard87cc38f2009-08-20 23:37:04 +0000200def get_host_query(multiple_labels, exclude_only_if_needed_labels,
showard8aa84fc2009-09-16 17:17:55 +0000201 exclude_atomic_group_hosts, valid_only, filter_data):
202 if valid_only:
203 query = models.Host.valid_objects.all()
204 else:
205 query = models.Host.objects.all()
206
showard43a3d262008-11-12 18:17:05 +0000207 if exclude_only_if_needed_labels:
208 only_if_needed_labels = models.Label.valid_objects.filter(
209 only_if_needed=True)
showardf7eac6f2008-11-13 21:18:01 +0000210 if only_if_needed_labels.count() > 0:
showard87cc38f2009-08-20 23:37:04 +0000211 only_if_needed_ids = ','.join(
212 str(label['id'])
213 for label in only_if_needed_labels.values('id'))
showardf7eac6f2008-11-13 21:18:01 +0000214 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000215 query, 'afe_hosts_labels', join_key='host_id',
216 join_condition=('afe_hosts_labels_exclude_OIN.label_id IN (%s)'
showard87cc38f2009-08-20 23:37:04 +0000217 % only_if_needed_ids),
218 suffix='_exclude_OIN', exclude=True)
showard8aa84fc2009-09-16 17:17:55 +0000219
showard87cc38f2009-08-20 23:37:04 +0000220 if exclude_atomic_group_hosts:
221 atomic_group_labels = models.Label.valid_objects.filter(
222 atomic_group__isnull=False)
223 if atomic_group_labels.count() > 0:
224 atomic_group_label_ids = ','.join(
225 str(atomic_group['id'])
226 for atomic_group in atomic_group_labels.values('id'))
227 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000228 query, 'afe_hosts_labels', join_key='host_id',
229 join_condition=(
230 'afe_hosts_labels_exclude_AG.label_id IN (%s)'
231 % atomic_group_label_ids),
showard87cc38f2009-08-20 23:37:04 +0000232 suffix='_exclude_AG', exclude=True)
Fang Deng04d30612013-04-10 18:13:13 -0700233 try:
234 assert 'extra_args' not in filter_data
235 filter_data['extra_args'] = extra_host_filters(multiple_labels)
236 return models.Host.query_objects(filter_data, initial_query=query)
237 except models.Label.DoesNotExist as e:
238 return models.Host.objects.none()
showard43a3d262008-11-12 18:17:05 +0000239
240
showard8fd58242008-03-10 21:29:07 +0000241class InconsistencyException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000242 'Raised when a list of objects does not have a consistent value'
showard8fd58242008-03-10 21:29:07 +0000243
244
245def get_consistent_value(objects, field):
mblighc5ddfd12008-08-04 17:15:00 +0000246 if not objects:
247 # well a list of nothing is consistent
248 return None
249
jadmanski0afbb632008-06-06 21:10:57 +0000250 value = getattr(objects[0], field)
251 for obj in objects:
252 this_value = getattr(obj, field)
253 if this_value != value:
254 raise InconsistencyException(objects[0], obj)
255 return value
showard8fd58242008-03-10 21:29:07 +0000256
257
Matthew Sartori10438092015-06-24 14:30:18 -0700258def afe_test_dict_to_test_object(test_dict):
259 if not isinstance(test_dict, dict):
260 return test_dict
261
262 numerized_dict = {}
263 for key, value in test_dict.iteritems():
264 try:
265 numerized_dict[key] = int(value)
266 except (ValueError, TypeError):
267 numerized_dict[key] = value
268
269 return type('TestObject', (object,), numerized_dict)
270
271
272def prepare_generate_control_file(tests, kernel, label, profilers,
273 db_tests=True):
274 if db_tests:
275 test_objects = [models.Test.smart_get(test) for test in tests]
276 else:
277 test_objects = [afe_test_dict_to_test_object(test) for test in tests]
278
showard2b9a88b2008-06-13 20:55:03 +0000279 profiler_objects = [models.Profiler.smart_get(profiler)
280 for profiler in profilers]
jadmanski0afbb632008-06-06 21:10:57 +0000281 # ensure tests are all the same type
282 try:
283 test_type = get_consistent_value(test_objects, 'test_type')
284 except InconsistencyException, exc:
285 test1, test2 = exc.args
mblighec5546d2008-06-16 16:51:28 +0000286 raise model_logic.ValidationError(
Matthew Sartori10438092015-06-24 14:30:18 -0700287 {'tests' : 'You cannot run both test_suites and server-side '
jadmanski0afbb632008-06-06 21:10:57 +0000288 'tests together (tests %s and %s differ' % (
289 test1.name, test2.name)})
showard8fd58242008-03-10 21:29:07 +0000290
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700291 is_server = (test_type == control_data.CONTROL_TYPE.SERVER)
showard14374b12009-01-31 00:11:54 +0000292 if test_objects:
293 synch_count = max(test.sync_count for test in test_objects)
294 else:
295 synch_count = 1
jadmanski0afbb632008-06-06 21:10:57 +0000296 if label:
297 label = models.Label.smart_get(label)
mblighe8819cd2008-02-15 16:48:40 +0000298
Matthew Sartori10438092015-06-24 14:30:18 -0700299 if db_tests:
300 dependencies = set(label.name for label
301 in models.Label.objects.filter(test__in=test_objects))
302 else:
303 dependencies = reduce(
304 set.union, [set(test.dependencies) for test in test_objects])
showard989f25d2008-10-01 11:38:11 +0000305
showard2bab8f42008-11-12 18:15:22 +0000306 cf_info = dict(is_server=is_server, synch_count=synch_count,
307 dependencies=list(dependencies))
308 return cf_info, test_objects, profiler_objects, label
showard989f25d2008-10-01 11:38:11 +0000309
310
311def check_job_dependencies(host_objects, job_dependencies):
312 """
313 Check that a set of machines satisfies a job's dependencies.
314 host_objects: list of models.Host objects
315 job_dependencies: list of names of labels
316 """
317 # check that hosts satisfy dependencies
318 host_ids = [host.id for host in host_objects]
319 hosts_in_job = models.Host.objects.filter(id__in=host_ids)
320 ok_hosts = hosts_in_job
321 for index, dependency in enumerate(job_dependencies):
Alex Milleraa772002014-04-10 17:51:21 -0700322 if not provision.is_for_special_action(dependency):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700323 ok_hosts = ok_hosts.filter(labels__name=dependency)
showard989f25d2008-10-01 11:38:11 +0000324 failing_hosts = (set(host.hostname for host in host_objects) -
325 set(host.hostname for host in ok_hosts))
326 if failing_hosts:
327 raise model_logic.ValidationError(
Eric Lie0493a42010-11-15 13:05:43 -0800328 {'hosts' : 'Host(s) failed to meet job dependencies (' +
329 (', '.join(job_dependencies)) + '): ' +
330 (', '.join(failing_hosts))})
331
showard989f25d2008-10-01 11:38:11 +0000332
Alex Miller4a193692013-08-21 13:59:01 -0700333def check_job_metahost_dependencies(metahost_objects, job_dependencies):
334 """
335 Check that at least one machine within the metahost spec satisfies the job's
336 dependencies.
337
338 @param metahost_objects A list of label objects representing the metahosts.
339 @param job_dependencies A list of strings of the required label names.
340 @raises NoEligibleHostException If a metahost cannot run the job.
341 """
342 for metahost in metahost_objects:
343 hosts = models.Host.objects.filter(labels=metahost)
344 for label_name in job_dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700345 if not provision.is_for_special_action(label_name):
Alex Miller4a193692013-08-21 13:59:01 -0700346 hosts = hosts.filter(labels__name=label_name)
347 if not any(hosts):
348 raise error.NoEligibleHostException("No hosts within %s satisfy %s."
349 % (metahost.name, ', '.join(job_dependencies)))
350
showard2bab8f42008-11-12 18:15:22 +0000351
352def _execution_key_for(host_queue_entry):
353 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
354
355
356def check_abort_synchronous_jobs(host_queue_entries):
357 # ensure user isn't aborting part of a synchronous autoserv execution
358 count_per_execution = {}
359 for queue_entry in host_queue_entries:
360 key = _execution_key_for(queue_entry)
361 count_per_execution.setdefault(key, 0)
362 count_per_execution[key] += 1
363
364 for queue_entry in host_queue_entries:
365 if not queue_entry.execution_subdir:
366 continue
367 execution_count = count_per_execution[_execution_key_for(queue_entry)]
368 if execution_count < queue_entry.job.synch_count:
mbligh1ef218d2009-08-03 16:57:56 +0000369 raise model_logic.ValidationError(
370 {'' : 'You cannot abort part of a synchronous job execution '
371 '(%d/%s), %d included, %d expected'
372 % (queue_entry.job.id, queue_entry.execution_subdir,
373 execution_count, queue_entry.job.synch_count)})
showard8fbae652009-01-20 23:23:10 +0000374
375
showardc92da832009-04-07 18:14:34 +0000376def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700377 dependencies, atomic_group):
showardc92da832009-04-07 18:14:34 +0000378 """
379 Attempt to reject create_job requests with an atomic group that
380 will be impossible to schedule. The checks are not perfect but
381 should catch the most obvious issues.
382
383 @param synch_count - The job's minimum synch count.
384 @param host_objects - A list of models.Host instances.
385 @param metahost_objects - A list of models.Label instances.
386 @param dependencies - A list of job dependency label names.
showardc92da832009-04-07 18:14:34 +0000387 @param labels_by_name - A dictionary mapping label names to models.Label
388 instance. Used to look up instances for dependencies.
389
390 @raises model_logic.ValidationError - When an issue is found.
391 """
392 # If specific host objects were supplied with an atomic group, verify
393 # that there are enough to satisfy the synch_count.
394 minimum_required = synch_count or 1
395 if (host_objects and not metahost_objects and
396 len(host_objects) < minimum_required):
397 raise model_logic.ValidationError(
398 {'hosts':
399 'only %d hosts provided for job with synch_count = %d' %
400 (len(host_objects), synch_count)})
401
402 # Check that the atomic group has a hope of running this job
403 # given any supplied metahosts and dependancies that may limit.
404
405 # Get a set of hostnames in the atomic group.
406 possible_hosts = set()
407 for label in atomic_group.label_set.all():
408 possible_hosts.update(h.hostname for h in label.host_set.all())
409
410 # Filter out hosts that don't match all of the job dependency labels.
Alex Miller871291b2013-08-08 01:19:20 -0700411 for label in models.Label.objects.filter(name__in=dependencies):
showardc92da832009-04-07 18:14:34 +0000412 hosts_in_label = (h.hostname for h in label.host_set.all())
413 possible_hosts.intersection_update(hosts_in_label)
414
showard225bdc12009-04-13 16:09:21 +0000415 if not host_objects and not metahost_objects:
416 # No hosts or metahosts are required to queue an atomic group Job.
417 # However, if they are given, we respect them below.
418 host_set = possible_hosts
419 else:
420 host_set = set(host.hostname for host in host_objects)
421 unusable_host_set = host_set.difference(possible_hosts)
422 if unusable_host_set:
423 raise model_logic.ValidationError(
424 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %
425 (', '.join(sorted(unusable_host_set)), atomic_group.name)})
showardc92da832009-04-07 18:14:34 +0000426
427 # Lookup hosts provided by each meta host and merge them into the
428 # host_set for final counting.
429 for meta_host in metahost_objects:
430 meta_possible = possible_hosts.copy()
431 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())
432 meta_possible.intersection_update(hosts_in_meta_host)
433
434 # Count all hosts that this meta_host will provide.
435 host_set.update(meta_possible)
436
437 if len(host_set) < minimum_required:
438 raise model_logic.ValidationError(
439 {'atomic_group_name':
440 'Insufficient hosts in Atomic Group "%s" with the'
441 ' supplied dependencies and meta_hosts.' %
442 (atomic_group.name,)})
443
444
showardbe0d8692009-08-20 23:42:44 +0000445def check_modify_host(update_data):
446 """
447 Sanity check modify_host* requests.
448
449 @param update_data: A dictionary with the changes to make to a host
450 or hosts.
451 """
452 # Only the scheduler (monitor_db) is allowed to modify Host status.
453 # Otherwise race conditions happen as a hosts state is changed out from
454 # beneath tasks being run on a host.
455 if 'status' in update_data:
456 raise model_logic.ValidationError({
457 'status': 'Host status can not be modified by the frontend.'})
458
459
showardce7c0922009-09-11 18:39:24 +0000460def check_modify_host_locking(host, update_data):
461 """
462 Checks when locking/unlocking has been requested if the host is already
463 locked/unlocked.
464
465 @param host: models.Host object to be modified
466 @param update_data: A dictionary with the changes to make to the host.
467 """
468 locked = update_data.get('locked', None)
Matthew Sartori68186332015-04-27 17:19:53 -0700469 lock_reason = update_data.get('lock_reason', None)
showardce7c0922009-09-11 18:39:24 +0000470 if locked is not None:
471 if locked and host.locked:
472 raise model_logic.ValidationError({
473 'locked': 'Host already locked by %s on %s.' %
474 (host.locked_by, host.lock_time)})
475 if not locked and not host.locked:
476 raise model_logic.ValidationError({
477 'locked': 'Host already unlocked.'})
Matthew Sartori68186332015-04-27 17:19:53 -0700478 if locked and not lock_reason and not host.locked:
479 raise model_logic.ValidationError({
480 'locked': 'Please provide a reason for locking'})
showardce7c0922009-09-11 18:39:24 +0000481
482
showard8fbae652009-01-20 23:23:10 +0000483def get_motd():
484 dirname = os.path.dirname(__file__)
485 filename = os.path.join(dirname, "..", "..", "motd.txt")
486 text = ''
487 try:
488 fp = open(filename, "r")
489 try:
490 text = fp.read()
491 finally:
492 fp.close()
493 except:
494 pass
495
496 return text
showard29f7cd22009-04-29 21:16:24 +0000497
498
499def _get_metahost_counts(metahost_objects):
500 metahost_counts = {}
501 for metahost in metahost_objects:
502 metahost_counts.setdefault(metahost, 0)
503 metahost_counts[metahost] += 1
504 return metahost_counts
505
506
showarda965cef2009-05-15 23:17:41 +0000507def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
showard29f7cd22009-04-29 21:16:24 +0000508 hosts = []
509 one_time_hosts = []
510 meta_hosts = []
511 atomic_group = None
jamesren2275ef12010-04-12 18:25:06 +0000512 hostless = False
showard29f7cd22009-04-29 21:16:24 +0000513
showard4d077562009-05-08 18:24:36 +0000514 queue_entries = job.hostqueueentry_set.all()
showarda965cef2009-05-15 23:17:41 +0000515 if queue_entry_filter_data:
516 queue_entries = models.HostQueueEntry.query_objects(
517 queue_entry_filter_data, initial_query=queue_entries)
showard4d077562009-05-08 18:24:36 +0000518
519 for queue_entry in queue_entries:
showard29f7cd22009-04-29 21:16:24 +0000520 if (queue_entry.host and (preserve_metahosts or
521 not queue_entry.meta_host)):
522 if queue_entry.deleted:
523 continue
524 if queue_entry.host.invalid:
525 one_time_hosts.append(queue_entry.host)
526 else:
527 hosts.append(queue_entry.host)
jamesren2275ef12010-04-12 18:25:06 +0000528 elif queue_entry.meta_host:
showard29f7cd22009-04-29 21:16:24 +0000529 meta_hosts.append(queue_entry.meta_host)
jamesren2275ef12010-04-12 18:25:06 +0000530 else:
531 hostless = True
532
showard29f7cd22009-04-29 21:16:24 +0000533 if atomic_group is None:
534 if queue_entry.atomic_group is not None:
535 atomic_group = queue_entry.atomic_group
536 else:
537 assert atomic_group.name == queue_entry.atomic_group.name, (
538 'DB inconsistency. HostQueueEntries with multiple atomic'
539 ' groups on job %s: %s != %s' % (
540 id, atomic_group.name, queue_entry.atomic_group.name))
541
542 meta_host_counts = _get_metahost_counts(meta_hosts)
543
544 info = dict(dependencies=[label.name for label
545 in job.dependency_labels.all()],
546 hosts=hosts,
547 meta_hosts=meta_hosts,
548 meta_host_counts=meta_host_counts,
549 one_time_hosts=one_time_hosts,
jamesren2275ef12010-04-12 18:25:06 +0000550 atomic_group=atomic_group,
551 hostless=hostless)
showard29f7cd22009-04-29 21:16:24 +0000552 return info
553
554
showard09d80f92009-11-19 01:01:19 +0000555def check_for_duplicate_hosts(host_objects):
556 host_ids = set()
557 duplicate_hostnames = set()
558 for host in host_objects:
559 if host.id in host_ids:
560 duplicate_hostnames.add(host.hostname)
561 host_ids.add(host.id)
562
563 if duplicate_hostnames:
564 raise model_logic.ValidationError(
565 {'hosts' : 'Duplicate hosts: %s'
566 % ', '.join(duplicate_hostnames)})
567
568
showarda1e74b32009-05-12 17:32:04 +0000569def create_new_job(owner, options, host_objects, metahost_objects,
570 atomic_group=None):
showard29f7cd22009-04-29 21:16:24 +0000571 all_host_objects = host_objects + metahost_objects
showarda1e74b32009-05-12 17:32:04 +0000572 dependencies = options.get('dependencies', [])
573 synch_count = options.get('synch_count')
showard29f7cd22009-04-29 21:16:24 +0000574
showard29f7cd22009-04-29 21:16:24 +0000575 if atomic_group:
576 check_atomic_group_create_job(
577 synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700578 dependencies, atomic_group)
showard29f7cd22009-04-29 21:16:24 +0000579 else:
580 if synch_count is not None and synch_count > len(all_host_objects):
581 raise model_logic.ValidationError(
582 {'hosts':
583 'only %d hosts provided for job with synch_count = %d' %
584 (len(all_host_objects), synch_count)})
585 atomic_hosts = models.Host.objects.filter(
586 id__in=[host.id for host in host_objects],
587 labels__atomic_group=True)
588 unusable_host_names = [host.hostname for host in atomic_hosts]
589 if unusable_host_names:
590 raise model_logic.ValidationError(
591 {'hosts':
592 'Host(s) "%s" are atomic group hosts but no '
593 'atomic group was specified for this job.' %
594 (', '.join(unusable_host_names),)})
595
showard09d80f92009-11-19 01:01:19 +0000596 check_for_duplicate_hosts(host_objects)
showard29f7cd22009-04-29 21:16:24 +0000597
Aviv Keshetc68807e2013-07-31 16:13:01 -0700598 for label_name in dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700599 if provision.is_for_special_action(label_name):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700600 # TODO: We could save a few queries
601 # if we had a bulk ensure-label-exists function, which used
602 # a bulk .get() call. The win is probably very small.
Alex Miller871291b2013-08-08 01:19:20 -0700603 _ensure_label_exists(label_name)
Aviv Keshetc68807e2013-07-31 16:13:01 -0700604
Alex Miller4a193692013-08-21 13:59:01 -0700605 # This only checks targeted hosts, not hosts eligible due to the metahost
606 check_job_dependencies(host_objects, dependencies)
607 check_job_metahost_dependencies(metahost_objects, dependencies)
608
Alex Miller871291b2013-08-08 01:19:20 -0700609 options['dependencies'] = list(
610 models.Label.objects.filter(name__in=dependencies))
showard29f7cd22009-04-29 21:16:24 +0000611
showarda1e74b32009-05-12 17:32:04 +0000612 for label in metahost_objects + options['dependencies']:
showard29f7cd22009-04-29 21:16:24 +0000613 if label.atomic_group and not atomic_group:
614 raise model_logic.ValidationError(
615 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000616 'Dependency %r requires an atomic group but no '
617 'atomic_group_name or meta_host in an atomic group was '
618 'specified for this job.' % label.name})
showard29f7cd22009-04-29 21:16:24 +0000619 elif (label.atomic_group and
620 label.atomic_group.name != atomic_group.name):
621 raise model_logic.ValidationError(
622 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000623 'meta_hosts or dependency %r requires atomic group '
624 '%r instead of the supplied atomic_group_name=%r.' %
625 (label.name, label.atomic_group.name, atomic_group.name)})
showard29f7cd22009-04-29 21:16:24 +0000626
showarda1e74b32009-05-12 17:32:04 +0000627 job = models.Job.create(owner=owner, options=options,
628 hosts=all_host_objects)
showard29f7cd22009-04-29 21:16:24 +0000629 job.queue(all_host_objects, atomic_group=atomic_group,
showarda1e74b32009-05-12 17:32:04 +0000630 is_template=options.get('is_template', False))
showard29f7cd22009-04-29 21:16:24 +0000631 return job.id
showard0957a842009-05-11 19:25:08 +0000632
633
Aviv Keshetc68807e2013-07-31 16:13:01 -0700634def _ensure_label_exists(name):
635 """
636 Ensure that a label called |name| exists in the Django models.
637
638 This function is to be called from within afe rpcs only, as an
639 alternative to server.cros.provision.ensure_label_exists(...). It works
640 by Django model manipulation, rather than by making another create_label
641 rpc call.
642
643 @param name: the label to check for/create.
644 @raises ValidationError: There was an error in the response that was
645 not because the label already existed.
646 @returns True is a label was created, False otherwise.
647 """
MK Ryu73be9862015-07-06 12:25:00 -0700648 # Make sure this function is not called on shards but only on master.
649 assert not server_utils.is_shard()
Aviv Keshetc68807e2013-07-31 16:13:01 -0700650 try:
651 models.Label.objects.get(name=name)
652 except models.Label.DoesNotExist:
653 new_label = models.Label.objects.create(name=name)
654 new_label.save()
655 return True
656 return False
657
658
showard909c9142009-07-07 20:54:42 +0000659def find_platform_and_atomic_group(host):
660 """
661 Figure out the platform name and atomic group name for the given host
662 object. If none, the return value for either will be None.
663
664 @returns (platform name, atomic group name) for the given host.
665 """
showard0957a842009-05-11 19:25:08 +0000666 platforms = [label.name for label in host.label_list if label.platform]
667 if not platforms:
showard909c9142009-07-07 20:54:42 +0000668 platform = None
669 else:
670 platform = platforms[0]
showard0957a842009-05-11 19:25:08 +0000671 if len(platforms) > 1:
672 raise ValueError('Host %s has more than one platform: %s' %
673 (host.hostname, ', '.join(platforms)))
showard909c9142009-07-07 20:54:42 +0000674 for label in host.label_list:
675 if label.atomic_group:
676 atomic_group_name = label.atomic_group.name
677 break
678 else:
679 atomic_group_name = None
680 # Don't check for multiple atomic groups on a host here. That is an
681 # error but should not trip up the RPC interface. monitor_db_cleanup
682 # deals with it. This just returns the first one found.
683 return platform, atomic_group_name
showardc0ac3a72009-07-08 21:14:45 +0000684
685
686# support for get_host_queue_entries_and_special_tasks()
687
MK Ryu0c1a37d2015-04-30 12:00:55 -0700688def _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on):
showardc0ac3a72009-07-08 21:14:45 +0000689 return dict(type=type,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700690 host=entry['host'],
showardc0ac3a72009-07-08 21:14:45 +0000691 job=job_dict,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700692 execution_path=exec_path,
693 status=status,
694 started_on=started_on,
695 id=str(entry['id']) + type,
696 oid=entry['id'])
showardc0ac3a72009-07-08 21:14:45 +0000697
698
MK Ryu0c1a37d2015-04-30 12:00:55 -0700699def _special_task_to_dict(task, queue_entries):
700 """Transforms a special task dictionary to another form of dictionary.
701
702 @param task Special task as a dictionary type
703 @param queue_entries Host queue entries as a list of dictionaries.
704
705 @return Transformed dictionary for a special task.
706 """
showardc0ac3a72009-07-08 21:14:45 +0000707 job_dict = None
MK Ryu0c1a37d2015-04-30 12:00:55 -0700708 if task['queue_entry']:
709 # Scan queue_entries to get the job detail info.
710 for qentry in queue_entries:
711 if task['queue_entry']['id'] == qentry['id']:
712 job_dict = qentry['job']
713 break
714 # If not found, get it from DB.
715 if job_dict is None:
716 job = models.Job.objects.get(id=task['queue_entry']['job'])
717 job_dict = job.get_object_dict()
718
719 exec_path = server_utils.get_special_task_exec_path(
720 task['host']['hostname'], task['id'], task['task'],
721 time_utils.time_string_to_datetime(task['time_requested']))
722 status = server_utils.get_special_task_status(
723 task['is_complete'], task['success'], task['is_active'])
724 return _common_entry_to_dict(task, task['task'], job_dict,
725 exec_path, status, task['time_started'])
showardc0ac3a72009-07-08 21:14:45 +0000726
727
728def _queue_entry_to_dict(queue_entry):
MK Ryu0c1a37d2015-04-30 12:00:55 -0700729 job_dict = queue_entry['job']
730 tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner'])
731 exec_path = server_utils.get_hqe_exec_path(tag,
732 queue_entry['execution_subdir'])
733 return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path,
734 queue_entry['status'], queue_entry['started_on'])
735
736
737def prepare_host_queue_entries_and_special_tasks(interleaved_entries,
738 queue_entries):
739 """
740 Prepare for serialization the interleaved entries of host queue entries
741 and special tasks.
742 Each element in the entries is a dictionary type.
743 The special task dictionary has only a job id for a job and lacks
744 the detail of the job while the host queue entry dictionary has.
745 queue_entries is used to look up the job detail info.
746
747 @param interleaved_entries Host queue entries and special tasks as a list
748 of dictionaries.
749 @param queue_entries Host queue entries as a list of dictionaries.
750
751 @return A post-processed list of dictionaries that is to be serialized.
752 """
753 dict_list = []
754 for e in interleaved_entries:
755 # Distinguish the two mixed entries based on the existence of
756 # the key "task". If an entry has the key, the entry is for
757 # special task. Otherwise, host queue entry.
758 if 'task' in e:
759 dict_list.append(_special_task_to_dict(e, queue_entries))
760 else:
761 dict_list.append(_queue_entry_to_dict(e))
762 return prepare_for_serialization(dict_list)
showardc0ac3a72009-07-08 21:14:45 +0000763
764
765def _compute_next_job_for_tasks(queue_entries, special_tasks):
766 """
767 For each task, try to figure out the next job that ran after that task.
768 This is done using two pieces of information:
769 * if the task has a queue entry, we can use that entry's job ID.
770 * if the task has a time_started, we can try to compare that against the
771 started_on field of queue_entries. this isn't guaranteed to work perfectly
772 since queue_entries may also have null started_on values.
773 * if the task has neither, or if use of time_started fails, just use the
774 last computed job ID.
MK Ryu0c1a37d2015-04-30 12:00:55 -0700775
776 @param queue_entries Host queue entries as a list of dictionaries.
777 @param special_tasks Special tasks as a list of dictionaries.
showardc0ac3a72009-07-08 21:14:45 +0000778 """
779 next_job_id = None # most recently computed next job
780 hqe_index = 0 # index for scanning by started_on times
781 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700782 if task['queue_entry']:
783 next_job_id = task['queue_entry']['job']
784 elif task['time_started'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000785 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700786 if queue_entry['started_on'] is None:
showardc0ac3a72009-07-08 21:14:45 +0000787 continue
MK Ryu0c1a37d2015-04-30 12:00:55 -0700788 t1 = time_utils.time_string_to_datetime(
789 queue_entry['started_on'])
790 t2 = time_utils.time_string_to_datetime(task['time_started'])
791 if t1 < t2:
showardc0ac3a72009-07-08 21:14:45 +0000792 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700793 next_job_id = queue_entry['job']['id']
showardc0ac3a72009-07-08 21:14:45 +0000794
MK Ryu0c1a37d2015-04-30 12:00:55 -0700795 task['next_job_id'] = next_job_id
showardc0ac3a72009-07-08 21:14:45 +0000796
797 # advance hqe_index to just after next_job_id
798 if next_job_id is not None:
799 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700800 if queue_entry['job']['id'] < next_job_id:
showardc0ac3a72009-07-08 21:14:45 +0000801 break
802 hqe_index += 1
803
804
805def interleave_entries(queue_entries, special_tasks):
806 """
807 Both lists should be ordered by descending ID.
808 """
809 _compute_next_job_for_tasks(queue_entries, special_tasks)
810
811 # start with all special tasks that've run since the last job
812 interleaved_entries = []
813 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700814 if task['next_job_id'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000815 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700816 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000817
818 # now interleave queue entries with the remaining special tasks
819 special_task_index = len(interleaved_entries)
820 for queue_entry in queue_entries:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700821 interleaved_entries.append(queue_entry)
showardc0ac3a72009-07-08 21:14:45 +0000822 # add all tasks that ran between this job and the previous one
823 for task in special_tasks[special_task_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700824 if task['next_job_id'] < queue_entry['job']['id']:
showardc0ac3a72009-07-08 21:14:45 +0000825 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700826 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000827 special_task_index += 1
828
829 return interleaved_entries
jamesren4a41e012010-07-16 22:33:48 +0000830
831
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800832def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
833 """Figure out which hosts are on which shards.
834
835 @param host_objs: A list of host objects.
836 @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
837 instead of the 'real' shard hostnames. This only matters for testing
838 environments.
839
840 @return: A map of shard hostname: list of hosts on the shard.
841 """
842 shard_host_map = {}
843 for host in host_objs:
844 if host.shard:
845 shard_name = (host.shard.rpc_hostname() if rpc_hostnames
846 else host.shard.hostname)
847 shard_host_map.setdefault(shard_name, []).append(host.hostname)
848 return shard_host_map
849
850
jamesren4a41e012010-07-16 22:33:48 +0000851def get_create_job_common_args(local_args):
852 """
853 Returns a dict containing only the args that apply for create_job_common
854
855 Returns a subset of local_args, which contains only the arguments that can
856 be passed in to create_job_common().
857 """
Alex Miller7d658cf2013-09-04 16:00:35 -0700858 # This code is only here to not kill suites scheduling tests when priority
859 # becomes an int instead of a string.
860 if isinstance(local_args['priority'], str):
861 local_args['priority'] = priorities.Priority.DEFAULT
862 # </migration hack>
jamesren4a41e012010-07-16 22:33:48 +0000863 arg_names, _, _, _ = inspect.getargspec(create_job_common)
864 return dict(item for item in local_args.iteritems() if item[0] in arg_names)
865
866
867def create_job_common(name, priority, control_type, control_file=None,
868 hosts=(), meta_hosts=(), one_time_hosts=(),
869 atomic_group_name=None, synch_count=None,
Simran Basi7e605742013-11-12 13:43:36 -0800870 is_template=False, timeout=None, timeout_mins=None,
871 max_runtime_mins=None, run_verify=True, email_list='',
872 dependencies=(), reboot_before=None, reboot_after=None,
jamesren4a41e012010-07-16 22:33:48 +0000873 parse_failed_repair=None, hostless=False, keyvals=None,
Aviv Keshet18308922013-02-19 17:49:49 -0800874 drone_set=None, parameterized_job=None,
Dan Shiec1d47d2015-02-13 11:38:13 -0800875 parent_job_id=None, test_retry=0, run_reset=True,
876 require_ssp=None):
Aviv Keshet18308922013-02-19 17:49:49 -0800877 #pylint: disable-msg=C0111
jamesren4a41e012010-07-16 22:33:48 +0000878 """
879 Common code between creating "standard" jobs and creating parameterized jobs
880 """
881 user = models.User.current_user()
882 owner = user.login
883
jamesren4a41e012010-07-16 22:33:48 +0000884 # input validation
885 if not (hosts or meta_hosts or one_time_hosts or atomic_group_name
886 or hostless):
887 raise model_logic.ValidationError({
888 'arguments' : "You must pass at least one of 'hosts', "
889 "'meta_hosts', 'one_time_hosts', "
890 "'atomic_group_name', or 'hostless'"
891 })
892
893 if hostless:
894 if hosts or meta_hosts or one_time_hosts or atomic_group_name:
895 raise model_logic.ValidationError({
896 'hostless': 'Hostless jobs cannot include any hosts!'})
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700897 server_type = control_data.CONTROL_TYPE_NAMES.SERVER
jamesren4a41e012010-07-16 22:33:48 +0000898 if control_type != server_type:
899 raise model_logic.ValidationError({
900 'control_type': 'Hostless jobs cannot use client-side '
901 'control files'})
902
Alex Miller871291b2013-08-08 01:19:20 -0700903 atomic_groups_by_name = dict((ag.name, ag)
jamesren4a41e012010-07-16 22:33:48 +0000904 for ag in models.AtomicGroup.objects.all())
Alex Miller871291b2013-08-08 01:19:20 -0700905 label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
jamesren4a41e012010-07-16 22:33:48 +0000906
907 # Schedule on an atomic group automagically if one of the labels given
908 # is an atomic group label and no explicit atomic_group_name was supplied.
909 if not atomic_group_name:
Alex Miller871291b2013-08-08 01:19:20 -0700910 for label in label_objects:
jamesren4a41e012010-07-16 22:33:48 +0000911 if label and label.atomic_group:
912 atomic_group_name = label.atomic_group.name
913 break
jamesren4a41e012010-07-16 22:33:48 +0000914 # convert hostnames & meta hosts to host/label objects
915 host_objects = models.Host.smart_get_bulk(hosts)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800916 if not server_utils.is_shard():
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800917 shard_host_map = bucket_hosts_by_shard(host_objects)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800918 num_shards = len(shard_host_map)
919 if (num_shards > 1 or (num_shards == 1 and
920 len(shard_host_map.values()[0]) != len(host_objects))):
921 # We disallow the following jobs on master:
922 # num_shards > 1: this is a job spanning across multiple shards.
923 # num_shards == 1 but number of hosts on shard is less
924 # than total number of hosts: this is a job that spans across
925 # one shard and the master.
926 raise ValueError(
927 'The following hosts are on shard(s), please create '
928 'seperate jobs for hosts on each shard: %s ' %
929 shard_host_map)
jamesren4a41e012010-07-16 22:33:48 +0000930 metahost_objects = []
Alex Miller871291b2013-08-08 01:19:20 -0700931 meta_host_labels_by_name = {label.name: label for label in label_objects}
jamesren4a41e012010-07-16 22:33:48 +0000932 for label_name in meta_hosts or []:
Alex Miller871291b2013-08-08 01:19:20 -0700933 if label_name in meta_host_labels_by_name:
934 metahost_objects.append(meta_host_labels_by_name[label_name])
jamesren4a41e012010-07-16 22:33:48 +0000935 elif label_name in atomic_groups_by_name:
936 # If given a metahost name that isn't a Label, check to
937 # see if the user was specifying an Atomic Group instead.
938 atomic_group = atomic_groups_by_name[label_name]
939 if atomic_group_name and atomic_group_name != atomic_group.name:
940 raise model_logic.ValidationError({
941 'meta_hosts': (
942 'Label "%s" not found. If assumed to be an '
943 'atomic group it would conflict with the '
944 'supplied atomic group "%s".' % (
945 label_name, atomic_group_name))})
946 atomic_group_name = atomic_group.name
947 else:
948 raise model_logic.ValidationError(
949 {'meta_hosts' : 'Label "%s" not found' % label_name})
950
951 # Create and sanity check an AtomicGroup object if requested.
952 if atomic_group_name:
953 if one_time_hosts:
954 raise model_logic.ValidationError(
955 {'one_time_hosts':
956 'One time hosts cannot be used with an Atomic Group.'})
957 atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
958 if synch_count and synch_count > atomic_group.max_number_of_machines:
959 raise model_logic.ValidationError(
960 {'atomic_group_name' :
961 'You have requested a synch_count (%d) greater than the '
962 'maximum machines in the requested Atomic Group (%d).' %
963 (synch_count, atomic_group.max_number_of_machines)})
964 else:
965 atomic_group = None
966
967 for host in one_time_hosts or []:
968 this_host = models.Host.create_one_time_host(host)
969 host_objects.append(this_host)
970
971 options = dict(name=name,
972 priority=priority,
973 control_file=control_file,
974 control_type=control_type,
975 is_template=is_template,
976 timeout=timeout,
Simran Basi7e605742013-11-12 13:43:36 -0800977 timeout_mins=timeout_mins,
Simran Basi34217022012-11-06 13:43:15 -0800978 max_runtime_mins=max_runtime_mins,
jamesren4a41e012010-07-16 22:33:48 +0000979 synch_count=synch_count,
980 run_verify=run_verify,
981 email_list=email_list,
982 dependencies=dependencies,
983 reboot_before=reboot_before,
984 reboot_after=reboot_after,
985 parse_failed_repair=parse_failed_repair,
986 keyvals=keyvals,
987 drone_set=drone_set,
Aviv Keshet18308922013-02-19 17:49:49 -0800988 parameterized_job=parameterized_job,
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800989 parent_job_id=parent_job_id,
Dan Shi07e09af2013-04-12 09:31:29 -0700990 test_retry=test_retry,
Dan Shiec1d47d2015-02-13 11:38:13 -0800991 run_reset=run_reset,
992 require_ssp=require_ssp)
jamesren4a41e012010-07-16 22:33:48 +0000993 return create_new_job(owner=owner,
994 options=options,
995 host_objects=host_objects,
996 metahost_objects=metahost_objects,
997 atomic_group=atomic_group)
Simran Basib6ec8ae2014-04-23 12:05:08 -0700998
999
1000def encode_ascii(control_file):
1001 """Force a control file to only contain ascii characters.
1002
1003 @param control_file: Control file to encode.
1004
1005 @returns the control file in an ascii encoding.
1006
1007 @raises error.ControlFileMalformed: if encoding fails.
1008 """
1009 try:
1010 return control_file.encode('ascii')
1011 except UnicodeDecodeError as e:
Jiaxi Luo421608e2014-07-07 14:38:00 -07001012 raise error.ControlFileMalformed(str(e))
1013
1014
1015def get_wmatrix_url():
1016 """Get wmatrix url from config file.
1017
1018 @returns the wmatrix url or an empty string.
1019 """
1020 return global_config.global_config.get_config_value('AUTOTEST_WEB',
1021 'wmatrix_url',
Jiaxi Luo15cbf372014-07-01 19:20:20 -07001022 default='')
Jiaxi Luo57bc1952014-07-22 15:27:30 -07001023
1024
1025def inject_times_to_filter(start_time_key=None, end_time_key=None,
1026 start_time_value=None, end_time_value=None,
1027 **filter_data):
1028 """Inject the key value pairs of start and end time if provided.
1029
1030 @param start_time_key: A string represents the filter key of start_time.
1031 @param end_time_key: A string represents the filter key of end_time.
1032 @param start_time_value: Start_time value.
1033 @param end_time_value: End_time value.
1034
1035 @returns the injected filter_data.
1036 """
1037 if start_time_value:
1038 filter_data[start_time_key] = start_time_value
1039 if end_time_value:
1040 filter_data[end_time_key] = end_time_value
1041 return filter_data
1042
1043
1044def inject_times_to_hqe_special_tasks_filters(filter_data_common,
1045 start_time, end_time):
1046 """Inject start and end time to hqe and special tasks filters.
1047
1048 @param filter_data_common: Common filter for hqe and special tasks.
1049 @param start_time_key: A string represents the filter key of start_time.
1050 @param end_time_key: A string represents the filter key of end_time.
1051
1052 @returns a pair of hqe and special tasks filters.
1053 """
1054 filter_data_special_tasks = filter_data_common.copy()
1055 return (inject_times_to_filter('started_on__gte', 'started_on__lte',
1056 start_time, end_time, **filter_data_common),
1057 inject_times_to_filter('time_started__gte', 'time_started__lte',
1058 start_time, end_time,
Jakob Juelich59cfe542014-09-02 16:37:46 -07001059 **filter_data_special_tasks))
1060
1061
1062def retrieve_shard(shard_hostname):
1063 """
Jakob Juelich77457572014-09-22 17:02:43 -07001064 Retrieves the shard with the given hostname from the database.
Jakob Juelich59cfe542014-09-02 16:37:46 -07001065
1066 @param shard_hostname: Hostname of the shard to retrieve
1067
Jakob Juelich77457572014-09-22 17:02:43 -07001068 @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
1069
Jakob Juelich59cfe542014-09-02 16:37:46 -07001070 @returns: Shard object
1071 """
MK Ryu509516b2015-05-18 12:00:47 -07001072 timer = autotest_stats.Timer('shard_heartbeat.retrieve_shard')
1073 with timer:
1074 return models.Shard.smart_get(shard_hostname)
Jakob Juelich59cfe542014-09-02 16:37:46 -07001075
1076
Jakob Juelich1b525742014-09-30 13:08:07 -07001077def find_records_for_shard(shard, known_job_ids, known_host_ids):
Jakob Juelich59cfe542014-09-02 16:37:46 -07001078 """Find records that should be sent to a shard.
1079
Jakob Juelicha94efe62014-09-18 16:02:49 -07001080 @param shard: Shard to find records for.
Jakob Juelich1b525742014-09-30 13:08:07 -07001081 @param known_job_ids: List of ids of jobs the shard already has.
1082 @param known_host_ids: List of ids of hosts the shard already has.
Jakob Juelicha94efe62014-09-18 16:02:49 -07001083
Fang Dengf3705992014-12-16 17:32:18 -08001084 @returns: Tuple of three lists for hosts, jobs, and suite job keyvals:
1085 (hosts, jobs, suite_job_keyvals).
Jakob Juelich59cfe542014-09-02 16:37:46 -07001086 """
MK Ryu509516b2015-05-18 12:00:47 -07001087 timer = autotest_stats.Timer('shard_heartbeat')
1088 with timer.get_client('find_hosts'):
1089 hosts = models.Host.assign_to_shard(shard, known_host_ids)
1090 with timer.get_client('find_jobs'):
1091 jobs = models.Job.assign_to_shard(shard, known_job_ids)
1092 with timer.get_client('find_suite_job_keyvals'):
1093 parent_job_ids = [job.parent_job_id for job in jobs]
1094 suite_job_keyvals = models.JobKeyval.objects.filter(
1095 job_id__in=parent_job_ids)
Fang Dengf3705992014-12-16 17:32:18 -08001096 return hosts, jobs, suite_job_keyvals
Jakob Juelicha94efe62014-09-18 16:02:49 -07001097
1098
1099def _persist_records_with_type_sent_from_shard(
1100 shard, records, record_type, *args, **kwargs):
1101 """
1102 Handle records of a specified type that were sent to the shard master.
1103
1104 @param shard: The shard the records were sent from.
1105 @param records: The records sent in their serialized format.
1106 @param record_type: Type of the objects represented by records.
1107 @param args: Additional arguments that will be passed on to the sanity
1108 checks.
1109 @param kwargs: Additional arguments that will be passed on to the sanity
1110 checks.
1111
1112 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1113
1114 @returns: List of primary keys of the processed records.
1115 """
1116 pks = []
1117 for serialized_record in records:
1118 pk = serialized_record['id']
1119 try:
1120 current_record = record_type.objects.get(pk=pk)
1121 except record_type.DoesNotExist:
1122 raise error.UnallowedRecordsSentToMaster(
1123 'Object with pk %s of type %s does not exist on master.' % (
1124 pk, record_type))
1125
1126 current_record.sanity_check_update_from_shard(
1127 shard, serialized_record, *args, **kwargs)
1128
1129 current_record.update_from_serialized(serialized_record)
1130 pks.append(pk)
1131 return pks
1132
1133
1134def persist_records_sent_from_shard(shard, jobs, hqes):
1135 """
1136 Sanity checking then saving serialized records sent to master from shard.
1137
1138 During heartbeats shards upload jobs and hostqueuentries. This performs
1139 some sanity checks on these and then updates the existing records for those
1140 entries with the updated ones from the heartbeat.
1141
1142 The sanity checks include:
1143 - Checking if the objects sent already exist on the master.
1144 - Checking if the objects sent were assigned to this shard.
1145 - hostqueueentries must be sent together with their jobs.
1146
1147 @param shard: The shard the records were sent from.
1148 @param jobs: The jobs the shard sent.
1149 @param hqes: The hostqueuentries the shart sent.
1150
1151 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1152 """
MK Ryu509516b2015-05-18 12:00:47 -07001153 timer = autotest_stats.Timer('shard_heartbeat')
1154 with timer.get_client('persist_jobs'):
1155 job_ids_sent = _persist_records_with_type_sent_from_shard(
1156 shard, jobs, models.Job)
Jakob Juelicha94efe62014-09-18 16:02:49 -07001157
MK Ryu509516b2015-05-18 12:00:47 -07001158 with timer.get_client('persist_hqes'):
1159 _persist_records_with_type_sent_from_shard(
1160 shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001161
1162
Jakob Juelich50e91f72014-10-01 12:43:23 -07001163def forward_single_host_rpc_to_shard(func):
1164 """This decorator forwards rpc calls that modify a host to a shard.
1165
1166 If a host is assigned to a shard, rpcs that change his attributes should be
1167 forwarded to the shard.
1168
1169 This assumes the first argument of the function represents a host id.
1170
1171 @param func: The function to decorate
1172
1173 @returns: The function to replace func with.
1174 """
1175 def replacement(**kwargs):
1176 # Only keyword arguments can be accepted here, as we need the argument
1177 # names to send the rpc. serviceHandler always provides arguments with
1178 # their keywords, so this is not a problem.
1179 host = models.Host.smart_get(kwargs['id'])
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -08001180 if host.shard and not server_utils.is_shard():
MK Ryu26f0c932015-05-28 18:14:33 -07001181 run_rpc_on_multiple_hostnames(func.func_name,
1182 [host.shard.rpc_hostname()],
Jakob Juelich50e91f72014-10-01 12:43:23 -07001183 **kwargs)
1184 return func(**kwargs)
1185
1186 return replacement
1187
1188
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001189def forward_multi_host_rpc_to_shards(func):
1190 """This decorator forwards rpc calls that modify multiple hosts.
1191
1192 If a host is assigned to a shard, rpcs that change his attributes should be
1193 forwarded to the shard. Some calls however, take a list of hosts and a
1194 single id to modify, eg: label_add_hosts. This wrapper will sift through
1195 the list of hosts, find each of their shards, and forward the rpc for
1196 those hosts to that shard before calling the local version of the given rpc.
1197
1198 This assumes:
1199 1. The rpc call uses `smart_get` to retrieve host objects, not the
1200 stock django `get` call. This is true for most, if not all rpcs in
1201 the rpc_interface.
1202 2. The kwargs to the function contain either a list of host ids or
1203 hostnames, keyed under 'hosts'. This is true for all the rpc
1204 functions that use 'smart_get'.
1205
1206 @param func: The function to decorate
1207
1208 @returns: The function to replace func with.
1209 """
1210 def replacement(**kwargs):
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001211 fanout_rpc(
1212 models.Host.smart_get_bulk(kwargs['hosts']),
1213 func.func_name, **kwargs)
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001214 return func(**kwargs)
1215
1216 return replacement
1217
1218
MK Ryufb5e3a82015-07-01 12:21:20 -07001219def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
1220 """Fanout the given rpc to shards of given hosts.
1221
1222 @param host_objs: Host objects for the rpc.
1223 @param rpc_name: The name of the rpc.
1224 @param include_hostnames: If True, include the hostnames in the kwargs.
1225 Hostnames are not always necessary, this functions is designed to
1226 send rpcs to the shard a host is on, the rpcs themselves could be
1227 related to labels, acls etc.
1228 @param kwargs: The kwargs for the rpc.
1229 """
1230 # Figure out which hosts are on which shards.
1231 shard_host_map = bucket_hosts_by_shard(
1232 host_objs, rpc_hostnames=True)
1233
1234 # Execute the rpc against the appropriate shards.
1235 for shard, hostnames in shard_host_map.iteritems():
1236 if include_hostnames:
1237 kwargs['hosts'] = hostnames
1238 try:
1239 run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
1240 except:
1241 ei = sys.exc_info()
1242 new_exc = error.RPCException('RPC %s failed on shard %s due to '
1243 '%s: %s' % (rpc_name, shard, ei[0].__name__, ei[1]))
1244 raise new_exc.__class__, new_exc, ei[2]
1245
1246
Jakob Juelich50e91f72014-10-01 12:43:23 -07001247def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1248 """Runs an rpc to multiple AFEs
1249
1250 This is i.e. used to propagate changes made to hosts after they are assigned
1251 to a shard.
1252
1253 @param rpc_call: Name of the rpc endpoint to call.
1254 @param shard_hostnames: List of hostnames to run the rpcs on.
1255 @param **kwargs: Keyword arguments to pass in the rpcs.
1256 """
MK Ryufb5e3a82015-07-01 12:21:20 -07001257 # Make sure this function is not called on shards but only on master.
1258 assert not server_utils.is_shard()
Jakob Juelich50e91f72014-10-01 12:43:23 -07001259 for shard_hostname in shard_hostnames:
MK Ryu9651ca52015-06-08 17:48:22 -07001260 afe = frontend_wrappers.RetryingAFE(server=shard_hostname)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001261 afe.run(rpc_call, **kwargs)
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001262
1263
1264def get_label(name):
1265 """Gets a label object using a given name.
1266
1267 @param name: Label name.
1268 @raises model.Label.DoesNotExist: when there is no label matching
1269 the given name.
1270 @return: a label object matching the given name.
1271 """
1272 try:
1273 label = models.Label.smart_get(name)
1274 except models.Label.DoesNotExist:
1275 return None
1276 return label
1277
1278
1279def get_global_afe_hostname():
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001280 """Read the hostname of the global AFE from the global configuration."""
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001281 return global_config.global_config.get_config_value(
MK Ryub1dc8242015-08-27 12:11:12 -07001282 'SERVER', 'global_afe_hostname')
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001283
1284
MK Ryufbb002c2015-06-08 14:13:16 -07001285def route_rpc_to_master(func):
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001286 """Route RPC to master AFE.
MK Ryu2d107562015-02-24 17:45:02 -08001287
MK Ryufbb002c2015-06-08 14:13:16 -07001288 @param func: The function to decorate
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001289
MK Ryufbb002c2015-06-08 14:13:16 -07001290 @returns: The function to replace func with.
MK Ryu2d107562015-02-24 17:45:02 -08001291 """
MK Ryufbb002c2015-06-08 14:13:16 -07001292 @wraps(func)
MK Ryuf6ab8a72015-07-06 10:19:48 -07001293 def replacement(*args, **kwargs):
1294 kwargs = inspect.getcallargs(func, *args, **kwargs)
MK Ryufbb002c2015-06-08 14:13:16 -07001295 if server_utils.is_shard():
MK Ryu9651ca52015-06-08 17:48:22 -07001296 afe = frontend_wrappers.RetryingAFE(
1297 server=get_global_afe_hostname())
1298 return afe.run(func.func_name, **kwargs)
MK Ryufbb002c2015-06-08 14:13:16 -07001299 return func(**kwargs)
1300 return replacement