blob: 9e96956027a1914e94fa722880111dce935237b7 [file] [log] [blame]
Aviv Keshet18308922013-02-19 17:49:49 -08001#pylint: disable-msg=C0111
mblighe8819cd2008-02-15 16:48:40 +00002"""\
3Utility functions for rpc_interface.py. We keep them in a separate file so that
4only RPC interface functions go into that file.
5"""
6
7__author__ = 'showard@google.com (Steve Howard)'
8
MK Ryu84573e12015-02-18 15:54:09 -08009import datetime
MK Ryufbb002c2015-06-08 14:13:16 -070010from functools import wraps
MK Ryu84573e12015-02-18 15:54:09 -080011import inspect
12import os
13import sys
showard3d6ae112009-05-02 00:45:48 +000014import django.http
MK Ryu0a9c82e2015-09-17 17:54:01 -070015
16from autotest_lib.frontend import thread_local
Dan Shi07e09af2013-04-12 09:31:29 -070017from autotest_lib.frontend.afe import models, model_logic
Alex Miller4a193692013-08-21 13:59:01 -070018from autotest_lib.client.common_lib import control_data, error
Jiaxi Luo421608e2014-07-07 14:38:00 -070019from autotest_lib.client.common_lib import global_config, priorities
MK Ryu0c1a37d2015-04-30 12:00:55 -070020from autotest_lib.client.common_lib import time_utils
MK Ryu509516b2015-05-18 12:00:47 -070021from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080022from autotest_lib.server import utils as server_utils
MK Ryu9651ca52015-06-08 17:48:22 -070023from autotest_lib.server.cros import provision
24from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
mblighe8819cd2008-02-15 16:48:40 +000025
showarda62866b2008-07-28 21:27:41 +000026NULL_DATETIME = datetime.datetime.max
27NULL_DATE = datetime.date.max
28
mblighe8819cd2008-02-15 16:48:40 +000029def prepare_for_serialization(objects):
jadmanski0afbb632008-06-06 21:10:57 +000030 """
31 Prepare Python objects to be returned via RPC.
Aviv Keshet18308922013-02-19 17:49:49 -080032 @param objects: objects to be prepared.
jadmanski0afbb632008-06-06 21:10:57 +000033 """
34 if (isinstance(objects, list) and len(objects) and
35 isinstance(objects[0], dict) and 'id' in objects[0]):
36 objects = gather_unique_dicts(objects)
37 return _prepare_data(objects)
showardb8d34242008-04-25 18:11:16 +000038
39
showardc92da832009-04-07 18:14:34 +000040def prepare_rows_as_nested_dicts(query, nested_dict_column_names):
41 """
42 Prepare a Django query to be returned via RPC as a sequence of nested
43 dictionaries.
44
45 @param query - A Django model query object with a select_related() method.
46 @param nested_dict_column_names - A list of column/attribute names for the
47 rows returned by query to expand into nested dictionaries using
48 their get_object_dict() method when not None.
49
50 @returns An list suitable to returned in an RPC.
51 """
52 all_dicts = []
53 for row in query.select_related():
54 row_dict = row.get_object_dict()
55 for column in nested_dict_column_names:
56 if row_dict[column] is not None:
57 row_dict[column] = getattr(row, column).get_object_dict()
58 all_dicts.append(row_dict)
59 return prepare_for_serialization(all_dicts)
60
61
showardb8d34242008-04-25 18:11:16 +000062def _prepare_data(data):
jadmanski0afbb632008-06-06 21:10:57 +000063 """
64 Recursively process data structures, performing necessary type
65 conversions to values in data to allow for RPC serialization:
66 -convert datetimes to strings
showard2b9a88b2008-06-13 20:55:03 +000067 -convert tuples and sets to lists
jadmanski0afbb632008-06-06 21:10:57 +000068 """
69 if isinstance(data, dict):
70 new_data = {}
71 for key, value in data.iteritems():
72 new_data[key] = _prepare_data(value)
73 return new_data
showard2b9a88b2008-06-13 20:55:03 +000074 elif (isinstance(data, list) or isinstance(data, tuple) or
75 isinstance(data, set)):
jadmanski0afbb632008-06-06 21:10:57 +000076 return [_prepare_data(item) for item in data]
showard98659972008-07-17 17:00:07 +000077 elif isinstance(data, datetime.date):
showarda62866b2008-07-28 21:27:41 +000078 if data is NULL_DATETIME or data is NULL_DATE:
79 return None
jadmanski0afbb632008-06-06 21:10:57 +000080 return str(data)
81 else:
82 return data
mblighe8819cd2008-02-15 16:48:40 +000083
84
Moises Osorio2dda22e2014-09-16 15:56:24 -070085def fetchall_as_list_of_dicts(cursor):
86 """
87 Converts each row in the cursor to a dictionary so that values can be read
88 by using the column name.
89 @param cursor: The database cursor to read from.
90 @returns: A list of each row in the cursor as a dictionary.
91 """
92 desc = cursor.description
93 return [ dict(zip([col[0] for col in desc], row))
94 for row in cursor.fetchall() ]
95
96
showard3d6ae112009-05-02 00:45:48 +000097def raw_http_response(response_data, content_type=None):
98 response = django.http.HttpResponse(response_data, mimetype=content_type)
99 response['Content-length'] = str(len(response.content))
100 return response
101
102
showardb0dfb9f2008-06-06 18:08:02 +0000103def gather_unique_dicts(dict_iterable):
jadmanski0afbb632008-06-06 21:10:57 +0000104 """\
105 Pick out unique objects (by ID) from an iterable of object dicts.
106 """
107 id_set = set()
108 result = []
109 for obj in dict_iterable:
110 if obj['id'] not in id_set:
111 id_set.add(obj['id'])
112 result.append(obj)
113 return result
showardb0dfb9f2008-06-06 18:08:02 +0000114
115
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700116def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
jadmanski0afbb632008-06-06 21:10:57 +0000117 """\
118 Generate a SQL WHERE clause for job status filtering, and return it in
119 a dict of keyword args to pass to query.extra(). No more than one of
120 the parameters should be passed as True.
showard6c65d252009-10-01 18:45:22 +0000121 * not_yet_run: all HQEs are Queued
122 * finished: all HQEs are complete
123 * running: everything else
jadmanski0afbb632008-06-06 21:10:57 +0000124 """
125 assert not ((not_yet_run and running) or
126 (not_yet_run and finished) or
127 (running and finished)), ('Cannot specify more than one '
128 'filter to this function')
showard6c65d252009-10-01 18:45:22 +0000129
showardeab66ce2009-12-23 00:03:56 +0000130 not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
131 'WHERE status != "%s")'
showard6c65d252009-10-01 18:45:22 +0000132 % models.HostQueueEntry.Status.QUEUED)
showardeab66ce2009-12-23 00:03:56 +0000133 not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
134 'WHERE not complete)')
showard6c65d252009-10-01 18:45:22 +0000135
jadmanski0afbb632008-06-06 21:10:57 +0000136 if not_yet_run:
showard6c65d252009-10-01 18:45:22 +0000137 where = ['id NOT IN ' + not_queued]
jadmanski0afbb632008-06-06 21:10:57 +0000138 elif running:
showard6c65d252009-10-01 18:45:22 +0000139 where = ['(id IN %s) AND (id IN %s)' % (not_queued, not_finished)]
jadmanski0afbb632008-06-06 21:10:57 +0000140 elif finished:
showard6c65d252009-10-01 18:45:22 +0000141 where = ['id NOT IN ' + not_finished]
jadmanski0afbb632008-06-06 21:10:57 +0000142 else:
showard10f41672009-05-13 21:28:25 +0000143 return {}
jadmanski0afbb632008-06-06 21:10:57 +0000144 return {'where': where}
mblighe8819cd2008-02-15 16:48:40 +0000145
146
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700147def extra_job_type_filters(extra_args, suite=False,
148 sub=False, standalone=False):
149 """\
150 Generate a SQL WHERE clause for job status filtering, and return it in
151 a dict of keyword args to pass to query.extra().
152
153 param extra_args: a dict of existing extra_args.
154
155 No more than one of the parameters should be passed as True:
156 * suite: job which is parent of other jobs
157 * sub: job with a parent job
158 * standalone: job with no child or parent jobs
159 """
160 assert not ((suite and sub) or
161 (suite and standalone) or
162 (sub and standalone)), ('Cannot specify more than one '
163 'filter to this function')
164
165 where = extra_args.get('where', [])
166 parent_job_id = ('DISTINCT parent_job_id')
167 child_job_id = ('id')
168 filter_common = ('(SELECT %s FROM afe_jobs '
169 'WHERE parent_job_id IS NOT NULL)')
170
171 if suite:
172 where.append('id IN ' + filter_common % parent_job_id)
173 elif sub:
174 where.append('id IN ' + filter_common % child_job_id)
175 elif standalone:
176 where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
177 'WHERE parent_job_id IS NOT NULL'
178 ' AND (sub_query.parent_job_id=afe_jobs.id'
179 ' OR sub_query.id=afe_jobs.id))')
180 else:
181 return extra_args
182
183 extra_args['where'] = where
184 return extra_args
185
186
187
showard87cc38f2009-08-20 23:37:04 +0000188def extra_host_filters(multiple_labels=()):
jadmanski0afbb632008-06-06 21:10:57 +0000189 """\
190 Generate SQL WHERE clauses for matching hosts in an intersection of
191 labels.
192 """
193 extra_args = {}
showardeab66ce2009-12-23 00:03:56 +0000194 where_str = ('afe_hosts.id in (select host_id from afe_hosts_labels '
jadmanski0afbb632008-06-06 21:10:57 +0000195 'where label_id=%s)')
196 extra_args['where'] = [where_str] * len(multiple_labels)
197 extra_args['params'] = [models.Label.smart_get(label).id
198 for label in multiple_labels]
199 return extra_args
showard8e3aa5e2008-04-08 19:42:32 +0000200
201
showard87cc38f2009-08-20 23:37:04 +0000202def get_host_query(multiple_labels, exclude_only_if_needed_labels,
showard8aa84fc2009-09-16 17:17:55 +0000203 exclude_atomic_group_hosts, valid_only, filter_data):
204 if valid_only:
205 query = models.Host.valid_objects.all()
206 else:
207 query = models.Host.objects.all()
208
showard43a3d262008-11-12 18:17:05 +0000209 if exclude_only_if_needed_labels:
210 only_if_needed_labels = models.Label.valid_objects.filter(
211 only_if_needed=True)
showardf7eac6f2008-11-13 21:18:01 +0000212 if only_if_needed_labels.count() > 0:
showard87cc38f2009-08-20 23:37:04 +0000213 only_if_needed_ids = ','.join(
214 str(label['id'])
215 for label in only_if_needed_labels.values('id'))
showardf7eac6f2008-11-13 21:18:01 +0000216 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000217 query, 'afe_hosts_labels', join_key='host_id',
218 join_condition=('afe_hosts_labels_exclude_OIN.label_id IN (%s)'
showard87cc38f2009-08-20 23:37:04 +0000219 % only_if_needed_ids),
220 suffix='_exclude_OIN', exclude=True)
showard8aa84fc2009-09-16 17:17:55 +0000221
showard87cc38f2009-08-20 23:37:04 +0000222 if exclude_atomic_group_hosts:
223 atomic_group_labels = models.Label.valid_objects.filter(
224 atomic_group__isnull=False)
225 if atomic_group_labels.count() > 0:
226 atomic_group_label_ids = ','.join(
227 str(atomic_group['id'])
228 for atomic_group in atomic_group_labels.values('id'))
229 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000230 query, 'afe_hosts_labels', join_key='host_id',
231 join_condition=(
232 'afe_hosts_labels_exclude_AG.label_id IN (%s)'
233 % atomic_group_label_ids),
showard87cc38f2009-08-20 23:37:04 +0000234 suffix='_exclude_AG', exclude=True)
Fang Deng04d30612013-04-10 18:13:13 -0700235 try:
236 assert 'extra_args' not in filter_data
237 filter_data['extra_args'] = extra_host_filters(multiple_labels)
238 return models.Host.query_objects(filter_data, initial_query=query)
239 except models.Label.DoesNotExist as e:
240 return models.Host.objects.none()
showard43a3d262008-11-12 18:17:05 +0000241
242
showard8fd58242008-03-10 21:29:07 +0000243class InconsistencyException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000244 'Raised when a list of objects does not have a consistent value'
showard8fd58242008-03-10 21:29:07 +0000245
246
247def get_consistent_value(objects, field):
mblighc5ddfd12008-08-04 17:15:00 +0000248 if not objects:
249 # well a list of nothing is consistent
250 return None
251
jadmanski0afbb632008-06-06 21:10:57 +0000252 value = getattr(objects[0], field)
253 for obj in objects:
254 this_value = getattr(obj, field)
255 if this_value != value:
256 raise InconsistencyException(objects[0], obj)
257 return value
showard8fd58242008-03-10 21:29:07 +0000258
259
Matthew Sartori10438092015-06-24 14:30:18 -0700260def afe_test_dict_to_test_object(test_dict):
261 if not isinstance(test_dict, dict):
262 return test_dict
263
264 numerized_dict = {}
265 for key, value in test_dict.iteritems():
266 try:
267 numerized_dict[key] = int(value)
268 except (ValueError, TypeError):
269 numerized_dict[key] = value
270
271 return type('TestObject', (object,), numerized_dict)
272
273
274def prepare_generate_control_file(tests, kernel, label, profilers,
275 db_tests=True):
276 if db_tests:
277 test_objects = [models.Test.smart_get(test) for test in tests]
278 else:
279 test_objects = [afe_test_dict_to_test_object(test) for test in tests]
280
showard2b9a88b2008-06-13 20:55:03 +0000281 profiler_objects = [models.Profiler.smart_get(profiler)
282 for profiler in profilers]
jadmanski0afbb632008-06-06 21:10:57 +0000283 # ensure tests are all the same type
284 try:
285 test_type = get_consistent_value(test_objects, 'test_type')
286 except InconsistencyException, exc:
287 test1, test2 = exc.args
mblighec5546d2008-06-16 16:51:28 +0000288 raise model_logic.ValidationError(
Matthew Sartori10438092015-06-24 14:30:18 -0700289 {'tests' : 'You cannot run both test_suites and server-side '
jadmanski0afbb632008-06-06 21:10:57 +0000290 'tests together (tests %s and %s differ' % (
291 test1.name, test2.name)})
showard8fd58242008-03-10 21:29:07 +0000292
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700293 is_server = (test_type == control_data.CONTROL_TYPE.SERVER)
showard14374b12009-01-31 00:11:54 +0000294 if test_objects:
295 synch_count = max(test.sync_count for test in test_objects)
296 else:
297 synch_count = 1
jadmanski0afbb632008-06-06 21:10:57 +0000298 if label:
299 label = models.Label.smart_get(label)
mblighe8819cd2008-02-15 16:48:40 +0000300
Matthew Sartori10438092015-06-24 14:30:18 -0700301 if db_tests:
302 dependencies = set(label.name for label
303 in models.Label.objects.filter(test__in=test_objects))
304 else:
305 dependencies = reduce(
306 set.union, [set(test.dependencies) for test in test_objects])
showard989f25d2008-10-01 11:38:11 +0000307
showard2bab8f42008-11-12 18:15:22 +0000308 cf_info = dict(is_server=is_server, synch_count=synch_count,
309 dependencies=list(dependencies))
310 return cf_info, test_objects, profiler_objects, label
showard989f25d2008-10-01 11:38:11 +0000311
312
313def check_job_dependencies(host_objects, job_dependencies):
314 """
315 Check that a set of machines satisfies a job's dependencies.
316 host_objects: list of models.Host objects
317 job_dependencies: list of names of labels
318 """
319 # check that hosts satisfy dependencies
320 host_ids = [host.id for host in host_objects]
321 hosts_in_job = models.Host.objects.filter(id__in=host_ids)
322 ok_hosts = hosts_in_job
323 for index, dependency in enumerate(job_dependencies):
Alex Milleraa772002014-04-10 17:51:21 -0700324 if not provision.is_for_special_action(dependency):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700325 ok_hosts = ok_hosts.filter(labels__name=dependency)
showard989f25d2008-10-01 11:38:11 +0000326 failing_hosts = (set(host.hostname for host in host_objects) -
327 set(host.hostname for host in ok_hosts))
328 if failing_hosts:
329 raise model_logic.ValidationError(
Eric Lie0493a42010-11-15 13:05:43 -0800330 {'hosts' : 'Host(s) failed to meet job dependencies (' +
331 (', '.join(job_dependencies)) + '): ' +
332 (', '.join(failing_hosts))})
333
showard989f25d2008-10-01 11:38:11 +0000334
Alex Miller4a193692013-08-21 13:59:01 -0700335def check_job_metahost_dependencies(metahost_objects, job_dependencies):
336 """
337 Check that at least one machine within the metahost spec satisfies the job's
338 dependencies.
339
340 @param metahost_objects A list of label objects representing the metahosts.
341 @param job_dependencies A list of strings of the required label names.
342 @raises NoEligibleHostException If a metahost cannot run the job.
343 """
344 for metahost in metahost_objects:
345 hosts = models.Host.objects.filter(labels=metahost)
346 for label_name in job_dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700347 if not provision.is_for_special_action(label_name):
Alex Miller4a193692013-08-21 13:59:01 -0700348 hosts = hosts.filter(labels__name=label_name)
349 if not any(hosts):
350 raise error.NoEligibleHostException("No hosts within %s satisfy %s."
351 % (metahost.name, ', '.join(job_dependencies)))
352
showard2bab8f42008-11-12 18:15:22 +0000353
354def _execution_key_for(host_queue_entry):
355 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
356
357
358def check_abort_synchronous_jobs(host_queue_entries):
359 # ensure user isn't aborting part of a synchronous autoserv execution
360 count_per_execution = {}
361 for queue_entry in host_queue_entries:
362 key = _execution_key_for(queue_entry)
363 count_per_execution.setdefault(key, 0)
364 count_per_execution[key] += 1
365
366 for queue_entry in host_queue_entries:
367 if not queue_entry.execution_subdir:
368 continue
369 execution_count = count_per_execution[_execution_key_for(queue_entry)]
370 if execution_count < queue_entry.job.synch_count:
mbligh1ef218d2009-08-03 16:57:56 +0000371 raise model_logic.ValidationError(
372 {'' : 'You cannot abort part of a synchronous job execution '
373 '(%d/%s), %d included, %d expected'
374 % (queue_entry.job.id, queue_entry.execution_subdir,
375 execution_count, queue_entry.job.synch_count)})
showard8fbae652009-01-20 23:23:10 +0000376
377
showardc92da832009-04-07 18:14:34 +0000378def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700379 dependencies, atomic_group):
showardc92da832009-04-07 18:14:34 +0000380 """
381 Attempt to reject create_job requests with an atomic group that
382 will be impossible to schedule. The checks are not perfect but
383 should catch the most obvious issues.
384
385 @param synch_count - The job's minimum synch count.
386 @param host_objects - A list of models.Host instances.
387 @param metahost_objects - A list of models.Label instances.
388 @param dependencies - A list of job dependency label names.
showardc92da832009-04-07 18:14:34 +0000389 @param labels_by_name - A dictionary mapping label names to models.Label
390 instance. Used to look up instances for dependencies.
391
392 @raises model_logic.ValidationError - When an issue is found.
393 """
394 # If specific host objects were supplied with an atomic group, verify
395 # that there are enough to satisfy the synch_count.
396 minimum_required = synch_count or 1
397 if (host_objects and not metahost_objects and
398 len(host_objects) < minimum_required):
399 raise model_logic.ValidationError(
400 {'hosts':
401 'only %d hosts provided for job with synch_count = %d' %
402 (len(host_objects), synch_count)})
403
404 # Check that the atomic group has a hope of running this job
405 # given any supplied metahosts and dependancies that may limit.
406
407 # Get a set of hostnames in the atomic group.
408 possible_hosts = set()
409 for label in atomic_group.label_set.all():
410 possible_hosts.update(h.hostname for h in label.host_set.all())
411
412 # Filter out hosts that don't match all of the job dependency labels.
Alex Miller871291b2013-08-08 01:19:20 -0700413 for label in models.Label.objects.filter(name__in=dependencies):
showardc92da832009-04-07 18:14:34 +0000414 hosts_in_label = (h.hostname for h in label.host_set.all())
415 possible_hosts.intersection_update(hosts_in_label)
416
showard225bdc12009-04-13 16:09:21 +0000417 if not host_objects and not metahost_objects:
418 # No hosts or metahosts are required to queue an atomic group Job.
419 # However, if they are given, we respect them below.
420 host_set = possible_hosts
421 else:
422 host_set = set(host.hostname for host in host_objects)
423 unusable_host_set = host_set.difference(possible_hosts)
424 if unusable_host_set:
425 raise model_logic.ValidationError(
426 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %
427 (', '.join(sorted(unusable_host_set)), atomic_group.name)})
showardc92da832009-04-07 18:14:34 +0000428
429 # Lookup hosts provided by each meta host and merge them into the
430 # host_set for final counting.
431 for meta_host in metahost_objects:
432 meta_possible = possible_hosts.copy()
433 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())
434 meta_possible.intersection_update(hosts_in_meta_host)
435
436 # Count all hosts that this meta_host will provide.
437 host_set.update(meta_possible)
438
439 if len(host_set) < minimum_required:
440 raise model_logic.ValidationError(
441 {'atomic_group_name':
442 'Insufficient hosts in Atomic Group "%s" with the'
443 ' supplied dependencies and meta_hosts.' %
444 (atomic_group.name,)})
445
446
showardbe0d8692009-08-20 23:42:44 +0000447def check_modify_host(update_data):
448 """
449 Sanity check modify_host* requests.
450
451 @param update_data: A dictionary with the changes to make to a host
452 or hosts.
453 """
454 # Only the scheduler (monitor_db) is allowed to modify Host status.
455 # Otherwise race conditions happen as a hosts state is changed out from
456 # beneath tasks being run on a host.
457 if 'status' in update_data:
458 raise model_logic.ValidationError({
459 'status': 'Host status can not be modified by the frontend.'})
460
461
showardce7c0922009-09-11 18:39:24 +0000462def check_modify_host_locking(host, update_data):
463 """
464 Checks when locking/unlocking has been requested if the host is already
465 locked/unlocked.
466
467 @param host: models.Host object to be modified
468 @param update_data: A dictionary with the changes to make to the host.
469 """
470 locked = update_data.get('locked', None)
Matthew Sartori68186332015-04-27 17:19:53 -0700471 lock_reason = update_data.get('lock_reason', None)
showardce7c0922009-09-11 18:39:24 +0000472 if locked is not None:
473 if locked and host.locked:
474 raise model_logic.ValidationError({
475 'locked': 'Host already locked by %s on %s.' %
476 (host.locked_by, host.lock_time)})
477 if not locked and not host.locked:
478 raise model_logic.ValidationError({
479 'locked': 'Host already unlocked.'})
Matthew Sartori68186332015-04-27 17:19:53 -0700480 if locked and not lock_reason and not host.locked:
481 raise model_logic.ValidationError({
482 'locked': 'Please provide a reason for locking'})
showardce7c0922009-09-11 18:39:24 +0000483
484
showard8fbae652009-01-20 23:23:10 +0000485def get_motd():
486 dirname = os.path.dirname(__file__)
487 filename = os.path.join(dirname, "..", "..", "motd.txt")
488 text = ''
489 try:
490 fp = open(filename, "r")
491 try:
492 text = fp.read()
493 finally:
494 fp.close()
495 except:
496 pass
497
498 return text
showard29f7cd22009-04-29 21:16:24 +0000499
500
501def _get_metahost_counts(metahost_objects):
502 metahost_counts = {}
503 for metahost in metahost_objects:
504 metahost_counts.setdefault(metahost, 0)
505 metahost_counts[metahost] += 1
506 return metahost_counts
507
508
showarda965cef2009-05-15 23:17:41 +0000509def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
showard29f7cd22009-04-29 21:16:24 +0000510 hosts = []
511 one_time_hosts = []
512 meta_hosts = []
513 atomic_group = None
jamesren2275ef12010-04-12 18:25:06 +0000514 hostless = False
showard29f7cd22009-04-29 21:16:24 +0000515
showard4d077562009-05-08 18:24:36 +0000516 queue_entries = job.hostqueueentry_set.all()
showarda965cef2009-05-15 23:17:41 +0000517 if queue_entry_filter_data:
518 queue_entries = models.HostQueueEntry.query_objects(
519 queue_entry_filter_data, initial_query=queue_entries)
showard4d077562009-05-08 18:24:36 +0000520
521 for queue_entry in queue_entries:
showard29f7cd22009-04-29 21:16:24 +0000522 if (queue_entry.host and (preserve_metahosts or
523 not queue_entry.meta_host)):
524 if queue_entry.deleted:
525 continue
526 if queue_entry.host.invalid:
527 one_time_hosts.append(queue_entry.host)
528 else:
529 hosts.append(queue_entry.host)
jamesren2275ef12010-04-12 18:25:06 +0000530 elif queue_entry.meta_host:
showard29f7cd22009-04-29 21:16:24 +0000531 meta_hosts.append(queue_entry.meta_host)
jamesren2275ef12010-04-12 18:25:06 +0000532 else:
533 hostless = True
534
showard29f7cd22009-04-29 21:16:24 +0000535 if atomic_group is None:
536 if queue_entry.atomic_group is not None:
537 atomic_group = queue_entry.atomic_group
538 else:
539 assert atomic_group.name == queue_entry.atomic_group.name, (
540 'DB inconsistency. HostQueueEntries with multiple atomic'
541 ' groups on job %s: %s != %s' % (
542 id, atomic_group.name, queue_entry.atomic_group.name))
543
544 meta_host_counts = _get_metahost_counts(meta_hosts)
545
546 info = dict(dependencies=[label.name for label
547 in job.dependency_labels.all()],
548 hosts=hosts,
549 meta_hosts=meta_hosts,
550 meta_host_counts=meta_host_counts,
551 one_time_hosts=one_time_hosts,
jamesren2275ef12010-04-12 18:25:06 +0000552 atomic_group=atomic_group,
553 hostless=hostless)
showard29f7cd22009-04-29 21:16:24 +0000554 return info
555
556
showard09d80f92009-11-19 01:01:19 +0000557def check_for_duplicate_hosts(host_objects):
558 host_ids = set()
559 duplicate_hostnames = set()
560 for host in host_objects:
561 if host.id in host_ids:
562 duplicate_hostnames.add(host.hostname)
563 host_ids.add(host.id)
564
565 if duplicate_hostnames:
566 raise model_logic.ValidationError(
567 {'hosts' : 'Duplicate hosts: %s'
568 % ', '.join(duplicate_hostnames)})
569
570
showarda1e74b32009-05-12 17:32:04 +0000571def create_new_job(owner, options, host_objects, metahost_objects,
572 atomic_group=None):
showard29f7cd22009-04-29 21:16:24 +0000573 all_host_objects = host_objects + metahost_objects
showarda1e74b32009-05-12 17:32:04 +0000574 dependencies = options.get('dependencies', [])
575 synch_count = options.get('synch_count')
showard29f7cd22009-04-29 21:16:24 +0000576
showard29f7cd22009-04-29 21:16:24 +0000577 if atomic_group:
578 check_atomic_group_create_job(
579 synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700580 dependencies, atomic_group)
showard29f7cd22009-04-29 21:16:24 +0000581 else:
582 if synch_count is not None and synch_count > len(all_host_objects):
583 raise model_logic.ValidationError(
584 {'hosts':
585 'only %d hosts provided for job with synch_count = %d' %
586 (len(all_host_objects), synch_count)})
587 atomic_hosts = models.Host.objects.filter(
588 id__in=[host.id for host in host_objects],
589 labels__atomic_group=True)
590 unusable_host_names = [host.hostname for host in atomic_hosts]
591 if unusable_host_names:
592 raise model_logic.ValidationError(
593 {'hosts':
594 'Host(s) "%s" are atomic group hosts but no '
595 'atomic group was specified for this job.' %
596 (', '.join(unusable_host_names),)})
597
showard09d80f92009-11-19 01:01:19 +0000598 check_for_duplicate_hosts(host_objects)
showard29f7cd22009-04-29 21:16:24 +0000599
Aviv Keshetc68807e2013-07-31 16:13:01 -0700600 for label_name in dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700601 if provision.is_for_special_action(label_name):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700602 # TODO: We could save a few queries
603 # if we had a bulk ensure-label-exists function, which used
604 # a bulk .get() call. The win is probably very small.
Alex Miller871291b2013-08-08 01:19:20 -0700605 _ensure_label_exists(label_name)
Aviv Keshetc68807e2013-07-31 16:13:01 -0700606
Alex Miller4a193692013-08-21 13:59:01 -0700607 # This only checks targeted hosts, not hosts eligible due to the metahost
608 check_job_dependencies(host_objects, dependencies)
609 check_job_metahost_dependencies(metahost_objects, dependencies)
610
Alex Miller871291b2013-08-08 01:19:20 -0700611 options['dependencies'] = list(
612 models.Label.objects.filter(name__in=dependencies))
showard29f7cd22009-04-29 21:16:24 +0000613
showarda1e74b32009-05-12 17:32:04 +0000614 for label in metahost_objects + options['dependencies']:
showard29f7cd22009-04-29 21:16:24 +0000615 if label.atomic_group and not atomic_group:
616 raise model_logic.ValidationError(
617 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000618 'Dependency %r requires an atomic group but no '
619 'atomic_group_name or meta_host in an atomic group was '
620 'specified for this job.' % label.name})
showard29f7cd22009-04-29 21:16:24 +0000621 elif (label.atomic_group and
622 label.atomic_group.name != atomic_group.name):
623 raise model_logic.ValidationError(
624 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000625 'meta_hosts or dependency %r requires atomic group '
626 '%r instead of the supplied atomic_group_name=%r.' %
627 (label.name, label.atomic_group.name, atomic_group.name)})
showard29f7cd22009-04-29 21:16:24 +0000628
showarda1e74b32009-05-12 17:32:04 +0000629 job = models.Job.create(owner=owner, options=options,
630 hosts=all_host_objects)
showard29f7cd22009-04-29 21:16:24 +0000631 job.queue(all_host_objects, atomic_group=atomic_group,
showarda1e74b32009-05-12 17:32:04 +0000632 is_template=options.get('is_template', False))
showard29f7cd22009-04-29 21:16:24 +0000633 return job.id
showard0957a842009-05-11 19:25:08 +0000634
635
Aviv Keshetc68807e2013-07-31 16:13:01 -0700636def _ensure_label_exists(name):
637 """
638 Ensure that a label called |name| exists in the Django models.
639
640 This function is to be called from within afe rpcs only, as an
641 alternative to server.cros.provision.ensure_label_exists(...). It works
642 by Django model manipulation, rather than by making another create_label
643 rpc call.
644
645 @param name: the label to check for/create.
646 @raises ValidationError: There was an error in the response that was
647 not because the label already existed.
648 @returns True is a label was created, False otherwise.
649 """
MK Ryu73be9862015-07-06 12:25:00 -0700650 # Make sure this function is not called on shards but only on master.
651 assert not server_utils.is_shard()
Aviv Keshetc68807e2013-07-31 16:13:01 -0700652 try:
653 models.Label.objects.get(name=name)
654 except models.Label.DoesNotExist:
655 new_label = models.Label.objects.create(name=name)
656 new_label.save()
657 return True
658 return False
659
660
showard909c9142009-07-07 20:54:42 +0000661def find_platform_and_atomic_group(host):
662 """
663 Figure out the platform name and atomic group name for the given host
664 object. If none, the return value for either will be None.
665
666 @returns (platform name, atomic group name) for the given host.
667 """
showard0957a842009-05-11 19:25:08 +0000668 platforms = [label.name for label in host.label_list if label.platform]
669 if not platforms:
showard909c9142009-07-07 20:54:42 +0000670 platform = None
671 else:
672 platform = platforms[0]
showard0957a842009-05-11 19:25:08 +0000673 if len(platforms) > 1:
674 raise ValueError('Host %s has more than one platform: %s' %
675 (host.hostname, ', '.join(platforms)))
showard909c9142009-07-07 20:54:42 +0000676 for label in host.label_list:
677 if label.atomic_group:
678 atomic_group_name = label.atomic_group.name
679 break
680 else:
681 atomic_group_name = None
682 # Don't check for multiple atomic groups on a host here. That is an
683 # error but should not trip up the RPC interface. monitor_db_cleanup
684 # deals with it. This just returns the first one found.
685 return platform, atomic_group_name
showardc0ac3a72009-07-08 21:14:45 +0000686
687
688# support for get_host_queue_entries_and_special_tasks()
689
MK Ryu0c1a37d2015-04-30 12:00:55 -0700690def _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on):
showardc0ac3a72009-07-08 21:14:45 +0000691 return dict(type=type,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700692 host=entry['host'],
showardc0ac3a72009-07-08 21:14:45 +0000693 job=job_dict,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700694 execution_path=exec_path,
695 status=status,
696 started_on=started_on,
697 id=str(entry['id']) + type,
698 oid=entry['id'])
showardc0ac3a72009-07-08 21:14:45 +0000699
700
MK Ryu0c1a37d2015-04-30 12:00:55 -0700701def _special_task_to_dict(task, queue_entries):
702 """Transforms a special task dictionary to another form of dictionary.
703
704 @param task Special task as a dictionary type
705 @param queue_entries Host queue entries as a list of dictionaries.
706
707 @return Transformed dictionary for a special task.
708 """
showardc0ac3a72009-07-08 21:14:45 +0000709 job_dict = None
MK Ryu0c1a37d2015-04-30 12:00:55 -0700710 if task['queue_entry']:
711 # Scan queue_entries to get the job detail info.
712 for qentry in queue_entries:
713 if task['queue_entry']['id'] == qentry['id']:
714 job_dict = qentry['job']
715 break
716 # If not found, get it from DB.
717 if job_dict is None:
718 job = models.Job.objects.get(id=task['queue_entry']['job'])
719 job_dict = job.get_object_dict()
720
721 exec_path = server_utils.get_special_task_exec_path(
722 task['host']['hostname'], task['id'], task['task'],
723 time_utils.time_string_to_datetime(task['time_requested']))
724 status = server_utils.get_special_task_status(
725 task['is_complete'], task['success'], task['is_active'])
726 return _common_entry_to_dict(task, task['task'], job_dict,
727 exec_path, status, task['time_started'])
showardc0ac3a72009-07-08 21:14:45 +0000728
729
730def _queue_entry_to_dict(queue_entry):
MK Ryu0c1a37d2015-04-30 12:00:55 -0700731 job_dict = queue_entry['job']
732 tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner'])
733 exec_path = server_utils.get_hqe_exec_path(tag,
734 queue_entry['execution_subdir'])
735 return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path,
736 queue_entry['status'], queue_entry['started_on'])
737
738
739def prepare_host_queue_entries_and_special_tasks(interleaved_entries,
740 queue_entries):
741 """
742 Prepare for serialization the interleaved entries of host queue entries
743 and special tasks.
744 Each element in the entries is a dictionary type.
745 The special task dictionary has only a job id for a job and lacks
746 the detail of the job while the host queue entry dictionary has.
747 queue_entries is used to look up the job detail info.
748
749 @param interleaved_entries Host queue entries and special tasks as a list
750 of dictionaries.
751 @param queue_entries Host queue entries as a list of dictionaries.
752
753 @return A post-processed list of dictionaries that is to be serialized.
754 """
755 dict_list = []
756 for e in interleaved_entries:
757 # Distinguish the two mixed entries based on the existence of
758 # the key "task". If an entry has the key, the entry is for
759 # special task. Otherwise, host queue entry.
760 if 'task' in e:
761 dict_list.append(_special_task_to_dict(e, queue_entries))
762 else:
763 dict_list.append(_queue_entry_to_dict(e))
764 return prepare_for_serialization(dict_list)
showardc0ac3a72009-07-08 21:14:45 +0000765
766
767def _compute_next_job_for_tasks(queue_entries, special_tasks):
768 """
769 For each task, try to figure out the next job that ran after that task.
770 This is done using two pieces of information:
771 * if the task has a queue entry, we can use that entry's job ID.
772 * if the task has a time_started, we can try to compare that against the
773 started_on field of queue_entries. this isn't guaranteed to work perfectly
774 since queue_entries may also have null started_on values.
775 * if the task has neither, or if use of time_started fails, just use the
776 last computed job ID.
MK Ryu0c1a37d2015-04-30 12:00:55 -0700777
778 @param queue_entries Host queue entries as a list of dictionaries.
779 @param special_tasks Special tasks as a list of dictionaries.
showardc0ac3a72009-07-08 21:14:45 +0000780 """
781 next_job_id = None # most recently computed next job
782 hqe_index = 0 # index for scanning by started_on times
783 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700784 if task['queue_entry']:
785 next_job_id = task['queue_entry']['job']
786 elif task['time_started'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000787 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700788 if queue_entry['started_on'] is None:
showardc0ac3a72009-07-08 21:14:45 +0000789 continue
MK Ryu0c1a37d2015-04-30 12:00:55 -0700790 t1 = time_utils.time_string_to_datetime(
791 queue_entry['started_on'])
792 t2 = time_utils.time_string_to_datetime(task['time_started'])
793 if t1 < t2:
showardc0ac3a72009-07-08 21:14:45 +0000794 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700795 next_job_id = queue_entry['job']['id']
showardc0ac3a72009-07-08 21:14:45 +0000796
MK Ryu0c1a37d2015-04-30 12:00:55 -0700797 task['next_job_id'] = next_job_id
showardc0ac3a72009-07-08 21:14:45 +0000798
799 # advance hqe_index to just after next_job_id
800 if next_job_id is not None:
801 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700802 if queue_entry['job']['id'] < next_job_id:
showardc0ac3a72009-07-08 21:14:45 +0000803 break
804 hqe_index += 1
805
806
807def interleave_entries(queue_entries, special_tasks):
808 """
809 Both lists should be ordered by descending ID.
810 """
811 _compute_next_job_for_tasks(queue_entries, special_tasks)
812
813 # start with all special tasks that've run since the last job
814 interleaved_entries = []
815 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700816 if task['next_job_id'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000817 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700818 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000819
820 # now interleave queue entries with the remaining special tasks
821 special_task_index = len(interleaved_entries)
822 for queue_entry in queue_entries:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700823 interleaved_entries.append(queue_entry)
showardc0ac3a72009-07-08 21:14:45 +0000824 # add all tasks that ran between this job and the previous one
825 for task in special_tasks[special_task_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700826 if task['next_job_id'] < queue_entry['job']['id']:
showardc0ac3a72009-07-08 21:14:45 +0000827 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700828 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000829 special_task_index += 1
830
831 return interleaved_entries
jamesren4a41e012010-07-16 22:33:48 +0000832
833
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800834def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
835 """Figure out which hosts are on which shards.
836
837 @param host_objs: A list of host objects.
838 @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
839 instead of the 'real' shard hostnames. This only matters for testing
840 environments.
841
842 @return: A map of shard hostname: list of hosts on the shard.
843 """
844 shard_host_map = {}
845 for host in host_objs:
846 if host.shard:
847 shard_name = (host.shard.rpc_hostname() if rpc_hostnames
848 else host.shard.hostname)
849 shard_host_map.setdefault(shard_name, []).append(host.hostname)
850 return shard_host_map
851
852
jamesren4a41e012010-07-16 22:33:48 +0000853def get_create_job_common_args(local_args):
854 """
855 Returns a dict containing only the args that apply for create_job_common
856
857 Returns a subset of local_args, which contains only the arguments that can
858 be passed in to create_job_common().
859 """
Alex Miller7d658cf2013-09-04 16:00:35 -0700860 # This code is only here to not kill suites scheduling tests when priority
861 # becomes an int instead of a string.
862 if isinstance(local_args['priority'], str):
863 local_args['priority'] = priorities.Priority.DEFAULT
864 # </migration hack>
jamesren4a41e012010-07-16 22:33:48 +0000865 arg_names, _, _, _ = inspect.getargspec(create_job_common)
866 return dict(item for item in local_args.iteritems() if item[0] in arg_names)
867
868
869def create_job_common(name, priority, control_type, control_file=None,
870 hosts=(), meta_hosts=(), one_time_hosts=(),
871 atomic_group_name=None, synch_count=None,
Simran Basi7e605742013-11-12 13:43:36 -0800872 is_template=False, timeout=None, timeout_mins=None,
873 max_runtime_mins=None, run_verify=True, email_list='',
874 dependencies=(), reboot_before=None, reboot_after=None,
jamesren4a41e012010-07-16 22:33:48 +0000875 parse_failed_repair=None, hostless=False, keyvals=None,
Aviv Keshet18308922013-02-19 17:49:49 -0800876 drone_set=None, parameterized_job=None,
Dan Shiec1d47d2015-02-13 11:38:13 -0800877 parent_job_id=None, test_retry=0, run_reset=True,
878 require_ssp=None):
Aviv Keshet18308922013-02-19 17:49:49 -0800879 #pylint: disable-msg=C0111
jamesren4a41e012010-07-16 22:33:48 +0000880 """
881 Common code between creating "standard" jobs and creating parameterized jobs
882 """
883 user = models.User.current_user()
884 owner = user.login
885
jamesren4a41e012010-07-16 22:33:48 +0000886 # input validation
887 if not (hosts or meta_hosts or one_time_hosts or atomic_group_name
888 or hostless):
889 raise model_logic.ValidationError({
890 'arguments' : "You must pass at least one of 'hosts', "
891 "'meta_hosts', 'one_time_hosts', "
892 "'atomic_group_name', or 'hostless'"
893 })
894
895 if hostless:
896 if hosts or meta_hosts or one_time_hosts or atomic_group_name:
897 raise model_logic.ValidationError({
898 'hostless': 'Hostless jobs cannot include any hosts!'})
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700899 server_type = control_data.CONTROL_TYPE_NAMES.SERVER
jamesren4a41e012010-07-16 22:33:48 +0000900 if control_type != server_type:
901 raise model_logic.ValidationError({
902 'control_type': 'Hostless jobs cannot use client-side '
903 'control files'})
904
Alex Miller871291b2013-08-08 01:19:20 -0700905 atomic_groups_by_name = dict((ag.name, ag)
jamesren4a41e012010-07-16 22:33:48 +0000906 for ag in models.AtomicGroup.objects.all())
Alex Miller871291b2013-08-08 01:19:20 -0700907 label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
jamesren4a41e012010-07-16 22:33:48 +0000908
909 # Schedule on an atomic group automagically if one of the labels given
910 # is an atomic group label and no explicit atomic_group_name was supplied.
911 if not atomic_group_name:
Alex Miller871291b2013-08-08 01:19:20 -0700912 for label in label_objects:
jamesren4a41e012010-07-16 22:33:48 +0000913 if label and label.atomic_group:
914 atomic_group_name = label.atomic_group.name
915 break
jamesren4a41e012010-07-16 22:33:48 +0000916 # convert hostnames & meta hosts to host/label objects
917 host_objects = models.Host.smart_get_bulk(hosts)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800918 if not server_utils.is_shard():
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800919 shard_host_map = bucket_hosts_by_shard(host_objects)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800920 num_shards = len(shard_host_map)
921 if (num_shards > 1 or (num_shards == 1 and
922 len(shard_host_map.values()[0]) != len(host_objects))):
923 # We disallow the following jobs on master:
924 # num_shards > 1: this is a job spanning across multiple shards.
925 # num_shards == 1 but number of hosts on shard is less
926 # than total number of hosts: this is a job that spans across
927 # one shard and the master.
928 raise ValueError(
929 'The following hosts are on shard(s), please create '
930 'seperate jobs for hosts on each shard: %s ' %
931 shard_host_map)
jamesren4a41e012010-07-16 22:33:48 +0000932 metahost_objects = []
Alex Miller871291b2013-08-08 01:19:20 -0700933 meta_host_labels_by_name = {label.name: label for label in label_objects}
jamesren4a41e012010-07-16 22:33:48 +0000934 for label_name in meta_hosts or []:
Alex Miller871291b2013-08-08 01:19:20 -0700935 if label_name in meta_host_labels_by_name:
936 metahost_objects.append(meta_host_labels_by_name[label_name])
jamesren4a41e012010-07-16 22:33:48 +0000937 elif label_name in atomic_groups_by_name:
938 # If given a metahost name that isn't a Label, check to
939 # see if the user was specifying an Atomic Group instead.
940 atomic_group = atomic_groups_by_name[label_name]
941 if atomic_group_name and atomic_group_name != atomic_group.name:
942 raise model_logic.ValidationError({
943 'meta_hosts': (
944 'Label "%s" not found. If assumed to be an '
945 'atomic group it would conflict with the '
946 'supplied atomic group "%s".' % (
947 label_name, atomic_group_name))})
948 atomic_group_name = atomic_group.name
949 else:
950 raise model_logic.ValidationError(
951 {'meta_hosts' : 'Label "%s" not found' % label_name})
952
953 # Create and sanity check an AtomicGroup object if requested.
954 if atomic_group_name:
955 if one_time_hosts:
956 raise model_logic.ValidationError(
957 {'one_time_hosts':
958 'One time hosts cannot be used with an Atomic Group.'})
959 atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
960 if synch_count and synch_count > atomic_group.max_number_of_machines:
961 raise model_logic.ValidationError(
962 {'atomic_group_name' :
963 'You have requested a synch_count (%d) greater than the '
964 'maximum machines in the requested Atomic Group (%d).' %
965 (synch_count, atomic_group.max_number_of_machines)})
966 else:
967 atomic_group = None
968
969 for host in one_time_hosts or []:
970 this_host = models.Host.create_one_time_host(host)
971 host_objects.append(this_host)
972
973 options = dict(name=name,
974 priority=priority,
975 control_file=control_file,
976 control_type=control_type,
977 is_template=is_template,
978 timeout=timeout,
Simran Basi7e605742013-11-12 13:43:36 -0800979 timeout_mins=timeout_mins,
Simran Basi34217022012-11-06 13:43:15 -0800980 max_runtime_mins=max_runtime_mins,
jamesren4a41e012010-07-16 22:33:48 +0000981 synch_count=synch_count,
982 run_verify=run_verify,
983 email_list=email_list,
984 dependencies=dependencies,
985 reboot_before=reboot_before,
986 reboot_after=reboot_after,
987 parse_failed_repair=parse_failed_repair,
988 keyvals=keyvals,
989 drone_set=drone_set,
Aviv Keshet18308922013-02-19 17:49:49 -0800990 parameterized_job=parameterized_job,
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800991 parent_job_id=parent_job_id,
Dan Shi07e09af2013-04-12 09:31:29 -0700992 test_retry=test_retry,
Dan Shiec1d47d2015-02-13 11:38:13 -0800993 run_reset=run_reset,
994 require_ssp=require_ssp)
jamesren4a41e012010-07-16 22:33:48 +0000995 return create_new_job(owner=owner,
996 options=options,
997 host_objects=host_objects,
998 metahost_objects=metahost_objects,
999 atomic_group=atomic_group)
Simran Basib6ec8ae2014-04-23 12:05:08 -07001000
1001
1002def encode_ascii(control_file):
1003 """Force a control file to only contain ascii characters.
1004
1005 @param control_file: Control file to encode.
1006
1007 @returns the control file in an ascii encoding.
1008
1009 @raises error.ControlFileMalformed: if encoding fails.
1010 """
1011 try:
1012 return control_file.encode('ascii')
1013 except UnicodeDecodeError as e:
Jiaxi Luo421608e2014-07-07 14:38:00 -07001014 raise error.ControlFileMalformed(str(e))
1015
1016
1017def get_wmatrix_url():
1018 """Get wmatrix url from config file.
1019
1020 @returns the wmatrix url or an empty string.
1021 """
1022 return global_config.global_config.get_config_value('AUTOTEST_WEB',
1023 'wmatrix_url',
Jiaxi Luo15cbf372014-07-01 19:20:20 -07001024 default='')
Jiaxi Luo57bc1952014-07-22 15:27:30 -07001025
1026
1027def inject_times_to_filter(start_time_key=None, end_time_key=None,
1028 start_time_value=None, end_time_value=None,
1029 **filter_data):
1030 """Inject the key value pairs of start and end time if provided.
1031
1032 @param start_time_key: A string represents the filter key of start_time.
1033 @param end_time_key: A string represents the filter key of end_time.
1034 @param start_time_value: Start_time value.
1035 @param end_time_value: End_time value.
1036
1037 @returns the injected filter_data.
1038 """
1039 if start_time_value:
1040 filter_data[start_time_key] = start_time_value
1041 if end_time_value:
1042 filter_data[end_time_key] = end_time_value
1043 return filter_data
1044
1045
1046def inject_times_to_hqe_special_tasks_filters(filter_data_common,
1047 start_time, end_time):
1048 """Inject start and end time to hqe and special tasks filters.
1049
1050 @param filter_data_common: Common filter for hqe and special tasks.
1051 @param start_time_key: A string represents the filter key of start_time.
1052 @param end_time_key: A string represents the filter key of end_time.
1053
1054 @returns a pair of hqe and special tasks filters.
1055 """
1056 filter_data_special_tasks = filter_data_common.copy()
1057 return (inject_times_to_filter('started_on__gte', 'started_on__lte',
1058 start_time, end_time, **filter_data_common),
1059 inject_times_to_filter('time_started__gte', 'time_started__lte',
1060 start_time, end_time,
Jakob Juelich59cfe542014-09-02 16:37:46 -07001061 **filter_data_special_tasks))
1062
1063
1064def retrieve_shard(shard_hostname):
1065 """
Jakob Juelich77457572014-09-22 17:02:43 -07001066 Retrieves the shard with the given hostname from the database.
Jakob Juelich59cfe542014-09-02 16:37:46 -07001067
1068 @param shard_hostname: Hostname of the shard to retrieve
1069
Jakob Juelich77457572014-09-22 17:02:43 -07001070 @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
1071
Jakob Juelich59cfe542014-09-02 16:37:46 -07001072 @returns: Shard object
1073 """
MK Ryu509516b2015-05-18 12:00:47 -07001074 timer = autotest_stats.Timer('shard_heartbeat.retrieve_shard')
1075 with timer:
1076 return models.Shard.smart_get(shard_hostname)
Jakob Juelich59cfe542014-09-02 16:37:46 -07001077
1078
Jakob Juelich1b525742014-09-30 13:08:07 -07001079def find_records_for_shard(shard, known_job_ids, known_host_ids):
Jakob Juelich59cfe542014-09-02 16:37:46 -07001080 """Find records that should be sent to a shard.
1081
Jakob Juelicha94efe62014-09-18 16:02:49 -07001082 @param shard: Shard to find records for.
Jakob Juelich1b525742014-09-30 13:08:07 -07001083 @param known_job_ids: List of ids of jobs the shard already has.
1084 @param known_host_ids: List of ids of hosts the shard already has.
Jakob Juelicha94efe62014-09-18 16:02:49 -07001085
Fang Dengf3705992014-12-16 17:32:18 -08001086 @returns: Tuple of three lists for hosts, jobs, and suite job keyvals:
1087 (hosts, jobs, suite_job_keyvals).
Jakob Juelich59cfe542014-09-02 16:37:46 -07001088 """
MK Ryu509516b2015-05-18 12:00:47 -07001089 timer = autotest_stats.Timer('shard_heartbeat')
1090 with timer.get_client('find_hosts'):
1091 hosts = models.Host.assign_to_shard(shard, known_host_ids)
1092 with timer.get_client('find_jobs'):
1093 jobs = models.Job.assign_to_shard(shard, known_job_ids)
1094 with timer.get_client('find_suite_job_keyvals'):
1095 parent_job_ids = [job.parent_job_id for job in jobs]
1096 suite_job_keyvals = models.JobKeyval.objects.filter(
1097 job_id__in=parent_job_ids)
Fang Dengf3705992014-12-16 17:32:18 -08001098 return hosts, jobs, suite_job_keyvals
Jakob Juelicha94efe62014-09-18 16:02:49 -07001099
1100
1101def _persist_records_with_type_sent_from_shard(
1102 shard, records, record_type, *args, **kwargs):
1103 """
1104 Handle records of a specified type that were sent to the shard master.
1105
1106 @param shard: The shard the records were sent from.
1107 @param records: The records sent in their serialized format.
1108 @param record_type: Type of the objects represented by records.
1109 @param args: Additional arguments that will be passed on to the sanity
1110 checks.
1111 @param kwargs: Additional arguments that will be passed on to the sanity
1112 checks.
1113
1114 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1115
1116 @returns: List of primary keys of the processed records.
1117 """
1118 pks = []
1119 for serialized_record in records:
1120 pk = serialized_record['id']
1121 try:
1122 current_record = record_type.objects.get(pk=pk)
1123 except record_type.DoesNotExist:
1124 raise error.UnallowedRecordsSentToMaster(
1125 'Object with pk %s of type %s does not exist on master.' % (
1126 pk, record_type))
1127
1128 current_record.sanity_check_update_from_shard(
1129 shard, serialized_record, *args, **kwargs)
1130
1131 current_record.update_from_serialized(serialized_record)
1132 pks.append(pk)
1133 return pks
1134
1135
1136def persist_records_sent_from_shard(shard, jobs, hqes):
1137 """
1138 Sanity checking then saving serialized records sent to master from shard.
1139
1140 During heartbeats shards upload jobs and hostqueuentries. This performs
1141 some sanity checks on these and then updates the existing records for those
1142 entries with the updated ones from the heartbeat.
1143
1144 The sanity checks include:
1145 - Checking if the objects sent already exist on the master.
1146 - Checking if the objects sent were assigned to this shard.
1147 - hostqueueentries must be sent together with their jobs.
1148
1149 @param shard: The shard the records were sent from.
1150 @param jobs: The jobs the shard sent.
1151 @param hqes: The hostqueuentries the shart sent.
1152
1153 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1154 """
MK Ryu509516b2015-05-18 12:00:47 -07001155 timer = autotest_stats.Timer('shard_heartbeat')
1156 with timer.get_client('persist_jobs'):
1157 job_ids_sent = _persist_records_with_type_sent_from_shard(
1158 shard, jobs, models.Job)
Jakob Juelicha94efe62014-09-18 16:02:49 -07001159
MK Ryu509516b2015-05-18 12:00:47 -07001160 with timer.get_client('persist_hqes'):
1161 _persist_records_with_type_sent_from_shard(
1162 shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001163
1164
Jakob Juelich50e91f72014-10-01 12:43:23 -07001165def forward_single_host_rpc_to_shard(func):
1166 """This decorator forwards rpc calls that modify a host to a shard.
1167
1168 If a host is assigned to a shard, rpcs that change his attributes should be
1169 forwarded to the shard.
1170
1171 This assumes the first argument of the function represents a host id.
1172
1173 @param func: The function to decorate
1174
1175 @returns: The function to replace func with.
1176 """
1177 def replacement(**kwargs):
1178 # Only keyword arguments can be accepted here, as we need the argument
1179 # names to send the rpc. serviceHandler always provides arguments with
1180 # their keywords, so this is not a problem.
1181 host = models.Host.smart_get(kwargs['id'])
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -08001182 if host.shard and not server_utils.is_shard():
MK Ryu26f0c932015-05-28 18:14:33 -07001183 run_rpc_on_multiple_hostnames(func.func_name,
1184 [host.shard.rpc_hostname()],
Jakob Juelich50e91f72014-10-01 12:43:23 -07001185 **kwargs)
1186 return func(**kwargs)
1187
1188 return replacement
1189
1190
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001191def forward_multi_host_rpc_to_shards(func):
1192 """This decorator forwards rpc calls that modify multiple hosts.
1193
1194 If a host is assigned to a shard, rpcs that change his attributes should be
1195 forwarded to the shard. Some calls however, take a list of hosts and a
1196 single id to modify, eg: label_add_hosts. This wrapper will sift through
1197 the list of hosts, find each of their shards, and forward the rpc for
1198 those hosts to that shard before calling the local version of the given rpc.
1199
1200 This assumes:
1201 1. The rpc call uses `smart_get` to retrieve host objects, not the
1202 stock django `get` call. This is true for most, if not all rpcs in
1203 the rpc_interface.
1204 2. The kwargs to the function contain either a list of host ids or
1205 hostnames, keyed under 'hosts'. This is true for all the rpc
1206 functions that use 'smart_get'.
1207
1208 @param func: The function to decorate
1209
1210 @returns: The function to replace func with.
1211 """
1212 def replacement(**kwargs):
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001213 fanout_rpc(
1214 models.Host.smart_get_bulk(kwargs['hosts']),
1215 func.func_name, **kwargs)
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001216 return func(**kwargs)
1217
1218 return replacement
1219
1220
MK Ryufb5e3a82015-07-01 12:21:20 -07001221def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
1222 """Fanout the given rpc to shards of given hosts.
1223
1224 @param host_objs: Host objects for the rpc.
1225 @param rpc_name: The name of the rpc.
1226 @param include_hostnames: If True, include the hostnames in the kwargs.
1227 Hostnames are not always necessary, this functions is designed to
1228 send rpcs to the shard a host is on, the rpcs themselves could be
1229 related to labels, acls etc.
1230 @param kwargs: The kwargs for the rpc.
1231 """
1232 # Figure out which hosts are on which shards.
1233 shard_host_map = bucket_hosts_by_shard(
1234 host_objs, rpc_hostnames=True)
1235
1236 # Execute the rpc against the appropriate shards.
1237 for shard, hostnames in shard_host_map.iteritems():
1238 if include_hostnames:
1239 kwargs['hosts'] = hostnames
1240 try:
1241 run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
1242 except:
1243 ei = sys.exc_info()
1244 new_exc = error.RPCException('RPC %s failed on shard %s due to '
1245 '%s: %s' % (rpc_name, shard, ei[0].__name__, ei[1]))
1246 raise new_exc.__class__, new_exc, ei[2]
1247
1248
Jakob Juelich50e91f72014-10-01 12:43:23 -07001249def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1250 """Runs an rpc to multiple AFEs
1251
1252 This is i.e. used to propagate changes made to hosts after they are assigned
1253 to a shard.
1254
1255 @param rpc_call: Name of the rpc endpoint to call.
1256 @param shard_hostnames: List of hostnames to run the rpcs on.
1257 @param **kwargs: Keyword arguments to pass in the rpcs.
1258 """
MK Ryufb5e3a82015-07-01 12:21:20 -07001259 # Make sure this function is not called on shards but only on master.
1260 assert not server_utils.is_shard()
Jakob Juelich50e91f72014-10-01 12:43:23 -07001261 for shard_hostname in shard_hostnames:
MK Ryu0a9c82e2015-09-17 17:54:01 -07001262 afe = frontend_wrappers.RetryingAFE(server=shard_hostname,
1263 user=thread_local.get_user())
Jakob Juelich50e91f72014-10-01 12:43:23 -07001264 afe.run(rpc_call, **kwargs)
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001265
1266
1267def get_label(name):
1268 """Gets a label object using a given name.
1269
1270 @param name: Label name.
1271 @raises model.Label.DoesNotExist: when there is no label matching
1272 the given name.
1273 @return: a label object matching the given name.
1274 """
1275 try:
1276 label = models.Label.smart_get(name)
1277 except models.Label.DoesNotExist:
1278 return None
1279 return label
1280
1281
1282def get_global_afe_hostname():
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001283 """Read the hostname of the global AFE from the global configuration."""
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001284 return global_config.global_config.get_config_value(
MK Ryub1dc8242015-08-27 12:11:12 -07001285 'SERVER', 'global_afe_hostname')
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001286
1287
MK Ryufbb002c2015-06-08 14:13:16 -07001288def route_rpc_to_master(func):
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001289 """Route RPC to master AFE.
MK Ryu2d107562015-02-24 17:45:02 -08001290
MK Ryu6f5eadb2015-09-04 10:50:47 -07001291 When a shard receives an RPC decorated by this, the RPC is just
1292 forwarded to the master.
1293 When the master gets the RPC, the RPC function is executed.
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001294
MK Ryu6f5eadb2015-09-04 10:50:47 -07001295 @param func: An RPC function to decorate
1296
1297 @returns: A function replacing the RPC func.
MK Ryu2d107562015-02-24 17:45:02 -08001298 """
MK Ryufbb002c2015-06-08 14:13:16 -07001299 @wraps(func)
MK Ryuf6ab8a72015-07-06 10:19:48 -07001300 def replacement(*args, **kwargs):
MK Ryu6f5eadb2015-09-04 10:50:47 -07001301 """
1302 We need a special care when decorating an RPC that can be called
1303 directly using positional arguments. One example is
1304 rpc_interface.create_job().
1305 rpc_interface.create_job_page_handler() calls the function using
1306 positional and keyword arguments.
1307 Since frontend.RpcClient.run() takes only keyword arguments for
1308 an RPC, positional arguments of the RPC function need to be
1309 transformed to key-value pair (dictionary type).
1310
1311 inspect.getcallargs() is a useful utility to achieve the goal,
1312 however, we need an additional effort when an RPC function has
1313 **kwargs argument.
1314 Let's say we have a following form of RPC function.
1315
1316 def rpcfunc(a, b, **kwargs)
1317
1318 When we call the function like "rpcfunc(1, 2, id=3, name='mk')",
1319 inspect.getcallargs() returns a dictionary like below.
1320
1321 {'a':1, 'b':2, 'kwargs': {'id':3, 'name':'mk'}}
1322
1323 This is an incorrect form of arguments to pass to the rpc function.
1324 Instead, the dictionary should be like this.
1325
1326 {'a':1, 'b':2, 'id':3, 'name':'mk'}
1327 """
1328 argspec = inspect.getargspec(func)
1329 if argspec.varargs is not None:
1330 raise Exception('RPC function must not have *args.')
1331 funcargs = inspect.getcallargs(func, *args, **kwargs)
1332 kwargs = dict()
1333 for k, v in funcargs.iteritems():
1334 if argspec.keywords and k in argspec.keywords:
1335 kwargs.update(v)
1336 else:
1337 kwargs[k] = v
1338
MK Ryufbb002c2015-06-08 14:13:16 -07001339 if server_utils.is_shard():
MK Ryu9651ca52015-06-08 17:48:22 -07001340 afe = frontend_wrappers.RetryingAFE(
MK Ryu0a9c82e2015-09-17 17:54:01 -07001341 server=get_global_afe_hostname(),
1342 user=thread_local.get_user())
MK Ryu9651ca52015-06-08 17:48:22 -07001343 return afe.run(func.func_name, **kwargs)
MK Ryufbb002c2015-06-08 14:13:16 -07001344 return func(**kwargs)
1345 return replacement