blob: 897d8fefc21b4bc949e64b4a87ee8f258eee1db6 [file] [log] [blame]
Aviv Keshet18308922013-02-19 17:49:49 -08001#pylint: disable-msg=C0111
xixuanba232a32016-08-25 17:01:59 -07002"""
mblighe8819cd2008-02-15 16:48:40 +00003Utility functions for rpc_interface.py. We keep them in a separate file so that
4only RPC interface functions go into that file.
5"""
6
7__author__ = 'showard@google.com (Steve Howard)'
8
Allen Li3d4e6112016-12-28 11:10:25 -08009import collections
MK Ryu84573e12015-02-18 15:54:09 -080010import datetime
MK Ryufbb002c2015-06-08 14:13:16 -070011from functools import wraps
MK Ryu84573e12015-02-18 15:54:09 -080012import inspect
13import os
14import sys
Fang Deng7051fe42015-10-20 14:57:28 -070015import django.db.utils
showard3d6ae112009-05-02 00:45:48 +000016import django.http
MK Ryu0a9c82e2015-09-17 17:54:01 -070017
18from autotest_lib.frontend import thread_local
Dan Shi07e09af2013-04-12 09:31:29 -070019from autotest_lib.frontend.afe import models, model_logic
Alex Miller4a193692013-08-21 13:59:01 -070020from autotest_lib.client.common_lib import control_data, error
Allen Li352b86a2016-12-14 12:11:27 -080021from autotest_lib.client.common_lib import global_config
MK Ryu0c1a37d2015-04-30 12:00:55 -070022from autotest_lib.client.common_lib import time_utils
Allen Li3d43e602016-12-08 15:09:51 -080023from autotest_lib.client.common_lib.cros import dev_server
Aviv Keshet14cac442016-11-20 21:44:11 -080024# TODO(akeshet): Replace with monarch once we know how to instrument rpc server
25# with ts_mon.
MK Ryu509516b2015-05-18 12:00:47 -070026from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080027from autotest_lib.server import utils as server_utils
MK Ryu9651ca52015-06-08 17:48:22 -070028from autotest_lib.server.cros import provision
29from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
mblighe8819cd2008-02-15 16:48:40 +000030
showarda62866b2008-07-28 21:27:41 +000031NULL_DATETIME = datetime.datetime.max
32NULL_DATE = datetime.date.max
Fang Deng7051fe42015-10-20 14:57:28 -070033DUPLICATE_KEY_MSG = 'Duplicate entry'
showarda62866b2008-07-28 21:27:41 +000034
mblighe8819cd2008-02-15 16:48:40 +000035def prepare_for_serialization(objects):
jadmanski0afbb632008-06-06 21:10:57 +000036 """
37 Prepare Python objects to be returned via RPC.
Aviv Keshet18308922013-02-19 17:49:49 -080038 @param objects: objects to be prepared.
jadmanski0afbb632008-06-06 21:10:57 +000039 """
40 if (isinstance(objects, list) and len(objects) and
41 isinstance(objects[0], dict) and 'id' in objects[0]):
42 objects = gather_unique_dicts(objects)
43 return _prepare_data(objects)
showardb8d34242008-04-25 18:11:16 +000044
45
showardc92da832009-04-07 18:14:34 +000046def prepare_rows_as_nested_dicts(query, nested_dict_column_names):
47 """
48 Prepare a Django query to be returned via RPC as a sequence of nested
49 dictionaries.
50
51 @param query - A Django model query object with a select_related() method.
52 @param nested_dict_column_names - A list of column/attribute names for the
53 rows returned by query to expand into nested dictionaries using
54 their get_object_dict() method when not None.
55
56 @returns An list suitable to returned in an RPC.
57 """
58 all_dicts = []
59 for row in query.select_related():
60 row_dict = row.get_object_dict()
61 for column in nested_dict_column_names:
62 if row_dict[column] is not None:
63 row_dict[column] = getattr(row, column).get_object_dict()
64 all_dicts.append(row_dict)
65 return prepare_for_serialization(all_dicts)
66
67
showardb8d34242008-04-25 18:11:16 +000068def _prepare_data(data):
jadmanski0afbb632008-06-06 21:10:57 +000069 """
70 Recursively process data structures, performing necessary type
71 conversions to values in data to allow for RPC serialization:
72 -convert datetimes to strings
showard2b9a88b2008-06-13 20:55:03 +000073 -convert tuples and sets to lists
jadmanski0afbb632008-06-06 21:10:57 +000074 """
75 if isinstance(data, dict):
76 new_data = {}
77 for key, value in data.iteritems():
78 new_data[key] = _prepare_data(value)
79 return new_data
showard2b9a88b2008-06-13 20:55:03 +000080 elif (isinstance(data, list) or isinstance(data, tuple) or
81 isinstance(data, set)):
jadmanski0afbb632008-06-06 21:10:57 +000082 return [_prepare_data(item) for item in data]
showard98659972008-07-17 17:00:07 +000083 elif isinstance(data, datetime.date):
showarda62866b2008-07-28 21:27:41 +000084 if data is NULL_DATETIME or data is NULL_DATE:
85 return None
jadmanski0afbb632008-06-06 21:10:57 +000086 return str(data)
87 else:
88 return data
mblighe8819cd2008-02-15 16:48:40 +000089
90
Moises Osorio2dda22e2014-09-16 15:56:24 -070091def fetchall_as_list_of_dicts(cursor):
92 """
93 Converts each row in the cursor to a dictionary so that values can be read
94 by using the column name.
95 @param cursor: The database cursor to read from.
96 @returns: A list of each row in the cursor as a dictionary.
97 """
98 desc = cursor.description
99 return [ dict(zip([col[0] for col in desc], row))
100 for row in cursor.fetchall() ]
101
102
showard3d6ae112009-05-02 00:45:48 +0000103def raw_http_response(response_data, content_type=None):
104 response = django.http.HttpResponse(response_data, mimetype=content_type)
105 response['Content-length'] = str(len(response.content))
106 return response
107
108
showardb0dfb9f2008-06-06 18:08:02 +0000109def gather_unique_dicts(dict_iterable):
jadmanski0afbb632008-06-06 21:10:57 +0000110 """\
111 Pick out unique objects (by ID) from an iterable of object dicts.
112 """
113 id_set = set()
114 result = []
115 for obj in dict_iterable:
116 if obj['id'] not in id_set:
117 id_set.add(obj['id'])
118 result.append(obj)
119 return result
showardb0dfb9f2008-06-06 18:08:02 +0000120
121
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700122def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
jadmanski0afbb632008-06-06 21:10:57 +0000123 """\
124 Generate a SQL WHERE clause for job status filtering, and return it in
Simran Basi01984f52015-10-12 15:36:45 -0700125 a dict of keyword args to pass to query.extra().
showard6c65d252009-10-01 18:45:22 +0000126 * not_yet_run: all HQEs are Queued
127 * finished: all HQEs are complete
128 * running: everything else
jadmanski0afbb632008-06-06 21:10:57 +0000129 """
Simran Basi01984f52015-10-12 15:36:45 -0700130 if not (not_yet_run or running or finished):
131 return {}
showardeab66ce2009-12-23 00:03:56 +0000132 not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
133 'WHERE status != "%s")'
showard6c65d252009-10-01 18:45:22 +0000134 % models.HostQueueEntry.Status.QUEUED)
showardeab66ce2009-12-23 00:03:56 +0000135 not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
136 'WHERE not complete)')
showard6c65d252009-10-01 18:45:22 +0000137
Simran Basi01984f52015-10-12 15:36:45 -0700138 where = []
jadmanski0afbb632008-06-06 21:10:57 +0000139 if not_yet_run:
Simran Basi01984f52015-10-12 15:36:45 -0700140 where.append('id NOT IN ' + not_queued)
141 if running:
142 where.append('(id IN %s) AND (id IN %s)' % (not_queued, not_finished))
143 if finished:
144 where.append('id NOT IN ' + not_finished)
145 return {'where': [' OR '.join(['(%s)' % x for x in where])]}
mblighe8819cd2008-02-15 16:48:40 +0000146
147
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700148def extra_job_type_filters(extra_args, suite=False,
149 sub=False, standalone=False):
150 """\
151 Generate a SQL WHERE clause for job status filtering, and return it in
152 a dict of keyword args to pass to query.extra().
153
154 param extra_args: a dict of existing extra_args.
155
156 No more than one of the parameters should be passed as True:
157 * suite: job which is parent of other jobs
158 * sub: job with a parent job
159 * standalone: job with no child or parent jobs
160 """
161 assert not ((suite and sub) or
162 (suite and standalone) or
163 (sub and standalone)), ('Cannot specify more than one '
164 'filter to this function')
165
166 where = extra_args.get('where', [])
167 parent_job_id = ('DISTINCT parent_job_id')
168 child_job_id = ('id')
169 filter_common = ('(SELECT %s FROM afe_jobs '
170 'WHERE parent_job_id IS NOT NULL)')
171
172 if suite:
173 where.append('id IN ' + filter_common % parent_job_id)
174 elif sub:
175 where.append('id IN ' + filter_common % child_job_id)
176 elif standalone:
177 where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
178 'WHERE parent_job_id IS NOT NULL'
179 ' AND (sub_query.parent_job_id=afe_jobs.id'
180 ' OR sub_query.id=afe_jobs.id))')
181 else:
182 return extra_args
183
184 extra_args['where'] = where
185 return extra_args
186
187
188
showard87cc38f2009-08-20 23:37:04 +0000189def extra_host_filters(multiple_labels=()):
jadmanski0afbb632008-06-06 21:10:57 +0000190 """\
191 Generate SQL WHERE clauses for matching hosts in an intersection of
192 labels.
193 """
194 extra_args = {}
showardeab66ce2009-12-23 00:03:56 +0000195 where_str = ('afe_hosts.id in (select host_id from afe_hosts_labels '
jadmanski0afbb632008-06-06 21:10:57 +0000196 'where label_id=%s)')
197 extra_args['where'] = [where_str] * len(multiple_labels)
198 extra_args['params'] = [models.Label.smart_get(label).id
199 for label in multiple_labels]
200 return extra_args
showard8e3aa5e2008-04-08 19:42:32 +0000201
202
showard87cc38f2009-08-20 23:37:04 +0000203def get_host_query(multiple_labels, exclude_only_if_needed_labels,
showard8aa84fc2009-09-16 17:17:55 +0000204 exclude_atomic_group_hosts, valid_only, filter_data):
205 if valid_only:
206 query = models.Host.valid_objects.all()
207 else:
208 query = models.Host.objects.all()
209
showard43a3d262008-11-12 18:17:05 +0000210 if exclude_only_if_needed_labels:
211 only_if_needed_labels = models.Label.valid_objects.filter(
212 only_if_needed=True)
showardf7eac6f2008-11-13 21:18:01 +0000213 if only_if_needed_labels.count() > 0:
showard87cc38f2009-08-20 23:37:04 +0000214 only_if_needed_ids = ','.join(
215 str(label['id'])
216 for label in only_if_needed_labels.values('id'))
showardf7eac6f2008-11-13 21:18:01 +0000217 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000218 query, 'afe_hosts_labels', join_key='host_id',
219 join_condition=('afe_hosts_labels_exclude_OIN.label_id IN (%s)'
showard87cc38f2009-08-20 23:37:04 +0000220 % only_if_needed_ids),
221 suffix='_exclude_OIN', exclude=True)
showard8aa84fc2009-09-16 17:17:55 +0000222
showard87cc38f2009-08-20 23:37:04 +0000223 if exclude_atomic_group_hosts:
224 atomic_group_labels = models.Label.valid_objects.filter(
225 atomic_group__isnull=False)
226 if atomic_group_labels.count() > 0:
227 atomic_group_label_ids = ','.join(
228 str(atomic_group['id'])
229 for atomic_group in atomic_group_labels.values('id'))
230 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000231 query, 'afe_hosts_labels', join_key='host_id',
232 join_condition=(
233 'afe_hosts_labels_exclude_AG.label_id IN (%s)'
234 % atomic_group_label_ids),
showard87cc38f2009-08-20 23:37:04 +0000235 suffix='_exclude_AG', exclude=True)
Fang Deng04d30612013-04-10 18:13:13 -0700236 try:
237 assert 'extra_args' not in filter_data
238 filter_data['extra_args'] = extra_host_filters(multiple_labels)
239 return models.Host.query_objects(filter_data, initial_query=query)
240 except models.Label.DoesNotExist as e:
241 return models.Host.objects.none()
showard43a3d262008-11-12 18:17:05 +0000242
243
showard8fd58242008-03-10 21:29:07 +0000244class InconsistencyException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000245 'Raised when a list of objects does not have a consistent value'
showard8fd58242008-03-10 21:29:07 +0000246
247
248def get_consistent_value(objects, field):
mblighc5ddfd12008-08-04 17:15:00 +0000249 if not objects:
250 # well a list of nothing is consistent
251 return None
252
jadmanski0afbb632008-06-06 21:10:57 +0000253 value = getattr(objects[0], field)
254 for obj in objects:
255 this_value = getattr(obj, field)
256 if this_value != value:
257 raise InconsistencyException(objects[0], obj)
258 return value
showard8fd58242008-03-10 21:29:07 +0000259
260
Matthew Sartori10438092015-06-24 14:30:18 -0700261def afe_test_dict_to_test_object(test_dict):
262 if not isinstance(test_dict, dict):
263 return test_dict
264
265 numerized_dict = {}
266 for key, value in test_dict.iteritems():
267 try:
268 numerized_dict[key] = int(value)
269 except (ValueError, TypeError):
270 numerized_dict[key] = value
271
272 return type('TestObject', (object,), numerized_dict)
273
274
Michael Tang84a2ecf2016-06-07 15:10:53 -0700275def _check_is_server_test(test_type):
276 """Checks if the test type is a server test.
277
278 @param test_type The test type in enum integer or string.
279
280 @returns A boolean to identify if the test type is server test.
281 """
282 if test_type is not None:
283 if isinstance(test_type, basestring):
284 try:
285 test_type = control_data.CONTROL_TYPE.get_value(test_type)
286 except AttributeError:
287 return False
288 return (test_type == control_data.CONTROL_TYPE.SERVER)
289 return False
290
291
Richard Barnette8e33b4e2016-05-21 12:12:26 -0700292def prepare_generate_control_file(tests, profilers, db_tests=True):
Matthew Sartori10438092015-06-24 14:30:18 -0700293 if db_tests:
294 test_objects = [models.Test.smart_get(test) for test in tests]
295 else:
296 test_objects = [afe_test_dict_to_test_object(test) for test in tests]
297
showard2b9a88b2008-06-13 20:55:03 +0000298 profiler_objects = [models.Profiler.smart_get(profiler)
299 for profiler in profilers]
jadmanski0afbb632008-06-06 21:10:57 +0000300 # ensure tests are all the same type
301 try:
302 test_type = get_consistent_value(test_objects, 'test_type')
303 except InconsistencyException, exc:
304 test1, test2 = exc.args
mblighec5546d2008-06-16 16:51:28 +0000305 raise model_logic.ValidationError(
Matthew Sartori10438092015-06-24 14:30:18 -0700306 {'tests' : 'You cannot run both test_suites and server-side '
jadmanski0afbb632008-06-06 21:10:57 +0000307 'tests together (tests %s and %s differ' % (
308 test1.name, test2.name)})
showard8fd58242008-03-10 21:29:07 +0000309
Michael Tang84a2ecf2016-06-07 15:10:53 -0700310 is_server = _check_is_server_test(test_type)
showard14374b12009-01-31 00:11:54 +0000311 if test_objects:
312 synch_count = max(test.sync_count for test in test_objects)
313 else:
314 synch_count = 1
mblighe8819cd2008-02-15 16:48:40 +0000315
Matthew Sartori10438092015-06-24 14:30:18 -0700316 if db_tests:
317 dependencies = set(label.name for label
318 in models.Label.objects.filter(test__in=test_objects))
319 else:
320 dependencies = reduce(
321 set.union, [set(test.dependencies) for test in test_objects])
showard989f25d2008-10-01 11:38:11 +0000322
showard2bab8f42008-11-12 18:15:22 +0000323 cf_info = dict(is_server=is_server, synch_count=synch_count,
324 dependencies=list(dependencies))
Richard Barnette8e33b4e2016-05-21 12:12:26 -0700325 return cf_info, test_objects, profiler_objects
showard989f25d2008-10-01 11:38:11 +0000326
327
328def check_job_dependencies(host_objects, job_dependencies):
329 """
330 Check that a set of machines satisfies a job's dependencies.
331 host_objects: list of models.Host objects
332 job_dependencies: list of names of labels
333 """
334 # check that hosts satisfy dependencies
335 host_ids = [host.id for host in host_objects]
336 hosts_in_job = models.Host.objects.filter(id__in=host_ids)
337 ok_hosts = hosts_in_job
338 for index, dependency in enumerate(job_dependencies):
Alex Milleraa772002014-04-10 17:51:21 -0700339 if not provision.is_for_special_action(dependency):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700340 ok_hosts = ok_hosts.filter(labels__name=dependency)
showard989f25d2008-10-01 11:38:11 +0000341 failing_hosts = (set(host.hostname for host in host_objects) -
342 set(host.hostname for host in ok_hosts))
343 if failing_hosts:
344 raise model_logic.ValidationError(
Eric Lie0493a42010-11-15 13:05:43 -0800345 {'hosts' : 'Host(s) failed to meet job dependencies (' +
346 (', '.join(job_dependencies)) + '): ' +
347 (', '.join(failing_hosts))})
348
showard989f25d2008-10-01 11:38:11 +0000349
Alex Miller4a193692013-08-21 13:59:01 -0700350def check_job_metahost_dependencies(metahost_objects, job_dependencies):
351 """
352 Check that at least one machine within the metahost spec satisfies the job's
353 dependencies.
354
355 @param metahost_objects A list of label objects representing the metahosts.
356 @param job_dependencies A list of strings of the required label names.
357 @raises NoEligibleHostException If a metahost cannot run the job.
358 """
359 for metahost in metahost_objects:
360 hosts = models.Host.objects.filter(labels=metahost)
361 for label_name in job_dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700362 if not provision.is_for_special_action(label_name):
Alex Miller4a193692013-08-21 13:59:01 -0700363 hosts = hosts.filter(labels__name=label_name)
364 if not any(hosts):
365 raise error.NoEligibleHostException("No hosts within %s satisfy %s."
366 % (metahost.name, ', '.join(job_dependencies)))
367
showard2bab8f42008-11-12 18:15:22 +0000368
369def _execution_key_for(host_queue_entry):
370 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
371
372
373def check_abort_synchronous_jobs(host_queue_entries):
374 # ensure user isn't aborting part of a synchronous autoserv execution
375 count_per_execution = {}
376 for queue_entry in host_queue_entries:
377 key = _execution_key_for(queue_entry)
378 count_per_execution.setdefault(key, 0)
379 count_per_execution[key] += 1
380
381 for queue_entry in host_queue_entries:
382 if not queue_entry.execution_subdir:
383 continue
384 execution_count = count_per_execution[_execution_key_for(queue_entry)]
385 if execution_count < queue_entry.job.synch_count:
mbligh1ef218d2009-08-03 16:57:56 +0000386 raise model_logic.ValidationError(
387 {'' : 'You cannot abort part of a synchronous job execution '
388 '(%d/%s), %d included, %d expected'
389 % (queue_entry.job.id, queue_entry.execution_subdir,
390 execution_count, queue_entry.job.synch_count)})
showard8fbae652009-01-20 23:23:10 +0000391
392
showardc92da832009-04-07 18:14:34 +0000393def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700394 dependencies, atomic_group):
showardc92da832009-04-07 18:14:34 +0000395 """
396 Attempt to reject create_job requests with an atomic group that
397 will be impossible to schedule. The checks are not perfect but
398 should catch the most obvious issues.
399
400 @param synch_count - The job's minimum synch count.
401 @param host_objects - A list of models.Host instances.
402 @param metahost_objects - A list of models.Label instances.
403 @param dependencies - A list of job dependency label names.
showardc92da832009-04-07 18:14:34 +0000404 @param labels_by_name - A dictionary mapping label names to models.Label
405 instance. Used to look up instances for dependencies.
406
407 @raises model_logic.ValidationError - When an issue is found.
408 """
409 # If specific host objects were supplied with an atomic group, verify
410 # that there are enough to satisfy the synch_count.
411 minimum_required = synch_count or 1
412 if (host_objects and not metahost_objects and
413 len(host_objects) < minimum_required):
414 raise model_logic.ValidationError(
415 {'hosts':
416 'only %d hosts provided for job with synch_count = %d' %
417 (len(host_objects), synch_count)})
418
419 # Check that the atomic group has a hope of running this job
420 # given any supplied metahosts and dependancies that may limit.
421
422 # Get a set of hostnames in the atomic group.
423 possible_hosts = set()
424 for label in atomic_group.label_set.all():
425 possible_hosts.update(h.hostname for h in label.host_set.all())
426
427 # Filter out hosts that don't match all of the job dependency labels.
Alex Miller871291b2013-08-08 01:19:20 -0700428 for label in models.Label.objects.filter(name__in=dependencies):
showardc92da832009-04-07 18:14:34 +0000429 hosts_in_label = (h.hostname for h in label.host_set.all())
430 possible_hosts.intersection_update(hosts_in_label)
431
showard225bdc12009-04-13 16:09:21 +0000432 if not host_objects and not metahost_objects:
433 # No hosts or metahosts are required to queue an atomic group Job.
434 # However, if they are given, we respect them below.
435 host_set = possible_hosts
436 else:
437 host_set = set(host.hostname for host in host_objects)
438 unusable_host_set = host_set.difference(possible_hosts)
439 if unusable_host_set:
440 raise model_logic.ValidationError(
441 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %
442 (', '.join(sorted(unusable_host_set)), atomic_group.name)})
showardc92da832009-04-07 18:14:34 +0000443
444 # Lookup hosts provided by each meta host and merge them into the
445 # host_set for final counting.
446 for meta_host in metahost_objects:
447 meta_possible = possible_hosts.copy()
448 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())
449 meta_possible.intersection_update(hosts_in_meta_host)
450
451 # Count all hosts that this meta_host will provide.
452 host_set.update(meta_possible)
453
454 if len(host_set) < minimum_required:
455 raise model_logic.ValidationError(
456 {'atomic_group_name':
457 'Insufficient hosts in Atomic Group "%s" with the'
458 ' supplied dependencies and meta_hosts.' %
459 (atomic_group.name,)})
460
461
showardbe0d8692009-08-20 23:42:44 +0000462def check_modify_host(update_data):
463 """
464 Sanity check modify_host* requests.
465
466 @param update_data: A dictionary with the changes to make to a host
467 or hosts.
468 """
469 # Only the scheduler (monitor_db) is allowed to modify Host status.
470 # Otherwise race conditions happen as a hosts state is changed out from
471 # beneath tasks being run on a host.
472 if 'status' in update_data:
473 raise model_logic.ValidationError({
474 'status': 'Host status can not be modified by the frontend.'})
475
476
showardce7c0922009-09-11 18:39:24 +0000477def check_modify_host_locking(host, update_data):
478 """
479 Checks when locking/unlocking has been requested if the host is already
480 locked/unlocked.
481
482 @param host: models.Host object to be modified
483 @param update_data: A dictionary with the changes to make to the host.
484 """
485 locked = update_data.get('locked', None)
Matthew Sartori68186332015-04-27 17:19:53 -0700486 lock_reason = update_data.get('lock_reason', None)
showardce7c0922009-09-11 18:39:24 +0000487 if locked is not None:
488 if locked and host.locked:
489 raise model_logic.ValidationError({
Shuqian Zhao4c0d2902016-01-12 17:03:15 -0800490 'locked': 'Host %s already locked by %s on %s.' %
491 (host.hostname, host.locked_by, host.lock_time)})
showardce7c0922009-09-11 18:39:24 +0000492 if not locked and not host.locked:
493 raise model_logic.ValidationError({
Shuqian Zhao4c0d2902016-01-12 17:03:15 -0800494 'locked': 'Host %s already unlocked.' % host.hostname})
Matthew Sartori68186332015-04-27 17:19:53 -0700495 if locked and not lock_reason and not host.locked:
496 raise model_logic.ValidationError({
Shuqian Zhao4c0d2902016-01-12 17:03:15 -0800497 'locked': 'Please provide a reason for locking Host %s' %
498 host.hostname})
showardce7c0922009-09-11 18:39:24 +0000499
500
showard8fbae652009-01-20 23:23:10 +0000501def get_motd():
502 dirname = os.path.dirname(__file__)
503 filename = os.path.join(dirname, "..", "..", "motd.txt")
504 text = ''
505 try:
506 fp = open(filename, "r")
507 try:
508 text = fp.read()
509 finally:
510 fp.close()
511 except:
512 pass
513
514 return text
showard29f7cd22009-04-29 21:16:24 +0000515
516
517def _get_metahost_counts(metahost_objects):
518 metahost_counts = {}
519 for metahost in metahost_objects:
520 metahost_counts.setdefault(metahost, 0)
521 metahost_counts[metahost] += 1
522 return metahost_counts
523
524
showarda965cef2009-05-15 23:17:41 +0000525def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
showard29f7cd22009-04-29 21:16:24 +0000526 hosts = []
527 one_time_hosts = []
528 meta_hosts = []
529 atomic_group = None
jamesren2275ef12010-04-12 18:25:06 +0000530 hostless = False
showard29f7cd22009-04-29 21:16:24 +0000531
showard4d077562009-05-08 18:24:36 +0000532 queue_entries = job.hostqueueentry_set.all()
showarda965cef2009-05-15 23:17:41 +0000533 if queue_entry_filter_data:
534 queue_entries = models.HostQueueEntry.query_objects(
535 queue_entry_filter_data, initial_query=queue_entries)
showard4d077562009-05-08 18:24:36 +0000536
537 for queue_entry in queue_entries:
showard29f7cd22009-04-29 21:16:24 +0000538 if (queue_entry.host and (preserve_metahosts or
539 not queue_entry.meta_host)):
540 if queue_entry.deleted:
541 continue
542 if queue_entry.host.invalid:
543 one_time_hosts.append(queue_entry.host)
544 else:
545 hosts.append(queue_entry.host)
jamesren2275ef12010-04-12 18:25:06 +0000546 elif queue_entry.meta_host:
showard29f7cd22009-04-29 21:16:24 +0000547 meta_hosts.append(queue_entry.meta_host)
jamesren2275ef12010-04-12 18:25:06 +0000548 else:
549 hostless = True
550
showard29f7cd22009-04-29 21:16:24 +0000551 if atomic_group is None:
552 if queue_entry.atomic_group is not None:
553 atomic_group = queue_entry.atomic_group
554 else:
555 assert atomic_group.name == queue_entry.atomic_group.name, (
556 'DB inconsistency. HostQueueEntries with multiple atomic'
557 ' groups on job %s: %s != %s' % (
558 id, atomic_group.name, queue_entry.atomic_group.name))
559
560 meta_host_counts = _get_metahost_counts(meta_hosts)
561
562 info = dict(dependencies=[label.name for label
563 in job.dependency_labels.all()],
564 hosts=hosts,
565 meta_hosts=meta_hosts,
566 meta_host_counts=meta_host_counts,
567 one_time_hosts=one_time_hosts,
jamesren2275ef12010-04-12 18:25:06 +0000568 atomic_group=atomic_group,
569 hostless=hostless)
showard29f7cd22009-04-29 21:16:24 +0000570 return info
571
572
showard09d80f92009-11-19 01:01:19 +0000573def check_for_duplicate_hosts(host_objects):
574 host_ids = set()
575 duplicate_hostnames = set()
576 for host in host_objects:
577 if host.id in host_ids:
578 duplicate_hostnames.add(host.hostname)
579 host_ids.add(host.id)
580
581 if duplicate_hostnames:
582 raise model_logic.ValidationError(
583 {'hosts' : 'Duplicate hosts: %s'
584 % ', '.join(duplicate_hostnames)})
585
586
showarda1e74b32009-05-12 17:32:04 +0000587def create_new_job(owner, options, host_objects, metahost_objects,
588 atomic_group=None):
showard29f7cd22009-04-29 21:16:24 +0000589 all_host_objects = host_objects + metahost_objects
showarda1e74b32009-05-12 17:32:04 +0000590 dependencies = options.get('dependencies', [])
591 synch_count = options.get('synch_count')
showard29f7cd22009-04-29 21:16:24 +0000592
showard29f7cd22009-04-29 21:16:24 +0000593 if atomic_group:
594 check_atomic_group_create_job(
595 synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700596 dependencies, atomic_group)
showard29f7cd22009-04-29 21:16:24 +0000597 else:
598 if synch_count is not None and synch_count > len(all_host_objects):
599 raise model_logic.ValidationError(
600 {'hosts':
601 'only %d hosts provided for job with synch_count = %d' %
602 (len(all_host_objects), synch_count)})
603 atomic_hosts = models.Host.objects.filter(
604 id__in=[host.id for host in host_objects],
605 labels__atomic_group=True)
606 unusable_host_names = [host.hostname for host in atomic_hosts]
607 if unusable_host_names:
608 raise model_logic.ValidationError(
609 {'hosts':
610 'Host(s) "%s" are atomic group hosts but no '
611 'atomic group was specified for this job.' %
612 (', '.join(unusable_host_names),)})
613
showard09d80f92009-11-19 01:01:19 +0000614 check_for_duplicate_hosts(host_objects)
showard29f7cd22009-04-29 21:16:24 +0000615
Aviv Keshetc68807e2013-07-31 16:13:01 -0700616 for label_name in dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700617 if provision.is_for_special_action(label_name):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700618 # TODO: We could save a few queries
619 # if we had a bulk ensure-label-exists function, which used
620 # a bulk .get() call. The win is probably very small.
Alex Miller871291b2013-08-08 01:19:20 -0700621 _ensure_label_exists(label_name)
Aviv Keshetc68807e2013-07-31 16:13:01 -0700622
Alex Miller4a193692013-08-21 13:59:01 -0700623 # This only checks targeted hosts, not hosts eligible due to the metahost
624 check_job_dependencies(host_objects, dependencies)
625 check_job_metahost_dependencies(metahost_objects, dependencies)
626
Alex Miller871291b2013-08-08 01:19:20 -0700627 options['dependencies'] = list(
628 models.Label.objects.filter(name__in=dependencies))
showard29f7cd22009-04-29 21:16:24 +0000629
showarda1e74b32009-05-12 17:32:04 +0000630 for label in metahost_objects + options['dependencies']:
showard29f7cd22009-04-29 21:16:24 +0000631 if label.atomic_group and not atomic_group:
632 raise model_logic.ValidationError(
633 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000634 'Dependency %r requires an atomic group but no '
635 'atomic_group_name or meta_host in an atomic group was '
636 'specified for this job.' % label.name})
showard29f7cd22009-04-29 21:16:24 +0000637 elif (label.atomic_group and
638 label.atomic_group.name != atomic_group.name):
639 raise model_logic.ValidationError(
640 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000641 'meta_hosts or dependency %r requires atomic group '
642 '%r instead of the supplied atomic_group_name=%r.' %
643 (label.name, label.atomic_group.name, atomic_group.name)})
showard29f7cd22009-04-29 21:16:24 +0000644
showarda1e74b32009-05-12 17:32:04 +0000645 job = models.Job.create(owner=owner, options=options,
646 hosts=all_host_objects)
showard29f7cd22009-04-29 21:16:24 +0000647 job.queue(all_host_objects, atomic_group=atomic_group,
showarda1e74b32009-05-12 17:32:04 +0000648 is_template=options.get('is_template', False))
showard29f7cd22009-04-29 21:16:24 +0000649 return job.id
showard0957a842009-05-11 19:25:08 +0000650
651
Aviv Keshetc68807e2013-07-31 16:13:01 -0700652def _ensure_label_exists(name):
653 """
654 Ensure that a label called |name| exists in the Django models.
655
656 This function is to be called from within afe rpcs only, as an
657 alternative to server.cros.provision.ensure_label_exists(...). It works
658 by Django model manipulation, rather than by making another create_label
659 rpc call.
660
661 @param name: the label to check for/create.
662 @raises ValidationError: There was an error in the response that was
663 not because the label already existed.
664 @returns True is a label was created, False otherwise.
665 """
MK Ryu73be9862015-07-06 12:25:00 -0700666 # Make sure this function is not called on shards but only on master.
667 assert not server_utils.is_shard()
Aviv Keshetc68807e2013-07-31 16:13:01 -0700668 try:
669 models.Label.objects.get(name=name)
670 except models.Label.DoesNotExist:
Fang Deng7051fe42015-10-20 14:57:28 -0700671 try:
672 new_label = models.Label.objects.create(name=name)
673 new_label.save()
674 return True
675 except django.db.utils.IntegrityError as e:
676 # It is possible that another suite/test already
677 # created the label between the check and save.
678 if DUPLICATE_KEY_MSG in str(e):
679 return False
680 else:
681 raise
Aviv Keshetc68807e2013-07-31 16:13:01 -0700682 return False
683
684
showard909c9142009-07-07 20:54:42 +0000685def find_platform_and_atomic_group(host):
686 """
687 Figure out the platform name and atomic group name for the given host
688 object. If none, the return value for either will be None.
689
690 @returns (platform name, atomic group name) for the given host.
691 """
showard0957a842009-05-11 19:25:08 +0000692 platforms = [label.name for label in host.label_list if label.platform]
693 if not platforms:
showard909c9142009-07-07 20:54:42 +0000694 platform = None
695 else:
696 platform = platforms[0]
showard0957a842009-05-11 19:25:08 +0000697 if len(platforms) > 1:
698 raise ValueError('Host %s has more than one platform: %s' %
699 (host.hostname, ', '.join(platforms)))
showard909c9142009-07-07 20:54:42 +0000700 for label in host.label_list:
701 if label.atomic_group:
702 atomic_group_name = label.atomic_group.name
703 break
704 else:
705 atomic_group_name = None
706 # Don't check for multiple atomic groups on a host here. That is an
707 # error but should not trip up the RPC interface. monitor_db_cleanup
708 # deals with it. This just returns the first one found.
709 return platform, atomic_group_name
showardc0ac3a72009-07-08 21:14:45 +0000710
711
712# support for get_host_queue_entries_and_special_tasks()
713
MK Ryu0c1a37d2015-04-30 12:00:55 -0700714def _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on):
showardc0ac3a72009-07-08 21:14:45 +0000715 return dict(type=type,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700716 host=entry['host'],
showardc0ac3a72009-07-08 21:14:45 +0000717 job=job_dict,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700718 execution_path=exec_path,
719 status=status,
720 started_on=started_on,
721 id=str(entry['id']) + type,
722 oid=entry['id'])
showardc0ac3a72009-07-08 21:14:45 +0000723
724
MK Ryu0c1a37d2015-04-30 12:00:55 -0700725def _special_task_to_dict(task, queue_entries):
726 """Transforms a special task dictionary to another form of dictionary.
727
728 @param task Special task as a dictionary type
729 @param queue_entries Host queue entries as a list of dictionaries.
730
731 @return Transformed dictionary for a special task.
732 """
showardc0ac3a72009-07-08 21:14:45 +0000733 job_dict = None
MK Ryu0c1a37d2015-04-30 12:00:55 -0700734 if task['queue_entry']:
735 # Scan queue_entries to get the job detail info.
736 for qentry in queue_entries:
737 if task['queue_entry']['id'] == qentry['id']:
738 job_dict = qentry['job']
739 break
740 # If not found, get it from DB.
741 if job_dict is None:
742 job = models.Job.objects.get(id=task['queue_entry']['job'])
743 job_dict = job.get_object_dict()
744
745 exec_path = server_utils.get_special_task_exec_path(
746 task['host']['hostname'], task['id'], task['task'],
747 time_utils.time_string_to_datetime(task['time_requested']))
748 status = server_utils.get_special_task_status(
749 task['is_complete'], task['success'], task['is_active'])
750 return _common_entry_to_dict(task, task['task'], job_dict,
751 exec_path, status, task['time_started'])
showardc0ac3a72009-07-08 21:14:45 +0000752
753
754def _queue_entry_to_dict(queue_entry):
MK Ryu0c1a37d2015-04-30 12:00:55 -0700755 job_dict = queue_entry['job']
756 tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner'])
757 exec_path = server_utils.get_hqe_exec_path(tag,
758 queue_entry['execution_subdir'])
759 return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path,
760 queue_entry['status'], queue_entry['started_on'])
761
762
763def prepare_host_queue_entries_and_special_tasks(interleaved_entries,
764 queue_entries):
765 """
766 Prepare for serialization the interleaved entries of host queue entries
767 and special tasks.
768 Each element in the entries is a dictionary type.
769 The special task dictionary has only a job id for a job and lacks
770 the detail of the job while the host queue entry dictionary has.
771 queue_entries is used to look up the job detail info.
772
773 @param interleaved_entries Host queue entries and special tasks as a list
774 of dictionaries.
775 @param queue_entries Host queue entries as a list of dictionaries.
776
777 @return A post-processed list of dictionaries that is to be serialized.
778 """
779 dict_list = []
780 for e in interleaved_entries:
781 # Distinguish the two mixed entries based on the existence of
782 # the key "task". If an entry has the key, the entry is for
783 # special task. Otherwise, host queue entry.
784 if 'task' in e:
785 dict_list.append(_special_task_to_dict(e, queue_entries))
786 else:
787 dict_list.append(_queue_entry_to_dict(e))
788 return prepare_for_serialization(dict_list)
showardc0ac3a72009-07-08 21:14:45 +0000789
790
791def _compute_next_job_for_tasks(queue_entries, special_tasks):
792 """
793 For each task, try to figure out the next job that ran after that task.
794 This is done using two pieces of information:
795 * if the task has a queue entry, we can use that entry's job ID.
796 * if the task has a time_started, we can try to compare that against the
797 started_on field of queue_entries. this isn't guaranteed to work perfectly
798 since queue_entries may also have null started_on values.
799 * if the task has neither, or if use of time_started fails, just use the
800 last computed job ID.
MK Ryu0c1a37d2015-04-30 12:00:55 -0700801
802 @param queue_entries Host queue entries as a list of dictionaries.
803 @param special_tasks Special tasks as a list of dictionaries.
showardc0ac3a72009-07-08 21:14:45 +0000804 """
805 next_job_id = None # most recently computed next job
806 hqe_index = 0 # index for scanning by started_on times
807 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700808 if task['queue_entry']:
809 next_job_id = task['queue_entry']['job']
810 elif task['time_started'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000811 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700812 if queue_entry['started_on'] is None:
showardc0ac3a72009-07-08 21:14:45 +0000813 continue
MK Ryu0c1a37d2015-04-30 12:00:55 -0700814 t1 = time_utils.time_string_to_datetime(
815 queue_entry['started_on'])
816 t2 = time_utils.time_string_to_datetime(task['time_started'])
817 if t1 < t2:
showardc0ac3a72009-07-08 21:14:45 +0000818 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700819 next_job_id = queue_entry['job']['id']
showardc0ac3a72009-07-08 21:14:45 +0000820
MK Ryu0c1a37d2015-04-30 12:00:55 -0700821 task['next_job_id'] = next_job_id
showardc0ac3a72009-07-08 21:14:45 +0000822
823 # advance hqe_index to just after next_job_id
824 if next_job_id is not None:
825 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700826 if queue_entry['job']['id'] < next_job_id:
showardc0ac3a72009-07-08 21:14:45 +0000827 break
828 hqe_index += 1
829
830
831def interleave_entries(queue_entries, special_tasks):
832 """
833 Both lists should be ordered by descending ID.
834 """
835 _compute_next_job_for_tasks(queue_entries, special_tasks)
836
837 # start with all special tasks that've run since the last job
838 interleaved_entries = []
839 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700840 if task['next_job_id'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000841 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700842 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000843
844 # now interleave queue entries with the remaining special tasks
845 special_task_index = len(interleaved_entries)
846 for queue_entry in queue_entries:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700847 interleaved_entries.append(queue_entry)
showardc0ac3a72009-07-08 21:14:45 +0000848 # add all tasks that ran between this job and the previous one
849 for task in special_tasks[special_task_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700850 if task['next_job_id'] < queue_entry['job']['id']:
showardc0ac3a72009-07-08 21:14:45 +0000851 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700852 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000853 special_task_index += 1
854
855 return interleaved_entries
jamesren4a41e012010-07-16 22:33:48 +0000856
857
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800858def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
859 """Figure out which hosts are on which shards.
860
861 @param host_objs: A list of host objects.
862 @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
863 instead of the 'real' shard hostnames. This only matters for testing
864 environments.
865
866 @return: A map of shard hostname: list of hosts on the shard.
867 """
Allen Li3d4e6112016-12-28 11:10:25 -0800868 shard_host_map = collections.defaultdict(list)
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800869 for host in host_objs:
870 if host.shard:
871 shard_name = (host.shard.rpc_hostname() if rpc_hostnames
872 else host.shard.hostname)
Allen Li3d4e6112016-12-28 11:10:25 -0800873 shard_host_map[shard_name].append(host.hostname)
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800874 return shard_host_map
875
876
Allen Li1453fdf2016-12-14 12:16:37 -0800877def create_job_common(
878 name,
879 priority,
880 control_type,
881 control_file=None,
882 hosts=(),
883 meta_hosts=(),
884 one_time_hosts=(),
885 atomic_group_name=None,
886 synch_count=None,
887 is_template=False,
888 timeout=None,
889 timeout_mins=None,
890 max_runtime_mins=None,
891 run_verify=True,
892 email_list='',
893 dependencies=(),
894 reboot_before=None,
895 reboot_after=None,
896 parse_failed_repair=None,
897 hostless=False,
898 keyvals=None,
899 drone_set=None,
900 parameterized_job=None,
901 parent_job_id=None,
902 test_retry=0,
903 run_reset=True,
904 require_ssp=None):
Aviv Keshet18308922013-02-19 17:49:49 -0800905 #pylint: disable-msg=C0111
jamesren4a41e012010-07-16 22:33:48 +0000906 """
907 Common code between creating "standard" jobs and creating parameterized jobs
908 """
jamesren4a41e012010-07-16 22:33:48 +0000909 # input validation
Allen Lie6203192016-12-14 13:05:53 -0800910 host_args_passed = any((
911 hosts, meta_hosts, one_time_hosts, atomic_group_name))
jamesren4a41e012010-07-16 22:33:48 +0000912 if hostless:
Allen Lie6203192016-12-14 13:05:53 -0800913 if host_args_passed:
jamesren4a41e012010-07-16 22:33:48 +0000914 raise model_logic.ValidationError({
915 'hostless': 'Hostless jobs cannot include any hosts!'})
Allen Lie6203192016-12-14 13:05:53 -0800916 if control_type != control_data.CONTROL_TYPE_NAMES.SERVER:
jamesren4a41e012010-07-16 22:33:48 +0000917 raise model_logic.ValidationError({
918 'control_type': 'Hostless jobs cannot use client-side '
919 'control files'})
Allen Lie6203192016-12-14 13:05:53 -0800920 elif not host_args_passed:
921 raise model_logic.ValidationError({
922 'arguments' : "For host jobs, you must pass at least one of"
923 " 'hosts', 'meta_hosts', 'one_time_hosts',"
924 " 'atomic_group_name'."
925 })
jamesren4a41e012010-07-16 22:33:48 +0000926
Allen Lie6203192016-12-14 13:05:53 -0800927 atomic_groups_by_name = {
928 group.name: group for group in models.AtomicGroup.objects.all()
929 }
Alex Miller871291b2013-08-08 01:19:20 -0700930 label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
jamesren4a41e012010-07-16 22:33:48 +0000931
932 # Schedule on an atomic group automagically if one of the labels given
933 # is an atomic group label and no explicit atomic_group_name was supplied.
934 if not atomic_group_name:
Allen Lie6203192016-12-14 13:05:53 -0800935 atomic_group_name = _get_atomic_group_name_from_labels(label_objects)
936
jamesren4a41e012010-07-16 22:33:48 +0000937 # convert hostnames & meta hosts to host/label objects
938 host_objects = models.Host.smart_get_bulk(hosts)
Allen Lie6203192016-12-14 13:05:53 -0800939 _validate_host_job_sharding(host_objects)
940
jamesren4a41e012010-07-16 22:33:48 +0000941 metahost_objects = []
Alex Miller871291b2013-08-08 01:19:20 -0700942 meta_host_labels_by_name = {label.name: label for label in label_objects}
Allen Lie6203192016-12-14 13:05:53 -0800943 for label_name in meta_hosts:
Alex Miller871291b2013-08-08 01:19:20 -0700944 if label_name in meta_host_labels_by_name:
945 metahost_objects.append(meta_host_labels_by_name[label_name])
jamesren4a41e012010-07-16 22:33:48 +0000946 elif label_name in atomic_groups_by_name:
947 # If given a metahost name that isn't a Label, check to
948 # see if the user was specifying an Atomic Group instead.
949 atomic_group = atomic_groups_by_name[label_name]
950 if atomic_group_name and atomic_group_name != atomic_group.name:
951 raise model_logic.ValidationError({
952 'meta_hosts': (
953 'Label "%s" not found. If assumed to be an '
954 'atomic group it would conflict with the '
955 'supplied atomic group "%s".' % (
956 label_name, atomic_group_name))})
957 atomic_group_name = atomic_group.name
958 else:
959 raise model_logic.ValidationError(
960 {'meta_hosts' : 'Label "%s" not found' % label_name})
961
962 # Create and sanity check an AtomicGroup object if requested.
963 if atomic_group_name:
964 if one_time_hosts:
965 raise model_logic.ValidationError(
966 {'one_time_hosts':
967 'One time hosts cannot be used with an Atomic Group.'})
968 atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
969 if synch_count and synch_count > atomic_group.max_number_of_machines:
970 raise model_logic.ValidationError(
971 {'atomic_group_name' :
972 'You have requested a synch_count (%d) greater than the '
973 'maximum machines in the requested Atomic Group (%d).' %
974 (synch_count, atomic_group.max_number_of_machines)})
975 else:
976 atomic_group = None
977
Allen Lie6203192016-12-14 13:05:53 -0800978 for host in one_time_hosts:
jamesren4a41e012010-07-16 22:33:48 +0000979 this_host = models.Host.create_one_time_host(host)
980 host_objects.append(this_host)
981
982 options = dict(name=name,
983 priority=priority,
984 control_file=control_file,
985 control_type=control_type,
986 is_template=is_template,
987 timeout=timeout,
Simran Basi7e605742013-11-12 13:43:36 -0800988 timeout_mins=timeout_mins,
Simran Basi34217022012-11-06 13:43:15 -0800989 max_runtime_mins=max_runtime_mins,
jamesren4a41e012010-07-16 22:33:48 +0000990 synch_count=synch_count,
991 run_verify=run_verify,
992 email_list=email_list,
993 dependencies=dependencies,
994 reboot_before=reboot_before,
995 reboot_after=reboot_after,
996 parse_failed_repair=parse_failed_repair,
997 keyvals=keyvals,
998 drone_set=drone_set,
Aviv Keshet18308922013-02-19 17:49:49 -0800999 parameterized_job=parameterized_job,
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -08001000 parent_job_id=parent_job_id,
Dan Shi07e09af2013-04-12 09:31:29 -07001001 test_retry=test_retry,
Dan Shiec1d47d2015-02-13 11:38:13 -08001002 run_reset=run_reset,
1003 require_ssp=require_ssp)
Allen Lie6203192016-12-14 13:05:53 -08001004
1005 return create_new_job(owner=models.User.current_user().login,
jamesren4a41e012010-07-16 22:33:48 +00001006 options=options,
1007 host_objects=host_objects,
1008 metahost_objects=metahost_objects,
1009 atomic_group=atomic_group)
Simran Basib6ec8ae2014-04-23 12:05:08 -07001010
1011
Allen Lie6203192016-12-14 13:05:53 -08001012def _get_atomic_group_name_from_labels(label_objects):
1013 """Get atomic group name from label objects.
1014
1015 @returns: atomic group name string or None
1016 """
1017 for label in label_objects:
1018 if label and label.atomic_group:
1019 return label.atomic_group.name
1020
1021
1022def _validate_host_job_sharding(host_objects):
1023 """Check that the hosts obey job sharding rules."""
1024 if not (server_utils.is_shard()
1025 or _allowed_hosts_for_master_job(host_objects)):
1026 shard_host_map = bucket_hosts_by_shard(host_objects)
1027 raise ValueError(
1028 'The following hosts are on shard(s), please create '
1029 'seperate jobs for hosts on each shard: %s ' %
1030 shard_host_map)
1031
1032
1033def _allowed_hosts_for_master_job(host_objects):
1034 """Check that the hosts are allowed for a job on master."""
Allen Lie6203192016-12-14 13:05:53 -08001035 # We disallow the following jobs on master:
1036 # num_shards > 1: this is a job spanning across multiple shards.
1037 # num_shards == 1 but number of hosts on shard is less
1038 # than total number of hosts: this is a job that spans across
1039 # one shard and the master.
Allen Liacb97922016-12-14 13:45:50 -08001040 shard_host_map = bucket_hosts_by_shard(host_objects)
1041 num_shards = len(shard_host_map)
1042 if num_shards > 1:
1043 return False
1044 if num_shards == 1:
1045 hosts_on_shard = shard_host_map.values()[0]
1046 assert len(hosts_on_shard) <= len(host_objects)
1047 return len(hosts_on_shard) == len(host_objects)
1048 else:
1049 return True
Allen Lie6203192016-12-14 13:05:53 -08001050
1051
Simran Basib6ec8ae2014-04-23 12:05:08 -07001052def encode_ascii(control_file):
1053 """Force a control file to only contain ascii characters.
1054
1055 @param control_file: Control file to encode.
1056
1057 @returns the control file in an ascii encoding.
1058
1059 @raises error.ControlFileMalformed: if encoding fails.
1060 """
1061 try:
1062 return control_file.encode('ascii')
1063 except UnicodeDecodeError as e:
Jiaxi Luo421608e2014-07-07 14:38:00 -07001064 raise error.ControlFileMalformed(str(e))
1065
1066
1067def get_wmatrix_url():
1068 """Get wmatrix url from config file.
1069
1070 @returns the wmatrix url or an empty string.
1071 """
1072 return global_config.global_config.get_config_value('AUTOTEST_WEB',
1073 'wmatrix_url',
Jiaxi Luo15cbf372014-07-01 19:20:20 -07001074 default='')
Jiaxi Luo57bc1952014-07-22 15:27:30 -07001075
1076
1077def inject_times_to_filter(start_time_key=None, end_time_key=None,
1078 start_time_value=None, end_time_value=None,
1079 **filter_data):
1080 """Inject the key value pairs of start and end time if provided.
1081
1082 @param start_time_key: A string represents the filter key of start_time.
1083 @param end_time_key: A string represents the filter key of end_time.
1084 @param start_time_value: Start_time value.
1085 @param end_time_value: End_time value.
1086
1087 @returns the injected filter_data.
1088 """
1089 if start_time_value:
1090 filter_data[start_time_key] = start_time_value
1091 if end_time_value:
1092 filter_data[end_time_key] = end_time_value
1093 return filter_data
1094
1095
1096def inject_times_to_hqe_special_tasks_filters(filter_data_common,
1097 start_time, end_time):
1098 """Inject start and end time to hqe and special tasks filters.
1099
1100 @param filter_data_common: Common filter for hqe and special tasks.
1101 @param start_time_key: A string represents the filter key of start_time.
1102 @param end_time_key: A string represents the filter key of end_time.
1103
1104 @returns a pair of hqe and special tasks filters.
1105 """
1106 filter_data_special_tasks = filter_data_common.copy()
1107 return (inject_times_to_filter('started_on__gte', 'started_on__lte',
1108 start_time, end_time, **filter_data_common),
1109 inject_times_to_filter('time_started__gte', 'time_started__lte',
1110 start_time, end_time,
Jakob Juelich59cfe542014-09-02 16:37:46 -07001111 **filter_data_special_tasks))
1112
1113
1114def retrieve_shard(shard_hostname):
1115 """
Jakob Juelich77457572014-09-22 17:02:43 -07001116 Retrieves the shard with the given hostname from the database.
Jakob Juelich59cfe542014-09-02 16:37:46 -07001117
1118 @param shard_hostname: Hostname of the shard to retrieve
1119
Jakob Juelich77457572014-09-22 17:02:43 -07001120 @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
1121
Jakob Juelich59cfe542014-09-02 16:37:46 -07001122 @returns: Shard object
1123 """
MK Ryu509516b2015-05-18 12:00:47 -07001124 timer = autotest_stats.Timer('shard_heartbeat.retrieve_shard')
1125 with timer:
1126 return models.Shard.smart_get(shard_hostname)
Jakob Juelich59cfe542014-09-02 16:37:46 -07001127
1128
Jakob Juelich1b525742014-09-30 13:08:07 -07001129def find_records_for_shard(shard, known_job_ids, known_host_ids):
Jakob Juelich59cfe542014-09-02 16:37:46 -07001130 """Find records that should be sent to a shard.
1131
Jakob Juelicha94efe62014-09-18 16:02:49 -07001132 @param shard: Shard to find records for.
Jakob Juelich1b525742014-09-30 13:08:07 -07001133 @param known_job_ids: List of ids of jobs the shard already has.
1134 @param known_host_ids: List of ids of hosts the shard already has.
Jakob Juelicha94efe62014-09-18 16:02:49 -07001135
Fang Dengf3705992014-12-16 17:32:18 -08001136 @returns: Tuple of three lists for hosts, jobs, and suite job keyvals:
1137 (hosts, jobs, suite_job_keyvals).
Jakob Juelich59cfe542014-09-02 16:37:46 -07001138 """
MK Ryu509516b2015-05-18 12:00:47 -07001139 timer = autotest_stats.Timer('shard_heartbeat')
1140 with timer.get_client('find_hosts'):
1141 hosts = models.Host.assign_to_shard(shard, known_host_ids)
1142 with timer.get_client('find_jobs'):
1143 jobs = models.Job.assign_to_shard(shard, known_job_ids)
1144 with timer.get_client('find_suite_job_keyvals'):
1145 parent_job_ids = [job.parent_job_id for job in jobs]
1146 suite_job_keyvals = models.JobKeyval.objects.filter(
1147 job_id__in=parent_job_ids)
Fang Dengf3705992014-12-16 17:32:18 -08001148 return hosts, jobs, suite_job_keyvals
Jakob Juelicha94efe62014-09-18 16:02:49 -07001149
1150
1151def _persist_records_with_type_sent_from_shard(
1152 shard, records, record_type, *args, **kwargs):
1153 """
1154 Handle records of a specified type that were sent to the shard master.
1155
1156 @param shard: The shard the records were sent from.
1157 @param records: The records sent in their serialized format.
1158 @param record_type: Type of the objects represented by records.
1159 @param args: Additional arguments that will be passed on to the sanity
1160 checks.
1161 @param kwargs: Additional arguments that will be passed on to the sanity
1162 checks.
1163
1164 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1165
1166 @returns: List of primary keys of the processed records.
1167 """
1168 pks = []
1169 for serialized_record in records:
1170 pk = serialized_record['id']
1171 try:
1172 current_record = record_type.objects.get(pk=pk)
1173 except record_type.DoesNotExist:
1174 raise error.UnallowedRecordsSentToMaster(
1175 'Object with pk %s of type %s does not exist on master.' % (
1176 pk, record_type))
1177
1178 current_record.sanity_check_update_from_shard(
1179 shard, serialized_record, *args, **kwargs)
1180
1181 current_record.update_from_serialized(serialized_record)
1182 pks.append(pk)
1183 return pks
1184
1185
1186def persist_records_sent_from_shard(shard, jobs, hqes):
1187 """
1188 Sanity checking then saving serialized records sent to master from shard.
1189
1190 During heartbeats shards upload jobs and hostqueuentries. This performs
1191 some sanity checks on these and then updates the existing records for those
1192 entries with the updated ones from the heartbeat.
1193
1194 The sanity checks include:
1195 - Checking if the objects sent already exist on the master.
1196 - Checking if the objects sent were assigned to this shard.
1197 - hostqueueentries must be sent together with their jobs.
1198
1199 @param shard: The shard the records were sent from.
1200 @param jobs: The jobs the shard sent.
1201 @param hqes: The hostqueuentries the shart sent.
1202
1203 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1204 """
MK Ryu509516b2015-05-18 12:00:47 -07001205 timer = autotest_stats.Timer('shard_heartbeat')
1206 with timer.get_client('persist_jobs'):
1207 job_ids_sent = _persist_records_with_type_sent_from_shard(
1208 shard, jobs, models.Job)
Jakob Juelicha94efe62014-09-18 16:02:49 -07001209
MK Ryu509516b2015-05-18 12:00:47 -07001210 with timer.get_client('persist_hqes'):
1211 _persist_records_with_type_sent_from_shard(
1212 shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001213
1214
Jakob Juelich50e91f72014-10-01 12:43:23 -07001215def forward_single_host_rpc_to_shard(func):
1216 """This decorator forwards rpc calls that modify a host to a shard.
1217
1218 If a host is assigned to a shard, rpcs that change his attributes should be
1219 forwarded to the shard.
1220
1221 This assumes the first argument of the function represents a host id.
1222
1223 @param func: The function to decorate
1224
1225 @returns: The function to replace func with.
1226 """
1227 def replacement(**kwargs):
1228 # Only keyword arguments can be accepted here, as we need the argument
1229 # names to send the rpc. serviceHandler always provides arguments with
1230 # their keywords, so this is not a problem.
MK Ryu8e2c2d02016-01-06 15:24:38 -08001231
1232 # A host record (identified by kwargs['id']) can be deleted in
1233 # func(). Therefore, we should save the data that can be needed later
1234 # before func() is called.
1235 shard_hostname = None
Jakob Juelich50e91f72014-10-01 12:43:23 -07001236 host = models.Host.smart_get(kwargs['id'])
MK Ryu8e2c2d02016-01-06 15:24:38 -08001237 if host and host.shard:
1238 shard_hostname = host.shard.rpc_hostname()
1239 ret = func(**kwargs)
1240 if shard_hostname and not server_utils.is_shard():
MK Ryu26f0c932015-05-28 18:14:33 -07001241 run_rpc_on_multiple_hostnames(func.func_name,
MK Ryu8e2c2d02016-01-06 15:24:38 -08001242 [shard_hostname],
Jakob Juelich50e91f72014-10-01 12:43:23 -07001243 **kwargs)
MK Ryu8e2c2d02016-01-06 15:24:38 -08001244 return ret
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001245
1246 return replacement
1247
1248
MK Ryufb5e3a82015-07-01 12:21:20 -07001249def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
1250 """Fanout the given rpc to shards of given hosts.
1251
1252 @param host_objs: Host objects for the rpc.
1253 @param rpc_name: The name of the rpc.
1254 @param include_hostnames: If True, include the hostnames in the kwargs.
1255 Hostnames are not always necessary, this functions is designed to
1256 send rpcs to the shard a host is on, the rpcs themselves could be
1257 related to labels, acls etc.
1258 @param kwargs: The kwargs for the rpc.
1259 """
1260 # Figure out which hosts are on which shards.
1261 shard_host_map = bucket_hosts_by_shard(
1262 host_objs, rpc_hostnames=True)
1263
1264 # Execute the rpc against the appropriate shards.
1265 for shard, hostnames in shard_host_map.iteritems():
1266 if include_hostnames:
1267 kwargs['hosts'] = hostnames
1268 try:
1269 run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
1270 except:
1271 ei = sys.exc_info()
1272 new_exc = error.RPCException('RPC %s failed on shard %s due to '
1273 '%s: %s' % (rpc_name, shard, ei[0].__name__, ei[1]))
1274 raise new_exc.__class__, new_exc, ei[2]
1275
1276
Jakob Juelich50e91f72014-10-01 12:43:23 -07001277def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1278 """Runs an rpc to multiple AFEs
1279
1280 This is i.e. used to propagate changes made to hosts after they are assigned
1281 to a shard.
1282
1283 @param rpc_call: Name of the rpc endpoint to call.
1284 @param shard_hostnames: List of hostnames to run the rpcs on.
1285 @param **kwargs: Keyword arguments to pass in the rpcs.
1286 """
MK Ryufb5e3a82015-07-01 12:21:20 -07001287 # Make sure this function is not called on shards but only on master.
1288 assert not server_utils.is_shard()
Jakob Juelich50e91f72014-10-01 12:43:23 -07001289 for shard_hostname in shard_hostnames:
MK Ryu0a9c82e2015-09-17 17:54:01 -07001290 afe = frontend_wrappers.RetryingAFE(server=shard_hostname,
1291 user=thread_local.get_user())
Jakob Juelich50e91f72014-10-01 12:43:23 -07001292 afe.run(rpc_call, **kwargs)
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001293
1294
1295def get_label(name):
1296 """Gets a label object using a given name.
1297
1298 @param name: Label name.
1299 @raises model.Label.DoesNotExist: when there is no label matching
1300 the given name.
1301 @return: a label object matching the given name.
1302 """
1303 try:
1304 label = models.Label.smart_get(name)
1305 except models.Label.DoesNotExist:
1306 return None
1307 return label
1308
1309
xixuanba232a32016-08-25 17:01:59 -07001310# TODO: hide the following rpcs under is_moblab
1311def moblab_only(func):
1312 """Ensure moblab specific functions only run on Moblab devices."""
1313 def verify(*args, **kwargs):
1314 if not server_utils.is_moblab():
1315 raise error.RPCException('RPC: %s can only run on Moblab Systems!',
1316 func.__name__)
1317 return func(*args, **kwargs)
1318 return verify
1319
1320
MK Ryufbb002c2015-06-08 14:13:16 -07001321def route_rpc_to_master(func):
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001322 """Route RPC to master AFE.
MK Ryu2d107562015-02-24 17:45:02 -08001323
MK Ryu6f5eadb2015-09-04 10:50:47 -07001324 When a shard receives an RPC decorated by this, the RPC is just
1325 forwarded to the master.
1326 When the master gets the RPC, the RPC function is executed.
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001327
MK Ryu6f5eadb2015-09-04 10:50:47 -07001328 @param func: An RPC function to decorate
1329
1330 @returns: A function replacing the RPC func.
MK Ryu2d107562015-02-24 17:45:02 -08001331 """
Allen Li54121d02016-12-12 17:35:53 -08001332 argspec = inspect.getargspec(func)
1333 if argspec.varargs is not None:
1334 raise Exception('RPC function must not have *args.')
1335
MK Ryufbb002c2015-06-08 14:13:16 -07001336 @wraps(func)
MK Ryuf6ab8a72015-07-06 10:19:48 -07001337 def replacement(*args, **kwargs):
Allen Lice51f372016-12-12 17:48:51 -08001338 """We need special handling when decorating an RPC that can be called
1339 directly using positional arguments.
1340
1341 One example is rpc_interface.create_job().
1342 rpc_interface.create_job_page_handler() calls the function using both
1343 positional and keyword arguments. Since frontend.RpcClient.run()
1344 takes only keyword arguments for an RPC, positional arguments of the
1345 RPC function need to be transformed into keyword arguments.
MK Ryu6f5eadb2015-09-04 10:50:47 -07001346 """
Allen Li416c4052016-12-12 17:46:46 -08001347 kwargs = _convert_to_kwargs_only(func, args, kwargs)
MK Ryufbb002c2015-06-08 14:13:16 -07001348 if server_utils.is_shard():
MK Ryu9651ca52015-06-08 17:48:22 -07001349 afe = frontend_wrappers.RetryingAFE(
Fang Deng0cb2a3b2015-12-10 17:59:00 -08001350 server=server_utils.get_global_afe_hostname(),
MK Ryu0a9c82e2015-09-17 17:54:01 -07001351 user=thread_local.get_user())
MK Ryu9651ca52015-06-08 17:48:22 -07001352 return afe.run(func.func_name, **kwargs)
MK Ryufbb002c2015-06-08 14:13:16 -07001353 return func(**kwargs)
Allen Li54121d02016-12-12 17:35:53 -08001354
MK Ryufbb002c2015-06-08 14:13:16 -07001355 return replacement
Dan Shi5e8fa182016-04-15 11:04:36 -07001356
1357
Allen Li416c4052016-12-12 17:46:46 -08001358def _convert_to_kwargs_only(func, args, kwargs):
1359 """Convert a function call's arguments to a kwargs dict.
1360
1361 This is best illustrated with an example. Given:
1362
Allen Liab8d3792016-12-12 18:00:31 -08001363 def foo(a, b, **kwargs):
1364 pass
1365 _to_kwargs(foo, (1, 2), {'c': 3}) # corresponding to foo(1, 2, c=3)
Allen Li416c4052016-12-12 17:46:46 -08001366
1367 foo(**kwargs)
1368
1369 @param func: function whose signature to use
1370 @param args: positional arguments of call
1371 @param kwargs: keyword arguments of call
1372
1373 @returns: kwargs dict
1374 """
Allen Li416c4052016-12-12 17:46:46 -08001375 argspec = inspect.getargspec(func)
Allen Liab8d3792016-12-12 18:00:31 -08001376 # callargs looks like {'a': 1, 'b': 2, 'kwargs': {'c': 3}}
1377 callargs = inspect.getcallargs(func, *args, **kwargs)
1378 if argspec.keywords is None:
1379 kwargs = {}
1380 else:
1381 kwargs = callargs.pop(argspec.keywords)
1382 kwargs.update(callargs)
Allen Li416c4052016-12-12 17:46:46 -08001383 return kwargs
1384
1385
Dan Shi5e8fa182016-04-15 11:04:36 -07001386def get_sample_dut(board, pool):
1387 """Get a dut with the given board and pool.
1388
1389 This method is used to help to locate a dut with the given board and pool.
1390 The dut then can be used to identify a devserver in the same subnet.
1391
1392 @param board: Name of the board.
1393 @param pool: Name of the pool.
1394
1395 @return: Name of a dut with the given board and pool.
1396 """
Allen Li3d43e602016-12-08 15:09:51 -08001397 if not (dev_server.PREFER_LOCAL_DEVSERVER and pool and board):
Dan Shi5e8fa182016-04-15 11:04:36 -07001398 return None
Dan Shic3d702b2016-12-21 03:05:09 +00001399
1400 hosts = get_host_query(
1401 ('pool:%s' % pool, 'board:%s' % board), False, False, True, {})
1402 if not hosts:
Dan Shi5e8fa182016-04-15 11:04:36 -07001403 return None
Dan Shic3d702b2016-12-21 03:05:09 +00001404
1405 return list(hosts)[0].get_object_dict()['hostname']