blob: 23ca47b673248d90542791c402639c17d8d7db92 [file] [log] [blame]
Aviv Keshet18308922013-02-19 17:49:49 -08001#pylint: disable-msg=C0111
mblighe8819cd2008-02-15 16:48:40 +00002"""\
3Utility functions for rpc_interface.py. We keep them in a separate file so that
4only RPC interface functions go into that file.
5"""
6
7__author__ = 'showard@google.com (Steve Howard)'
8
MK Ryu84573e12015-02-18 15:54:09 -08009import datetime
MK Ryufbb002c2015-06-08 14:13:16 -070010from functools import wraps
MK Ryu84573e12015-02-18 15:54:09 -080011import inspect
12import os
13import sys
showard3d6ae112009-05-02 00:45:48 +000014import django.http
MK Ryu0a9c82e2015-09-17 17:54:01 -070015
16from autotest_lib.frontend import thread_local
Dan Shi07e09af2013-04-12 09:31:29 -070017from autotest_lib.frontend.afe import models, model_logic
Alex Miller4a193692013-08-21 13:59:01 -070018from autotest_lib.client.common_lib import control_data, error
Jiaxi Luo421608e2014-07-07 14:38:00 -070019from autotest_lib.client.common_lib import global_config, priorities
MK Ryu0c1a37d2015-04-30 12:00:55 -070020from autotest_lib.client.common_lib import time_utils
MK Ryu509516b2015-05-18 12:00:47 -070021from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080022from autotest_lib.server import utils as server_utils
MK Ryu9651ca52015-06-08 17:48:22 -070023from autotest_lib.server.cros import provision
24from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
mblighe8819cd2008-02-15 16:48:40 +000025
showarda62866b2008-07-28 21:27:41 +000026NULL_DATETIME = datetime.datetime.max
27NULL_DATE = datetime.date.max
28
mblighe8819cd2008-02-15 16:48:40 +000029def prepare_for_serialization(objects):
jadmanski0afbb632008-06-06 21:10:57 +000030 """
31 Prepare Python objects to be returned via RPC.
Aviv Keshet18308922013-02-19 17:49:49 -080032 @param objects: objects to be prepared.
jadmanski0afbb632008-06-06 21:10:57 +000033 """
34 if (isinstance(objects, list) and len(objects) and
35 isinstance(objects[0], dict) and 'id' in objects[0]):
36 objects = gather_unique_dicts(objects)
37 return _prepare_data(objects)
showardb8d34242008-04-25 18:11:16 +000038
39
showardc92da832009-04-07 18:14:34 +000040def prepare_rows_as_nested_dicts(query, nested_dict_column_names):
41 """
42 Prepare a Django query to be returned via RPC as a sequence of nested
43 dictionaries.
44
45 @param query - A Django model query object with a select_related() method.
46 @param nested_dict_column_names - A list of column/attribute names for the
47 rows returned by query to expand into nested dictionaries using
48 their get_object_dict() method when not None.
49
50 @returns An list suitable to returned in an RPC.
51 """
52 all_dicts = []
53 for row in query.select_related():
54 row_dict = row.get_object_dict()
55 for column in nested_dict_column_names:
56 if row_dict[column] is not None:
57 row_dict[column] = getattr(row, column).get_object_dict()
58 all_dicts.append(row_dict)
59 return prepare_for_serialization(all_dicts)
60
61
showardb8d34242008-04-25 18:11:16 +000062def _prepare_data(data):
jadmanski0afbb632008-06-06 21:10:57 +000063 """
64 Recursively process data structures, performing necessary type
65 conversions to values in data to allow for RPC serialization:
66 -convert datetimes to strings
showard2b9a88b2008-06-13 20:55:03 +000067 -convert tuples and sets to lists
jadmanski0afbb632008-06-06 21:10:57 +000068 """
69 if isinstance(data, dict):
70 new_data = {}
71 for key, value in data.iteritems():
72 new_data[key] = _prepare_data(value)
73 return new_data
showard2b9a88b2008-06-13 20:55:03 +000074 elif (isinstance(data, list) or isinstance(data, tuple) or
75 isinstance(data, set)):
jadmanski0afbb632008-06-06 21:10:57 +000076 return [_prepare_data(item) for item in data]
showard98659972008-07-17 17:00:07 +000077 elif isinstance(data, datetime.date):
showarda62866b2008-07-28 21:27:41 +000078 if data is NULL_DATETIME or data is NULL_DATE:
79 return None
jadmanski0afbb632008-06-06 21:10:57 +000080 return str(data)
81 else:
82 return data
mblighe8819cd2008-02-15 16:48:40 +000083
84
Moises Osorio2dda22e2014-09-16 15:56:24 -070085def fetchall_as_list_of_dicts(cursor):
86 """
87 Converts each row in the cursor to a dictionary so that values can be read
88 by using the column name.
89 @param cursor: The database cursor to read from.
90 @returns: A list of each row in the cursor as a dictionary.
91 """
92 desc = cursor.description
93 return [ dict(zip([col[0] for col in desc], row))
94 for row in cursor.fetchall() ]
95
96
showard3d6ae112009-05-02 00:45:48 +000097def raw_http_response(response_data, content_type=None):
98 response = django.http.HttpResponse(response_data, mimetype=content_type)
99 response['Content-length'] = str(len(response.content))
100 return response
101
102
showardb0dfb9f2008-06-06 18:08:02 +0000103def gather_unique_dicts(dict_iterable):
jadmanski0afbb632008-06-06 21:10:57 +0000104 """\
105 Pick out unique objects (by ID) from an iterable of object dicts.
106 """
107 id_set = set()
108 result = []
109 for obj in dict_iterable:
110 if obj['id'] not in id_set:
111 id_set.add(obj['id'])
112 result.append(obj)
113 return result
showardb0dfb9f2008-06-06 18:08:02 +0000114
115
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700116def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
jadmanski0afbb632008-06-06 21:10:57 +0000117 """\
118 Generate a SQL WHERE clause for job status filtering, and return it in
Simran Basi01984f52015-10-12 15:36:45 -0700119 a dict of keyword args to pass to query.extra().
showard6c65d252009-10-01 18:45:22 +0000120 * not_yet_run: all HQEs are Queued
121 * finished: all HQEs are complete
122 * running: everything else
jadmanski0afbb632008-06-06 21:10:57 +0000123 """
Simran Basi01984f52015-10-12 15:36:45 -0700124 if not (not_yet_run or running or finished):
125 return {}
showardeab66ce2009-12-23 00:03:56 +0000126 not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
127 'WHERE status != "%s")'
showard6c65d252009-10-01 18:45:22 +0000128 % models.HostQueueEntry.Status.QUEUED)
showardeab66ce2009-12-23 00:03:56 +0000129 not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
130 'WHERE not complete)')
showard6c65d252009-10-01 18:45:22 +0000131
Simran Basi01984f52015-10-12 15:36:45 -0700132 where = []
jadmanski0afbb632008-06-06 21:10:57 +0000133 if not_yet_run:
Simran Basi01984f52015-10-12 15:36:45 -0700134 where.append('id NOT IN ' + not_queued)
135 if running:
136 where.append('(id IN %s) AND (id IN %s)' % (not_queued, not_finished))
137 if finished:
138 where.append('id NOT IN ' + not_finished)
139 return {'where': [' OR '.join(['(%s)' % x for x in where])]}
mblighe8819cd2008-02-15 16:48:40 +0000140
141
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700142def extra_job_type_filters(extra_args, suite=False,
143 sub=False, standalone=False):
144 """\
145 Generate a SQL WHERE clause for job status filtering, and return it in
146 a dict of keyword args to pass to query.extra().
147
148 param extra_args: a dict of existing extra_args.
149
150 No more than one of the parameters should be passed as True:
151 * suite: job which is parent of other jobs
152 * sub: job with a parent job
153 * standalone: job with no child or parent jobs
154 """
155 assert not ((suite and sub) or
156 (suite and standalone) or
157 (sub and standalone)), ('Cannot specify more than one '
158 'filter to this function')
159
160 where = extra_args.get('where', [])
161 parent_job_id = ('DISTINCT parent_job_id')
162 child_job_id = ('id')
163 filter_common = ('(SELECT %s FROM afe_jobs '
164 'WHERE parent_job_id IS NOT NULL)')
165
166 if suite:
167 where.append('id IN ' + filter_common % parent_job_id)
168 elif sub:
169 where.append('id IN ' + filter_common % child_job_id)
170 elif standalone:
171 where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
172 'WHERE parent_job_id IS NOT NULL'
173 ' AND (sub_query.parent_job_id=afe_jobs.id'
174 ' OR sub_query.id=afe_jobs.id))')
175 else:
176 return extra_args
177
178 extra_args['where'] = where
179 return extra_args
180
181
182
showard87cc38f2009-08-20 23:37:04 +0000183def extra_host_filters(multiple_labels=()):
jadmanski0afbb632008-06-06 21:10:57 +0000184 """\
185 Generate SQL WHERE clauses for matching hosts in an intersection of
186 labels.
187 """
188 extra_args = {}
showardeab66ce2009-12-23 00:03:56 +0000189 where_str = ('afe_hosts.id in (select host_id from afe_hosts_labels '
jadmanski0afbb632008-06-06 21:10:57 +0000190 'where label_id=%s)')
191 extra_args['where'] = [where_str] * len(multiple_labels)
192 extra_args['params'] = [models.Label.smart_get(label).id
193 for label in multiple_labels]
194 return extra_args
showard8e3aa5e2008-04-08 19:42:32 +0000195
196
showard87cc38f2009-08-20 23:37:04 +0000197def get_host_query(multiple_labels, exclude_only_if_needed_labels,
showard8aa84fc2009-09-16 17:17:55 +0000198 exclude_atomic_group_hosts, valid_only, filter_data):
199 if valid_only:
200 query = models.Host.valid_objects.all()
201 else:
202 query = models.Host.objects.all()
203
showard43a3d262008-11-12 18:17:05 +0000204 if exclude_only_if_needed_labels:
205 only_if_needed_labels = models.Label.valid_objects.filter(
206 only_if_needed=True)
showardf7eac6f2008-11-13 21:18:01 +0000207 if only_if_needed_labels.count() > 0:
showard87cc38f2009-08-20 23:37:04 +0000208 only_if_needed_ids = ','.join(
209 str(label['id'])
210 for label in only_if_needed_labels.values('id'))
showardf7eac6f2008-11-13 21:18:01 +0000211 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000212 query, 'afe_hosts_labels', join_key='host_id',
213 join_condition=('afe_hosts_labels_exclude_OIN.label_id IN (%s)'
showard87cc38f2009-08-20 23:37:04 +0000214 % only_if_needed_ids),
215 suffix='_exclude_OIN', exclude=True)
showard8aa84fc2009-09-16 17:17:55 +0000216
showard87cc38f2009-08-20 23:37:04 +0000217 if exclude_atomic_group_hosts:
218 atomic_group_labels = models.Label.valid_objects.filter(
219 atomic_group__isnull=False)
220 if atomic_group_labels.count() > 0:
221 atomic_group_label_ids = ','.join(
222 str(atomic_group['id'])
223 for atomic_group in atomic_group_labels.values('id'))
224 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000225 query, 'afe_hosts_labels', join_key='host_id',
226 join_condition=(
227 'afe_hosts_labels_exclude_AG.label_id IN (%s)'
228 % atomic_group_label_ids),
showard87cc38f2009-08-20 23:37:04 +0000229 suffix='_exclude_AG', exclude=True)
Fang Deng04d30612013-04-10 18:13:13 -0700230 try:
231 assert 'extra_args' not in filter_data
232 filter_data['extra_args'] = extra_host_filters(multiple_labels)
233 return models.Host.query_objects(filter_data, initial_query=query)
234 except models.Label.DoesNotExist as e:
235 return models.Host.objects.none()
showard43a3d262008-11-12 18:17:05 +0000236
237
showard8fd58242008-03-10 21:29:07 +0000238class InconsistencyException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000239 'Raised when a list of objects does not have a consistent value'
showard8fd58242008-03-10 21:29:07 +0000240
241
242def get_consistent_value(objects, field):
mblighc5ddfd12008-08-04 17:15:00 +0000243 if not objects:
244 # well a list of nothing is consistent
245 return None
246
jadmanski0afbb632008-06-06 21:10:57 +0000247 value = getattr(objects[0], field)
248 for obj in objects:
249 this_value = getattr(obj, field)
250 if this_value != value:
251 raise InconsistencyException(objects[0], obj)
252 return value
showard8fd58242008-03-10 21:29:07 +0000253
254
Matthew Sartori10438092015-06-24 14:30:18 -0700255def afe_test_dict_to_test_object(test_dict):
256 if not isinstance(test_dict, dict):
257 return test_dict
258
259 numerized_dict = {}
260 for key, value in test_dict.iteritems():
261 try:
262 numerized_dict[key] = int(value)
263 except (ValueError, TypeError):
264 numerized_dict[key] = value
265
266 return type('TestObject', (object,), numerized_dict)
267
268
269def prepare_generate_control_file(tests, kernel, label, profilers,
270 db_tests=True):
271 if db_tests:
272 test_objects = [models.Test.smart_get(test) for test in tests]
273 else:
274 test_objects = [afe_test_dict_to_test_object(test) for test in tests]
275
showard2b9a88b2008-06-13 20:55:03 +0000276 profiler_objects = [models.Profiler.smart_get(profiler)
277 for profiler in profilers]
jadmanski0afbb632008-06-06 21:10:57 +0000278 # ensure tests are all the same type
279 try:
280 test_type = get_consistent_value(test_objects, 'test_type')
281 except InconsistencyException, exc:
282 test1, test2 = exc.args
mblighec5546d2008-06-16 16:51:28 +0000283 raise model_logic.ValidationError(
Matthew Sartori10438092015-06-24 14:30:18 -0700284 {'tests' : 'You cannot run both test_suites and server-side '
jadmanski0afbb632008-06-06 21:10:57 +0000285 'tests together (tests %s and %s differ' % (
286 test1.name, test2.name)})
showard8fd58242008-03-10 21:29:07 +0000287
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700288 is_server = (test_type == control_data.CONTROL_TYPE.SERVER)
showard14374b12009-01-31 00:11:54 +0000289 if test_objects:
290 synch_count = max(test.sync_count for test in test_objects)
291 else:
292 synch_count = 1
jadmanski0afbb632008-06-06 21:10:57 +0000293 if label:
294 label = models.Label.smart_get(label)
mblighe8819cd2008-02-15 16:48:40 +0000295
Matthew Sartori10438092015-06-24 14:30:18 -0700296 if db_tests:
297 dependencies = set(label.name for label
298 in models.Label.objects.filter(test__in=test_objects))
299 else:
300 dependencies = reduce(
301 set.union, [set(test.dependencies) for test in test_objects])
showard989f25d2008-10-01 11:38:11 +0000302
showard2bab8f42008-11-12 18:15:22 +0000303 cf_info = dict(is_server=is_server, synch_count=synch_count,
304 dependencies=list(dependencies))
305 return cf_info, test_objects, profiler_objects, label
showard989f25d2008-10-01 11:38:11 +0000306
307
308def check_job_dependencies(host_objects, job_dependencies):
309 """
310 Check that a set of machines satisfies a job's dependencies.
311 host_objects: list of models.Host objects
312 job_dependencies: list of names of labels
313 """
314 # check that hosts satisfy dependencies
315 host_ids = [host.id for host in host_objects]
316 hosts_in_job = models.Host.objects.filter(id__in=host_ids)
317 ok_hosts = hosts_in_job
318 for index, dependency in enumerate(job_dependencies):
Alex Milleraa772002014-04-10 17:51:21 -0700319 if not provision.is_for_special_action(dependency):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700320 ok_hosts = ok_hosts.filter(labels__name=dependency)
showard989f25d2008-10-01 11:38:11 +0000321 failing_hosts = (set(host.hostname for host in host_objects) -
322 set(host.hostname for host in ok_hosts))
323 if failing_hosts:
324 raise model_logic.ValidationError(
Eric Lie0493a42010-11-15 13:05:43 -0800325 {'hosts' : 'Host(s) failed to meet job dependencies (' +
326 (', '.join(job_dependencies)) + '): ' +
327 (', '.join(failing_hosts))})
328
showard989f25d2008-10-01 11:38:11 +0000329
Alex Miller4a193692013-08-21 13:59:01 -0700330def check_job_metahost_dependencies(metahost_objects, job_dependencies):
331 """
332 Check that at least one machine within the metahost spec satisfies the job's
333 dependencies.
334
335 @param metahost_objects A list of label objects representing the metahosts.
336 @param job_dependencies A list of strings of the required label names.
337 @raises NoEligibleHostException If a metahost cannot run the job.
338 """
339 for metahost in metahost_objects:
340 hosts = models.Host.objects.filter(labels=metahost)
341 for label_name in job_dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700342 if not provision.is_for_special_action(label_name):
Alex Miller4a193692013-08-21 13:59:01 -0700343 hosts = hosts.filter(labels__name=label_name)
344 if not any(hosts):
345 raise error.NoEligibleHostException("No hosts within %s satisfy %s."
346 % (metahost.name, ', '.join(job_dependencies)))
347
showard2bab8f42008-11-12 18:15:22 +0000348
349def _execution_key_for(host_queue_entry):
350 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
351
352
353def check_abort_synchronous_jobs(host_queue_entries):
354 # ensure user isn't aborting part of a synchronous autoserv execution
355 count_per_execution = {}
356 for queue_entry in host_queue_entries:
357 key = _execution_key_for(queue_entry)
358 count_per_execution.setdefault(key, 0)
359 count_per_execution[key] += 1
360
361 for queue_entry in host_queue_entries:
362 if not queue_entry.execution_subdir:
363 continue
364 execution_count = count_per_execution[_execution_key_for(queue_entry)]
365 if execution_count < queue_entry.job.synch_count:
mbligh1ef218d2009-08-03 16:57:56 +0000366 raise model_logic.ValidationError(
367 {'' : 'You cannot abort part of a synchronous job execution '
368 '(%d/%s), %d included, %d expected'
369 % (queue_entry.job.id, queue_entry.execution_subdir,
370 execution_count, queue_entry.job.synch_count)})
showard8fbae652009-01-20 23:23:10 +0000371
372
showardc92da832009-04-07 18:14:34 +0000373def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700374 dependencies, atomic_group):
showardc92da832009-04-07 18:14:34 +0000375 """
376 Attempt to reject create_job requests with an atomic group that
377 will be impossible to schedule. The checks are not perfect but
378 should catch the most obvious issues.
379
380 @param synch_count - The job's minimum synch count.
381 @param host_objects - A list of models.Host instances.
382 @param metahost_objects - A list of models.Label instances.
383 @param dependencies - A list of job dependency label names.
showardc92da832009-04-07 18:14:34 +0000384 @param labels_by_name - A dictionary mapping label names to models.Label
385 instance. Used to look up instances for dependencies.
386
387 @raises model_logic.ValidationError - When an issue is found.
388 """
389 # If specific host objects were supplied with an atomic group, verify
390 # that there are enough to satisfy the synch_count.
391 minimum_required = synch_count or 1
392 if (host_objects and not metahost_objects and
393 len(host_objects) < minimum_required):
394 raise model_logic.ValidationError(
395 {'hosts':
396 'only %d hosts provided for job with synch_count = %d' %
397 (len(host_objects), synch_count)})
398
399 # Check that the atomic group has a hope of running this job
400 # given any supplied metahosts and dependancies that may limit.
401
402 # Get a set of hostnames in the atomic group.
403 possible_hosts = set()
404 for label in atomic_group.label_set.all():
405 possible_hosts.update(h.hostname for h in label.host_set.all())
406
407 # Filter out hosts that don't match all of the job dependency labels.
Alex Miller871291b2013-08-08 01:19:20 -0700408 for label in models.Label.objects.filter(name__in=dependencies):
showardc92da832009-04-07 18:14:34 +0000409 hosts_in_label = (h.hostname for h in label.host_set.all())
410 possible_hosts.intersection_update(hosts_in_label)
411
showard225bdc12009-04-13 16:09:21 +0000412 if not host_objects and not metahost_objects:
413 # No hosts or metahosts are required to queue an atomic group Job.
414 # However, if they are given, we respect them below.
415 host_set = possible_hosts
416 else:
417 host_set = set(host.hostname for host in host_objects)
418 unusable_host_set = host_set.difference(possible_hosts)
419 if unusable_host_set:
420 raise model_logic.ValidationError(
421 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %
422 (', '.join(sorted(unusable_host_set)), atomic_group.name)})
showardc92da832009-04-07 18:14:34 +0000423
424 # Lookup hosts provided by each meta host and merge them into the
425 # host_set for final counting.
426 for meta_host in metahost_objects:
427 meta_possible = possible_hosts.copy()
428 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())
429 meta_possible.intersection_update(hosts_in_meta_host)
430
431 # Count all hosts that this meta_host will provide.
432 host_set.update(meta_possible)
433
434 if len(host_set) < minimum_required:
435 raise model_logic.ValidationError(
436 {'atomic_group_name':
437 'Insufficient hosts in Atomic Group "%s" with the'
438 ' supplied dependencies and meta_hosts.' %
439 (atomic_group.name,)})
440
441
showardbe0d8692009-08-20 23:42:44 +0000442def check_modify_host(update_data):
443 """
444 Sanity check modify_host* requests.
445
446 @param update_data: A dictionary with the changes to make to a host
447 or hosts.
448 """
449 # Only the scheduler (monitor_db) is allowed to modify Host status.
450 # Otherwise race conditions happen as a hosts state is changed out from
451 # beneath tasks being run on a host.
452 if 'status' in update_data:
453 raise model_logic.ValidationError({
454 'status': 'Host status can not be modified by the frontend.'})
455
456
showardce7c0922009-09-11 18:39:24 +0000457def check_modify_host_locking(host, update_data):
458 """
459 Checks when locking/unlocking has been requested if the host is already
460 locked/unlocked.
461
462 @param host: models.Host object to be modified
463 @param update_data: A dictionary with the changes to make to the host.
464 """
465 locked = update_data.get('locked', None)
Matthew Sartori68186332015-04-27 17:19:53 -0700466 lock_reason = update_data.get('lock_reason', None)
showardce7c0922009-09-11 18:39:24 +0000467 if locked is not None:
468 if locked and host.locked:
469 raise model_logic.ValidationError({
470 'locked': 'Host already locked by %s on %s.' %
471 (host.locked_by, host.lock_time)})
472 if not locked and not host.locked:
473 raise model_logic.ValidationError({
474 'locked': 'Host already unlocked.'})
Matthew Sartori68186332015-04-27 17:19:53 -0700475 if locked and not lock_reason and not host.locked:
476 raise model_logic.ValidationError({
477 'locked': 'Please provide a reason for locking'})
showardce7c0922009-09-11 18:39:24 +0000478
479
showard8fbae652009-01-20 23:23:10 +0000480def get_motd():
481 dirname = os.path.dirname(__file__)
482 filename = os.path.join(dirname, "..", "..", "motd.txt")
483 text = ''
484 try:
485 fp = open(filename, "r")
486 try:
487 text = fp.read()
488 finally:
489 fp.close()
490 except:
491 pass
492
493 return text
showard29f7cd22009-04-29 21:16:24 +0000494
495
496def _get_metahost_counts(metahost_objects):
497 metahost_counts = {}
498 for metahost in metahost_objects:
499 metahost_counts.setdefault(metahost, 0)
500 metahost_counts[metahost] += 1
501 return metahost_counts
502
503
showarda965cef2009-05-15 23:17:41 +0000504def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
showard29f7cd22009-04-29 21:16:24 +0000505 hosts = []
506 one_time_hosts = []
507 meta_hosts = []
508 atomic_group = None
jamesren2275ef12010-04-12 18:25:06 +0000509 hostless = False
showard29f7cd22009-04-29 21:16:24 +0000510
showard4d077562009-05-08 18:24:36 +0000511 queue_entries = job.hostqueueentry_set.all()
showarda965cef2009-05-15 23:17:41 +0000512 if queue_entry_filter_data:
513 queue_entries = models.HostQueueEntry.query_objects(
514 queue_entry_filter_data, initial_query=queue_entries)
showard4d077562009-05-08 18:24:36 +0000515
516 for queue_entry in queue_entries:
showard29f7cd22009-04-29 21:16:24 +0000517 if (queue_entry.host and (preserve_metahosts or
518 not queue_entry.meta_host)):
519 if queue_entry.deleted:
520 continue
521 if queue_entry.host.invalid:
522 one_time_hosts.append(queue_entry.host)
523 else:
524 hosts.append(queue_entry.host)
jamesren2275ef12010-04-12 18:25:06 +0000525 elif queue_entry.meta_host:
showard29f7cd22009-04-29 21:16:24 +0000526 meta_hosts.append(queue_entry.meta_host)
jamesren2275ef12010-04-12 18:25:06 +0000527 else:
528 hostless = True
529
showard29f7cd22009-04-29 21:16:24 +0000530 if atomic_group is None:
531 if queue_entry.atomic_group is not None:
532 atomic_group = queue_entry.atomic_group
533 else:
534 assert atomic_group.name == queue_entry.atomic_group.name, (
535 'DB inconsistency. HostQueueEntries with multiple atomic'
536 ' groups on job %s: %s != %s' % (
537 id, atomic_group.name, queue_entry.atomic_group.name))
538
539 meta_host_counts = _get_metahost_counts(meta_hosts)
540
541 info = dict(dependencies=[label.name for label
542 in job.dependency_labels.all()],
543 hosts=hosts,
544 meta_hosts=meta_hosts,
545 meta_host_counts=meta_host_counts,
546 one_time_hosts=one_time_hosts,
jamesren2275ef12010-04-12 18:25:06 +0000547 atomic_group=atomic_group,
548 hostless=hostless)
showard29f7cd22009-04-29 21:16:24 +0000549 return info
550
551
showard09d80f92009-11-19 01:01:19 +0000552def check_for_duplicate_hosts(host_objects):
553 host_ids = set()
554 duplicate_hostnames = set()
555 for host in host_objects:
556 if host.id in host_ids:
557 duplicate_hostnames.add(host.hostname)
558 host_ids.add(host.id)
559
560 if duplicate_hostnames:
561 raise model_logic.ValidationError(
562 {'hosts' : 'Duplicate hosts: %s'
563 % ', '.join(duplicate_hostnames)})
564
565
showarda1e74b32009-05-12 17:32:04 +0000566def create_new_job(owner, options, host_objects, metahost_objects,
567 atomic_group=None):
showard29f7cd22009-04-29 21:16:24 +0000568 all_host_objects = host_objects + metahost_objects
showarda1e74b32009-05-12 17:32:04 +0000569 dependencies = options.get('dependencies', [])
570 synch_count = options.get('synch_count')
showard29f7cd22009-04-29 21:16:24 +0000571
showard29f7cd22009-04-29 21:16:24 +0000572 if atomic_group:
573 check_atomic_group_create_job(
574 synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700575 dependencies, atomic_group)
showard29f7cd22009-04-29 21:16:24 +0000576 else:
577 if synch_count is not None and synch_count > len(all_host_objects):
578 raise model_logic.ValidationError(
579 {'hosts':
580 'only %d hosts provided for job with synch_count = %d' %
581 (len(all_host_objects), synch_count)})
582 atomic_hosts = models.Host.objects.filter(
583 id__in=[host.id for host in host_objects],
584 labels__atomic_group=True)
585 unusable_host_names = [host.hostname for host in atomic_hosts]
586 if unusable_host_names:
587 raise model_logic.ValidationError(
588 {'hosts':
589 'Host(s) "%s" are atomic group hosts but no '
590 'atomic group was specified for this job.' %
591 (', '.join(unusable_host_names),)})
592
showard09d80f92009-11-19 01:01:19 +0000593 check_for_duplicate_hosts(host_objects)
showard29f7cd22009-04-29 21:16:24 +0000594
Aviv Keshetc68807e2013-07-31 16:13:01 -0700595 for label_name in dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700596 if provision.is_for_special_action(label_name):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700597 # TODO: We could save a few queries
598 # if we had a bulk ensure-label-exists function, which used
599 # a bulk .get() call. The win is probably very small.
Alex Miller871291b2013-08-08 01:19:20 -0700600 _ensure_label_exists(label_name)
Aviv Keshetc68807e2013-07-31 16:13:01 -0700601
Alex Miller4a193692013-08-21 13:59:01 -0700602 # This only checks targeted hosts, not hosts eligible due to the metahost
603 check_job_dependencies(host_objects, dependencies)
604 check_job_metahost_dependencies(metahost_objects, dependencies)
605
Alex Miller871291b2013-08-08 01:19:20 -0700606 options['dependencies'] = list(
607 models.Label.objects.filter(name__in=dependencies))
showard29f7cd22009-04-29 21:16:24 +0000608
showarda1e74b32009-05-12 17:32:04 +0000609 for label in metahost_objects + options['dependencies']:
showard29f7cd22009-04-29 21:16:24 +0000610 if label.atomic_group and not atomic_group:
611 raise model_logic.ValidationError(
612 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000613 'Dependency %r requires an atomic group but no '
614 'atomic_group_name or meta_host in an atomic group was '
615 'specified for this job.' % label.name})
showard29f7cd22009-04-29 21:16:24 +0000616 elif (label.atomic_group and
617 label.atomic_group.name != atomic_group.name):
618 raise model_logic.ValidationError(
619 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000620 'meta_hosts or dependency %r requires atomic group '
621 '%r instead of the supplied atomic_group_name=%r.' %
622 (label.name, label.atomic_group.name, atomic_group.name)})
showard29f7cd22009-04-29 21:16:24 +0000623
showarda1e74b32009-05-12 17:32:04 +0000624 job = models.Job.create(owner=owner, options=options,
625 hosts=all_host_objects)
showard29f7cd22009-04-29 21:16:24 +0000626 job.queue(all_host_objects, atomic_group=atomic_group,
showarda1e74b32009-05-12 17:32:04 +0000627 is_template=options.get('is_template', False))
showard29f7cd22009-04-29 21:16:24 +0000628 return job.id
showard0957a842009-05-11 19:25:08 +0000629
630
Aviv Keshetc68807e2013-07-31 16:13:01 -0700631def _ensure_label_exists(name):
632 """
633 Ensure that a label called |name| exists in the Django models.
634
635 This function is to be called from within afe rpcs only, as an
636 alternative to server.cros.provision.ensure_label_exists(...). It works
637 by Django model manipulation, rather than by making another create_label
638 rpc call.
639
640 @param name: the label to check for/create.
641 @raises ValidationError: There was an error in the response that was
642 not because the label already existed.
643 @returns True is a label was created, False otherwise.
644 """
MK Ryu73be9862015-07-06 12:25:00 -0700645 # Make sure this function is not called on shards but only on master.
646 assert not server_utils.is_shard()
Aviv Keshetc68807e2013-07-31 16:13:01 -0700647 try:
648 models.Label.objects.get(name=name)
649 except models.Label.DoesNotExist:
650 new_label = models.Label.objects.create(name=name)
651 new_label.save()
652 return True
653 return False
654
655
showard909c9142009-07-07 20:54:42 +0000656def find_platform_and_atomic_group(host):
657 """
658 Figure out the platform name and atomic group name for the given host
659 object. If none, the return value for either will be None.
660
661 @returns (platform name, atomic group name) for the given host.
662 """
showard0957a842009-05-11 19:25:08 +0000663 platforms = [label.name for label in host.label_list if label.platform]
664 if not platforms:
showard909c9142009-07-07 20:54:42 +0000665 platform = None
666 else:
667 platform = platforms[0]
showard0957a842009-05-11 19:25:08 +0000668 if len(platforms) > 1:
669 raise ValueError('Host %s has more than one platform: %s' %
670 (host.hostname, ', '.join(platforms)))
showard909c9142009-07-07 20:54:42 +0000671 for label in host.label_list:
672 if label.atomic_group:
673 atomic_group_name = label.atomic_group.name
674 break
675 else:
676 atomic_group_name = None
677 # Don't check for multiple atomic groups on a host here. That is an
678 # error but should not trip up the RPC interface. monitor_db_cleanup
679 # deals with it. This just returns the first one found.
680 return platform, atomic_group_name
showardc0ac3a72009-07-08 21:14:45 +0000681
682
683# support for get_host_queue_entries_and_special_tasks()
684
MK Ryu0c1a37d2015-04-30 12:00:55 -0700685def _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on):
showardc0ac3a72009-07-08 21:14:45 +0000686 return dict(type=type,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700687 host=entry['host'],
showardc0ac3a72009-07-08 21:14:45 +0000688 job=job_dict,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700689 execution_path=exec_path,
690 status=status,
691 started_on=started_on,
692 id=str(entry['id']) + type,
693 oid=entry['id'])
showardc0ac3a72009-07-08 21:14:45 +0000694
695
MK Ryu0c1a37d2015-04-30 12:00:55 -0700696def _special_task_to_dict(task, queue_entries):
697 """Transforms a special task dictionary to another form of dictionary.
698
699 @param task Special task as a dictionary type
700 @param queue_entries Host queue entries as a list of dictionaries.
701
702 @return Transformed dictionary for a special task.
703 """
showardc0ac3a72009-07-08 21:14:45 +0000704 job_dict = None
MK Ryu0c1a37d2015-04-30 12:00:55 -0700705 if task['queue_entry']:
706 # Scan queue_entries to get the job detail info.
707 for qentry in queue_entries:
708 if task['queue_entry']['id'] == qentry['id']:
709 job_dict = qentry['job']
710 break
711 # If not found, get it from DB.
712 if job_dict is None:
713 job = models.Job.objects.get(id=task['queue_entry']['job'])
714 job_dict = job.get_object_dict()
715
716 exec_path = server_utils.get_special_task_exec_path(
717 task['host']['hostname'], task['id'], task['task'],
718 time_utils.time_string_to_datetime(task['time_requested']))
719 status = server_utils.get_special_task_status(
720 task['is_complete'], task['success'], task['is_active'])
721 return _common_entry_to_dict(task, task['task'], job_dict,
722 exec_path, status, task['time_started'])
showardc0ac3a72009-07-08 21:14:45 +0000723
724
725def _queue_entry_to_dict(queue_entry):
MK Ryu0c1a37d2015-04-30 12:00:55 -0700726 job_dict = queue_entry['job']
727 tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner'])
728 exec_path = server_utils.get_hqe_exec_path(tag,
729 queue_entry['execution_subdir'])
730 return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path,
731 queue_entry['status'], queue_entry['started_on'])
732
733
734def prepare_host_queue_entries_and_special_tasks(interleaved_entries,
735 queue_entries):
736 """
737 Prepare for serialization the interleaved entries of host queue entries
738 and special tasks.
739 Each element in the entries is a dictionary type.
740 The special task dictionary has only a job id for a job and lacks
741 the detail of the job while the host queue entry dictionary has.
742 queue_entries is used to look up the job detail info.
743
744 @param interleaved_entries Host queue entries and special tasks as a list
745 of dictionaries.
746 @param queue_entries Host queue entries as a list of dictionaries.
747
748 @return A post-processed list of dictionaries that is to be serialized.
749 """
750 dict_list = []
751 for e in interleaved_entries:
752 # Distinguish the two mixed entries based on the existence of
753 # the key "task". If an entry has the key, the entry is for
754 # special task. Otherwise, host queue entry.
755 if 'task' in e:
756 dict_list.append(_special_task_to_dict(e, queue_entries))
757 else:
758 dict_list.append(_queue_entry_to_dict(e))
759 return prepare_for_serialization(dict_list)
showardc0ac3a72009-07-08 21:14:45 +0000760
761
762def _compute_next_job_for_tasks(queue_entries, special_tasks):
763 """
764 For each task, try to figure out the next job that ran after that task.
765 This is done using two pieces of information:
766 * if the task has a queue entry, we can use that entry's job ID.
767 * if the task has a time_started, we can try to compare that against the
768 started_on field of queue_entries. this isn't guaranteed to work perfectly
769 since queue_entries may also have null started_on values.
770 * if the task has neither, or if use of time_started fails, just use the
771 last computed job ID.
MK Ryu0c1a37d2015-04-30 12:00:55 -0700772
773 @param queue_entries Host queue entries as a list of dictionaries.
774 @param special_tasks Special tasks as a list of dictionaries.
showardc0ac3a72009-07-08 21:14:45 +0000775 """
776 next_job_id = None # most recently computed next job
777 hqe_index = 0 # index for scanning by started_on times
778 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700779 if task['queue_entry']:
780 next_job_id = task['queue_entry']['job']
781 elif task['time_started'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000782 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700783 if queue_entry['started_on'] is None:
showardc0ac3a72009-07-08 21:14:45 +0000784 continue
MK Ryu0c1a37d2015-04-30 12:00:55 -0700785 t1 = time_utils.time_string_to_datetime(
786 queue_entry['started_on'])
787 t2 = time_utils.time_string_to_datetime(task['time_started'])
788 if t1 < t2:
showardc0ac3a72009-07-08 21:14:45 +0000789 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700790 next_job_id = queue_entry['job']['id']
showardc0ac3a72009-07-08 21:14:45 +0000791
MK Ryu0c1a37d2015-04-30 12:00:55 -0700792 task['next_job_id'] = next_job_id
showardc0ac3a72009-07-08 21:14:45 +0000793
794 # advance hqe_index to just after next_job_id
795 if next_job_id is not None:
796 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700797 if queue_entry['job']['id'] < next_job_id:
showardc0ac3a72009-07-08 21:14:45 +0000798 break
799 hqe_index += 1
800
801
802def interleave_entries(queue_entries, special_tasks):
803 """
804 Both lists should be ordered by descending ID.
805 """
806 _compute_next_job_for_tasks(queue_entries, special_tasks)
807
808 # start with all special tasks that've run since the last job
809 interleaved_entries = []
810 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700811 if task['next_job_id'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000812 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700813 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000814
815 # now interleave queue entries with the remaining special tasks
816 special_task_index = len(interleaved_entries)
817 for queue_entry in queue_entries:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700818 interleaved_entries.append(queue_entry)
showardc0ac3a72009-07-08 21:14:45 +0000819 # add all tasks that ran between this job and the previous one
820 for task in special_tasks[special_task_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700821 if task['next_job_id'] < queue_entry['job']['id']:
showardc0ac3a72009-07-08 21:14:45 +0000822 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700823 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000824 special_task_index += 1
825
826 return interleaved_entries
jamesren4a41e012010-07-16 22:33:48 +0000827
828
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800829def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
830 """Figure out which hosts are on which shards.
831
832 @param host_objs: A list of host objects.
833 @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
834 instead of the 'real' shard hostnames. This only matters for testing
835 environments.
836
837 @return: A map of shard hostname: list of hosts on the shard.
838 """
839 shard_host_map = {}
840 for host in host_objs:
841 if host.shard:
842 shard_name = (host.shard.rpc_hostname() if rpc_hostnames
843 else host.shard.hostname)
844 shard_host_map.setdefault(shard_name, []).append(host.hostname)
845 return shard_host_map
846
847
jamesren4a41e012010-07-16 22:33:48 +0000848def get_create_job_common_args(local_args):
849 """
850 Returns a dict containing only the args that apply for create_job_common
851
852 Returns a subset of local_args, which contains only the arguments that can
853 be passed in to create_job_common().
854 """
Alex Miller7d658cf2013-09-04 16:00:35 -0700855 # This code is only here to not kill suites scheduling tests when priority
856 # becomes an int instead of a string.
857 if isinstance(local_args['priority'], str):
858 local_args['priority'] = priorities.Priority.DEFAULT
859 # </migration hack>
jamesren4a41e012010-07-16 22:33:48 +0000860 arg_names, _, _, _ = inspect.getargspec(create_job_common)
861 return dict(item for item in local_args.iteritems() if item[0] in arg_names)
862
863
864def create_job_common(name, priority, control_type, control_file=None,
865 hosts=(), meta_hosts=(), one_time_hosts=(),
866 atomic_group_name=None, synch_count=None,
Simran Basi7e605742013-11-12 13:43:36 -0800867 is_template=False, timeout=None, timeout_mins=None,
868 max_runtime_mins=None, run_verify=True, email_list='',
869 dependencies=(), reboot_before=None, reboot_after=None,
jamesren4a41e012010-07-16 22:33:48 +0000870 parse_failed_repair=None, hostless=False, keyvals=None,
Aviv Keshet18308922013-02-19 17:49:49 -0800871 drone_set=None, parameterized_job=None,
Dan Shiec1d47d2015-02-13 11:38:13 -0800872 parent_job_id=None, test_retry=0, run_reset=True,
873 require_ssp=None):
Aviv Keshet18308922013-02-19 17:49:49 -0800874 #pylint: disable-msg=C0111
jamesren4a41e012010-07-16 22:33:48 +0000875 """
876 Common code between creating "standard" jobs and creating parameterized jobs
877 """
878 user = models.User.current_user()
879 owner = user.login
880
jamesren4a41e012010-07-16 22:33:48 +0000881 # input validation
882 if not (hosts or meta_hosts or one_time_hosts or atomic_group_name
883 or hostless):
884 raise model_logic.ValidationError({
885 'arguments' : "You must pass at least one of 'hosts', "
886 "'meta_hosts', 'one_time_hosts', "
887 "'atomic_group_name', or 'hostless'"
888 })
889
890 if hostless:
891 if hosts or meta_hosts or one_time_hosts or atomic_group_name:
892 raise model_logic.ValidationError({
893 'hostless': 'Hostless jobs cannot include any hosts!'})
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700894 server_type = control_data.CONTROL_TYPE_NAMES.SERVER
jamesren4a41e012010-07-16 22:33:48 +0000895 if control_type != server_type:
896 raise model_logic.ValidationError({
897 'control_type': 'Hostless jobs cannot use client-side '
898 'control files'})
899
Alex Miller871291b2013-08-08 01:19:20 -0700900 atomic_groups_by_name = dict((ag.name, ag)
jamesren4a41e012010-07-16 22:33:48 +0000901 for ag in models.AtomicGroup.objects.all())
Alex Miller871291b2013-08-08 01:19:20 -0700902 label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
jamesren4a41e012010-07-16 22:33:48 +0000903
904 # Schedule on an atomic group automagically if one of the labels given
905 # is an atomic group label and no explicit atomic_group_name was supplied.
906 if not atomic_group_name:
Alex Miller871291b2013-08-08 01:19:20 -0700907 for label in label_objects:
jamesren4a41e012010-07-16 22:33:48 +0000908 if label and label.atomic_group:
909 atomic_group_name = label.atomic_group.name
910 break
jamesren4a41e012010-07-16 22:33:48 +0000911 # convert hostnames & meta hosts to host/label objects
912 host_objects = models.Host.smart_get_bulk(hosts)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800913 if not server_utils.is_shard():
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800914 shard_host_map = bucket_hosts_by_shard(host_objects)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800915 num_shards = len(shard_host_map)
916 if (num_shards > 1 or (num_shards == 1 and
917 len(shard_host_map.values()[0]) != len(host_objects))):
918 # We disallow the following jobs on master:
919 # num_shards > 1: this is a job spanning across multiple shards.
920 # num_shards == 1 but number of hosts on shard is less
921 # than total number of hosts: this is a job that spans across
922 # one shard and the master.
923 raise ValueError(
924 'The following hosts are on shard(s), please create '
925 'seperate jobs for hosts on each shard: %s ' %
926 shard_host_map)
jamesren4a41e012010-07-16 22:33:48 +0000927 metahost_objects = []
Alex Miller871291b2013-08-08 01:19:20 -0700928 meta_host_labels_by_name = {label.name: label for label in label_objects}
jamesren4a41e012010-07-16 22:33:48 +0000929 for label_name in meta_hosts or []:
Alex Miller871291b2013-08-08 01:19:20 -0700930 if label_name in meta_host_labels_by_name:
931 metahost_objects.append(meta_host_labels_by_name[label_name])
jamesren4a41e012010-07-16 22:33:48 +0000932 elif label_name in atomic_groups_by_name:
933 # If given a metahost name that isn't a Label, check to
934 # see if the user was specifying an Atomic Group instead.
935 atomic_group = atomic_groups_by_name[label_name]
936 if atomic_group_name and atomic_group_name != atomic_group.name:
937 raise model_logic.ValidationError({
938 'meta_hosts': (
939 'Label "%s" not found. If assumed to be an '
940 'atomic group it would conflict with the '
941 'supplied atomic group "%s".' % (
942 label_name, atomic_group_name))})
943 atomic_group_name = atomic_group.name
944 else:
945 raise model_logic.ValidationError(
946 {'meta_hosts' : 'Label "%s" not found' % label_name})
947
948 # Create and sanity check an AtomicGroup object if requested.
949 if atomic_group_name:
950 if one_time_hosts:
951 raise model_logic.ValidationError(
952 {'one_time_hosts':
953 'One time hosts cannot be used with an Atomic Group.'})
954 atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
955 if synch_count and synch_count > atomic_group.max_number_of_machines:
956 raise model_logic.ValidationError(
957 {'atomic_group_name' :
958 'You have requested a synch_count (%d) greater than the '
959 'maximum machines in the requested Atomic Group (%d).' %
960 (synch_count, atomic_group.max_number_of_machines)})
961 else:
962 atomic_group = None
963
964 for host in one_time_hosts or []:
965 this_host = models.Host.create_one_time_host(host)
966 host_objects.append(this_host)
967
968 options = dict(name=name,
969 priority=priority,
970 control_file=control_file,
971 control_type=control_type,
972 is_template=is_template,
973 timeout=timeout,
Simran Basi7e605742013-11-12 13:43:36 -0800974 timeout_mins=timeout_mins,
Simran Basi34217022012-11-06 13:43:15 -0800975 max_runtime_mins=max_runtime_mins,
jamesren4a41e012010-07-16 22:33:48 +0000976 synch_count=synch_count,
977 run_verify=run_verify,
978 email_list=email_list,
979 dependencies=dependencies,
980 reboot_before=reboot_before,
981 reboot_after=reboot_after,
982 parse_failed_repair=parse_failed_repair,
983 keyvals=keyvals,
984 drone_set=drone_set,
Aviv Keshet18308922013-02-19 17:49:49 -0800985 parameterized_job=parameterized_job,
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800986 parent_job_id=parent_job_id,
Dan Shi07e09af2013-04-12 09:31:29 -0700987 test_retry=test_retry,
Dan Shiec1d47d2015-02-13 11:38:13 -0800988 run_reset=run_reset,
989 require_ssp=require_ssp)
jamesren4a41e012010-07-16 22:33:48 +0000990 return create_new_job(owner=owner,
991 options=options,
992 host_objects=host_objects,
993 metahost_objects=metahost_objects,
994 atomic_group=atomic_group)
Simran Basib6ec8ae2014-04-23 12:05:08 -0700995
996
997def encode_ascii(control_file):
998 """Force a control file to only contain ascii characters.
999
1000 @param control_file: Control file to encode.
1001
1002 @returns the control file in an ascii encoding.
1003
1004 @raises error.ControlFileMalformed: if encoding fails.
1005 """
1006 try:
1007 return control_file.encode('ascii')
1008 except UnicodeDecodeError as e:
Jiaxi Luo421608e2014-07-07 14:38:00 -07001009 raise error.ControlFileMalformed(str(e))
1010
1011
1012def get_wmatrix_url():
1013 """Get wmatrix url from config file.
1014
1015 @returns the wmatrix url or an empty string.
1016 """
1017 return global_config.global_config.get_config_value('AUTOTEST_WEB',
1018 'wmatrix_url',
Jiaxi Luo15cbf372014-07-01 19:20:20 -07001019 default='')
Jiaxi Luo57bc1952014-07-22 15:27:30 -07001020
1021
1022def inject_times_to_filter(start_time_key=None, end_time_key=None,
1023 start_time_value=None, end_time_value=None,
1024 **filter_data):
1025 """Inject the key value pairs of start and end time if provided.
1026
1027 @param start_time_key: A string represents the filter key of start_time.
1028 @param end_time_key: A string represents the filter key of end_time.
1029 @param start_time_value: Start_time value.
1030 @param end_time_value: End_time value.
1031
1032 @returns the injected filter_data.
1033 """
1034 if start_time_value:
1035 filter_data[start_time_key] = start_time_value
1036 if end_time_value:
1037 filter_data[end_time_key] = end_time_value
1038 return filter_data
1039
1040
1041def inject_times_to_hqe_special_tasks_filters(filter_data_common,
1042 start_time, end_time):
1043 """Inject start and end time to hqe and special tasks filters.
1044
1045 @param filter_data_common: Common filter for hqe and special tasks.
1046 @param start_time_key: A string represents the filter key of start_time.
1047 @param end_time_key: A string represents the filter key of end_time.
1048
1049 @returns a pair of hqe and special tasks filters.
1050 """
1051 filter_data_special_tasks = filter_data_common.copy()
1052 return (inject_times_to_filter('started_on__gte', 'started_on__lte',
1053 start_time, end_time, **filter_data_common),
1054 inject_times_to_filter('time_started__gte', 'time_started__lte',
1055 start_time, end_time,
Jakob Juelich59cfe542014-09-02 16:37:46 -07001056 **filter_data_special_tasks))
1057
1058
1059def retrieve_shard(shard_hostname):
1060 """
Jakob Juelich77457572014-09-22 17:02:43 -07001061 Retrieves the shard with the given hostname from the database.
Jakob Juelich59cfe542014-09-02 16:37:46 -07001062
1063 @param shard_hostname: Hostname of the shard to retrieve
1064
Jakob Juelich77457572014-09-22 17:02:43 -07001065 @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
1066
Jakob Juelich59cfe542014-09-02 16:37:46 -07001067 @returns: Shard object
1068 """
MK Ryu509516b2015-05-18 12:00:47 -07001069 timer = autotest_stats.Timer('shard_heartbeat.retrieve_shard')
1070 with timer:
1071 return models.Shard.smart_get(shard_hostname)
Jakob Juelich59cfe542014-09-02 16:37:46 -07001072
1073
Jakob Juelich1b525742014-09-30 13:08:07 -07001074def find_records_for_shard(shard, known_job_ids, known_host_ids):
Jakob Juelich59cfe542014-09-02 16:37:46 -07001075 """Find records that should be sent to a shard.
1076
Jakob Juelicha94efe62014-09-18 16:02:49 -07001077 @param shard: Shard to find records for.
Jakob Juelich1b525742014-09-30 13:08:07 -07001078 @param known_job_ids: List of ids of jobs the shard already has.
1079 @param known_host_ids: List of ids of hosts the shard already has.
Jakob Juelicha94efe62014-09-18 16:02:49 -07001080
Fang Dengf3705992014-12-16 17:32:18 -08001081 @returns: Tuple of three lists for hosts, jobs, and suite job keyvals:
1082 (hosts, jobs, suite_job_keyvals).
Jakob Juelich59cfe542014-09-02 16:37:46 -07001083 """
MK Ryu509516b2015-05-18 12:00:47 -07001084 timer = autotest_stats.Timer('shard_heartbeat')
1085 with timer.get_client('find_hosts'):
1086 hosts = models.Host.assign_to_shard(shard, known_host_ids)
1087 with timer.get_client('find_jobs'):
1088 jobs = models.Job.assign_to_shard(shard, known_job_ids)
1089 with timer.get_client('find_suite_job_keyvals'):
1090 parent_job_ids = [job.parent_job_id for job in jobs]
1091 suite_job_keyvals = models.JobKeyval.objects.filter(
1092 job_id__in=parent_job_ids)
Fang Dengf3705992014-12-16 17:32:18 -08001093 return hosts, jobs, suite_job_keyvals
Jakob Juelicha94efe62014-09-18 16:02:49 -07001094
1095
1096def _persist_records_with_type_sent_from_shard(
1097 shard, records, record_type, *args, **kwargs):
1098 """
1099 Handle records of a specified type that were sent to the shard master.
1100
1101 @param shard: The shard the records were sent from.
1102 @param records: The records sent in their serialized format.
1103 @param record_type: Type of the objects represented by records.
1104 @param args: Additional arguments that will be passed on to the sanity
1105 checks.
1106 @param kwargs: Additional arguments that will be passed on to the sanity
1107 checks.
1108
1109 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1110
1111 @returns: List of primary keys of the processed records.
1112 """
1113 pks = []
1114 for serialized_record in records:
1115 pk = serialized_record['id']
1116 try:
1117 current_record = record_type.objects.get(pk=pk)
1118 except record_type.DoesNotExist:
1119 raise error.UnallowedRecordsSentToMaster(
1120 'Object with pk %s of type %s does not exist on master.' % (
1121 pk, record_type))
1122
1123 current_record.sanity_check_update_from_shard(
1124 shard, serialized_record, *args, **kwargs)
1125
1126 current_record.update_from_serialized(serialized_record)
1127 pks.append(pk)
1128 return pks
1129
1130
1131def persist_records_sent_from_shard(shard, jobs, hqes):
1132 """
1133 Sanity checking then saving serialized records sent to master from shard.
1134
1135 During heartbeats shards upload jobs and hostqueuentries. This performs
1136 some sanity checks on these and then updates the existing records for those
1137 entries with the updated ones from the heartbeat.
1138
1139 The sanity checks include:
1140 - Checking if the objects sent already exist on the master.
1141 - Checking if the objects sent were assigned to this shard.
1142 - hostqueueentries must be sent together with their jobs.
1143
1144 @param shard: The shard the records were sent from.
1145 @param jobs: The jobs the shard sent.
1146 @param hqes: The hostqueuentries the shart sent.
1147
1148 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1149 """
MK Ryu509516b2015-05-18 12:00:47 -07001150 timer = autotest_stats.Timer('shard_heartbeat')
1151 with timer.get_client('persist_jobs'):
1152 job_ids_sent = _persist_records_with_type_sent_from_shard(
1153 shard, jobs, models.Job)
Jakob Juelicha94efe62014-09-18 16:02:49 -07001154
MK Ryu509516b2015-05-18 12:00:47 -07001155 with timer.get_client('persist_hqes'):
1156 _persist_records_with_type_sent_from_shard(
1157 shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001158
1159
Jakob Juelich50e91f72014-10-01 12:43:23 -07001160def forward_single_host_rpc_to_shard(func):
1161 """This decorator forwards rpc calls that modify a host to a shard.
1162
1163 If a host is assigned to a shard, rpcs that change his attributes should be
1164 forwarded to the shard.
1165
1166 This assumes the first argument of the function represents a host id.
1167
1168 @param func: The function to decorate
1169
1170 @returns: The function to replace func with.
1171 """
1172 def replacement(**kwargs):
1173 # Only keyword arguments can be accepted here, as we need the argument
1174 # names to send the rpc. serviceHandler always provides arguments with
1175 # their keywords, so this is not a problem.
1176 host = models.Host.smart_get(kwargs['id'])
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -08001177 if host.shard and not server_utils.is_shard():
MK Ryu26f0c932015-05-28 18:14:33 -07001178 run_rpc_on_multiple_hostnames(func.func_name,
1179 [host.shard.rpc_hostname()],
Jakob Juelich50e91f72014-10-01 12:43:23 -07001180 **kwargs)
1181 return func(**kwargs)
1182
1183 return replacement
1184
1185
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001186def forward_multi_host_rpc_to_shards(func):
1187 """This decorator forwards rpc calls that modify multiple hosts.
1188
1189 If a host is assigned to a shard, rpcs that change his attributes should be
1190 forwarded to the shard. Some calls however, take a list of hosts and a
1191 single id to modify, eg: label_add_hosts. This wrapper will sift through
1192 the list of hosts, find each of their shards, and forward the rpc for
1193 those hosts to that shard before calling the local version of the given rpc.
1194
1195 This assumes:
1196 1. The rpc call uses `smart_get` to retrieve host objects, not the
1197 stock django `get` call. This is true for most, if not all rpcs in
1198 the rpc_interface.
1199 2. The kwargs to the function contain either a list of host ids or
1200 hostnames, keyed under 'hosts'. This is true for all the rpc
1201 functions that use 'smart_get'.
1202
1203 @param func: The function to decorate
1204
1205 @returns: The function to replace func with.
1206 """
1207 def replacement(**kwargs):
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001208 fanout_rpc(
1209 models.Host.smart_get_bulk(kwargs['hosts']),
1210 func.func_name, **kwargs)
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001211 return func(**kwargs)
1212
1213 return replacement
1214
1215
MK Ryufb5e3a82015-07-01 12:21:20 -07001216def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
1217 """Fanout the given rpc to shards of given hosts.
1218
1219 @param host_objs: Host objects for the rpc.
1220 @param rpc_name: The name of the rpc.
1221 @param include_hostnames: If True, include the hostnames in the kwargs.
1222 Hostnames are not always necessary, this functions is designed to
1223 send rpcs to the shard a host is on, the rpcs themselves could be
1224 related to labels, acls etc.
1225 @param kwargs: The kwargs for the rpc.
1226 """
1227 # Figure out which hosts are on which shards.
1228 shard_host_map = bucket_hosts_by_shard(
1229 host_objs, rpc_hostnames=True)
1230
1231 # Execute the rpc against the appropriate shards.
1232 for shard, hostnames in shard_host_map.iteritems():
1233 if include_hostnames:
1234 kwargs['hosts'] = hostnames
1235 try:
1236 run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
1237 except:
1238 ei = sys.exc_info()
1239 new_exc = error.RPCException('RPC %s failed on shard %s due to '
1240 '%s: %s' % (rpc_name, shard, ei[0].__name__, ei[1]))
1241 raise new_exc.__class__, new_exc, ei[2]
1242
1243
Jakob Juelich50e91f72014-10-01 12:43:23 -07001244def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1245 """Runs an rpc to multiple AFEs
1246
1247 This is i.e. used to propagate changes made to hosts after they are assigned
1248 to a shard.
1249
1250 @param rpc_call: Name of the rpc endpoint to call.
1251 @param shard_hostnames: List of hostnames to run the rpcs on.
1252 @param **kwargs: Keyword arguments to pass in the rpcs.
1253 """
MK Ryufb5e3a82015-07-01 12:21:20 -07001254 # Make sure this function is not called on shards but only on master.
1255 assert not server_utils.is_shard()
Jakob Juelich50e91f72014-10-01 12:43:23 -07001256 for shard_hostname in shard_hostnames:
MK Ryu0a9c82e2015-09-17 17:54:01 -07001257 afe = frontend_wrappers.RetryingAFE(server=shard_hostname,
1258 user=thread_local.get_user())
Jakob Juelich50e91f72014-10-01 12:43:23 -07001259 afe.run(rpc_call, **kwargs)
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001260
1261
1262def get_label(name):
1263 """Gets a label object using a given name.
1264
1265 @param name: Label name.
1266 @raises model.Label.DoesNotExist: when there is no label matching
1267 the given name.
1268 @return: a label object matching the given name.
1269 """
1270 try:
1271 label = models.Label.smart_get(name)
1272 except models.Label.DoesNotExist:
1273 return None
1274 return label
1275
1276
1277def get_global_afe_hostname():
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001278 """Read the hostname of the global AFE from the global configuration."""
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001279 return global_config.global_config.get_config_value(
MK Ryub1dc8242015-08-27 12:11:12 -07001280 'SERVER', 'global_afe_hostname')
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001281
1282
MK Ryufbb002c2015-06-08 14:13:16 -07001283def route_rpc_to_master(func):
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001284 """Route RPC to master AFE.
MK Ryu2d107562015-02-24 17:45:02 -08001285
MK Ryu6f5eadb2015-09-04 10:50:47 -07001286 When a shard receives an RPC decorated by this, the RPC is just
1287 forwarded to the master.
1288 When the master gets the RPC, the RPC function is executed.
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001289
MK Ryu6f5eadb2015-09-04 10:50:47 -07001290 @param func: An RPC function to decorate
1291
1292 @returns: A function replacing the RPC func.
MK Ryu2d107562015-02-24 17:45:02 -08001293 """
MK Ryufbb002c2015-06-08 14:13:16 -07001294 @wraps(func)
MK Ryuf6ab8a72015-07-06 10:19:48 -07001295 def replacement(*args, **kwargs):
MK Ryu6f5eadb2015-09-04 10:50:47 -07001296 """
1297 We need a special care when decorating an RPC that can be called
1298 directly using positional arguments. One example is
1299 rpc_interface.create_job().
1300 rpc_interface.create_job_page_handler() calls the function using
1301 positional and keyword arguments.
1302 Since frontend.RpcClient.run() takes only keyword arguments for
1303 an RPC, positional arguments of the RPC function need to be
1304 transformed to key-value pair (dictionary type).
1305
1306 inspect.getcallargs() is a useful utility to achieve the goal,
1307 however, we need an additional effort when an RPC function has
1308 **kwargs argument.
1309 Let's say we have a following form of RPC function.
1310
1311 def rpcfunc(a, b, **kwargs)
1312
1313 When we call the function like "rpcfunc(1, 2, id=3, name='mk')",
1314 inspect.getcallargs() returns a dictionary like below.
1315
1316 {'a':1, 'b':2, 'kwargs': {'id':3, 'name':'mk'}}
1317
1318 This is an incorrect form of arguments to pass to the rpc function.
1319 Instead, the dictionary should be like this.
1320
1321 {'a':1, 'b':2, 'id':3, 'name':'mk'}
1322 """
1323 argspec = inspect.getargspec(func)
1324 if argspec.varargs is not None:
1325 raise Exception('RPC function must not have *args.')
1326 funcargs = inspect.getcallargs(func, *args, **kwargs)
1327 kwargs = dict()
1328 for k, v in funcargs.iteritems():
1329 if argspec.keywords and k in argspec.keywords:
1330 kwargs.update(v)
1331 else:
1332 kwargs[k] = v
1333
MK Ryufbb002c2015-06-08 14:13:16 -07001334 if server_utils.is_shard():
MK Ryu9651ca52015-06-08 17:48:22 -07001335 afe = frontend_wrappers.RetryingAFE(
MK Ryu0a9c82e2015-09-17 17:54:01 -07001336 server=get_global_afe_hostname(),
1337 user=thread_local.get_user())
MK Ryu9651ca52015-06-08 17:48:22 -07001338 return afe.run(func.func_name, **kwargs)
MK Ryufbb002c2015-06-08 14:13:16 -07001339 return func(**kwargs)
1340 return replacement