blob: 3ff81cdd20aa2908519cf725b72d4fe30db515cb [file] [log] [blame]
Aviv Keshet18308922013-02-19 17:49:49 -08001#pylint: disable-msg=C0111
xixuanba232a32016-08-25 17:01:59 -07002"""
mblighe8819cd2008-02-15 16:48:40 +00003Utility functions for rpc_interface.py. We keep them in a separate file so that
4only RPC interface functions go into that file.
5"""
6
7__author__ = 'showard@google.com (Steve Howard)'
8
MK Ryu84573e12015-02-18 15:54:09 -08009import datetime
MK Ryufbb002c2015-06-08 14:13:16 -070010from functools import wraps
MK Ryu84573e12015-02-18 15:54:09 -080011import inspect
12import os
13import sys
Fang Deng7051fe42015-10-20 14:57:28 -070014import django.db.utils
showard3d6ae112009-05-02 00:45:48 +000015import django.http
MK Ryu0a9c82e2015-09-17 17:54:01 -070016
17from autotest_lib.frontend import thread_local
Dan Shi07e09af2013-04-12 09:31:29 -070018from autotest_lib.frontend.afe import models, model_logic
Alex Miller4a193692013-08-21 13:59:01 -070019from autotest_lib.client.common_lib import control_data, error
Jiaxi Luo421608e2014-07-07 14:38:00 -070020from autotest_lib.client.common_lib import global_config, priorities
MK Ryu0c1a37d2015-04-30 12:00:55 -070021from autotest_lib.client.common_lib import time_utils
Aviv Keshet14cac442016-11-20 21:44:11 -080022# TODO(akeshet): Replace with monarch once we know how to instrument rpc server
23# with ts_mon.
MK Ryu509516b2015-05-18 12:00:47 -070024from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080025from autotest_lib.server import utils as server_utils
MK Ryu9651ca52015-06-08 17:48:22 -070026from autotest_lib.server.cros import provision
27from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
mblighe8819cd2008-02-15 16:48:40 +000028
showarda62866b2008-07-28 21:27:41 +000029NULL_DATETIME = datetime.datetime.max
30NULL_DATE = datetime.date.max
Fang Deng7051fe42015-10-20 14:57:28 -070031DUPLICATE_KEY_MSG = 'Duplicate entry'
showarda62866b2008-07-28 21:27:41 +000032
mblighe8819cd2008-02-15 16:48:40 +000033def prepare_for_serialization(objects):
jadmanski0afbb632008-06-06 21:10:57 +000034 """
35 Prepare Python objects to be returned via RPC.
Aviv Keshet18308922013-02-19 17:49:49 -080036 @param objects: objects to be prepared.
jadmanski0afbb632008-06-06 21:10:57 +000037 """
38 if (isinstance(objects, list) and len(objects) and
39 isinstance(objects[0], dict) and 'id' in objects[0]):
40 objects = gather_unique_dicts(objects)
41 return _prepare_data(objects)
showardb8d34242008-04-25 18:11:16 +000042
43
showardc92da832009-04-07 18:14:34 +000044def prepare_rows_as_nested_dicts(query, nested_dict_column_names):
45 """
46 Prepare a Django query to be returned via RPC as a sequence of nested
47 dictionaries.
48
49 @param query - A Django model query object with a select_related() method.
50 @param nested_dict_column_names - A list of column/attribute names for the
51 rows returned by query to expand into nested dictionaries using
52 their get_object_dict() method when not None.
53
54 @returns An list suitable to returned in an RPC.
55 """
56 all_dicts = []
57 for row in query.select_related():
58 row_dict = row.get_object_dict()
59 for column in nested_dict_column_names:
60 if row_dict[column] is not None:
61 row_dict[column] = getattr(row, column).get_object_dict()
62 all_dicts.append(row_dict)
63 return prepare_for_serialization(all_dicts)
64
65
showardb8d34242008-04-25 18:11:16 +000066def _prepare_data(data):
jadmanski0afbb632008-06-06 21:10:57 +000067 """
68 Recursively process data structures, performing necessary type
69 conversions to values in data to allow for RPC serialization:
70 -convert datetimes to strings
showard2b9a88b2008-06-13 20:55:03 +000071 -convert tuples and sets to lists
jadmanski0afbb632008-06-06 21:10:57 +000072 """
73 if isinstance(data, dict):
74 new_data = {}
75 for key, value in data.iteritems():
76 new_data[key] = _prepare_data(value)
77 return new_data
showard2b9a88b2008-06-13 20:55:03 +000078 elif (isinstance(data, list) or isinstance(data, tuple) or
79 isinstance(data, set)):
jadmanski0afbb632008-06-06 21:10:57 +000080 return [_prepare_data(item) for item in data]
showard98659972008-07-17 17:00:07 +000081 elif isinstance(data, datetime.date):
showarda62866b2008-07-28 21:27:41 +000082 if data is NULL_DATETIME or data is NULL_DATE:
83 return None
jadmanski0afbb632008-06-06 21:10:57 +000084 return str(data)
85 else:
86 return data
mblighe8819cd2008-02-15 16:48:40 +000087
88
Moises Osorio2dda22e2014-09-16 15:56:24 -070089def fetchall_as_list_of_dicts(cursor):
90 """
91 Converts each row in the cursor to a dictionary so that values can be read
92 by using the column name.
93 @param cursor: The database cursor to read from.
94 @returns: A list of each row in the cursor as a dictionary.
95 """
96 desc = cursor.description
97 return [ dict(zip([col[0] for col in desc], row))
98 for row in cursor.fetchall() ]
99
100
showard3d6ae112009-05-02 00:45:48 +0000101def raw_http_response(response_data, content_type=None):
102 response = django.http.HttpResponse(response_data, mimetype=content_type)
103 response['Content-length'] = str(len(response.content))
104 return response
105
106
showardb0dfb9f2008-06-06 18:08:02 +0000107def gather_unique_dicts(dict_iterable):
jadmanski0afbb632008-06-06 21:10:57 +0000108 """\
109 Pick out unique objects (by ID) from an iterable of object dicts.
110 """
111 id_set = set()
112 result = []
113 for obj in dict_iterable:
114 if obj['id'] not in id_set:
115 id_set.add(obj['id'])
116 result.append(obj)
117 return result
showardb0dfb9f2008-06-06 18:08:02 +0000118
119
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700120def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
jadmanski0afbb632008-06-06 21:10:57 +0000121 """\
122 Generate a SQL WHERE clause for job status filtering, and return it in
Simran Basi01984f52015-10-12 15:36:45 -0700123 a dict of keyword args to pass to query.extra().
showard6c65d252009-10-01 18:45:22 +0000124 * not_yet_run: all HQEs are Queued
125 * finished: all HQEs are complete
126 * running: everything else
jadmanski0afbb632008-06-06 21:10:57 +0000127 """
Simran Basi01984f52015-10-12 15:36:45 -0700128 if not (not_yet_run or running or finished):
129 return {}
showardeab66ce2009-12-23 00:03:56 +0000130 not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
131 'WHERE status != "%s")'
showard6c65d252009-10-01 18:45:22 +0000132 % models.HostQueueEntry.Status.QUEUED)
showardeab66ce2009-12-23 00:03:56 +0000133 not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
134 'WHERE not complete)')
showard6c65d252009-10-01 18:45:22 +0000135
Simran Basi01984f52015-10-12 15:36:45 -0700136 where = []
jadmanski0afbb632008-06-06 21:10:57 +0000137 if not_yet_run:
Simran Basi01984f52015-10-12 15:36:45 -0700138 where.append('id NOT IN ' + not_queued)
139 if running:
140 where.append('(id IN %s) AND (id IN %s)' % (not_queued, not_finished))
141 if finished:
142 where.append('id NOT IN ' + not_finished)
143 return {'where': [' OR '.join(['(%s)' % x for x in where])]}
mblighe8819cd2008-02-15 16:48:40 +0000144
145
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700146def extra_job_type_filters(extra_args, suite=False,
147 sub=False, standalone=False):
148 """\
149 Generate a SQL WHERE clause for job status filtering, and return it in
150 a dict of keyword args to pass to query.extra().
151
152 param extra_args: a dict of existing extra_args.
153
154 No more than one of the parameters should be passed as True:
155 * suite: job which is parent of other jobs
156 * sub: job with a parent job
157 * standalone: job with no child or parent jobs
158 """
159 assert not ((suite and sub) or
160 (suite and standalone) or
161 (sub and standalone)), ('Cannot specify more than one '
162 'filter to this function')
163
164 where = extra_args.get('where', [])
165 parent_job_id = ('DISTINCT parent_job_id')
166 child_job_id = ('id')
167 filter_common = ('(SELECT %s FROM afe_jobs '
168 'WHERE parent_job_id IS NOT NULL)')
169
170 if suite:
171 where.append('id IN ' + filter_common % parent_job_id)
172 elif sub:
173 where.append('id IN ' + filter_common % child_job_id)
174 elif standalone:
175 where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
176 'WHERE parent_job_id IS NOT NULL'
177 ' AND (sub_query.parent_job_id=afe_jobs.id'
178 ' OR sub_query.id=afe_jobs.id))')
179 else:
180 return extra_args
181
182 extra_args['where'] = where
183 return extra_args
184
185
186
showard87cc38f2009-08-20 23:37:04 +0000187def extra_host_filters(multiple_labels=()):
jadmanski0afbb632008-06-06 21:10:57 +0000188 """\
189 Generate SQL WHERE clauses for matching hosts in an intersection of
190 labels.
191 """
192 extra_args = {}
showardeab66ce2009-12-23 00:03:56 +0000193 where_str = ('afe_hosts.id in (select host_id from afe_hosts_labels '
jadmanski0afbb632008-06-06 21:10:57 +0000194 'where label_id=%s)')
195 extra_args['where'] = [where_str] * len(multiple_labels)
196 extra_args['params'] = [models.Label.smart_get(label).id
197 for label in multiple_labels]
198 return extra_args
showard8e3aa5e2008-04-08 19:42:32 +0000199
200
showard87cc38f2009-08-20 23:37:04 +0000201def get_host_query(multiple_labels, exclude_only_if_needed_labels,
showard8aa84fc2009-09-16 17:17:55 +0000202 exclude_atomic_group_hosts, valid_only, filter_data):
203 if valid_only:
204 query = models.Host.valid_objects.all()
205 else:
206 query = models.Host.objects.all()
207
showard43a3d262008-11-12 18:17:05 +0000208 if exclude_only_if_needed_labels:
209 only_if_needed_labels = models.Label.valid_objects.filter(
210 only_if_needed=True)
showardf7eac6f2008-11-13 21:18:01 +0000211 if only_if_needed_labels.count() > 0:
showard87cc38f2009-08-20 23:37:04 +0000212 only_if_needed_ids = ','.join(
213 str(label['id'])
214 for label in only_if_needed_labels.values('id'))
showardf7eac6f2008-11-13 21:18:01 +0000215 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000216 query, 'afe_hosts_labels', join_key='host_id',
217 join_condition=('afe_hosts_labels_exclude_OIN.label_id IN (%s)'
showard87cc38f2009-08-20 23:37:04 +0000218 % only_if_needed_ids),
219 suffix='_exclude_OIN', exclude=True)
showard8aa84fc2009-09-16 17:17:55 +0000220
showard87cc38f2009-08-20 23:37:04 +0000221 if exclude_atomic_group_hosts:
222 atomic_group_labels = models.Label.valid_objects.filter(
223 atomic_group__isnull=False)
224 if atomic_group_labels.count() > 0:
225 atomic_group_label_ids = ','.join(
226 str(atomic_group['id'])
227 for atomic_group in atomic_group_labels.values('id'))
228 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000229 query, 'afe_hosts_labels', join_key='host_id',
230 join_condition=(
231 'afe_hosts_labels_exclude_AG.label_id IN (%s)'
232 % atomic_group_label_ids),
showard87cc38f2009-08-20 23:37:04 +0000233 suffix='_exclude_AG', exclude=True)
Fang Deng04d30612013-04-10 18:13:13 -0700234 try:
235 assert 'extra_args' not in filter_data
236 filter_data['extra_args'] = extra_host_filters(multiple_labels)
237 return models.Host.query_objects(filter_data, initial_query=query)
238 except models.Label.DoesNotExist as e:
239 return models.Host.objects.none()
showard43a3d262008-11-12 18:17:05 +0000240
241
showard8fd58242008-03-10 21:29:07 +0000242class InconsistencyException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000243 'Raised when a list of objects does not have a consistent value'
showard8fd58242008-03-10 21:29:07 +0000244
245
246def get_consistent_value(objects, field):
mblighc5ddfd12008-08-04 17:15:00 +0000247 if not objects:
248 # well a list of nothing is consistent
249 return None
250
jadmanski0afbb632008-06-06 21:10:57 +0000251 value = getattr(objects[0], field)
252 for obj in objects:
253 this_value = getattr(obj, field)
254 if this_value != value:
255 raise InconsistencyException(objects[0], obj)
256 return value
showard8fd58242008-03-10 21:29:07 +0000257
258
Matthew Sartori10438092015-06-24 14:30:18 -0700259def afe_test_dict_to_test_object(test_dict):
260 if not isinstance(test_dict, dict):
261 return test_dict
262
263 numerized_dict = {}
264 for key, value in test_dict.iteritems():
265 try:
266 numerized_dict[key] = int(value)
267 except (ValueError, TypeError):
268 numerized_dict[key] = value
269
270 return type('TestObject', (object,), numerized_dict)
271
272
Michael Tang84a2ecf2016-06-07 15:10:53 -0700273def _check_is_server_test(test_type):
274 """Checks if the test type is a server test.
275
276 @param test_type The test type in enum integer or string.
277
278 @returns A boolean to identify if the test type is server test.
279 """
280 if test_type is not None:
281 if isinstance(test_type, basestring):
282 try:
283 test_type = control_data.CONTROL_TYPE.get_value(test_type)
284 except AttributeError:
285 return False
286 return (test_type == control_data.CONTROL_TYPE.SERVER)
287 return False
288
289
Richard Barnette8e33b4e2016-05-21 12:12:26 -0700290def prepare_generate_control_file(tests, profilers, db_tests=True):
Matthew Sartori10438092015-06-24 14:30:18 -0700291 if db_tests:
292 test_objects = [models.Test.smart_get(test) for test in tests]
293 else:
294 test_objects = [afe_test_dict_to_test_object(test) for test in tests]
295
showard2b9a88b2008-06-13 20:55:03 +0000296 profiler_objects = [models.Profiler.smart_get(profiler)
297 for profiler in profilers]
jadmanski0afbb632008-06-06 21:10:57 +0000298 # ensure tests are all the same type
299 try:
300 test_type = get_consistent_value(test_objects, 'test_type')
301 except InconsistencyException, exc:
302 test1, test2 = exc.args
mblighec5546d2008-06-16 16:51:28 +0000303 raise model_logic.ValidationError(
Matthew Sartori10438092015-06-24 14:30:18 -0700304 {'tests' : 'You cannot run both test_suites and server-side '
jadmanski0afbb632008-06-06 21:10:57 +0000305 'tests together (tests %s and %s differ' % (
306 test1.name, test2.name)})
showard8fd58242008-03-10 21:29:07 +0000307
Michael Tang84a2ecf2016-06-07 15:10:53 -0700308 is_server = _check_is_server_test(test_type)
showard14374b12009-01-31 00:11:54 +0000309 if test_objects:
310 synch_count = max(test.sync_count for test in test_objects)
311 else:
312 synch_count = 1
mblighe8819cd2008-02-15 16:48:40 +0000313
Matthew Sartori10438092015-06-24 14:30:18 -0700314 if db_tests:
315 dependencies = set(label.name for label
316 in models.Label.objects.filter(test__in=test_objects))
317 else:
318 dependencies = reduce(
319 set.union, [set(test.dependencies) for test in test_objects])
showard989f25d2008-10-01 11:38:11 +0000320
showard2bab8f42008-11-12 18:15:22 +0000321 cf_info = dict(is_server=is_server, synch_count=synch_count,
322 dependencies=list(dependencies))
Richard Barnette8e33b4e2016-05-21 12:12:26 -0700323 return cf_info, test_objects, profiler_objects
showard989f25d2008-10-01 11:38:11 +0000324
325
326def check_job_dependencies(host_objects, job_dependencies):
327 """
328 Check that a set of machines satisfies a job's dependencies.
329 host_objects: list of models.Host objects
330 job_dependencies: list of names of labels
331 """
332 # check that hosts satisfy dependencies
333 host_ids = [host.id for host in host_objects]
334 hosts_in_job = models.Host.objects.filter(id__in=host_ids)
335 ok_hosts = hosts_in_job
336 for index, dependency in enumerate(job_dependencies):
Alex Milleraa772002014-04-10 17:51:21 -0700337 if not provision.is_for_special_action(dependency):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700338 ok_hosts = ok_hosts.filter(labels__name=dependency)
showard989f25d2008-10-01 11:38:11 +0000339 failing_hosts = (set(host.hostname for host in host_objects) -
340 set(host.hostname for host in ok_hosts))
341 if failing_hosts:
342 raise model_logic.ValidationError(
Eric Lie0493a42010-11-15 13:05:43 -0800343 {'hosts' : 'Host(s) failed to meet job dependencies (' +
344 (', '.join(job_dependencies)) + '): ' +
345 (', '.join(failing_hosts))})
346
showard989f25d2008-10-01 11:38:11 +0000347
Alex Miller4a193692013-08-21 13:59:01 -0700348def check_job_metahost_dependencies(metahost_objects, job_dependencies):
349 """
350 Check that at least one machine within the metahost spec satisfies the job's
351 dependencies.
352
353 @param metahost_objects A list of label objects representing the metahosts.
354 @param job_dependencies A list of strings of the required label names.
355 @raises NoEligibleHostException If a metahost cannot run the job.
356 """
357 for metahost in metahost_objects:
358 hosts = models.Host.objects.filter(labels=metahost)
359 for label_name in job_dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700360 if not provision.is_for_special_action(label_name):
Alex Miller4a193692013-08-21 13:59:01 -0700361 hosts = hosts.filter(labels__name=label_name)
362 if not any(hosts):
363 raise error.NoEligibleHostException("No hosts within %s satisfy %s."
364 % (metahost.name, ', '.join(job_dependencies)))
365
showard2bab8f42008-11-12 18:15:22 +0000366
367def _execution_key_for(host_queue_entry):
368 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
369
370
371def check_abort_synchronous_jobs(host_queue_entries):
372 # ensure user isn't aborting part of a synchronous autoserv execution
373 count_per_execution = {}
374 for queue_entry in host_queue_entries:
375 key = _execution_key_for(queue_entry)
376 count_per_execution.setdefault(key, 0)
377 count_per_execution[key] += 1
378
379 for queue_entry in host_queue_entries:
380 if not queue_entry.execution_subdir:
381 continue
382 execution_count = count_per_execution[_execution_key_for(queue_entry)]
383 if execution_count < queue_entry.job.synch_count:
mbligh1ef218d2009-08-03 16:57:56 +0000384 raise model_logic.ValidationError(
385 {'' : 'You cannot abort part of a synchronous job execution '
386 '(%d/%s), %d included, %d expected'
387 % (queue_entry.job.id, queue_entry.execution_subdir,
388 execution_count, queue_entry.job.synch_count)})
showard8fbae652009-01-20 23:23:10 +0000389
390
showardc92da832009-04-07 18:14:34 +0000391def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700392 dependencies, atomic_group):
showardc92da832009-04-07 18:14:34 +0000393 """
394 Attempt to reject create_job requests with an atomic group that
395 will be impossible to schedule. The checks are not perfect but
396 should catch the most obvious issues.
397
398 @param synch_count - The job's minimum synch count.
399 @param host_objects - A list of models.Host instances.
400 @param metahost_objects - A list of models.Label instances.
401 @param dependencies - A list of job dependency label names.
showardc92da832009-04-07 18:14:34 +0000402 @param labels_by_name - A dictionary mapping label names to models.Label
403 instance. Used to look up instances for dependencies.
404
405 @raises model_logic.ValidationError - When an issue is found.
406 """
407 # If specific host objects were supplied with an atomic group, verify
408 # that there are enough to satisfy the synch_count.
409 minimum_required = synch_count or 1
410 if (host_objects and not metahost_objects and
411 len(host_objects) < minimum_required):
412 raise model_logic.ValidationError(
413 {'hosts':
414 'only %d hosts provided for job with synch_count = %d' %
415 (len(host_objects), synch_count)})
416
417 # Check that the atomic group has a hope of running this job
418 # given any supplied metahosts and dependancies that may limit.
419
420 # Get a set of hostnames in the atomic group.
421 possible_hosts = set()
422 for label in atomic_group.label_set.all():
423 possible_hosts.update(h.hostname for h in label.host_set.all())
424
425 # Filter out hosts that don't match all of the job dependency labels.
Alex Miller871291b2013-08-08 01:19:20 -0700426 for label in models.Label.objects.filter(name__in=dependencies):
showardc92da832009-04-07 18:14:34 +0000427 hosts_in_label = (h.hostname for h in label.host_set.all())
428 possible_hosts.intersection_update(hosts_in_label)
429
showard225bdc12009-04-13 16:09:21 +0000430 if not host_objects and not metahost_objects:
431 # No hosts or metahosts are required to queue an atomic group Job.
432 # However, if they are given, we respect them below.
433 host_set = possible_hosts
434 else:
435 host_set = set(host.hostname for host in host_objects)
436 unusable_host_set = host_set.difference(possible_hosts)
437 if unusable_host_set:
438 raise model_logic.ValidationError(
439 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %
440 (', '.join(sorted(unusable_host_set)), atomic_group.name)})
showardc92da832009-04-07 18:14:34 +0000441
442 # Lookup hosts provided by each meta host and merge them into the
443 # host_set for final counting.
444 for meta_host in metahost_objects:
445 meta_possible = possible_hosts.copy()
446 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())
447 meta_possible.intersection_update(hosts_in_meta_host)
448
449 # Count all hosts that this meta_host will provide.
450 host_set.update(meta_possible)
451
452 if len(host_set) < minimum_required:
453 raise model_logic.ValidationError(
454 {'atomic_group_name':
455 'Insufficient hosts in Atomic Group "%s" with the'
456 ' supplied dependencies and meta_hosts.' %
457 (atomic_group.name,)})
458
459
showardbe0d8692009-08-20 23:42:44 +0000460def check_modify_host(update_data):
461 """
462 Sanity check modify_host* requests.
463
464 @param update_data: A dictionary with the changes to make to a host
465 or hosts.
466 """
467 # Only the scheduler (monitor_db) is allowed to modify Host status.
468 # Otherwise race conditions happen as a hosts state is changed out from
469 # beneath tasks being run on a host.
470 if 'status' in update_data:
471 raise model_logic.ValidationError({
472 'status': 'Host status can not be modified by the frontend.'})
473
474
showardce7c0922009-09-11 18:39:24 +0000475def check_modify_host_locking(host, update_data):
476 """
477 Checks when locking/unlocking has been requested if the host is already
478 locked/unlocked.
479
480 @param host: models.Host object to be modified
481 @param update_data: A dictionary with the changes to make to the host.
482 """
483 locked = update_data.get('locked', None)
Matthew Sartori68186332015-04-27 17:19:53 -0700484 lock_reason = update_data.get('lock_reason', None)
showardce7c0922009-09-11 18:39:24 +0000485 if locked is not None:
486 if locked and host.locked:
487 raise model_logic.ValidationError({
Shuqian Zhao4c0d2902016-01-12 17:03:15 -0800488 'locked': 'Host %s already locked by %s on %s.' %
489 (host.hostname, host.locked_by, host.lock_time)})
showardce7c0922009-09-11 18:39:24 +0000490 if not locked and not host.locked:
491 raise model_logic.ValidationError({
Shuqian Zhao4c0d2902016-01-12 17:03:15 -0800492 'locked': 'Host %s already unlocked.' % host.hostname})
Matthew Sartori68186332015-04-27 17:19:53 -0700493 if locked and not lock_reason and not host.locked:
494 raise model_logic.ValidationError({
Shuqian Zhao4c0d2902016-01-12 17:03:15 -0800495 'locked': 'Please provide a reason for locking Host %s' %
496 host.hostname})
showardce7c0922009-09-11 18:39:24 +0000497
498
showard8fbae652009-01-20 23:23:10 +0000499def get_motd():
500 dirname = os.path.dirname(__file__)
501 filename = os.path.join(dirname, "..", "..", "motd.txt")
502 text = ''
503 try:
504 fp = open(filename, "r")
505 try:
506 text = fp.read()
507 finally:
508 fp.close()
509 except:
510 pass
511
512 return text
showard29f7cd22009-04-29 21:16:24 +0000513
514
515def _get_metahost_counts(metahost_objects):
516 metahost_counts = {}
517 for metahost in metahost_objects:
518 metahost_counts.setdefault(metahost, 0)
519 metahost_counts[metahost] += 1
520 return metahost_counts
521
522
showarda965cef2009-05-15 23:17:41 +0000523def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
showard29f7cd22009-04-29 21:16:24 +0000524 hosts = []
525 one_time_hosts = []
526 meta_hosts = []
527 atomic_group = None
jamesren2275ef12010-04-12 18:25:06 +0000528 hostless = False
showard29f7cd22009-04-29 21:16:24 +0000529
showard4d077562009-05-08 18:24:36 +0000530 queue_entries = job.hostqueueentry_set.all()
showarda965cef2009-05-15 23:17:41 +0000531 if queue_entry_filter_data:
532 queue_entries = models.HostQueueEntry.query_objects(
533 queue_entry_filter_data, initial_query=queue_entries)
showard4d077562009-05-08 18:24:36 +0000534
535 for queue_entry in queue_entries:
showard29f7cd22009-04-29 21:16:24 +0000536 if (queue_entry.host and (preserve_metahosts or
537 not queue_entry.meta_host)):
538 if queue_entry.deleted:
539 continue
540 if queue_entry.host.invalid:
541 one_time_hosts.append(queue_entry.host)
542 else:
543 hosts.append(queue_entry.host)
jamesren2275ef12010-04-12 18:25:06 +0000544 elif queue_entry.meta_host:
showard29f7cd22009-04-29 21:16:24 +0000545 meta_hosts.append(queue_entry.meta_host)
jamesren2275ef12010-04-12 18:25:06 +0000546 else:
547 hostless = True
548
showard29f7cd22009-04-29 21:16:24 +0000549 if atomic_group is None:
550 if queue_entry.atomic_group is not None:
551 atomic_group = queue_entry.atomic_group
552 else:
553 assert atomic_group.name == queue_entry.atomic_group.name, (
554 'DB inconsistency. HostQueueEntries with multiple atomic'
555 ' groups on job %s: %s != %s' % (
556 id, atomic_group.name, queue_entry.atomic_group.name))
557
558 meta_host_counts = _get_metahost_counts(meta_hosts)
559
560 info = dict(dependencies=[label.name for label
561 in job.dependency_labels.all()],
562 hosts=hosts,
563 meta_hosts=meta_hosts,
564 meta_host_counts=meta_host_counts,
565 one_time_hosts=one_time_hosts,
jamesren2275ef12010-04-12 18:25:06 +0000566 atomic_group=atomic_group,
567 hostless=hostless)
showard29f7cd22009-04-29 21:16:24 +0000568 return info
569
570
showard09d80f92009-11-19 01:01:19 +0000571def check_for_duplicate_hosts(host_objects):
572 host_ids = set()
573 duplicate_hostnames = set()
574 for host in host_objects:
575 if host.id in host_ids:
576 duplicate_hostnames.add(host.hostname)
577 host_ids.add(host.id)
578
579 if duplicate_hostnames:
580 raise model_logic.ValidationError(
581 {'hosts' : 'Duplicate hosts: %s'
582 % ', '.join(duplicate_hostnames)})
583
584
showarda1e74b32009-05-12 17:32:04 +0000585def create_new_job(owner, options, host_objects, metahost_objects,
586 atomic_group=None):
showard29f7cd22009-04-29 21:16:24 +0000587 all_host_objects = host_objects + metahost_objects
showarda1e74b32009-05-12 17:32:04 +0000588 dependencies = options.get('dependencies', [])
589 synch_count = options.get('synch_count')
showard29f7cd22009-04-29 21:16:24 +0000590
showard29f7cd22009-04-29 21:16:24 +0000591 if atomic_group:
592 check_atomic_group_create_job(
593 synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700594 dependencies, atomic_group)
showard29f7cd22009-04-29 21:16:24 +0000595 else:
596 if synch_count is not None and synch_count > len(all_host_objects):
597 raise model_logic.ValidationError(
598 {'hosts':
599 'only %d hosts provided for job with synch_count = %d' %
600 (len(all_host_objects), synch_count)})
601 atomic_hosts = models.Host.objects.filter(
602 id__in=[host.id for host in host_objects],
603 labels__atomic_group=True)
604 unusable_host_names = [host.hostname for host in atomic_hosts]
605 if unusable_host_names:
606 raise model_logic.ValidationError(
607 {'hosts':
608 'Host(s) "%s" are atomic group hosts but no '
609 'atomic group was specified for this job.' %
610 (', '.join(unusable_host_names),)})
611
showard09d80f92009-11-19 01:01:19 +0000612 check_for_duplicate_hosts(host_objects)
showard29f7cd22009-04-29 21:16:24 +0000613
Aviv Keshetc68807e2013-07-31 16:13:01 -0700614 for label_name in dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700615 if provision.is_for_special_action(label_name):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700616 # TODO: We could save a few queries
617 # if we had a bulk ensure-label-exists function, which used
618 # a bulk .get() call. The win is probably very small.
Alex Miller871291b2013-08-08 01:19:20 -0700619 _ensure_label_exists(label_name)
Aviv Keshetc68807e2013-07-31 16:13:01 -0700620
Alex Miller4a193692013-08-21 13:59:01 -0700621 # This only checks targeted hosts, not hosts eligible due to the metahost
622 check_job_dependencies(host_objects, dependencies)
623 check_job_metahost_dependencies(metahost_objects, dependencies)
624
Alex Miller871291b2013-08-08 01:19:20 -0700625 options['dependencies'] = list(
626 models.Label.objects.filter(name__in=dependencies))
showard29f7cd22009-04-29 21:16:24 +0000627
showarda1e74b32009-05-12 17:32:04 +0000628 for label in metahost_objects + options['dependencies']:
showard29f7cd22009-04-29 21:16:24 +0000629 if label.atomic_group and not atomic_group:
630 raise model_logic.ValidationError(
631 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000632 'Dependency %r requires an atomic group but no '
633 'atomic_group_name or meta_host in an atomic group was '
634 'specified for this job.' % label.name})
showard29f7cd22009-04-29 21:16:24 +0000635 elif (label.atomic_group and
636 label.atomic_group.name != atomic_group.name):
637 raise model_logic.ValidationError(
638 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000639 'meta_hosts or dependency %r requires atomic group '
640 '%r instead of the supplied atomic_group_name=%r.' %
641 (label.name, label.atomic_group.name, atomic_group.name)})
showard29f7cd22009-04-29 21:16:24 +0000642
showarda1e74b32009-05-12 17:32:04 +0000643 job = models.Job.create(owner=owner, options=options,
644 hosts=all_host_objects)
showard29f7cd22009-04-29 21:16:24 +0000645 job.queue(all_host_objects, atomic_group=atomic_group,
showarda1e74b32009-05-12 17:32:04 +0000646 is_template=options.get('is_template', False))
showard29f7cd22009-04-29 21:16:24 +0000647 return job.id
showard0957a842009-05-11 19:25:08 +0000648
649
Aviv Keshetc68807e2013-07-31 16:13:01 -0700650def _ensure_label_exists(name):
651 """
652 Ensure that a label called |name| exists in the Django models.
653
654 This function is to be called from within afe rpcs only, as an
655 alternative to server.cros.provision.ensure_label_exists(...). It works
656 by Django model manipulation, rather than by making another create_label
657 rpc call.
658
659 @param name: the label to check for/create.
660 @raises ValidationError: There was an error in the response that was
661 not because the label already existed.
662 @returns True is a label was created, False otherwise.
663 """
MK Ryu73be9862015-07-06 12:25:00 -0700664 # Make sure this function is not called on shards but only on master.
665 assert not server_utils.is_shard()
Aviv Keshetc68807e2013-07-31 16:13:01 -0700666 try:
667 models.Label.objects.get(name=name)
668 except models.Label.DoesNotExist:
Fang Deng7051fe42015-10-20 14:57:28 -0700669 try:
670 new_label = models.Label.objects.create(name=name)
671 new_label.save()
672 return True
673 except django.db.utils.IntegrityError as e:
674 # It is possible that another suite/test already
675 # created the label between the check and save.
676 if DUPLICATE_KEY_MSG in str(e):
677 return False
678 else:
679 raise
Aviv Keshetc68807e2013-07-31 16:13:01 -0700680 return False
681
682
showard909c9142009-07-07 20:54:42 +0000683def find_platform_and_atomic_group(host):
684 """
685 Figure out the platform name and atomic group name for the given host
686 object. If none, the return value for either will be None.
687
688 @returns (platform name, atomic group name) for the given host.
689 """
showard0957a842009-05-11 19:25:08 +0000690 platforms = [label.name for label in host.label_list if label.platform]
691 if not platforms:
showard909c9142009-07-07 20:54:42 +0000692 platform = None
693 else:
694 platform = platforms[0]
showard0957a842009-05-11 19:25:08 +0000695 if len(platforms) > 1:
696 raise ValueError('Host %s has more than one platform: %s' %
697 (host.hostname, ', '.join(platforms)))
showard909c9142009-07-07 20:54:42 +0000698 for label in host.label_list:
699 if label.atomic_group:
700 atomic_group_name = label.atomic_group.name
701 break
702 else:
703 atomic_group_name = None
704 # Don't check for multiple atomic groups on a host here. That is an
705 # error but should not trip up the RPC interface. monitor_db_cleanup
706 # deals with it. This just returns the first one found.
707 return platform, atomic_group_name
showardc0ac3a72009-07-08 21:14:45 +0000708
709
710# support for get_host_queue_entries_and_special_tasks()
711
MK Ryu0c1a37d2015-04-30 12:00:55 -0700712def _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on):
showardc0ac3a72009-07-08 21:14:45 +0000713 return dict(type=type,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700714 host=entry['host'],
showardc0ac3a72009-07-08 21:14:45 +0000715 job=job_dict,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700716 execution_path=exec_path,
717 status=status,
718 started_on=started_on,
719 id=str(entry['id']) + type,
720 oid=entry['id'])
showardc0ac3a72009-07-08 21:14:45 +0000721
722
MK Ryu0c1a37d2015-04-30 12:00:55 -0700723def _special_task_to_dict(task, queue_entries):
724 """Transforms a special task dictionary to another form of dictionary.
725
726 @param task Special task as a dictionary type
727 @param queue_entries Host queue entries as a list of dictionaries.
728
729 @return Transformed dictionary for a special task.
730 """
showardc0ac3a72009-07-08 21:14:45 +0000731 job_dict = None
MK Ryu0c1a37d2015-04-30 12:00:55 -0700732 if task['queue_entry']:
733 # Scan queue_entries to get the job detail info.
734 for qentry in queue_entries:
735 if task['queue_entry']['id'] == qentry['id']:
736 job_dict = qentry['job']
737 break
738 # If not found, get it from DB.
739 if job_dict is None:
740 job = models.Job.objects.get(id=task['queue_entry']['job'])
741 job_dict = job.get_object_dict()
742
743 exec_path = server_utils.get_special_task_exec_path(
744 task['host']['hostname'], task['id'], task['task'],
745 time_utils.time_string_to_datetime(task['time_requested']))
746 status = server_utils.get_special_task_status(
747 task['is_complete'], task['success'], task['is_active'])
748 return _common_entry_to_dict(task, task['task'], job_dict,
749 exec_path, status, task['time_started'])
showardc0ac3a72009-07-08 21:14:45 +0000750
751
752def _queue_entry_to_dict(queue_entry):
MK Ryu0c1a37d2015-04-30 12:00:55 -0700753 job_dict = queue_entry['job']
754 tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner'])
755 exec_path = server_utils.get_hqe_exec_path(tag,
756 queue_entry['execution_subdir'])
757 return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path,
758 queue_entry['status'], queue_entry['started_on'])
759
760
761def prepare_host_queue_entries_and_special_tasks(interleaved_entries,
762 queue_entries):
763 """
764 Prepare for serialization the interleaved entries of host queue entries
765 and special tasks.
766 Each element in the entries is a dictionary type.
767 The special task dictionary has only a job id for a job and lacks
768 the detail of the job while the host queue entry dictionary has.
769 queue_entries is used to look up the job detail info.
770
771 @param interleaved_entries Host queue entries and special tasks as a list
772 of dictionaries.
773 @param queue_entries Host queue entries as a list of dictionaries.
774
775 @return A post-processed list of dictionaries that is to be serialized.
776 """
777 dict_list = []
778 for e in interleaved_entries:
779 # Distinguish the two mixed entries based on the existence of
780 # the key "task". If an entry has the key, the entry is for
781 # special task. Otherwise, host queue entry.
782 if 'task' in e:
783 dict_list.append(_special_task_to_dict(e, queue_entries))
784 else:
785 dict_list.append(_queue_entry_to_dict(e))
786 return prepare_for_serialization(dict_list)
showardc0ac3a72009-07-08 21:14:45 +0000787
788
789def _compute_next_job_for_tasks(queue_entries, special_tasks):
790 """
791 For each task, try to figure out the next job that ran after that task.
792 This is done using two pieces of information:
793 * if the task has a queue entry, we can use that entry's job ID.
794 * if the task has a time_started, we can try to compare that against the
795 started_on field of queue_entries. this isn't guaranteed to work perfectly
796 since queue_entries may also have null started_on values.
797 * if the task has neither, or if use of time_started fails, just use the
798 last computed job ID.
MK Ryu0c1a37d2015-04-30 12:00:55 -0700799
800 @param queue_entries Host queue entries as a list of dictionaries.
801 @param special_tasks Special tasks as a list of dictionaries.
showardc0ac3a72009-07-08 21:14:45 +0000802 """
803 next_job_id = None # most recently computed next job
804 hqe_index = 0 # index for scanning by started_on times
805 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700806 if task['queue_entry']:
807 next_job_id = task['queue_entry']['job']
808 elif task['time_started'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000809 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700810 if queue_entry['started_on'] is None:
showardc0ac3a72009-07-08 21:14:45 +0000811 continue
MK Ryu0c1a37d2015-04-30 12:00:55 -0700812 t1 = time_utils.time_string_to_datetime(
813 queue_entry['started_on'])
814 t2 = time_utils.time_string_to_datetime(task['time_started'])
815 if t1 < t2:
showardc0ac3a72009-07-08 21:14:45 +0000816 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700817 next_job_id = queue_entry['job']['id']
showardc0ac3a72009-07-08 21:14:45 +0000818
MK Ryu0c1a37d2015-04-30 12:00:55 -0700819 task['next_job_id'] = next_job_id
showardc0ac3a72009-07-08 21:14:45 +0000820
821 # advance hqe_index to just after next_job_id
822 if next_job_id is not None:
823 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700824 if queue_entry['job']['id'] < next_job_id:
showardc0ac3a72009-07-08 21:14:45 +0000825 break
826 hqe_index += 1
827
828
829def interleave_entries(queue_entries, special_tasks):
830 """
831 Both lists should be ordered by descending ID.
832 """
833 _compute_next_job_for_tasks(queue_entries, special_tasks)
834
835 # start with all special tasks that've run since the last job
836 interleaved_entries = []
837 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700838 if task['next_job_id'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000839 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700840 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000841
842 # now interleave queue entries with the remaining special tasks
843 special_task_index = len(interleaved_entries)
844 for queue_entry in queue_entries:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700845 interleaved_entries.append(queue_entry)
showardc0ac3a72009-07-08 21:14:45 +0000846 # add all tasks that ran between this job and the previous one
847 for task in special_tasks[special_task_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700848 if task['next_job_id'] < queue_entry['job']['id']:
showardc0ac3a72009-07-08 21:14:45 +0000849 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700850 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000851 special_task_index += 1
852
853 return interleaved_entries
jamesren4a41e012010-07-16 22:33:48 +0000854
855
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800856def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
857 """Figure out which hosts are on which shards.
858
859 @param host_objs: A list of host objects.
860 @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
861 instead of the 'real' shard hostnames. This only matters for testing
862 environments.
863
864 @return: A map of shard hostname: list of hosts on the shard.
865 """
866 shard_host_map = {}
867 for host in host_objs:
868 if host.shard:
869 shard_name = (host.shard.rpc_hostname() if rpc_hostnames
870 else host.shard.hostname)
871 shard_host_map.setdefault(shard_name, []).append(host.hostname)
872 return shard_host_map
873
874
jamesren4a41e012010-07-16 22:33:48 +0000875def get_create_job_common_args(local_args):
876 """
877 Returns a dict containing only the args that apply for create_job_common
878
879 Returns a subset of local_args, which contains only the arguments that can
880 be passed in to create_job_common().
881 """
Alex Miller7d658cf2013-09-04 16:00:35 -0700882 # This code is only here to not kill suites scheduling tests when priority
883 # becomes an int instead of a string.
884 if isinstance(local_args['priority'], str):
885 local_args['priority'] = priorities.Priority.DEFAULT
886 # </migration hack>
jamesren4a41e012010-07-16 22:33:48 +0000887 arg_names, _, _, _ = inspect.getargspec(create_job_common)
888 return dict(item for item in local_args.iteritems() if item[0] in arg_names)
889
890
891def create_job_common(name, priority, control_type, control_file=None,
892 hosts=(), meta_hosts=(), one_time_hosts=(),
893 atomic_group_name=None, synch_count=None,
Simran Basi7e605742013-11-12 13:43:36 -0800894 is_template=False, timeout=None, timeout_mins=None,
895 max_runtime_mins=None, run_verify=True, email_list='',
896 dependencies=(), reboot_before=None, reboot_after=None,
jamesren4a41e012010-07-16 22:33:48 +0000897 parse_failed_repair=None, hostless=False, keyvals=None,
Aviv Keshet18308922013-02-19 17:49:49 -0800898 drone_set=None, parameterized_job=None,
Dan Shiec1d47d2015-02-13 11:38:13 -0800899 parent_job_id=None, test_retry=0, run_reset=True,
900 require_ssp=None):
Aviv Keshet18308922013-02-19 17:49:49 -0800901 #pylint: disable-msg=C0111
jamesren4a41e012010-07-16 22:33:48 +0000902 """
903 Common code between creating "standard" jobs and creating parameterized jobs
904 """
905 user = models.User.current_user()
906 owner = user.login
907
jamesren4a41e012010-07-16 22:33:48 +0000908 # input validation
909 if not (hosts or meta_hosts or one_time_hosts or atomic_group_name
910 or hostless):
911 raise model_logic.ValidationError({
912 'arguments' : "You must pass at least one of 'hosts', "
913 "'meta_hosts', 'one_time_hosts', "
914 "'atomic_group_name', or 'hostless'"
915 })
916
917 if hostless:
918 if hosts or meta_hosts or one_time_hosts or atomic_group_name:
919 raise model_logic.ValidationError({
920 'hostless': 'Hostless jobs cannot include any hosts!'})
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700921 server_type = control_data.CONTROL_TYPE_NAMES.SERVER
jamesren4a41e012010-07-16 22:33:48 +0000922 if control_type != server_type:
923 raise model_logic.ValidationError({
924 'control_type': 'Hostless jobs cannot use client-side '
925 'control files'})
926
Alex Miller871291b2013-08-08 01:19:20 -0700927 atomic_groups_by_name = dict((ag.name, ag)
jamesren4a41e012010-07-16 22:33:48 +0000928 for ag in models.AtomicGroup.objects.all())
Alex Miller871291b2013-08-08 01:19:20 -0700929 label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
jamesren4a41e012010-07-16 22:33:48 +0000930
931 # Schedule on an atomic group automagically if one of the labels given
932 # is an atomic group label and no explicit atomic_group_name was supplied.
933 if not atomic_group_name:
Alex Miller871291b2013-08-08 01:19:20 -0700934 for label in label_objects:
jamesren4a41e012010-07-16 22:33:48 +0000935 if label and label.atomic_group:
936 atomic_group_name = label.atomic_group.name
937 break
jamesren4a41e012010-07-16 22:33:48 +0000938 # convert hostnames & meta hosts to host/label objects
939 host_objects = models.Host.smart_get_bulk(hosts)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800940 if not server_utils.is_shard():
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800941 shard_host_map = bucket_hosts_by_shard(host_objects)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800942 num_shards = len(shard_host_map)
943 if (num_shards > 1 or (num_shards == 1 and
944 len(shard_host_map.values()[0]) != len(host_objects))):
945 # We disallow the following jobs on master:
946 # num_shards > 1: this is a job spanning across multiple shards.
947 # num_shards == 1 but number of hosts on shard is less
948 # than total number of hosts: this is a job that spans across
949 # one shard and the master.
950 raise ValueError(
951 'The following hosts are on shard(s), please create '
952 'seperate jobs for hosts on each shard: %s ' %
953 shard_host_map)
jamesren4a41e012010-07-16 22:33:48 +0000954 metahost_objects = []
Alex Miller871291b2013-08-08 01:19:20 -0700955 meta_host_labels_by_name = {label.name: label for label in label_objects}
jamesren4a41e012010-07-16 22:33:48 +0000956 for label_name in meta_hosts or []:
Alex Miller871291b2013-08-08 01:19:20 -0700957 if label_name in meta_host_labels_by_name:
958 metahost_objects.append(meta_host_labels_by_name[label_name])
jamesren4a41e012010-07-16 22:33:48 +0000959 elif label_name in atomic_groups_by_name:
960 # If given a metahost name that isn't a Label, check to
961 # see if the user was specifying an Atomic Group instead.
962 atomic_group = atomic_groups_by_name[label_name]
963 if atomic_group_name and atomic_group_name != atomic_group.name:
964 raise model_logic.ValidationError({
965 'meta_hosts': (
966 'Label "%s" not found. If assumed to be an '
967 'atomic group it would conflict with the '
968 'supplied atomic group "%s".' % (
969 label_name, atomic_group_name))})
970 atomic_group_name = atomic_group.name
971 else:
972 raise model_logic.ValidationError(
973 {'meta_hosts' : 'Label "%s" not found' % label_name})
974
975 # Create and sanity check an AtomicGroup object if requested.
976 if atomic_group_name:
977 if one_time_hosts:
978 raise model_logic.ValidationError(
979 {'one_time_hosts':
980 'One time hosts cannot be used with an Atomic Group.'})
981 atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
982 if synch_count and synch_count > atomic_group.max_number_of_machines:
983 raise model_logic.ValidationError(
984 {'atomic_group_name' :
985 'You have requested a synch_count (%d) greater than the '
986 'maximum machines in the requested Atomic Group (%d).' %
987 (synch_count, atomic_group.max_number_of_machines)})
988 else:
989 atomic_group = None
990
991 for host in one_time_hosts or []:
992 this_host = models.Host.create_one_time_host(host)
993 host_objects.append(this_host)
994
995 options = dict(name=name,
996 priority=priority,
997 control_file=control_file,
998 control_type=control_type,
999 is_template=is_template,
1000 timeout=timeout,
Simran Basi7e605742013-11-12 13:43:36 -08001001 timeout_mins=timeout_mins,
Simran Basi34217022012-11-06 13:43:15 -08001002 max_runtime_mins=max_runtime_mins,
jamesren4a41e012010-07-16 22:33:48 +00001003 synch_count=synch_count,
1004 run_verify=run_verify,
1005 email_list=email_list,
1006 dependencies=dependencies,
1007 reboot_before=reboot_before,
1008 reboot_after=reboot_after,
1009 parse_failed_repair=parse_failed_repair,
1010 keyvals=keyvals,
1011 drone_set=drone_set,
Aviv Keshet18308922013-02-19 17:49:49 -08001012 parameterized_job=parameterized_job,
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -08001013 parent_job_id=parent_job_id,
Dan Shi07e09af2013-04-12 09:31:29 -07001014 test_retry=test_retry,
Dan Shiec1d47d2015-02-13 11:38:13 -08001015 run_reset=run_reset,
1016 require_ssp=require_ssp)
jamesren4a41e012010-07-16 22:33:48 +00001017 return create_new_job(owner=owner,
1018 options=options,
1019 host_objects=host_objects,
1020 metahost_objects=metahost_objects,
1021 atomic_group=atomic_group)
Simran Basib6ec8ae2014-04-23 12:05:08 -07001022
1023
1024def encode_ascii(control_file):
1025 """Force a control file to only contain ascii characters.
1026
1027 @param control_file: Control file to encode.
1028
1029 @returns the control file in an ascii encoding.
1030
1031 @raises error.ControlFileMalformed: if encoding fails.
1032 """
1033 try:
1034 return control_file.encode('ascii')
1035 except UnicodeDecodeError as e:
Jiaxi Luo421608e2014-07-07 14:38:00 -07001036 raise error.ControlFileMalformed(str(e))
1037
1038
1039def get_wmatrix_url():
1040 """Get wmatrix url from config file.
1041
1042 @returns the wmatrix url or an empty string.
1043 """
1044 return global_config.global_config.get_config_value('AUTOTEST_WEB',
1045 'wmatrix_url',
Jiaxi Luo15cbf372014-07-01 19:20:20 -07001046 default='')
Jiaxi Luo57bc1952014-07-22 15:27:30 -07001047
1048
1049def inject_times_to_filter(start_time_key=None, end_time_key=None,
1050 start_time_value=None, end_time_value=None,
1051 **filter_data):
1052 """Inject the key value pairs of start and end time if provided.
1053
1054 @param start_time_key: A string represents the filter key of start_time.
1055 @param end_time_key: A string represents the filter key of end_time.
1056 @param start_time_value: Start_time value.
1057 @param end_time_value: End_time value.
1058
1059 @returns the injected filter_data.
1060 """
1061 if start_time_value:
1062 filter_data[start_time_key] = start_time_value
1063 if end_time_value:
1064 filter_data[end_time_key] = end_time_value
1065 return filter_data
1066
1067
1068def inject_times_to_hqe_special_tasks_filters(filter_data_common,
1069 start_time, end_time):
1070 """Inject start and end time to hqe and special tasks filters.
1071
1072 @param filter_data_common: Common filter for hqe and special tasks.
1073 @param start_time_key: A string represents the filter key of start_time.
1074 @param end_time_key: A string represents the filter key of end_time.
1075
1076 @returns a pair of hqe and special tasks filters.
1077 """
1078 filter_data_special_tasks = filter_data_common.copy()
1079 return (inject_times_to_filter('started_on__gte', 'started_on__lte',
1080 start_time, end_time, **filter_data_common),
1081 inject_times_to_filter('time_started__gte', 'time_started__lte',
1082 start_time, end_time,
Jakob Juelich59cfe542014-09-02 16:37:46 -07001083 **filter_data_special_tasks))
1084
1085
1086def retrieve_shard(shard_hostname):
1087 """
Jakob Juelich77457572014-09-22 17:02:43 -07001088 Retrieves the shard with the given hostname from the database.
Jakob Juelich59cfe542014-09-02 16:37:46 -07001089
1090 @param shard_hostname: Hostname of the shard to retrieve
1091
Jakob Juelich77457572014-09-22 17:02:43 -07001092 @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
1093
Jakob Juelich59cfe542014-09-02 16:37:46 -07001094 @returns: Shard object
1095 """
MK Ryu509516b2015-05-18 12:00:47 -07001096 timer = autotest_stats.Timer('shard_heartbeat.retrieve_shard')
1097 with timer:
1098 return models.Shard.smart_get(shard_hostname)
Jakob Juelich59cfe542014-09-02 16:37:46 -07001099
1100
Jakob Juelich1b525742014-09-30 13:08:07 -07001101def find_records_for_shard(shard, known_job_ids, known_host_ids):
Jakob Juelich59cfe542014-09-02 16:37:46 -07001102 """Find records that should be sent to a shard.
1103
Jakob Juelicha94efe62014-09-18 16:02:49 -07001104 @param shard: Shard to find records for.
Jakob Juelich1b525742014-09-30 13:08:07 -07001105 @param known_job_ids: List of ids of jobs the shard already has.
1106 @param known_host_ids: List of ids of hosts the shard already has.
Jakob Juelicha94efe62014-09-18 16:02:49 -07001107
Fang Dengf3705992014-12-16 17:32:18 -08001108 @returns: Tuple of three lists for hosts, jobs, and suite job keyvals:
1109 (hosts, jobs, suite_job_keyvals).
Jakob Juelich59cfe542014-09-02 16:37:46 -07001110 """
MK Ryu509516b2015-05-18 12:00:47 -07001111 timer = autotest_stats.Timer('shard_heartbeat')
1112 with timer.get_client('find_hosts'):
1113 hosts = models.Host.assign_to_shard(shard, known_host_ids)
1114 with timer.get_client('find_jobs'):
1115 jobs = models.Job.assign_to_shard(shard, known_job_ids)
1116 with timer.get_client('find_suite_job_keyvals'):
1117 parent_job_ids = [job.parent_job_id for job in jobs]
1118 suite_job_keyvals = models.JobKeyval.objects.filter(
1119 job_id__in=parent_job_ids)
Fang Dengf3705992014-12-16 17:32:18 -08001120 return hosts, jobs, suite_job_keyvals
Jakob Juelicha94efe62014-09-18 16:02:49 -07001121
1122
1123def _persist_records_with_type_sent_from_shard(
1124 shard, records, record_type, *args, **kwargs):
1125 """
1126 Handle records of a specified type that were sent to the shard master.
1127
1128 @param shard: The shard the records were sent from.
1129 @param records: The records sent in their serialized format.
1130 @param record_type: Type of the objects represented by records.
1131 @param args: Additional arguments that will be passed on to the sanity
1132 checks.
1133 @param kwargs: Additional arguments that will be passed on to the sanity
1134 checks.
1135
1136 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1137
1138 @returns: List of primary keys of the processed records.
1139 """
1140 pks = []
1141 for serialized_record in records:
1142 pk = serialized_record['id']
1143 try:
1144 current_record = record_type.objects.get(pk=pk)
1145 except record_type.DoesNotExist:
1146 raise error.UnallowedRecordsSentToMaster(
1147 'Object with pk %s of type %s does not exist on master.' % (
1148 pk, record_type))
1149
1150 current_record.sanity_check_update_from_shard(
1151 shard, serialized_record, *args, **kwargs)
1152
1153 current_record.update_from_serialized(serialized_record)
1154 pks.append(pk)
1155 return pks
1156
1157
1158def persist_records_sent_from_shard(shard, jobs, hqes):
1159 """
1160 Sanity checking then saving serialized records sent to master from shard.
1161
1162 During heartbeats shards upload jobs and hostqueuentries. This performs
1163 some sanity checks on these and then updates the existing records for those
1164 entries with the updated ones from the heartbeat.
1165
1166 The sanity checks include:
1167 - Checking if the objects sent already exist on the master.
1168 - Checking if the objects sent were assigned to this shard.
1169 - hostqueueentries must be sent together with their jobs.
1170
1171 @param shard: The shard the records were sent from.
1172 @param jobs: The jobs the shard sent.
1173 @param hqes: The hostqueuentries the shart sent.
1174
1175 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1176 """
MK Ryu509516b2015-05-18 12:00:47 -07001177 timer = autotest_stats.Timer('shard_heartbeat')
1178 with timer.get_client('persist_jobs'):
1179 job_ids_sent = _persist_records_with_type_sent_from_shard(
1180 shard, jobs, models.Job)
Jakob Juelicha94efe62014-09-18 16:02:49 -07001181
MK Ryu509516b2015-05-18 12:00:47 -07001182 with timer.get_client('persist_hqes'):
1183 _persist_records_with_type_sent_from_shard(
1184 shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001185
1186
Jakob Juelich50e91f72014-10-01 12:43:23 -07001187def forward_single_host_rpc_to_shard(func):
1188 """This decorator forwards rpc calls that modify a host to a shard.
1189
1190 If a host is assigned to a shard, rpcs that change his attributes should be
1191 forwarded to the shard.
1192
1193 This assumes the first argument of the function represents a host id.
1194
1195 @param func: The function to decorate
1196
1197 @returns: The function to replace func with.
1198 """
1199 def replacement(**kwargs):
1200 # Only keyword arguments can be accepted here, as we need the argument
1201 # names to send the rpc. serviceHandler always provides arguments with
1202 # their keywords, so this is not a problem.
MK Ryu8e2c2d02016-01-06 15:24:38 -08001203
1204 # A host record (identified by kwargs['id']) can be deleted in
1205 # func(). Therefore, we should save the data that can be needed later
1206 # before func() is called.
1207 shard_hostname = None
Jakob Juelich50e91f72014-10-01 12:43:23 -07001208 host = models.Host.smart_get(kwargs['id'])
MK Ryu8e2c2d02016-01-06 15:24:38 -08001209 if host and host.shard:
1210 shard_hostname = host.shard.rpc_hostname()
1211 ret = func(**kwargs)
1212 if shard_hostname and not server_utils.is_shard():
MK Ryu26f0c932015-05-28 18:14:33 -07001213 run_rpc_on_multiple_hostnames(func.func_name,
MK Ryu8e2c2d02016-01-06 15:24:38 -08001214 [shard_hostname],
Jakob Juelich50e91f72014-10-01 12:43:23 -07001215 **kwargs)
MK Ryu8e2c2d02016-01-06 15:24:38 -08001216 return ret
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001217
1218 return replacement
1219
1220
MK Ryufb5e3a82015-07-01 12:21:20 -07001221def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
1222 """Fanout the given rpc to shards of given hosts.
1223
1224 @param host_objs: Host objects for the rpc.
1225 @param rpc_name: The name of the rpc.
1226 @param include_hostnames: If True, include the hostnames in the kwargs.
1227 Hostnames are not always necessary, this functions is designed to
1228 send rpcs to the shard a host is on, the rpcs themselves could be
1229 related to labels, acls etc.
1230 @param kwargs: The kwargs for the rpc.
1231 """
1232 # Figure out which hosts are on which shards.
1233 shard_host_map = bucket_hosts_by_shard(
1234 host_objs, rpc_hostnames=True)
1235
1236 # Execute the rpc against the appropriate shards.
1237 for shard, hostnames in shard_host_map.iteritems():
1238 if include_hostnames:
1239 kwargs['hosts'] = hostnames
1240 try:
1241 run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
1242 except:
1243 ei = sys.exc_info()
1244 new_exc = error.RPCException('RPC %s failed on shard %s due to '
1245 '%s: %s' % (rpc_name, shard, ei[0].__name__, ei[1]))
1246 raise new_exc.__class__, new_exc, ei[2]
1247
1248
Jakob Juelich50e91f72014-10-01 12:43:23 -07001249def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1250 """Runs an rpc to multiple AFEs
1251
1252 This is i.e. used to propagate changes made to hosts after they are assigned
1253 to a shard.
1254
1255 @param rpc_call: Name of the rpc endpoint to call.
1256 @param shard_hostnames: List of hostnames to run the rpcs on.
1257 @param **kwargs: Keyword arguments to pass in the rpcs.
1258 """
MK Ryufb5e3a82015-07-01 12:21:20 -07001259 # Make sure this function is not called on shards but only on master.
1260 assert not server_utils.is_shard()
Jakob Juelich50e91f72014-10-01 12:43:23 -07001261 for shard_hostname in shard_hostnames:
MK Ryu0a9c82e2015-09-17 17:54:01 -07001262 afe = frontend_wrappers.RetryingAFE(server=shard_hostname,
1263 user=thread_local.get_user())
Jakob Juelich50e91f72014-10-01 12:43:23 -07001264 afe.run(rpc_call, **kwargs)
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001265
1266
1267def get_label(name):
1268 """Gets a label object using a given name.
1269
1270 @param name: Label name.
1271 @raises model.Label.DoesNotExist: when there is no label matching
1272 the given name.
1273 @return: a label object matching the given name.
1274 """
1275 try:
1276 label = models.Label.smart_get(name)
1277 except models.Label.DoesNotExist:
1278 return None
1279 return label
1280
1281
xixuanba232a32016-08-25 17:01:59 -07001282# TODO: hide the following rpcs under is_moblab
1283def moblab_only(func):
1284 """Ensure moblab specific functions only run on Moblab devices."""
1285 def verify(*args, **kwargs):
1286 if not server_utils.is_moblab():
1287 raise error.RPCException('RPC: %s can only run on Moblab Systems!',
1288 func.__name__)
1289 return func(*args, **kwargs)
1290 return verify
1291
1292
MK Ryufbb002c2015-06-08 14:13:16 -07001293def route_rpc_to_master(func):
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001294 """Route RPC to master AFE.
MK Ryu2d107562015-02-24 17:45:02 -08001295
MK Ryu6f5eadb2015-09-04 10:50:47 -07001296 When a shard receives an RPC decorated by this, the RPC is just
1297 forwarded to the master.
1298 When the master gets the RPC, the RPC function is executed.
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001299
MK Ryu6f5eadb2015-09-04 10:50:47 -07001300 @param func: An RPC function to decorate
1301
1302 @returns: A function replacing the RPC func.
MK Ryu2d107562015-02-24 17:45:02 -08001303 """
MK Ryufbb002c2015-06-08 14:13:16 -07001304 @wraps(func)
MK Ryuf6ab8a72015-07-06 10:19:48 -07001305 def replacement(*args, **kwargs):
MK Ryu6f5eadb2015-09-04 10:50:47 -07001306 """
1307 We need a special care when decorating an RPC that can be called
1308 directly using positional arguments. One example is
1309 rpc_interface.create_job().
1310 rpc_interface.create_job_page_handler() calls the function using
1311 positional and keyword arguments.
1312 Since frontend.RpcClient.run() takes only keyword arguments for
1313 an RPC, positional arguments of the RPC function need to be
1314 transformed to key-value pair (dictionary type).
1315
1316 inspect.getcallargs() is a useful utility to achieve the goal,
1317 however, we need an additional effort when an RPC function has
1318 **kwargs argument.
1319 Let's say we have a following form of RPC function.
1320
1321 def rpcfunc(a, b, **kwargs)
1322
1323 When we call the function like "rpcfunc(1, 2, id=3, name='mk')",
1324 inspect.getcallargs() returns a dictionary like below.
1325
1326 {'a':1, 'b':2, 'kwargs': {'id':3, 'name':'mk'}}
1327
1328 This is an incorrect form of arguments to pass to the rpc function.
1329 Instead, the dictionary should be like this.
1330
1331 {'a':1, 'b':2, 'id':3, 'name':'mk'}
1332 """
1333 argspec = inspect.getargspec(func)
1334 if argspec.varargs is not None:
1335 raise Exception('RPC function must not have *args.')
1336 funcargs = inspect.getcallargs(func, *args, **kwargs)
1337 kwargs = dict()
1338 for k, v in funcargs.iteritems():
MK Ryu2c9af7b2015-10-22 16:19:08 -07001339 if argspec.keywords and k == argspec.keywords:
MK Ryu6f5eadb2015-09-04 10:50:47 -07001340 kwargs.update(v)
1341 else:
1342 kwargs[k] = v
1343
MK Ryufbb002c2015-06-08 14:13:16 -07001344 if server_utils.is_shard():
MK Ryu9651ca52015-06-08 17:48:22 -07001345 afe = frontend_wrappers.RetryingAFE(
Fang Deng0cb2a3b2015-12-10 17:59:00 -08001346 server=server_utils.get_global_afe_hostname(),
MK Ryu0a9c82e2015-09-17 17:54:01 -07001347 user=thread_local.get_user())
MK Ryu9651ca52015-06-08 17:48:22 -07001348 return afe.run(func.func_name, **kwargs)
MK Ryufbb002c2015-06-08 14:13:16 -07001349 return func(**kwargs)
1350 return replacement
Dan Shi5e8fa182016-04-15 11:04:36 -07001351
1352
1353def get_sample_dut(board, pool):
1354 """Get a dut with the given board and pool.
1355
1356 This method is used to help to locate a dut with the given board and pool.
1357 The dut then can be used to identify a devserver in the same subnet.
1358
1359 @param board: Name of the board.
1360 @param pool: Name of the pool.
1361
1362 @return: Name of a dut with the given board and pool.
1363 """
1364 if not board or not pool:
1365 return None
1366
1367 hosts = get_host_query(
1368 ('pool:%s' % pool, 'board:%s' % board), False, False, True, {})
1369 if not hosts:
1370 return None
1371
1372 return list(hosts)[0].get_object_dict()['hostname']