blob: 95206bb45c22a1b5c9dc9fab503ea93639f67ce1 [file] [log] [blame]
Aviv Keshet18308922013-02-19 17:49:49 -08001#pylint: disable-msg=C0111
xixuanba232a32016-08-25 17:01:59 -07002"""
mblighe8819cd2008-02-15 16:48:40 +00003Utility functions for rpc_interface.py. We keep them in a separate file so that
4only RPC interface functions go into that file.
5"""
6
7__author__ = 'showard@google.com (Steve Howard)'
8
MK Ryu84573e12015-02-18 15:54:09 -08009import datetime
MK Ryufbb002c2015-06-08 14:13:16 -070010from functools import wraps
MK Ryu84573e12015-02-18 15:54:09 -080011import inspect
12import os
13import sys
Fang Deng7051fe42015-10-20 14:57:28 -070014import django.db.utils
showard3d6ae112009-05-02 00:45:48 +000015import django.http
MK Ryu0a9c82e2015-09-17 17:54:01 -070016
17from autotest_lib.frontend import thread_local
Dan Shi07e09af2013-04-12 09:31:29 -070018from autotest_lib.frontend.afe import models, model_logic
Alex Miller4a193692013-08-21 13:59:01 -070019from autotest_lib.client.common_lib import control_data, error
Jiaxi Luo421608e2014-07-07 14:38:00 -070020from autotest_lib.client.common_lib import global_config, priorities
MK Ryu0c1a37d2015-04-30 12:00:55 -070021from autotest_lib.client.common_lib import time_utils
MK Ryu509516b2015-05-18 12:00:47 -070022from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080023from autotest_lib.server import utils as server_utils
MK Ryu9651ca52015-06-08 17:48:22 -070024from autotest_lib.server.cros import provision
25from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
mblighe8819cd2008-02-15 16:48:40 +000026
showarda62866b2008-07-28 21:27:41 +000027NULL_DATETIME = datetime.datetime.max
28NULL_DATE = datetime.date.max
Fang Deng7051fe42015-10-20 14:57:28 -070029DUPLICATE_KEY_MSG = 'Duplicate entry'
showarda62866b2008-07-28 21:27:41 +000030
mblighe8819cd2008-02-15 16:48:40 +000031def prepare_for_serialization(objects):
jadmanski0afbb632008-06-06 21:10:57 +000032 """
33 Prepare Python objects to be returned via RPC.
Aviv Keshet18308922013-02-19 17:49:49 -080034 @param objects: objects to be prepared.
jadmanski0afbb632008-06-06 21:10:57 +000035 """
36 if (isinstance(objects, list) and len(objects) and
37 isinstance(objects[0], dict) and 'id' in objects[0]):
38 objects = gather_unique_dicts(objects)
39 return _prepare_data(objects)
showardb8d34242008-04-25 18:11:16 +000040
41
showardc92da832009-04-07 18:14:34 +000042def prepare_rows_as_nested_dicts(query, nested_dict_column_names):
43 """
44 Prepare a Django query to be returned via RPC as a sequence of nested
45 dictionaries.
46
47 @param query - A Django model query object with a select_related() method.
48 @param nested_dict_column_names - A list of column/attribute names for the
49 rows returned by query to expand into nested dictionaries using
50 their get_object_dict() method when not None.
51
52 @returns An list suitable to returned in an RPC.
53 """
54 all_dicts = []
55 for row in query.select_related():
56 row_dict = row.get_object_dict()
57 for column in nested_dict_column_names:
58 if row_dict[column] is not None:
59 row_dict[column] = getattr(row, column).get_object_dict()
60 all_dicts.append(row_dict)
61 return prepare_for_serialization(all_dicts)
62
63
showardb8d34242008-04-25 18:11:16 +000064def _prepare_data(data):
jadmanski0afbb632008-06-06 21:10:57 +000065 """
66 Recursively process data structures, performing necessary type
67 conversions to values in data to allow for RPC serialization:
68 -convert datetimes to strings
showard2b9a88b2008-06-13 20:55:03 +000069 -convert tuples and sets to lists
jadmanski0afbb632008-06-06 21:10:57 +000070 """
71 if isinstance(data, dict):
72 new_data = {}
73 for key, value in data.iteritems():
74 new_data[key] = _prepare_data(value)
75 return new_data
showard2b9a88b2008-06-13 20:55:03 +000076 elif (isinstance(data, list) or isinstance(data, tuple) or
77 isinstance(data, set)):
jadmanski0afbb632008-06-06 21:10:57 +000078 return [_prepare_data(item) for item in data]
showard98659972008-07-17 17:00:07 +000079 elif isinstance(data, datetime.date):
showarda62866b2008-07-28 21:27:41 +000080 if data is NULL_DATETIME or data is NULL_DATE:
81 return None
jadmanski0afbb632008-06-06 21:10:57 +000082 return str(data)
83 else:
84 return data
mblighe8819cd2008-02-15 16:48:40 +000085
86
Moises Osorio2dda22e2014-09-16 15:56:24 -070087def fetchall_as_list_of_dicts(cursor):
88 """
89 Converts each row in the cursor to a dictionary so that values can be read
90 by using the column name.
91 @param cursor: The database cursor to read from.
92 @returns: A list of each row in the cursor as a dictionary.
93 """
94 desc = cursor.description
95 return [ dict(zip([col[0] for col in desc], row))
96 for row in cursor.fetchall() ]
97
98
showard3d6ae112009-05-02 00:45:48 +000099def raw_http_response(response_data, content_type=None):
100 response = django.http.HttpResponse(response_data, mimetype=content_type)
101 response['Content-length'] = str(len(response.content))
102 return response
103
104
showardb0dfb9f2008-06-06 18:08:02 +0000105def gather_unique_dicts(dict_iterable):
jadmanski0afbb632008-06-06 21:10:57 +0000106 """\
107 Pick out unique objects (by ID) from an iterable of object dicts.
108 """
109 id_set = set()
110 result = []
111 for obj in dict_iterable:
112 if obj['id'] not in id_set:
113 id_set.add(obj['id'])
114 result.append(obj)
115 return result
showardb0dfb9f2008-06-06 18:08:02 +0000116
117
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700118def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
jadmanski0afbb632008-06-06 21:10:57 +0000119 """\
120 Generate a SQL WHERE clause for job status filtering, and return it in
Simran Basi01984f52015-10-12 15:36:45 -0700121 a dict of keyword args to pass to query.extra().
showard6c65d252009-10-01 18:45:22 +0000122 * not_yet_run: all HQEs are Queued
123 * finished: all HQEs are complete
124 * running: everything else
jadmanski0afbb632008-06-06 21:10:57 +0000125 """
Simran Basi01984f52015-10-12 15:36:45 -0700126 if not (not_yet_run or running or finished):
127 return {}
showardeab66ce2009-12-23 00:03:56 +0000128 not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
129 'WHERE status != "%s")'
showard6c65d252009-10-01 18:45:22 +0000130 % models.HostQueueEntry.Status.QUEUED)
showardeab66ce2009-12-23 00:03:56 +0000131 not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
132 'WHERE not complete)')
showard6c65d252009-10-01 18:45:22 +0000133
Simran Basi01984f52015-10-12 15:36:45 -0700134 where = []
jadmanski0afbb632008-06-06 21:10:57 +0000135 if not_yet_run:
Simran Basi01984f52015-10-12 15:36:45 -0700136 where.append('id NOT IN ' + not_queued)
137 if running:
138 where.append('(id IN %s) AND (id IN %s)' % (not_queued, not_finished))
139 if finished:
140 where.append('id NOT IN ' + not_finished)
141 return {'where': [' OR '.join(['(%s)' % x for x in where])]}
mblighe8819cd2008-02-15 16:48:40 +0000142
143
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700144def extra_job_type_filters(extra_args, suite=False,
145 sub=False, standalone=False):
146 """\
147 Generate a SQL WHERE clause for job status filtering, and return it in
148 a dict of keyword args to pass to query.extra().
149
150 param extra_args: a dict of existing extra_args.
151
152 No more than one of the parameters should be passed as True:
153 * suite: job which is parent of other jobs
154 * sub: job with a parent job
155 * standalone: job with no child or parent jobs
156 """
157 assert not ((suite and sub) or
158 (suite and standalone) or
159 (sub and standalone)), ('Cannot specify more than one '
160 'filter to this function')
161
162 where = extra_args.get('where', [])
163 parent_job_id = ('DISTINCT parent_job_id')
164 child_job_id = ('id')
165 filter_common = ('(SELECT %s FROM afe_jobs '
166 'WHERE parent_job_id IS NOT NULL)')
167
168 if suite:
169 where.append('id IN ' + filter_common % parent_job_id)
170 elif sub:
171 where.append('id IN ' + filter_common % child_job_id)
172 elif standalone:
173 where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
174 'WHERE parent_job_id IS NOT NULL'
175 ' AND (sub_query.parent_job_id=afe_jobs.id'
176 ' OR sub_query.id=afe_jobs.id))')
177 else:
178 return extra_args
179
180 extra_args['where'] = where
181 return extra_args
182
183
184
showard87cc38f2009-08-20 23:37:04 +0000185def extra_host_filters(multiple_labels=()):
jadmanski0afbb632008-06-06 21:10:57 +0000186 """\
187 Generate SQL WHERE clauses for matching hosts in an intersection of
188 labels.
189 """
190 extra_args = {}
showardeab66ce2009-12-23 00:03:56 +0000191 where_str = ('afe_hosts.id in (select host_id from afe_hosts_labels '
jadmanski0afbb632008-06-06 21:10:57 +0000192 'where label_id=%s)')
193 extra_args['where'] = [where_str] * len(multiple_labels)
194 extra_args['params'] = [models.Label.smart_get(label).id
195 for label in multiple_labels]
196 return extra_args
showard8e3aa5e2008-04-08 19:42:32 +0000197
198
showard87cc38f2009-08-20 23:37:04 +0000199def get_host_query(multiple_labels, exclude_only_if_needed_labels,
showard8aa84fc2009-09-16 17:17:55 +0000200 exclude_atomic_group_hosts, valid_only, filter_data):
201 if valid_only:
202 query = models.Host.valid_objects.all()
203 else:
204 query = models.Host.objects.all()
205
showard43a3d262008-11-12 18:17:05 +0000206 if exclude_only_if_needed_labels:
207 only_if_needed_labels = models.Label.valid_objects.filter(
208 only_if_needed=True)
showardf7eac6f2008-11-13 21:18:01 +0000209 if only_if_needed_labels.count() > 0:
showard87cc38f2009-08-20 23:37:04 +0000210 only_if_needed_ids = ','.join(
211 str(label['id'])
212 for label in only_if_needed_labels.values('id'))
showardf7eac6f2008-11-13 21:18:01 +0000213 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000214 query, 'afe_hosts_labels', join_key='host_id',
215 join_condition=('afe_hosts_labels_exclude_OIN.label_id IN (%s)'
showard87cc38f2009-08-20 23:37:04 +0000216 % only_if_needed_ids),
217 suffix='_exclude_OIN', exclude=True)
showard8aa84fc2009-09-16 17:17:55 +0000218
showard87cc38f2009-08-20 23:37:04 +0000219 if exclude_atomic_group_hosts:
220 atomic_group_labels = models.Label.valid_objects.filter(
221 atomic_group__isnull=False)
222 if atomic_group_labels.count() > 0:
223 atomic_group_label_ids = ','.join(
224 str(atomic_group['id'])
225 for atomic_group in atomic_group_labels.values('id'))
226 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000227 query, 'afe_hosts_labels', join_key='host_id',
228 join_condition=(
229 'afe_hosts_labels_exclude_AG.label_id IN (%s)'
230 % atomic_group_label_ids),
showard87cc38f2009-08-20 23:37:04 +0000231 suffix='_exclude_AG', exclude=True)
Fang Deng04d30612013-04-10 18:13:13 -0700232 try:
233 assert 'extra_args' not in filter_data
234 filter_data['extra_args'] = extra_host_filters(multiple_labels)
235 return models.Host.query_objects(filter_data, initial_query=query)
236 except models.Label.DoesNotExist as e:
237 return models.Host.objects.none()
showard43a3d262008-11-12 18:17:05 +0000238
239
showard8fd58242008-03-10 21:29:07 +0000240class InconsistencyException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000241 'Raised when a list of objects does not have a consistent value'
showard8fd58242008-03-10 21:29:07 +0000242
243
244def get_consistent_value(objects, field):
mblighc5ddfd12008-08-04 17:15:00 +0000245 if not objects:
246 # well a list of nothing is consistent
247 return None
248
jadmanski0afbb632008-06-06 21:10:57 +0000249 value = getattr(objects[0], field)
250 for obj in objects:
251 this_value = getattr(obj, field)
252 if this_value != value:
253 raise InconsistencyException(objects[0], obj)
254 return value
showard8fd58242008-03-10 21:29:07 +0000255
256
Matthew Sartori10438092015-06-24 14:30:18 -0700257def afe_test_dict_to_test_object(test_dict):
258 if not isinstance(test_dict, dict):
259 return test_dict
260
261 numerized_dict = {}
262 for key, value in test_dict.iteritems():
263 try:
264 numerized_dict[key] = int(value)
265 except (ValueError, TypeError):
266 numerized_dict[key] = value
267
268 return type('TestObject', (object,), numerized_dict)
269
270
Michael Tang84a2ecf2016-06-07 15:10:53 -0700271def _check_is_server_test(test_type):
272 """Checks if the test type is a server test.
273
274 @param test_type The test type in enum integer or string.
275
276 @returns A boolean to identify if the test type is server test.
277 """
278 if test_type is not None:
279 if isinstance(test_type, basestring):
280 try:
281 test_type = control_data.CONTROL_TYPE.get_value(test_type)
282 except AttributeError:
283 return False
284 return (test_type == control_data.CONTROL_TYPE.SERVER)
285 return False
286
287
Richard Barnette8e33b4e2016-05-21 12:12:26 -0700288def prepare_generate_control_file(tests, profilers, db_tests=True):
Matthew Sartori10438092015-06-24 14:30:18 -0700289 if db_tests:
290 test_objects = [models.Test.smart_get(test) for test in tests]
291 else:
292 test_objects = [afe_test_dict_to_test_object(test) for test in tests]
293
showard2b9a88b2008-06-13 20:55:03 +0000294 profiler_objects = [models.Profiler.smart_get(profiler)
295 for profiler in profilers]
jadmanski0afbb632008-06-06 21:10:57 +0000296 # ensure tests are all the same type
297 try:
298 test_type = get_consistent_value(test_objects, 'test_type')
299 except InconsistencyException, exc:
300 test1, test2 = exc.args
mblighec5546d2008-06-16 16:51:28 +0000301 raise model_logic.ValidationError(
Matthew Sartori10438092015-06-24 14:30:18 -0700302 {'tests' : 'You cannot run both test_suites and server-side '
jadmanski0afbb632008-06-06 21:10:57 +0000303 'tests together (tests %s and %s differ' % (
304 test1.name, test2.name)})
showard8fd58242008-03-10 21:29:07 +0000305
Michael Tang84a2ecf2016-06-07 15:10:53 -0700306 is_server = _check_is_server_test(test_type)
showard14374b12009-01-31 00:11:54 +0000307 if test_objects:
308 synch_count = max(test.sync_count for test in test_objects)
309 else:
310 synch_count = 1
mblighe8819cd2008-02-15 16:48:40 +0000311
Matthew Sartori10438092015-06-24 14:30:18 -0700312 if db_tests:
313 dependencies = set(label.name for label
314 in models.Label.objects.filter(test__in=test_objects))
315 else:
316 dependencies = reduce(
317 set.union, [set(test.dependencies) for test in test_objects])
showard989f25d2008-10-01 11:38:11 +0000318
showard2bab8f42008-11-12 18:15:22 +0000319 cf_info = dict(is_server=is_server, synch_count=synch_count,
320 dependencies=list(dependencies))
Richard Barnette8e33b4e2016-05-21 12:12:26 -0700321 return cf_info, test_objects, profiler_objects
showard989f25d2008-10-01 11:38:11 +0000322
323
324def check_job_dependencies(host_objects, job_dependencies):
325 """
326 Check that a set of machines satisfies a job's dependencies.
327 host_objects: list of models.Host objects
328 job_dependencies: list of names of labels
329 """
330 # check that hosts satisfy dependencies
331 host_ids = [host.id for host in host_objects]
332 hosts_in_job = models.Host.objects.filter(id__in=host_ids)
333 ok_hosts = hosts_in_job
334 for index, dependency in enumerate(job_dependencies):
Alex Milleraa772002014-04-10 17:51:21 -0700335 if not provision.is_for_special_action(dependency):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700336 ok_hosts = ok_hosts.filter(labels__name=dependency)
showard989f25d2008-10-01 11:38:11 +0000337 failing_hosts = (set(host.hostname for host in host_objects) -
338 set(host.hostname for host in ok_hosts))
339 if failing_hosts:
340 raise model_logic.ValidationError(
Eric Lie0493a42010-11-15 13:05:43 -0800341 {'hosts' : 'Host(s) failed to meet job dependencies (' +
342 (', '.join(job_dependencies)) + '): ' +
343 (', '.join(failing_hosts))})
344
showard989f25d2008-10-01 11:38:11 +0000345
Alex Miller4a193692013-08-21 13:59:01 -0700346def check_job_metahost_dependencies(metahost_objects, job_dependencies):
347 """
348 Check that at least one machine within the metahost spec satisfies the job's
349 dependencies.
350
351 @param metahost_objects A list of label objects representing the metahosts.
352 @param job_dependencies A list of strings of the required label names.
353 @raises NoEligibleHostException If a metahost cannot run the job.
354 """
355 for metahost in metahost_objects:
356 hosts = models.Host.objects.filter(labels=metahost)
357 for label_name in job_dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700358 if not provision.is_for_special_action(label_name):
Alex Miller4a193692013-08-21 13:59:01 -0700359 hosts = hosts.filter(labels__name=label_name)
360 if not any(hosts):
361 raise error.NoEligibleHostException("No hosts within %s satisfy %s."
362 % (metahost.name, ', '.join(job_dependencies)))
363
showard2bab8f42008-11-12 18:15:22 +0000364
365def _execution_key_for(host_queue_entry):
366 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
367
368
369def check_abort_synchronous_jobs(host_queue_entries):
370 # ensure user isn't aborting part of a synchronous autoserv execution
371 count_per_execution = {}
372 for queue_entry in host_queue_entries:
373 key = _execution_key_for(queue_entry)
374 count_per_execution.setdefault(key, 0)
375 count_per_execution[key] += 1
376
377 for queue_entry in host_queue_entries:
378 if not queue_entry.execution_subdir:
379 continue
380 execution_count = count_per_execution[_execution_key_for(queue_entry)]
381 if execution_count < queue_entry.job.synch_count:
mbligh1ef218d2009-08-03 16:57:56 +0000382 raise model_logic.ValidationError(
383 {'' : 'You cannot abort part of a synchronous job execution '
384 '(%d/%s), %d included, %d expected'
385 % (queue_entry.job.id, queue_entry.execution_subdir,
386 execution_count, queue_entry.job.synch_count)})
showard8fbae652009-01-20 23:23:10 +0000387
388
showardc92da832009-04-07 18:14:34 +0000389def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700390 dependencies, atomic_group):
showardc92da832009-04-07 18:14:34 +0000391 """
392 Attempt to reject create_job requests with an atomic group that
393 will be impossible to schedule. The checks are not perfect but
394 should catch the most obvious issues.
395
396 @param synch_count - The job's minimum synch count.
397 @param host_objects - A list of models.Host instances.
398 @param metahost_objects - A list of models.Label instances.
399 @param dependencies - A list of job dependency label names.
showardc92da832009-04-07 18:14:34 +0000400 @param labels_by_name - A dictionary mapping label names to models.Label
401 instance. Used to look up instances for dependencies.
402
403 @raises model_logic.ValidationError - When an issue is found.
404 """
405 # If specific host objects were supplied with an atomic group, verify
406 # that there are enough to satisfy the synch_count.
407 minimum_required = synch_count or 1
408 if (host_objects and not metahost_objects and
409 len(host_objects) < minimum_required):
410 raise model_logic.ValidationError(
411 {'hosts':
412 'only %d hosts provided for job with synch_count = %d' %
413 (len(host_objects), synch_count)})
414
415 # Check that the atomic group has a hope of running this job
416 # given any supplied metahosts and dependancies that may limit.
417
418 # Get a set of hostnames in the atomic group.
419 possible_hosts = set()
420 for label in atomic_group.label_set.all():
421 possible_hosts.update(h.hostname for h in label.host_set.all())
422
423 # Filter out hosts that don't match all of the job dependency labels.
Alex Miller871291b2013-08-08 01:19:20 -0700424 for label in models.Label.objects.filter(name__in=dependencies):
showardc92da832009-04-07 18:14:34 +0000425 hosts_in_label = (h.hostname for h in label.host_set.all())
426 possible_hosts.intersection_update(hosts_in_label)
427
showard225bdc12009-04-13 16:09:21 +0000428 if not host_objects and not metahost_objects:
429 # No hosts or metahosts are required to queue an atomic group Job.
430 # However, if they are given, we respect them below.
431 host_set = possible_hosts
432 else:
433 host_set = set(host.hostname for host in host_objects)
434 unusable_host_set = host_set.difference(possible_hosts)
435 if unusable_host_set:
436 raise model_logic.ValidationError(
437 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %
438 (', '.join(sorted(unusable_host_set)), atomic_group.name)})
showardc92da832009-04-07 18:14:34 +0000439
440 # Lookup hosts provided by each meta host and merge them into the
441 # host_set for final counting.
442 for meta_host in metahost_objects:
443 meta_possible = possible_hosts.copy()
444 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())
445 meta_possible.intersection_update(hosts_in_meta_host)
446
447 # Count all hosts that this meta_host will provide.
448 host_set.update(meta_possible)
449
450 if len(host_set) < minimum_required:
451 raise model_logic.ValidationError(
452 {'atomic_group_name':
453 'Insufficient hosts in Atomic Group "%s" with the'
454 ' supplied dependencies and meta_hosts.' %
455 (atomic_group.name,)})
456
457
showardbe0d8692009-08-20 23:42:44 +0000458def check_modify_host(update_data):
459 """
460 Sanity check modify_host* requests.
461
462 @param update_data: A dictionary with the changes to make to a host
463 or hosts.
464 """
465 # Only the scheduler (monitor_db) is allowed to modify Host status.
466 # Otherwise race conditions happen as a hosts state is changed out from
467 # beneath tasks being run on a host.
468 if 'status' in update_data:
469 raise model_logic.ValidationError({
470 'status': 'Host status can not be modified by the frontend.'})
471
472
showardce7c0922009-09-11 18:39:24 +0000473def check_modify_host_locking(host, update_data):
474 """
475 Checks when locking/unlocking has been requested if the host is already
476 locked/unlocked.
477
478 @param host: models.Host object to be modified
479 @param update_data: A dictionary with the changes to make to the host.
480 """
481 locked = update_data.get('locked', None)
Matthew Sartori68186332015-04-27 17:19:53 -0700482 lock_reason = update_data.get('lock_reason', None)
showardce7c0922009-09-11 18:39:24 +0000483 if locked is not None:
484 if locked and host.locked:
485 raise model_logic.ValidationError({
Shuqian Zhao4c0d2902016-01-12 17:03:15 -0800486 'locked': 'Host %s already locked by %s on %s.' %
487 (host.hostname, host.locked_by, host.lock_time)})
showardce7c0922009-09-11 18:39:24 +0000488 if not locked and not host.locked:
489 raise model_logic.ValidationError({
Shuqian Zhao4c0d2902016-01-12 17:03:15 -0800490 'locked': 'Host %s already unlocked.' % host.hostname})
Matthew Sartori68186332015-04-27 17:19:53 -0700491 if locked and not lock_reason and not host.locked:
492 raise model_logic.ValidationError({
Shuqian Zhao4c0d2902016-01-12 17:03:15 -0800493 'locked': 'Please provide a reason for locking Host %s' %
494 host.hostname})
showardce7c0922009-09-11 18:39:24 +0000495
496
showard8fbae652009-01-20 23:23:10 +0000497def get_motd():
498 dirname = os.path.dirname(__file__)
499 filename = os.path.join(dirname, "..", "..", "motd.txt")
500 text = ''
501 try:
502 fp = open(filename, "r")
503 try:
504 text = fp.read()
505 finally:
506 fp.close()
507 except:
508 pass
509
510 return text
showard29f7cd22009-04-29 21:16:24 +0000511
512
513def _get_metahost_counts(metahost_objects):
514 metahost_counts = {}
515 for metahost in metahost_objects:
516 metahost_counts.setdefault(metahost, 0)
517 metahost_counts[metahost] += 1
518 return metahost_counts
519
520
showarda965cef2009-05-15 23:17:41 +0000521def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
showard29f7cd22009-04-29 21:16:24 +0000522 hosts = []
523 one_time_hosts = []
524 meta_hosts = []
525 atomic_group = None
jamesren2275ef12010-04-12 18:25:06 +0000526 hostless = False
showard29f7cd22009-04-29 21:16:24 +0000527
showard4d077562009-05-08 18:24:36 +0000528 queue_entries = job.hostqueueentry_set.all()
showarda965cef2009-05-15 23:17:41 +0000529 if queue_entry_filter_data:
530 queue_entries = models.HostQueueEntry.query_objects(
531 queue_entry_filter_data, initial_query=queue_entries)
showard4d077562009-05-08 18:24:36 +0000532
533 for queue_entry in queue_entries:
showard29f7cd22009-04-29 21:16:24 +0000534 if (queue_entry.host and (preserve_metahosts or
535 not queue_entry.meta_host)):
536 if queue_entry.deleted:
537 continue
538 if queue_entry.host.invalid:
539 one_time_hosts.append(queue_entry.host)
540 else:
541 hosts.append(queue_entry.host)
jamesren2275ef12010-04-12 18:25:06 +0000542 elif queue_entry.meta_host:
showard29f7cd22009-04-29 21:16:24 +0000543 meta_hosts.append(queue_entry.meta_host)
jamesren2275ef12010-04-12 18:25:06 +0000544 else:
545 hostless = True
546
showard29f7cd22009-04-29 21:16:24 +0000547 if atomic_group is None:
548 if queue_entry.atomic_group is not None:
549 atomic_group = queue_entry.atomic_group
550 else:
551 assert atomic_group.name == queue_entry.atomic_group.name, (
552 'DB inconsistency. HostQueueEntries with multiple atomic'
553 ' groups on job %s: %s != %s' % (
554 id, atomic_group.name, queue_entry.atomic_group.name))
555
556 meta_host_counts = _get_metahost_counts(meta_hosts)
557
558 info = dict(dependencies=[label.name for label
559 in job.dependency_labels.all()],
560 hosts=hosts,
561 meta_hosts=meta_hosts,
562 meta_host_counts=meta_host_counts,
563 one_time_hosts=one_time_hosts,
jamesren2275ef12010-04-12 18:25:06 +0000564 atomic_group=atomic_group,
565 hostless=hostless)
showard29f7cd22009-04-29 21:16:24 +0000566 return info
567
568
showard09d80f92009-11-19 01:01:19 +0000569def check_for_duplicate_hosts(host_objects):
570 host_ids = set()
571 duplicate_hostnames = set()
572 for host in host_objects:
573 if host.id in host_ids:
574 duplicate_hostnames.add(host.hostname)
575 host_ids.add(host.id)
576
577 if duplicate_hostnames:
578 raise model_logic.ValidationError(
579 {'hosts' : 'Duplicate hosts: %s'
580 % ', '.join(duplicate_hostnames)})
581
582
showarda1e74b32009-05-12 17:32:04 +0000583def create_new_job(owner, options, host_objects, metahost_objects,
584 atomic_group=None):
showard29f7cd22009-04-29 21:16:24 +0000585 all_host_objects = host_objects + metahost_objects
showarda1e74b32009-05-12 17:32:04 +0000586 dependencies = options.get('dependencies', [])
587 synch_count = options.get('synch_count')
showard29f7cd22009-04-29 21:16:24 +0000588
showard29f7cd22009-04-29 21:16:24 +0000589 if atomic_group:
590 check_atomic_group_create_job(
591 synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700592 dependencies, atomic_group)
showard29f7cd22009-04-29 21:16:24 +0000593 else:
594 if synch_count is not None and synch_count > len(all_host_objects):
595 raise model_logic.ValidationError(
596 {'hosts':
597 'only %d hosts provided for job with synch_count = %d' %
598 (len(all_host_objects), synch_count)})
599 atomic_hosts = models.Host.objects.filter(
600 id__in=[host.id for host in host_objects],
601 labels__atomic_group=True)
602 unusable_host_names = [host.hostname for host in atomic_hosts]
603 if unusable_host_names:
604 raise model_logic.ValidationError(
605 {'hosts':
606 'Host(s) "%s" are atomic group hosts but no '
607 'atomic group was specified for this job.' %
608 (', '.join(unusable_host_names),)})
609
showard09d80f92009-11-19 01:01:19 +0000610 check_for_duplicate_hosts(host_objects)
showard29f7cd22009-04-29 21:16:24 +0000611
Aviv Keshetc68807e2013-07-31 16:13:01 -0700612 for label_name in dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700613 if provision.is_for_special_action(label_name):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700614 # TODO: We could save a few queries
615 # if we had a bulk ensure-label-exists function, which used
616 # a bulk .get() call. The win is probably very small.
Alex Miller871291b2013-08-08 01:19:20 -0700617 _ensure_label_exists(label_name)
Aviv Keshetc68807e2013-07-31 16:13:01 -0700618
Alex Miller4a193692013-08-21 13:59:01 -0700619 # This only checks targeted hosts, not hosts eligible due to the metahost
620 check_job_dependencies(host_objects, dependencies)
621 check_job_metahost_dependencies(metahost_objects, dependencies)
622
Alex Miller871291b2013-08-08 01:19:20 -0700623 options['dependencies'] = list(
624 models.Label.objects.filter(name__in=dependencies))
showard29f7cd22009-04-29 21:16:24 +0000625
showarda1e74b32009-05-12 17:32:04 +0000626 for label in metahost_objects + options['dependencies']:
showard29f7cd22009-04-29 21:16:24 +0000627 if label.atomic_group and not atomic_group:
628 raise model_logic.ValidationError(
629 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000630 'Dependency %r requires an atomic group but no '
631 'atomic_group_name or meta_host in an atomic group was '
632 'specified for this job.' % label.name})
showard29f7cd22009-04-29 21:16:24 +0000633 elif (label.atomic_group and
634 label.atomic_group.name != atomic_group.name):
635 raise model_logic.ValidationError(
636 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000637 'meta_hosts or dependency %r requires atomic group '
638 '%r instead of the supplied atomic_group_name=%r.' %
639 (label.name, label.atomic_group.name, atomic_group.name)})
showard29f7cd22009-04-29 21:16:24 +0000640
showarda1e74b32009-05-12 17:32:04 +0000641 job = models.Job.create(owner=owner, options=options,
642 hosts=all_host_objects)
showard29f7cd22009-04-29 21:16:24 +0000643 job.queue(all_host_objects, atomic_group=atomic_group,
showarda1e74b32009-05-12 17:32:04 +0000644 is_template=options.get('is_template', False))
showard29f7cd22009-04-29 21:16:24 +0000645 return job.id
showard0957a842009-05-11 19:25:08 +0000646
647
Aviv Keshetc68807e2013-07-31 16:13:01 -0700648def _ensure_label_exists(name):
649 """
650 Ensure that a label called |name| exists in the Django models.
651
652 This function is to be called from within afe rpcs only, as an
653 alternative to server.cros.provision.ensure_label_exists(...). It works
654 by Django model manipulation, rather than by making another create_label
655 rpc call.
656
657 @param name: the label to check for/create.
658 @raises ValidationError: There was an error in the response that was
659 not because the label already existed.
660 @returns True is a label was created, False otherwise.
661 """
MK Ryu73be9862015-07-06 12:25:00 -0700662 # Make sure this function is not called on shards but only on master.
663 assert not server_utils.is_shard()
Aviv Keshetc68807e2013-07-31 16:13:01 -0700664 try:
665 models.Label.objects.get(name=name)
666 except models.Label.DoesNotExist:
Fang Deng7051fe42015-10-20 14:57:28 -0700667 try:
668 new_label = models.Label.objects.create(name=name)
669 new_label.save()
670 return True
671 except django.db.utils.IntegrityError as e:
672 # It is possible that another suite/test already
673 # created the label between the check and save.
674 if DUPLICATE_KEY_MSG in str(e):
675 return False
676 else:
677 raise
Aviv Keshetc68807e2013-07-31 16:13:01 -0700678 return False
679
680
showard909c9142009-07-07 20:54:42 +0000681def find_platform_and_atomic_group(host):
682 """
683 Figure out the platform name and atomic group name for the given host
684 object. If none, the return value for either will be None.
685
686 @returns (platform name, atomic group name) for the given host.
687 """
showard0957a842009-05-11 19:25:08 +0000688 platforms = [label.name for label in host.label_list if label.platform]
689 if not platforms:
showard909c9142009-07-07 20:54:42 +0000690 platform = None
691 else:
692 platform = platforms[0]
showard0957a842009-05-11 19:25:08 +0000693 if len(platforms) > 1:
694 raise ValueError('Host %s has more than one platform: %s' %
695 (host.hostname, ', '.join(platforms)))
showard909c9142009-07-07 20:54:42 +0000696 for label in host.label_list:
697 if label.atomic_group:
698 atomic_group_name = label.atomic_group.name
699 break
700 else:
701 atomic_group_name = None
702 # Don't check for multiple atomic groups on a host here. That is an
703 # error but should not trip up the RPC interface. monitor_db_cleanup
704 # deals with it. This just returns the first one found.
705 return platform, atomic_group_name
showardc0ac3a72009-07-08 21:14:45 +0000706
707
708# support for get_host_queue_entries_and_special_tasks()
709
MK Ryu0c1a37d2015-04-30 12:00:55 -0700710def _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on):
showardc0ac3a72009-07-08 21:14:45 +0000711 return dict(type=type,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700712 host=entry['host'],
showardc0ac3a72009-07-08 21:14:45 +0000713 job=job_dict,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700714 execution_path=exec_path,
715 status=status,
716 started_on=started_on,
717 id=str(entry['id']) + type,
718 oid=entry['id'])
showardc0ac3a72009-07-08 21:14:45 +0000719
720
MK Ryu0c1a37d2015-04-30 12:00:55 -0700721def _special_task_to_dict(task, queue_entries):
722 """Transforms a special task dictionary to another form of dictionary.
723
724 @param task Special task as a dictionary type
725 @param queue_entries Host queue entries as a list of dictionaries.
726
727 @return Transformed dictionary for a special task.
728 """
showardc0ac3a72009-07-08 21:14:45 +0000729 job_dict = None
MK Ryu0c1a37d2015-04-30 12:00:55 -0700730 if task['queue_entry']:
731 # Scan queue_entries to get the job detail info.
732 for qentry in queue_entries:
733 if task['queue_entry']['id'] == qentry['id']:
734 job_dict = qentry['job']
735 break
736 # If not found, get it from DB.
737 if job_dict is None:
738 job = models.Job.objects.get(id=task['queue_entry']['job'])
739 job_dict = job.get_object_dict()
740
741 exec_path = server_utils.get_special_task_exec_path(
742 task['host']['hostname'], task['id'], task['task'],
743 time_utils.time_string_to_datetime(task['time_requested']))
744 status = server_utils.get_special_task_status(
745 task['is_complete'], task['success'], task['is_active'])
746 return _common_entry_to_dict(task, task['task'], job_dict,
747 exec_path, status, task['time_started'])
showardc0ac3a72009-07-08 21:14:45 +0000748
749
750def _queue_entry_to_dict(queue_entry):
MK Ryu0c1a37d2015-04-30 12:00:55 -0700751 job_dict = queue_entry['job']
752 tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner'])
753 exec_path = server_utils.get_hqe_exec_path(tag,
754 queue_entry['execution_subdir'])
755 return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path,
756 queue_entry['status'], queue_entry['started_on'])
757
758
759def prepare_host_queue_entries_and_special_tasks(interleaved_entries,
760 queue_entries):
761 """
762 Prepare for serialization the interleaved entries of host queue entries
763 and special tasks.
764 Each element in the entries is a dictionary type.
765 The special task dictionary has only a job id for a job and lacks
766 the detail of the job while the host queue entry dictionary has.
767 queue_entries is used to look up the job detail info.
768
769 @param interleaved_entries Host queue entries and special tasks as a list
770 of dictionaries.
771 @param queue_entries Host queue entries as a list of dictionaries.
772
773 @return A post-processed list of dictionaries that is to be serialized.
774 """
775 dict_list = []
776 for e in interleaved_entries:
777 # Distinguish the two mixed entries based on the existence of
778 # the key "task". If an entry has the key, the entry is for
779 # special task. Otherwise, host queue entry.
780 if 'task' in e:
781 dict_list.append(_special_task_to_dict(e, queue_entries))
782 else:
783 dict_list.append(_queue_entry_to_dict(e))
784 return prepare_for_serialization(dict_list)
showardc0ac3a72009-07-08 21:14:45 +0000785
786
787def _compute_next_job_for_tasks(queue_entries, special_tasks):
788 """
789 For each task, try to figure out the next job that ran after that task.
790 This is done using two pieces of information:
791 * if the task has a queue entry, we can use that entry's job ID.
792 * if the task has a time_started, we can try to compare that against the
793 started_on field of queue_entries. this isn't guaranteed to work perfectly
794 since queue_entries may also have null started_on values.
795 * if the task has neither, or if use of time_started fails, just use the
796 last computed job ID.
MK Ryu0c1a37d2015-04-30 12:00:55 -0700797
798 @param queue_entries Host queue entries as a list of dictionaries.
799 @param special_tasks Special tasks as a list of dictionaries.
showardc0ac3a72009-07-08 21:14:45 +0000800 """
801 next_job_id = None # most recently computed next job
802 hqe_index = 0 # index for scanning by started_on times
803 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700804 if task['queue_entry']:
805 next_job_id = task['queue_entry']['job']
806 elif task['time_started'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000807 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700808 if queue_entry['started_on'] is None:
showardc0ac3a72009-07-08 21:14:45 +0000809 continue
MK Ryu0c1a37d2015-04-30 12:00:55 -0700810 t1 = time_utils.time_string_to_datetime(
811 queue_entry['started_on'])
812 t2 = time_utils.time_string_to_datetime(task['time_started'])
813 if t1 < t2:
showardc0ac3a72009-07-08 21:14:45 +0000814 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700815 next_job_id = queue_entry['job']['id']
showardc0ac3a72009-07-08 21:14:45 +0000816
MK Ryu0c1a37d2015-04-30 12:00:55 -0700817 task['next_job_id'] = next_job_id
showardc0ac3a72009-07-08 21:14:45 +0000818
819 # advance hqe_index to just after next_job_id
820 if next_job_id is not None:
821 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700822 if queue_entry['job']['id'] < next_job_id:
showardc0ac3a72009-07-08 21:14:45 +0000823 break
824 hqe_index += 1
825
826
827def interleave_entries(queue_entries, special_tasks):
828 """
829 Both lists should be ordered by descending ID.
830 """
831 _compute_next_job_for_tasks(queue_entries, special_tasks)
832
833 # start with all special tasks that've run since the last job
834 interleaved_entries = []
835 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700836 if task['next_job_id'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000837 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700838 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000839
840 # now interleave queue entries with the remaining special tasks
841 special_task_index = len(interleaved_entries)
842 for queue_entry in queue_entries:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700843 interleaved_entries.append(queue_entry)
showardc0ac3a72009-07-08 21:14:45 +0000844 # add all tasks that ran between this job and the previous one
845 for task in special_tasks[special_task_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700846 if task['next_job_id'] < queue_entry['job']['id']:
showardc0ac3a72009-07-08 21:14:45 +0000847 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700848 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000849 special_task_index += 1
850
851 return interleaved_entries
jamesren4a41e012010-07-16 22:33:48 +0000852
853
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800854def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
855 """Figure out which hosts are on which shards.
856
857 @param host_objs: A list of host objects.
858 @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
859 instead of the 'real' shard hostnames. This only matters for testing
860 environments.
861
862 @return: A map of shard hostname: list of hosts on the shard.
863 """
864 shard_host_map = {}
865 for host in host_objs:
866 if host.shard:
867 shard_name = (host.shard.rpc_hostname() if rpc_hostnames
868 else host.shard.hostname)
869 shard_host_map.setdefault(shard_name, []).append(host.hostname)
870 return shard_host_map
871
872
jamesren4a41e012010-07-16 22:33:48 +0000873def get_create_job_common_args(local_args):
874 """
875 Returns a dict containing only the args that apply for create_job_common
876
877 Returns a subset of local_args, which contains only the arguments that can
878 be passed in to create_job_common().
879 """
Alex Miller7d658cf2013-09-04 16:00:35 -0700880 # This code is only here to not kill suites scheduling tests when priority
881 # becomes an int instead of a string.
882 if isinstance(local_args['priority'], str):
883 local_args['priority'] = priorities.Priority.DEFAULT
884 # </migration hack>
jamesren4a41e012010-07-16 22:33:48 +0000885 arg_names, _, _, _ = inspect.getargspec(create_job_common)
886 return dict(item for item in local_args.iteritems() if item[0] in arg_names)
887
888
889def create_job_common(name, priority, control_type, control_file=None,
890 hosts=(), meta_hosts=(), one_time_hosts=(),
891 atomic_group_name=None, synch_count=None,
Simran Basi7e605742013-11-12 13:43:36 -0800892 is_template=False, timeout=None, timeout_mins=None,
893 max_runtime_mins=None, run_verify=True, email_list='',
894 dependencies=(), reboot_before=None, reboot_after=None,
jamesren4a41e012010-07-16 22:33:48 +0000895 parse_failed_repair=None, hostless=False, keyvals=None,
Aviv Keshet18308922013-02-19 17:49:49 -0800896 drone_set=None, parameterized_job=None,
Dan Shiec1d47d2015-02-13 11:38:13 -0800897 parent_job_id=None, test_retry=0, run_reset=True,
898 require_ssp=None):
Aviv Keshet18308922013-02-19 17:49:49 -0800899 #pylint: disable-msg=C0111
jamesren4a41e012010-07-16 22:33:48 +0000900 """
901 Common code between creating "standard" jobs and creating parameterized jobs
902 """
903 user = models.User.current_user()
904 owner = user.login
905
jamesren4a41e012010-07-16 22:33:48 +0000906 # input validation
907 if not (hosts or meta_hosts or one_time_hosts or atomic_group_name
908 or hostless):
909 raise model_logic.ValidationError({
910 'arguments' : "You must pass at least one of 'hosts', "
911 "'meta_hosts', 'one_time_hosts', "
912 "'atomic_group_name', or 'hostless'"
913 })
914
915 if hostless:
916 if hosts or meta_hosts or one_time_hosts or atomic_group_name:
917 raise model_logic.ValidationError({
918 'hostless': 'Hostless jobs cannot include any hosts!'})
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700919 server_type = control_data.CONTROL_TYPE_NAMES.SERVER
jamesren4a41e012010-07-16 22:33:48 +0000920 if control_type != server_type:
921 raise model_logic.ValidationError({
922 'control_type': 'Hostless jobs cannot use client-side '
923 'control files'})
924
Alex Miller871291b2013-08-08 01:19:20 -0700925 atomic_groups_by_name = dict((ag.name, ag)
jamesren4a41e012010-07-16 22:33:48 +0000926 for ag in models.AtomicGroup.objects.all())
Alex Miller871291b2013-08-08 01:19:20 -0700927 label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
jamesren4a41e012010-07-16 22:33:48 +0000928
929 # Schedule on an atomic group automagically if one of the labels given
930 # is an atomic group label and no explicit atomic_group_name was supplied.
931 if not atomic_group_name:
Alex Miller871291b2013-08-08 01:19:20 -0700932 for label in label_objects:
jamesren4a41e012010-07-16 22:33:48 +0000933 if label and label.atomic_group:
934 atomic_group_name = label.atomic_group.name
935 break
jamesren4a41e012010-07-16 22:33:48 +0000936 # convert hostnames & meta hosts to host/label objects
937 host_objects = models.Host.smart_get_bulk(hosts)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800938 if not server_utils.is_shard():
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800939 shard_host_map = bucket_hosts_by_shard(host_objects)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800940 num_shards = len(shard_host_map)
941 if (num_shards > 1 or (num_shards == 1 and
942 len(shard_host_map.values()[0]) != len(host_objects))):
943 # We disallow the following jobs on master:
944 # num_shards > 1: this is a job spanning across multiple shards.
945 # num_shards == 1 but number of hosts on shard is less
946 # than total number of hosts: this is a job that spans across
947 # one shard and the master.
948 raise ValueError(
949 'The following hosts are on shard(s), please create '
950 'seperate jobs for hosts on each shard: %s ' %
951 shard_host_map)
jamesren4a41e012010-07-16 22:33:48 +0000952 metahost_objects = []
Alex Miller871291b2013-08-08 01:19:20 -0700953 meta_host_labels_by_name = {label.name: label for label in label_objects}
jamesren4a41e012010-07-16 22:33:48 +0000954 for label_name in meta_hosts or []:
Alex Miller871291b2013-08-08 01:19:20 -0700955 if label_name in meta_host_labels_by_name:
956 metahost_objects.append(meta_host_labels_by_name[label_name])
jamesren4a41e012010-07-16 22:33:48 +0000957 elif label_name in atomic_groups_by_name:
958 # If given a metahost name that isn't a Label, check to
959 # see if the user was specifying an Atomic Group instead.
960 atomic_group = atomic_groups_by_name[label_name]
961 if atomic_group_name and atomic_group_name != atomic_group.name:
962 raise model_logic.ValidationError({
963 'meta_hosts': (
964 'Label "%s" not found. If assumed to be an '
965 'atomic group it would conflict with the '
966 'supplied atomic group "%s".' % (
967 label_name, atomic_group_name))})
968 atomic_group_name = atomic_group.name
969 else:
970 raise model_logic.ValidationError(
971 {'meta_hosts' : 'Label "%s" not found' % label_name})
972
973 # Create and sanity check an AtomicGroup object if requested.
974 if atomic_group_name:
975 if one_time_hosts:
976 raise model_logic.ValidationError(
977 {'one_time_hosts':
978 'One time hosts cannot be used with an Atomic Group.'})
979 atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
980 if synch_count and synch_count > atomic_group.max_number_of_machines:
981 raise model_logic.ValidationError(
982 {'atomic_group_name' :
983 'You have requested a synch_count (%d) greater than the '
984 'maximum machines in the requested Atomic Group (%d).' %
985 (synch_count, atomic_group.max_number_of_machines)})
986 else:
987 atomic_group = None
988
989 for host in one_time_hosts or []:
990 this_host = models.Host.create_one_time_host(host)
991 host_objects.append(this_host)
992
993 options = dict(name=name,
994 priority=priority,
995 control_file=control_file,
996 control_type=control_type,
997 is_template=is_template,
998 timeout=timeout,
Simran Basi7e605742013-11-12 13:43:36 -0800999 timeout_mins=timeout_mins,
Simran Basi34217022012-11-06 13:43:15 -08001000 max_runtime_mins=max_runtime_mins,
jamesren4a41e012010-07-16 22:33:48 +00001001 synch_count=synch_count,
1002 run_verify=run_verify,
1003 email_list=email_list,
1004 dependencies=dependencies,
1005 reboot_before=reboot_before,
1006 reboot_after=reboot_after,
1007 parse_failed_repair=parse_failed_repair,
1008 keyvals=keyvals,
1009 drone_set=drone_set,
Aviv Keshet18308922013-02-19 17:49:49 -08001010 parameterized_job=parameterized_job,
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -08001011 parent_job_id=parent_job_id,
Dan Shi07e09af2013-04-12 09:31:29 -07001012 test_retry=test_retry,
Dan Shiec1d47d2015-02-13 11:38:13 -08001013 run_reset=run_reset,
1014 require_ssp=require_ssp)
jamesren4a41e012010-07-16 22:33:48 +00001015 return create_new_job(owner=owner,
1016 options=options,
1017 host_objects=host_objects,
1018 metahost_objects=metahost_objects,
1019 atomic_group=atomic_group)
Simran Basib6ec8ae2014-04-23 12:05:08 -07001020
1021
1022def encode_ascii(control_file):
1023 """Force a control file to only contain ascii characters.
1024
1025 @param control_file: Control file to encode.
1026
1027 @returns the control file in an ascii encoding.
1028
1029 @raises error.ControlFileMalformed: if encoding fails.
1030 """
1031 try:
1032 return control_file.encode('ascii')
1033 except UnicodeDecodeError as e:
Jiaxi Luo421608e2014-07-07 14:38:00 -07001034 raise error.ControlFileMalformed(str(e))
1035
1036
1037def get_wmatrix_url():
1038 """Get wmatrix url from config file.
1039
1040 @returns the wmatrix url or an empty string.
1041 """
1042 return global_config.global_config.get_config_value('AUTOTEST_WEB',
1043 'wmatrix_url',
Jiaxi Luo15cbf372014-07-01 19:20:20 -07001044 default='')
Jiaxi Luo57bc1952014-07-22 15:27:30 -07001045
1046
1047def inject_times_to_filter(start_time_key=None, end_time_key=None,
1048 start_time_value=None, end_time_value=None,
1049 **filter_data):
1050 """Inject the key value pairs of start and end time if provided.
1051
1052 @param start_time_key: A string represents the filter key of start_time.
1053 @param end_time_key: A string represents the filter key of end_time.
1054 @param start_time_value: Start_time value.
1055 @param end_time_value: End_time value.
1056
1057 @returns the injected filter_data.
1058 """
1059 if start_time_value:
1060 filter_data[start_time_key] = start_time_value
1061 if end_time_value:
1062 filter_data[end_time_key] = end_time_value
1063 return filter_data
1064
1065
1066def inject_times_to_hqe_special_tasks_filters(filter_data_common,
1067 start_time, end_time):
1068 """Inject start and end time to hqe and special tasks filters.
1069
1070 @param filter_data_common: Common filter for hqe and special tasks.
1071 @param start_time_key: A string represents the filter key of start_time.
1072 @param end_time_key: A string represents the filter key of end_time.
1073
1074 @returns a pair of hqe and special tasks filters.
1075 """
1076 filter_data_special_tasks = filter_data_common.copy()
1077 return (inject_times_to_filter('started_on__gte', 'started_on__lte',
1078 start_time, end_time, **filter_data_common),
1079 inject_times_to_filter('time_started__gte', 'time_started__lte',
1080 start_time, end_time,
Jakob Juelich59cfe542014-09-02 16:37:46 -07001081 **filter_data_special_tasks))
1082
1083
1084def retrieve_shard(shard_hostname):
1085 """
Jakob Juelich77457572014-09-22 17:02:43 -07001086 Retrieves the shard with the given hostname from the database.
Jakob Juelich59cfe542014-09-02 16:37:46 -07001087
1088 @param shard_hostname: Hostname of the shard to retrieve
1089
Jakob Juelich77457572014-09-22 17:02:43 -07001090 @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
1091
Jakob Juelich59cfe542014-09-02 16:37:46 -07001092 @returns: Shard object
1093 """
MK Ryu509516b2015-05-18 12:00:47 -07001094 timer = autotest_stats.Timer('shard_heartbeat.retrieve_shard')
1095 with timer:
1096 return models.Shard.smart_get(shard_hostname)
Jakob Juelich59cfe542014-09-02 16:37:46 -07001097
1098
Jakob Juelich1b525742014-09-30 13:08:07 -07001099def find_records_for_shard(shard, known_job_ids, known_host_ids):
Jakob Juelich59cfe542014-09-02 16:37:46 -07001100 """Find records that should be sent to a shard.
1101
Jakob Juelicha94efe62014-09-18 16:02:49 -07001102 @param shard: Shard to find records for.
Jakob Juelich1b525742014-09-30 13:08:07 -07001103 @param known_job_ids: List of ids of jobs the shard already has.
1104 @param known_host_ids: List of ids of hosts the shard already has.
Jakob Juelicha94efe62014-09-18 16:02:49 -07001105
Fang Dengf3705992014-12-16 17:32:18 -08001106 @returns: Tuple of three lists for hosts, jobs, and suite job keyvals:
1107 (hosts, jobs, suite_job_keyvals).
Jakob Juelich59cfe542014-09-02 16:37:46 -07001108 """
MK Ryu509516b2015-05-18 12:00:47 -07001109 timer = autotest_stats.Timer('shard_heartbeat')
1110 with timer.get_client('find_hosts'):
1111 hosts = models.Host.assign_to_shard(shard, known_host_ids)
1112 with timer.get_client('find_jobs'):
1113 jobs = models.Job.assign_to_shard(shard, known_job_ids)
1114 with timer.get_client('find_suite_job_keyvals'):
1115 parent_job_ids = [job.parent_job_id for job in jobs]
1116 suite_job_keyvals = models.JobKeyval.objects.filter(
1117 job_id__in=parent_job_ids)
Fang Dengf3705992014-12-16 17:32:18 -08001118 return hosts, jobs, suite_job_keyvals
Jakob Juelicha94efe62014-09-18 16:02:49 -07001119
1120
1121def _persist_records_with_type_sent_from_shard(
1122 shard, records, record_type, *args, **kwargs):
1123 """
1124 Handle records of a specified type that were sent to the shard master.
1125
1126 @param shard: The shard the records were sent from.
1127 @param records: The records sent in their serialized format.
1128 @param record_type: Type of the objects represented by records.
1129 @param args: Additional arguments that will be passed on to the sanity
1130 checks.
1131 @param kwargs: Additional arguments that will be passed on to the sanity
1132 checks.
1133
1134 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1135
1136 @returns: List of primary keys of the processed records.
1137 """
1138 pks = []
1139 for serialized_record in records:
1140 pk = serialized_record['id']
1141 try:
1142 current_record = record_type.objects.get(pk=pk)
1143 except record_type.DoesNotExist:
1144 raise error.UnallowedRecordsSentToMaster(
1145 'Object with pk %s of type %s does not exist on master.' % (
1146 pk, record_type))
1147
1148 current_record.sanity_check_update_from_shard(
1149 shard, serialized_record, *args, **kwargs)
1150
1151 current_record.update_from_serialized(serialized_record)
1152 pks.append(pk)
1153 return pks
1154
1155
1156def persist_records_sent_from_shard(shard, jobs, hqes):
1157 """
1158 Sanity checking then saving serialized records sent to master from shard.
1159
1160 During heartbeats shards upload jobs and hostqueuentries. This performs
1161 some sanity checks on these and then updates the existing records for those
1162 entries with the updated ones from the heartbeat.
1163
1164 The sanity checks include:
1165 - Checking if the objects sent already exist on the master.
1166 - Checking if the objects sent were assigned to this shard.
1167 - hostqueueentries must be sent together with their jobs.
1168
1169 @param shard: The shard the records were sent from.
1170 @param jobs: The jobs the shard sent.
1171 @param hqes: The hostqueuentries the shart sent.
1172
1173 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1174 """
MK Ryu509516b2015-05-18 12:00:47 -07001175 timer = autotest_stats.Timer('shard_heartbeat')
1176 with timer.get_client('persist_jobs'):
1177 job_ids_sent = _persist_records_with_type_sent_from_shard(
1178 shard, jobs, models.Job)
Jakob Juelicha94efe62014-09-18 16:02:49 -07001179
MK Ryu509516b2015-05-18 12:00:47 -07001180 with timer.get_client('persist_hqes'):
1181 _persist_records_with_type_sent_from_shard(
1182 shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001183
1184
Jakob Juelich50e91f72014-10-01 12:43:23 -07001185def forward_single_host_rpc_to_shard(func):
1186 """This decorator forwards rpc calls that modify a host to a shard.
1187
1188 If a host is assigned to a shard, rpcs that change his attributes should be
1189 forwarded to the shard.
1190
1191 This assumes the first argument of the function represents a host id.
1192
1193 @param func: The function to decorate
1194
1195 @returns: The function to replace func with.
1196 """
1197 def replacement(**kwargs):
1198 # Only keyword arguments can be accepted here, as we need the argument
1199 # names to send the rpc. serviceHandler always provides arguments with
1200 # their keywords, so this is not a problem.
MK Ryu8e2c2d02016-01-06 15:24:38 -08001201
1202 # A host record (identified by kwargs['id']) can be deleted in
1203 # func(). Therefore, we should save the data that can be needed later
1204 # before func() is called.
1205 shard_hostname = None
Jakob Juelich50e91f72014-10-01 12:43:23 -07001206 host = models.Host.smart_get(kwargs['id'])
MK Ryu8e2c2d02016-01-06 15:24:38 -08001207 if host and host.shard:
1208 shard_hostname = host.shard.rpc_hostname()
1209 ret = func(**kwargs)
1210 if shard_hostname and not server_utils.is_shard():
MK Ryu26f0c932015-05-28 18:14:33 -07001211 run_rpc_on_multiple_hostnames(func.func_name,
MK Ryu8e2c2d02016-01-06 15:24:38 -08001212 [shard_hostname],
Jakob Juelich50e91f72014-10-01 12:43:23 -07001213 **kwargs)
MK Ryu8e2c2d02016-01-06 15:24:38 -08001214 return ret
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001215
1216 return replacement
1217
1218
MK Ryufb5e3a82015-07-01 12:21:20 -07001219def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
1220 """Fanout the given rpc to shards of given hosts.
1221
1222 @param host_objs: Host objects for the rpc.
1223 @param rpc_name: The name of the rpc.
1224 @param include_hostnames: If True, include the hostnames in the kwargs.
1225 Hostnames are not always necessary, this functions is designed to
1226 send rpcs to the shard a host is on, the rpcs themselves could be
1227 related to labels, acls etc.
1228 @param kwargs: The kwargs for the rpc.
1229 """
1230 # Figure out which hosts are on which shards.
1231 shard_host_map = bucket_hosts_by_shard(
1232 host_objs, rpc_hostnames=True)
1233
1234 # Execute the rpc against the appropriate shards.
1235 for shard, hostnames in shard_host_map.iteritems():
1236 if include_hostnames:
1237 kwargs['hosts'] = hostnames
1238 try:
1239 run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
1240 except:
1241 ei = sys.exc_info()
1242 new_exc = error.RPCException('RPC %s failed on shard %s due to '
1243 '%s: %s' % (rpc_name, shard, ei[0].__name__, ei[1]))
1244 raise new_exc.__class__, new_exc, ei[2]
1245
1246
Jakob Juelich50e91f72014-10-01 12:43:23 -07001247def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1248 """Runs an rpc to multiple AFEs
1249
1250 This is i.e. used to propagate changes made to hosts after they are assigned
1251 to a shard.
1252
1253 @param rpc_call: Name of the rpc endpoint to call.
1254 @param shard_hostnames: List of hostnames to run the rpcs on.
1255 @param **kwargs: Keyword arguments to pass in the rpcs.
1256 """
MK Ryufb5e3a82015-07-01 12:21:20 -07001257 # Make sure this function is not called on shards but only on master.
1258 assert not server_utils.is_shard()
Jakob Juelich50e91f72014-10-01 12:43:23 -07001259 for shard_hostname in shard_hostnames:
MK Ryu0a9c82e2015-09-17 17:54:01 -07001260 afe = frontend_wrappers.RetryingAFE(server=shard_hostname,
1261 user=thread_local.get_user())
Jakob Juelich50e91f72014-10-01 12:43:23 -07001262 afe.run(rpc_call, **kwargs)
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001263
1264
1265def get_label(name):
1266 """Gets a label object using a given name.
1267
1268 @param name: Label name.
1269 @raises model.Label.DoesNotExist: when there is no label matching
1270 the given name.
1271 @return: a label object matching the given name.
1272 """
1273 try:
1274 label = models.Label.smart_get(name)
1275 except models.Label.DoesNotExist:
1276 return None
1277 return label
1278
1279
xixuanba232a32016-08-25 17:01:59 -07001280# TODO: hide the following rpcs under is_moblab
1281def moblab_only(func):
1282 """Ensure moblab specific functions only run on Moblab devices."""
1283 def verify(*args, **kwargs):
1284 if not server_utils.is_moblab():
1285 raise error.RPCException('RPC: %s can only run on Moblab Systems!',
1286 func.__name__)
1287 return func(*args, **kwargs)
1288 return verify
1289
1290
MK Ryufbb002c2015-06-08 14:13:16 -07001291def route_rpc_to_master(func):
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001292 """Route RPC to master AFE.
MK Ryu2d107562015-02-24 17:45:02 -08001293
MK Ryu6f5eadb2015-09-04 10:50:47 -07001294 When a shard receives an RPC decorated by this, the RPC is just
1295 forwarded to the master.
1296 When the master gets the RPC, the RPC function is executed.
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001297
MK Ryu6f5eadb2015-09-04 10:50:47 -07001298 @param func: An RPC function to decorate
1299
1300 @returns: A function replacing the RPC func.
MK Ryu2d107562015-02-24 17:45:02 -08001301 """
MK Ryufbb002c2015-06-08 14:13:16 -07001302 @wraps(func)
MK Ryuf6ab8a72015-07-06 10:19:48 -07001303 def replacement(*args, **kwargs):
MK Ryu6f5eadb2015-09-04 10:50:47 -07001304 """
1305 We need a special care when decorating an RPC that can be called
1306 directly using positional arguments. One example is
1307 rpc_interface.create_job().
1308 rpc_interface.create_job_page_handler() calls the function using
1309 positional and keyword arguments.
1310 Since frontend.RpcClient.run() takes only keyword arguments for
1311 an RPC, positional arguments of the RPC function need to be
1312 transformed to key-value pair (dictionary type).
1313
1314 inspect.getcallargs() is a useful utility to achieve the goal,
1315 however, we need an additional effort when an RPC function has
1316 **kwargs argument.
1317 Let's say we have a following form of RPC function.
1318
1319 def rpcfunc(a, b, **kwargs)
1320
1321 When we call the function like "rpcfunc(1, 2, id=3, name='mk')",
1322 inspect.getcallargs() returns a dictionary like below.
1323
1324 {'a':1, 'b':2, 'kwargs': {'id':3, 'name':'mk'}}
1325
1326 This is an incorrect form of arguments to pass to the rpc function.
1327 Instead, the dictionary should be like this.
1328
1329 {'a':1, 'b':2, 'id':3, 'name':'mk'}
1330 """
1331 argspec = inspect.getargspec(func)
1332 if argspec.varargs is not None:
1333 raise Exception('RPC function must not have *args.')
1334 funcargs = inspect.getcallargs(func, *args, **kwargs)
1335 kwargs = dict()
1336 for k, v in funcargs.iteritems():
MK Ryu2c9af7b2015-10-22 16:19:08 -07001337 if argspec.keywords and k == argspec.keywords:
MK Ryu6f5eadb2015-09-04 10:50:47 -07001338 kwargs.update(v)
1339 else:
1340 kwargs[k] = v
1341
MK Ryufbb002c2015-06-08 14:13:16 -07001342 if server_utils.is_shard():
MK Ryu9651ca52015-06-08 17:48:22 -07001343 afe = frontend_wrappers.RetryingAFE(
Fang Deng0cb2a3b2015-12-10 17:59:00 -08001344 server=server_utils.get_global_afe_hostname(),
MK Ryu0a9c82e2015-09-17 17:54:01 -07001345 user=thread_local.get_user())
MK Ryu9651ca52015-06-08 17:48:22 -07001346 return afe.run(func.func_name, **kwargs)
MK Ryufbb002c2015-06-08 14:13:16 -07001347 return func(**kwargs)
1348 return replacement
Dan Shi5e8fa182016-04-15 11:04:36 -07001349
1350
1351def get_sample_dut(board, pool):
1352 """Get a dut with the given board and pool.
1353
1354 This method is used to help to locate a dut with the given board and pool.
1355 The dut then can be used to identify a devserver in the same subnet.
1356
1357 @param board: Name of the board.
1358 @param pool: Name of the pool.
1359
1360 @return: Name of a dut with the given board and pool.
1361 """
1362 if not board or not pool:
1363 return None
1364
1365 hosts = get_host_query(
1366 ('pool:%s' % pool, 'board:%s' % board), False, False, True, {})
1367 if not hosts:
1368 return None
1369
1370 return list(hosts)[0].get_object_dict()['hostname']