blob: 8623daf7e46899c77c12bd4c5ca81d782ed1617c [file] [log] [blame]
Aviv Keshet18308922013-02-19 17:49:49 -08001#pylint: disable-msg=C0111
mblighe8819cd2008-02-15 16:48:40 +00002"""\
3Utility functions for rpc_interface.py. We keep them in a separate file so that
4only RPC interface functions go into that file.
5"""
6
7__author__ = 'showard@google.com (Steve Howard)'
8
MK Ryu84573e12015-02-18 15:54:09 -08009import datetime
MK Ryufbb002c2015-06-08 14:13:16 -070010from functools import wraps
MK Ryu84573e12015-02-18 15:54:09 -080011import inspect
12import os
13import sys
Fang Deng7051fe42015-10-20 14:57:28 -070014import django.db.utils
showard3d6ae112009-05-02 00:45:48 +000015import django.http
MK Ryu0a9c82e2015-09-17 17:54:01 -070016
17from autotest_lib.frontend import thread_local
Dan Shi07e09af2013-04-12 09:31:29 -070018from autotest_lib.frontend.afe import models, model_logic
Alex Miller4a193692013-08-21 13:59:01 -070019from autotest_lib.client.common_lib import control_data, error
Jiaxi Luo421608e2014-07-07 14:38:00 -070020from autotest_lib.client.common_lib import global_config, priorities
MK Ryu0c1a37d2015-04-30 12:00:55 -070021from autotest_lib.client.common_lib import time_utils
MK Ryu509516b2015-05-18 12:00:47 -070022from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080023from autotest_lib.server import utils as server_utils
MK Ryu9651ca52015-06-08 17:48:22 -070024from autotest_lib.server.cros import provision
25from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
mblighe8819cd2008-02-15 16:48:40 +000026
showarda62866b2008-07-28 21:27:41 +000027NULL_DATETIME = datetime.datetime.max
28NULL_DATE = datetime.date.max
Fang Deng7051fe42015-10-20 14:57:28 -070029DUPLICATE_KEY_MSG = 'Duplicate entry'
showarda62866b2008-07-28 21:27:41 +000030
mblighe8819cd2008-02-15 16:48:40 +000031def prepare_for_serialization(objects):
jadmanski0afbb632008-06-06 21:10:57 +000032 """
33 Prepare Python objects to be returned via RPC.
Aviv Keshet18308922013-02-19 17:49:49 -080034 @param objects: objects to be prepared.
jadmanski0afbb632008-06-06 21:10:57 +000035 """
36 if (isinstance(objects, list) and len(objects) and
37 isinstance(objects[0], dict) and 'id' in objects[0]):
38 objects = gather_unique_dicts(objects)
39 return _prepare_data(objects)
showardb8d34242008-04-25 18:11:16 +000040
41
showardc92da832009-04-07 18:14:34 +000042def prepare_rows_as_nested_dicts(query, nested_dict_column_names):
43 """
44 Prepare a Django query to be returned via RPC as a sequence of nested
45 dictionaries.
46
47 @param query - A Django model query object with a select_related() method.
48 @param nested_dict_column_names - A list of column/attribute names for the
49 rows returned by query to expand into nested dictionaries using
50 their get_object_dict() method when not None.
51
52 @returns An list suitable to returned in an RPC.
53 """
54 all_dicts = []
55 for row in query.select_related():
56 row_dict = row.get_object_dict()
57 for column in nested_dict_column_names:
58 if row_dict[column] is not None:
59 row_dict[column] = getattr(row, column).get_object_dict()
60 all_dicts.append(row_dict)
61 return prepare_for_serialization(all_dicts)
62
63
showardb8d34242008-04-25 18:11:16 +000064def _prepare_data(data):
jadmanski0afbb632008-06-06 21:10:57 +000065 """
66 Recursively process data structures, performing necessary type
67 conversions to values in data to allow for RPC serialization:
68 -convert datetimes to strings
showard2b9a88b2008-06-13 20:55:03 +000069 -convert tuples and sets to lists
jadmanski0afbb632008-06-06 21:10:57 +000070 """
71 if isinstance(data, dict):
72 new_data = {}
73 for key, value in data.iteritems():
74 new_data[key] = _prepare_data(value)
75 return new_data
showard2b9a88b2008-06-13 20:55:03 +000076 elif (isinstance(data, list) or isinstance(data, tuple) or
77 isinstance(data, set)):
jadmanski0afbb632008-06-06 21:10:57 +000078 return [_prepare_data(item) for item in data]
showard98659972008-07-17 17:00:07 +000079 elif isinstance(data, datetime.date):
showarda62866b2008-07-28 21:27:41 +000080 if data is NULL_DATETIME or data is NULL_DATE:
81 return None
jadmanski0afbb632008-06-06 21:10:57 +000082 return str(data)
83 else:
84 return data
mblighe8819cd2008-02-15 16:48:40 +000085
86
Moises Osorio2dda22e2014-09-16 15:56:24 -070087def fetchall_as_list_of_dicts(cursor):
88 """
89 Converts each row in the cursor to a dictionary so that values can be read
90 by using the column name.
91 @param cursor: The database cursor to read from.
92 @returns: A list of each row in the cursor as a dictionary.
93 """
94 desc = cursor.description
95 return [ dict(zip([col[0] for col in desc], row))
96 for row in cursor.fetchall() ]
97
98
showard3d6ae112009-05-02 00:45:48 +000099def raw_http_response(response_data, content_type=None):
100 response = django.http.HttpResponse(response_data, mimetype=content_type)
101 response['Content-length'] = str(len(response.content))
102 return response
103
104
showardb0dfb9f2008-06-06 18:08:02 +0000105def gather_unique_dicts(dict_iterable):
jadmanski0afbb632008-06-06 21:10:57 +0000106 """\
107 Pick out unique objects (by ID) from an iterable of object dicts.
108 """
109 id_set = set()
110 result = []
111 for obj in dict_iterable:
112 if obj['id'] not in id_set:
113 id_set.add(obj['id'])
114 result.append(obj)
115 return result
showardb0dfb9f2008-06-06 18:08:02 +0000116
117
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700118def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
jadmanski0afbb632008-06-06 21:10:57 +0000119 """\
120 Generate a SQL WHERE clause for job status filtering, and return it in
Simran Basi01984f52015-10-12 15:36:45 -0700121 a dict of keyword args to pass to query.extra().
showard6c65d252009-10-01 18:45:22 +0000122 * not_yet_run: all HQEs are Queued
123 * finished: all HQEs are complete
124 * running: everything else
jadmanski0afbb632008-06-06 21:10:57 +0000125 """
Simran Basi01984f52015-10-12 15:36:45 -0700126 if not (not_yet_run or running or finished):
127 return {}
showardeab66ce2009-12-23 00:03:56 +0000128 not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
129 'WHERE status != "%s")'
showard6c65d252009-10-01 18:45:22 +0000130 % models.HostQueueEntry.Status.QUEUED)
showardeab66ce2009-12-23 00:03:56 +0000131 not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
132 'WHERE not complete)')
showard6c65d252009-10-01 18:45:22 +0000133
Simran Basi01984f52015-10-12 15:36:45 -0700134 where = []
jadmanski0afbb632008-06-06 21:10:57 +0000135 if not_yet_run:
Simran Basi01984f52015-10-12 15:36:45 -0700136 where.append('id NOT IN ' + not_queued)
137 if running:
138 where.append('(id IN %s) AND (id IN %s)' % (not_queued, not_finished))
139 if finished:
140 where.append('id NOT IN ' + not_finished)
141 return {'where': [' OR '.join(['(%s)' % x for x in where])]}
mblighe8819cd2008-02-15 16:48:40 +0000142
143
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700144def extra_job_type_filters(extra_args, suite=False,
145 sub=False, standalone=False):
146 """\
147 Generate a SQL WHERE clause for job status filtering, and return it in
148 a dict of keyword args to pass to query.extra().
149
150 param extra_args: a dict of existing extra_args.
151
152 No more than one of the parameters should be passed as True:
153 * suite: job which is parent of other jobs
154 * sub: job with a parent job
155 * standalone: job with no child or parent jobs
156 """
157 assert not ((suite and sub) or
158 (suite and standalone) or
159 (sub and standalone)), ('Cannot specify more than one '
160 'filter to this function')
161
162 where = extra_args.get('where', [])
163 parent_job_id = ('DISTINCT parent_job_id')
164 child_job_id = ('id')
165 filter_common = ('(SELECT %s FROM afe_jobs '
166 'WHERE parent_job_id IS NOT NULL)')
167
168 if suite:
169 where.append('id IN ' + filter_common % parent_job_id)
170 elif sub:
171 where.append('id IN ' + filter_common % child_job_id)
172 elif standalone:
173 where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
174 'WHERE parent_job_id IS NOT NULL'
175 ' AND (sub_query.parent_job_id=afe_jobs.id'
176 ' OR sub_query.id=afe_jobs.id))')
177 else:
178 return extra_args
179
180 extra_args['where'] = where
181 return extra_args
182
183
184
showard87cc38f2009-08-20 23:37:04 +0000185def extra_host_filters(multiple_labels=()):
jadmanski0afbb632008-06-06 21:10:57 +0000186 """\
187 Generate SQL WHERE clauses for matching hosts in an intersection of
188 labels.
189 """
190 extra_args = {}
showardeab66ce2009-12-23 00:03:56 +0000191 where_str = ('afe_hosts.id in (select host_id from afe_hosts_labels '
jadmanski0afbb632008-06-06 21:10:57 +0000192 'where label_id=%s)')
193 extra_args['where'] = [where_str] * len(multiple_labels)
194 extra_args['params'] = [models.Label.smart_get(label).id
195 for label in multiple_labels]
196 return extra_args
showard8e3aa5e2008-04-08 19:42:32 +0000197
198
showard87cc38f2009-08-20 23:37:04 +0000199def get_host_query(multiple_labels, exclude_only_if_needed_labels,
showard8aa84fc2009-09-16 17:17:55 +0000200 exclude_atomic_group_hosts, valid_only, filter_data):
201 if valid_only:
202 query = models.Host.valid_objects.all()
203 else:
204 query = models.Host.objects.all()
205
showard43a3d262008-11-12 18:17:05 +0000206 if exclude_only_if_needed_labels:
207 only_if_needed_labels = models.Label.valid_objects.filter(
208 only_if_needed=True)
showardf7eac6f2008-11-13 21:18:01 +0000209 if only_if_needed_labels.count() > 0:
showard87cc38f2009-08-20 23:37:04 +0000210 only_if_needed_ids = ','.join(
211 str(label['id'])
212 for label in only_if_needed_labels.values('id'))
showardf7eac6f2008-11-13 21:18:01 +0000213 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000214 query, 'afe_hosts_labels', join_key='host_id',
215 join_condition=('afe_hosts_labels_exclude_OIN.label_id IN (%s)'
showard87cc38f2009-08-20 23:37:04 +0000216 % only_if_needed_ids),
217 suffix='_exclude_OIN', exclude=True)
showard8aa84fc2009-09-16 17:17:55 +0000218
showard87cc38f2009-08-20 23:37:04 +0000219 if exclude_atomic_group_hosts:
220 atomic_group_labels = models.Label.valid_objects.filter(
221 atomic_group__isnull=False)
222 if atomic_group_labels.count() > 0:
223 atomic_group_label_ids = ','.join(
224 str(atomic_group['id'])
225 for atomic_group in atomic_group_labels.values('id'))
226 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000227 query, 'afe_hosts_labels', join_key='host_id',
228 join_condition=(
229 'afe_hosts_labels_exclude_AG.label_id IN (%s)'
230 % atomic_group_label_ids),
showard87cc38f2009-08-20 23:37:04 +0000231 suffix='_exclude_AG', exclude=True)
Fang Deng04d30612013-04-10 18:13:13 -0700232 try:
233 assert 'extra_args' not in filter_data
234 filter_data['extra_args'] = extra_host_filters(multiple_labels)
235 return models.Host.query_objects(filter_data, initial_query=query)
236 except models.Label.DoesNotExist as e:
237 return models.Host.objects.none()
showard43a3d262008-11-12 18:17:05 +0000238
239
showard8fd58242008-03-10 21:29:07 +0000240class InconsistencyException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000241 'Raised when a list of objects does not have a consistent value'
showard8fd58242008-03-10 21:29:07 +0000242
243
244def get_consistent_value(objects, field):
mblighc5ddfd12008-08-04 17:15:00 +0000245 if not objects:
246 # well a list of nothing is consistent
247 return None
248
jadmanski0afbb632008-06-06 21:10:57 +0000249 value = getattr(objects[0], field)
250 for obj in objects:
251 this_value = getattr(obj, field)
252 if this_value != value:
253 raise InconsistencyException(objects[0], obj)
254 return value
showard8fd58242008-03-10 21:29:07 +0000255
256
Matthew Sartori10438092015-06-24 14:30:18 -0700257def afe_test_dict_to_test_object(test_dict):
258 if not isinstance(test_dict, dict):
259 return test_dict
260
261 numerized_dict = {}
262 for key, value in test_dict.iteritems():
263 try:
264 numerized_dict[key] = int(value)
265 except (ValueError, TypeError):
266 numerized_dict[key] = value
267
268 return type('TestObject', (object,), numerized_dict)
269
270
271def prepare_generate_control_file(tests, kernel, label, profilers,
272 db_tests=True):
273 if db_tests:
274 test_objects = [models.Test.smart_get(test) for test in tests]
275 else:
276 test_objects = [afe_test_dict_to_test_object(test) for test in tests]
277
showard2b9a88b2008-06-13 20:55:03 +0000278 profiler_objects = [models.Profiler.smart_get(profiler)
279 for profiler in profilers]
jadmanski0afbb632008-06-06 21:10:57 +0000280 # ensure tests are all the same type
281 try:
282 test_type = get_consistent_value(test_objects, 'test_type')
283 except InconsistencyException, exc:
284 test1, test2 = exc.args
mblighec5546d2008-06-16 16:51:28 +0000285 raise model_logic.ValidationError(
Matthew Sartori10438092015-06-24 14:30:18 -0700286 {'tests' : 'You cannot run both test_suites and server-side '
jadmanski0afbb632008-06-06 21:10:57 +0000287 'tests together (tests %s and %s differ' % (
288 test1.name, test2.name)})
showard8fd58242008-03-10 21:29:07 +0000289
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700290 is_server = (test_type == control_data.CONTROL_TYPE.SERVER)
showard14374b12009-01-31 00:11:54 +0000291 if test_objects:
292 synch_count = max(test.sync_count for test in test_objects)
293 else:
294 synch_count = 1
jadmanski0afbb632008-06-06 21:10:57 +0000295 if label:
296 label = models.Label.smart_get(label)
mblighe8819cd2008-02-15 16:48:40 +0000297
Matthew Sartori10438092015-06-24 14:30:18 -0700298 if db_tests:
299 dependencies = set(label.name for label
300 in models.Label.objects.filter(test__in=test_objects))
301 else:
302 dependencies = reduce(
303 set.union, [set(test.dependencies) for test in test_objects])
showard989f25d2008-10-01 11:38:11 +0000304
showard2bab8f42008-11-12 18:15:22 +0000305 cf_info = dict(is_server=is_server, synch_count=synch_count,
306 dependencies=list(dependencies))
307 return cf_info, test_objects, profiler_objects, label
showard989f25d2008-10-01 11:38:11 +0000308
309
310def check_job_dependencies(host_objects, job_dependencies):
311 """
312 Check that a set of machines satisfies a job's dependencies.
313 host_objects: list of models.Host objects
314 job_dependencies: list of names of labels
315 """
316 # check that hosts satisfy dependencies
317 host_ids = [host.id for host in host_objects]
318 hosts_in_job = models.Host.objects.filter(id__in=host_ids)
319 ok_hosts = hosts_in_job
320 for index, dependency in enumerate(job_dependencies):
Alex Milleraa772002014-04-10 17:51:21 -0700321 if not provision.is_for_special_action(dependency):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700322 ok_hosts = ok_hosts.filter(labels__name=dependency)
showard989f25d2008-10-01 11:38:11 +0000323 failing_hosts = (set(host.hostname for host in host_objects) -
324 set(host.hostname for host in ok_hosts))
325 if failing_hosts:
326 raise model_logic.ValidationError(
Eric Lie0493a42010-11-15 13:05:43 -0800327 {'hosts' : 'Host(s) failed to meet job dependencies (' +
328 (', '.join(job_dependencies)) + '): ' +
329 (', '.join(failing_hosts))})
330
showard989f25d2008-10-01 11:38:11 +0000331
Alex Miller4a193692013-08-21 13:59:01 -0700332def check_job_metahost_dependencies(metahost_objects, job_dependencies):
333 """
334 Check that at least one machine within the metahost spec satisfies the job's
335 dependencies.
336
337 @param metahost_objects A list of label objects representing the metahosts.
338 @param job_dependencies A list of strings of the required label names.
339 @raises NoEligibleHostException If a metahost cannot run the job.
340 """
341 for metahost in metahost_objects:
342 hosts = models.Host.objects.filter(labels=metahost)
343 for label_name in job_dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700344 if not provision.is_for_special_action(label_name):
Alex Miller4a193692013-08-21 13:59:01 -0700345 hosts = hosts.filter(labels__name=label_name)
346 if not any(hosts):
347 raise error.NoEligibleHostException("No hosts within %s satisfy %s."
348 % (metahost.name, ', '.join(job_dependencies)))
349
showard2bab8f42008-11-12 18:15:22 +0000350
351def _execution_key_for(host_queue_entry):
352 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
353
354
355def check_abort_synchronous_jobs(host_queue_entries):
356 # ensure user isn't aborting part of a synchronous autoserv execution
357 count_per_execution = {}
358 for queue_entry in host_queue_entries:
359 key = _execution_key_for(queue_entry)
360 count_per_execution.setdefault(key, 0)
361 count_per_execution[key] += 1
362
363 for queue_entry in host_queue_entries:
364 if not queue_entry.execution_subdir:
365 continue
366 execution_count = count_per_execution[_execution_key_for(queue_entry)]
367 if execution_count < queue_entry.job.synch_count:
mbligh1ef218d2009-08-03 16:57:56 +0000368 raise model_logic.ValidationError(
369 {'' : 'You cannot abort part of a synchronous job execution '
370 '(%d/%s), %d included, %d expected'
371 % (queue_entry.job.id, queue_entry.execution_subdir,
372 execution_count, queue_entry.job.synch_count)})
showard8fbae652009-01-20 23:23:10 +0000373
374
showardc92da832009-04-07 18:14:34 +0000375def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700376 dependencies, atomic_group):
showardc92da832009-04-07 18:14:34 +0000377 """
378 Attempt to reject create_job requests with an atomic group that
379 will be impossible to schedule. The checks are not perfect but
380 should catch the most obvious issues.
381
382 @param synch_count - The job's minimum synch count.
383 @param host_objects - A list of models.Host instances.
384 @param metahost_objects - A list of models.Label instances.
385 @param dependencies - A list of job dependency label names.
showardc92da832009-04-07 18:14:34 +0000386 @param labels_by_name - A dictionary mapping label names to models.Label
387 instance. Used to look up instances for dependencies.
388
389 @raises model_logic.ValidationError - When an issue is found.
390 """
391 # If specific host objects were supplied with an atomic group, verify
392 # that there are enough to satisfy the synch_count.
393 minimum_required = synch_count or 1
394 if (host_objects and not metahost_objects and
395 len(host_objects) < minimum_required):
396 raise model_logic.ValidationError(
397 {'hosts':
398 'only %d hosts provided for job with synch_count = %d' %
399 (len(host_objects), synch_count)})
400
401 # Check that the atomic group has a hope of running this job
402 # given any supplied metahosts and dependancies that may limit.
403
404 # Get a set of hostnames in the atomic group.
405 possible_hosts = set()
406 for label in atomic_group.label_set.all():
407 possible_hosts.update(h.hostname for h in label.host_set.all())
408
409 # Filter out hosts that don't match all of the job dependency labels.
Alex Miller871291b2013-08-08 01:19:20 -0700410 for label in models.Label.objects.filter(name__in=dependencies):
showardc92da832009-04-07 18:14:34 +0000411 hosts_in_label = (h.hostname for h in label.host_set.all())
412 possible_hosts.intersection_update(hosts_in_label)
413
showard225bdc12009-04-13 16:09:21 +0000414 if not host_objects and not metahost_objects:
415 # No hosts or metahosts are required to queue an atomic group Job.
416 # However, if they are given, we respect them below.
417 host_set = possible_hosts
418 else:
419 host_set = set(host.hostname for host in host_objects)
420 unusable_host_set = host_set.difference(possible_hosts)
421 if unusable_host_set:
422 raise model_logic.ValidationError(
423 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %
424 (', '.join(sorted(unusable_host_set)), atomic_group.name)})
showardc92da832009-04-07 18:14:34 +0000425
426 # Lookup hosts provided by each meta host and merge them into the
427 # host_set for final counting.
428 for meta_host in metahost_objects:
429 meta_possible = possible_hosts.copy()
430 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())
431 meta_possible.intersection_update(hosts_in_meta_host)
432
433 # Count all hosts that this meta_host will provide.
434 host_set.update(meta_possible)
435
436 if len(host_set) < minimum_required:
437 raise model_logic.ValidationError(
438 {'atomic_group_name':
439 'Insufficient hosts in Atomic Group "%s" with the'
440 ' supplied dependencies and meta_hosts.' %
441 (atomic_group.name,)})
442
443
showardbe0d8692009-08-20 23:42:44 +0000444def check_modify_host(update_data):
445 """
446 Sanity check modify_host* requests.
447
448 @param update_data: A dictionary with the changes to make to a host
449 or hosts.
450 """
451 # Only the scheduler (monitor_db) is allowed to modify Host status.
452 # Otherwise race conditions happen as a hosts state is changed out from
453 # beneath tasks being run on a host.
454 if 'status' in update_data:
455 raise model_logic.ValidationError({
456 'status': 'Host status can not be modified by the frontend.'})
457
458
showardce7c0922009-09-11 18:39:24 +0000459def check_modify_host_locking(host, update_data):
460 """
461 Checks when locking/unlocking has been requested if the host is already
462 locked/unlocked.
463
464 @param host: models.Host object to be modified
465 @param update_data: A dictionary with the changes to make to the host.
466 """
467 locked = update_data.get('locked', None)
Matthew Sartori68186332015-04-27 17:19:53 -0700468 lock_reason = update_data.get('lock_reason', None)
showardce7c0922009-09-11 18:39:24 +0000469 if locked is not None:
470 if locked and host.locked:
471 raise model_logic.ValidationError({
472 'locked': 'Host already locked by %s on %s.' %
473 (host.locked_by, host.lock_time)})
474 if not locked and not host.locked:
475 raise model_logic.ValidationError({
476 'locked': 'Host already unlocked.'})
Matthew Sartori68186332015-04-27 17:19:53 -0700477 if locked and not lock_reason and not host.locked:
478 raise model_logic.ValidationError({
479 'locked': 'Please provide a reason for locking'})
showardce7c0922009-09-11 18:39:24 +0000480
481
showard8fbae652009-01-20 23:23:10 +0000482def get_motd():
483 dirname = os.path.dirname(__file__)
484 filename = os.path.join(dirname, "..", "..", "motd.txt")
485 text = ''
486 try:
487 fp = open(filename, "r")
488 try:
489 text = fp.read()
490 finally:
491 fp.close()
492 except:
493 pass
494
495 return text
showard29f7cd22009-04-29 21:16:24 +0000496
497
498def _get_metahost_counts(metahost_objects):
499 metahost_counts = {}
500 for metahost in metahost_objects:
501 metahost_counts.setdefault(metahost, 0)
502 metahost_counts[metahost] += 1
503 return metahost_counts
504
505
showarda965cef2009-05-15 23:17:41 +0000506def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
showard29f7cd22009-04-29 21:16:24 +0000507 hosts = []
508 one_time_hosts = []
509 meta_hosts = []
510 atomic_group = None
jamesren2275ef12010-04-12 18:25:06 +0000511 hostless = False
showard29f7cd22009-04-29 21:16:24 +0000512
showard4d077562009-05-08 18:24:36 +0000513 queue_entries = job.hostqueueentry_set.all()
showarda965cef2009-05-15 23:17:41 +0000514 if queue_entry_filter_data:
515 queue_entries = models.HostQueueEntry.query_objects(
516 queue_entry_filter_data, initial_query=queue_entries)
showard4d077562009-05-08 18:24:36 +0000517
518 for queue_entry in queue_entries:
showard29f7cd22009-04-29 21:16:24 +0000519 if (queue_entry.host and (preserve_metahosts or
520 not queue_entry.meta_host)):
521 if queue_entry.deleted:
522 continue
523 if queue_entry.host.invalid:
524 one_time_hosts.append(queue_entry.host)
525 else:
526 hosts.append(queue_entry.host)
jamesren2275ef12010-04-12 18:25:06 +0000527 elif queue_entry.meta_host:
showard29f7cd22009-04-29 21:16:24 +0000528 meta_hosts.append(queue_entry.meta_host)
jamesren2275ef12010-04-12 18:25:06 +0000529 else:
530 hostless = True
531
showard29f7cd22009-04-29 21:16:24 +0000532 if atomic_group is None:
533 if queue_entry.atomic_group is not None:
534 atomic_group = queue_entry.atomic_group
535 else:
536 assert atomic_group.name == queue_entry.atomic_group.name, (
537 'DB inconsistency. HostQueueEntries with multiple atomic'
538 ' groups on job %s: %s != %s' % (
539 id, atomic_group.name, queue_entry.atomic_group.name))
540
541 meta_host_counts = _get_metahost_counts(meta_hosts)
542
543 info = dict(dependencies=[label.name for label
544 in job.dependency_labels.all()],
545 hosts=hosts,
546 meta_hosts=meta_hosts,
547 meta_host_counts=meta_host_counts,
548 one_time_hosts=one_time_hosts,
jamesren2275ef12010-04-12 18:25:06 +0000549 atomic_group=atomic_group,
550 hostless=hostless)
showard29f7cd22009-04-29 21:16:24 +0000551 return info
552
553
showard09d80f92009-11-19 01:01:19 +0000554def check_for_duplicate_hosts(host_objects):
555 host_ids = set()
556 duplicate_hostnames = set()
557 for host in host_objects:
558 if host.id in host_ids:
559 duplicate_hostnames.add(host.hostname)
560 host_ids.add(host.id)
561
562 if duplicate_hostnames:
563 raise model_logic.ValidationError(
564 {'hosts' : 'Duplicate hosts: %s'
565 % ', '.join(duplicate_hostnames)})
566
567
showarda1e74b32009-05-12 17:32:04 +0000568def create_new_job(owner, options, host_objects, metahost_objects,
569 atomic_group=None):
showard29f7cd22009-04-29 21:16:24 +0000570 all_host_objects = host_objects + metahost_objects
showarda1e74b32009-05-12 17:32:04 +0000571 dependencies = options.get('dependencies', [])
572 synch_count = options.get('synch_count')
showard29f7cd22009-04-29 21:16:24 +0000573
showard29f7cd22009-04-29 21:16:24 +0000574 if atomic_group:
575 check_atomic_group_create_job(
576 synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700577 dependencies, atomic_group)
showard29f7cd22009-04-29 21:16:24 +0000578 else:
579 if synch_count is not None and synch_count > len(all_host_objects):
580 raise model_logic.ValidationError(
581 {'hosts':
582 'only %d hosts provided for job with synch_count = %d' %
583 (len(all_host_objects), synch_count)})
584 atomic_hosts = models.Host.objects.filter(
585 id__in=[host.id for host in host_objects],
586 labels__atomic_group=True)
587 unusable_host_names = [host.hostname for host in atomic_hosts]
588 if unusable_host_names:
589 raise model_logic.ValidationError(
590 {'hosts':
591 'Host(s) "%s" are atomic group hosts but no '
592 'atomic group was specified for this job.' %
593 (', '.join(unusable_host_names),)})
594
showard09d80f92009-11-19 01:01:19 +0000595 check_for_duplicate_hosts(host_objects)
showard29f7cd22009-04-29 21:16:24 +0000596
Aviv Keshetc68807e2013-07-31 16:13:01 -0700597 for label_name in dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700598 if provision.is_for_special_action(label_name):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700599 # TODO: We could save a few queries
600 # if we had a bulk ensure-label-exists function, which used
601 # a bulk .get() call. The win is probably very small.
Alex Miller871291b2013-08-08 01:19:20 -0700602 _ensure_label_exists(label_name)
Aviv Keshetc68807e2013-07-31 16:13:01 -0700603
Alex Miller4a193692013-08-21 13:59:01 -0700604 # This only checks targeted hosts, not hosts eligible due to the metahost
605 check_job_dependencies(host_objects, dependencies)
606 check_job_metahost_dependencies(metahost_objects, dependencies)
607
Alex Miller871291b2013-08-08 01:19:20 -0700608 options['dependencies'] = list(
609 models.Label.objects.filter(name__in=dependencies))
showard29f7cd22009-04-29 21:16:24 +0000610
showarda1e74b32009-05-12 17:32:04 +0000611 for label in metahost_objects + options['dependencies']:
showard29f7cd22009-04-29 21:16:24 +0000612 if label.atomic_group and not atomic_group:
613 raise model_logic.ValidationError(
614 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000615 'Dependency %r requires an atomic group but no '
616 'atomic_group_name or meta_host in an atomic group was '
617 'specified for this job.' % label.name})
showard29f7cd22009-04-29 21:16:24 +0000618 elif (label.atomic_group and
619 label.atomic_group.name != atomic_group.name):
620 raise model_logic.ValidationError(
621 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000622 'meta_hosts or dependency %r requires atomic group '
623 '%r instead of the supplied atomic_group_name=%r.' %
624 (label.name, label.atomic_group.name, atomic_group.name)})
showard29f7cd22009-04-29 21:16:24 +0000625
showarda1e74b32009-05-12 17:32:04 +0000626 job = models.Job.create(owner=owner, options=options,
627 hosts=all_host_objects)
showard29f7cd22009-04-29 21:16:24 +0000628 job.queue(all_host_objects, atomic_group=atomic_group,
showarda1e74b32009-05-12 17:32:04 +0000629 is_template=options.get('is_template', False))
showard29f7cd22009-04-29 21:16:24 +0000630 return job.id
showard0957a842009-05-11 19:25:08 +0000631
632
Aviv Keshetc68807e2013-07-31 16:13:01 -0700633def _ensure_label_exists(name):
634 """
635 Ensure that a label called |name| exists in the Django models.
636
637 This function is to be called from within afe rpcs only, as an
638 alternative to server.cros.provision.ensure_label_exists(...). It works
639 by Django model manipulation, rather than by making another create_label
640 rpc call.
641
642 @param name: the label to check for/create.
643 @raises ValidationError: There was an error in the response that was
644 not because the label already existed.
645 @returns True is a label was created, False otherwise.
646 """
MK Ryu73be9862015-07-06 12:25:00 -0700647 # Make sure this function is not called on shards but only on master.
648 assert not server_utils.is_shard()
Aviv Keshetc68807e2013-07-31 16:13:01 -0700649 try:
650 models.Label.objects.get(name=name)
651 except models.Label.DoesNotExist:
Fang Deng7051fe42015-10-20 14:57:28 -0700652 try:
653 new_label = models.Label.objects.create(name=name)
654 new_label.save()
655 return True
656 except django.db.utils.IntegrityError as e:
657 # It is possible that another suite/test already
658 # created the label between the check and save.
659 if DUPLICATE_KEY_MSG in str(e):
660 return False
661 else:
662 raise
Aviv Keshetc68807e2013-07-31 16:13:01 -0700663 return False
664
665
showard909c9142009-07-07 20:54:42 +0000666def find_platform_and_atomic_group(host):
667 """
668 Figure out the platform name and atomic group name for the given host
669 object. If none, the return value for either will be None.
670
671 @returns (platform name, atomic group name) for the given host.
672 """
showard0957a842009-05-11 19:25:08 +0000673 platforms = [label.name for label in host.label_list if label.platform]
674 if not platforms:
showard909c9142009-07-07 20:54:42 +0000675 platform = None
676 else:
677 platform = platforms[0]
showard0957a842009-05-11 19:25:08 +0000678 if len(platforms) > 1:
679 raise ValueError('Host %s has more than one platform: %s' %
680 (host.hostname, ', '.join(platforms)))
showard909c9142009-07-07 20:54:42 +0000681 for label in host.label_list:
682 if label.atomic_group:
683 atomic_group_name = label.atomic_group.name
684 break
685 else:
686 atomic_group_name = None
687 # Don't check for multiple atomic groups on a host here. That is an
688 # error but should not trip up the RPC interface. monitor_db_cleanup
689 # deals with it. This just returns the first one found.
690 return platform, atomic_group_name
showardc0ac3a72009-07-08 21:14:45 +0000691
692
693# support for get_host_queue_entries_and_special_tasks()
694
MK Ryu0c1a37d2015-04-30 12:00:55 -0700695def _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on):
showardc0ac3a72009-07-08 21:14:45 +0000696 return dict(type=type,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700697 host=entry['host'],
showardc0ac3a72009-07-08 21:14:45 +0000698 job=job_dict,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700699 execution_path=exec_path,
700 status=status,
701 started_on=started_on,
702 id=str(entry['id']) + type,
703 oid=entry['id'])
showardc0ac3a72009-07-08 21:14:45 +0000704
705
MK Ryu0c1a37d2015-04-30 12:00:55 -0700706def _special_task_to_dict(task, queue_entries):
707 """Transforms a special task dictionary to another form of dictionary.
708
709 @param task Special task as a dictionary type
710 @param queue_entries Host queue entries as a list of dictionaries.
711
712 @return Transformed dictionary for a special task.
713 """
showardc0ac3a72009-07-08 21:14:45 +0000714 job_dict = None
MK Ryu0c1a37d2015-04-30 12:00:55 -0700715 if task['queue_entry']:
716 # Scan queue_entries to get the job detail info.
717 for qentry in queue_entries:
718 if task['queue_entry']['id'] == qentry['id']:
719 job_dict = qentry['job']
720 break
721 # If not found, get it from DB.
722 if job_dict is None:
723 job = models.Job.objects.get(id=task['queue_entry']['job'])
724 job_dict = job.get_object_dict()
725
726 exec_path = server_utils.get_special_task_exec_path(
727 task['host']['hostname'], task['id'], task['task'],
728 time_utils.time_string_to_datetime(task['time_requested']))
729 status = server_utils.get_special_task_status(
730 task['is_complete'], task['success'], task['is_active'])
731 return _common_entry_to_dict(task, task['task'], job_dict,
732 exec_path, status, task['time_started'])
showardc0ac3a72009-07-08 21:14:45 +0000733
734
735def _queue_entry_to_dict(queue_entry):
MK Ryu0c1a37d2015-04-30 12:00:55 -0700736 job_dict = queue_entry['job']
737 tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner'])
738 exec_path = server_utils.get_hqe_exec_path(tag,
739 queue_entry['execution_subdir'])
740 return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path,
741 queue_entry['status'], queue_entry['started_on'])
742
743
744def prepare_host_queue_entries_and_special_tasks(interleaved_entries,
745 queue_entries):
746 """
747 Prepare for serialization the interleaved entries of host queue entries
748 and special tasks.
749 Each element in the entries is a dictionary type.
750 The special task dictionary has only a job id for a job and lacks
751 the detail of the job while the host queue entry dictionary has.
752 queue_entries is used to look up the job detail info.
753
754 @param interleaved_entries Host queue entries and special tasks as a list
755 of dictionaries.
756 @param queue_entries Host queue entries as a list of dictionaries.
757
758 @return A post-processed list of dictionaries that is to be serialized.
759 """
760 dict_list = []
761 for e in interleaved_entries:
762 # Distinguish the two mixed entries based on the existence of
763 # the key "task". If an entry has the key, the entry is for
764 # special task. Otherwise, host queue entry.
765 if 'task' in e:
766 dict_list.append(_special_task_to_dict(e, queue_entries))
767 else:
768 dict_list.append(_queue_entry_to_dict(e))
769 return prepare_for_serialization(dict_list)
showardc0ac3a72009-07-08 21:14:45 +0000770
771
772def _compute_next_job_for_tasks(queue_entries, special_tasks):
773 """
774 For each task, try to figure out the next job that ran after that task.
775 This is done using two pieces of information:
776 * if the task has a queue entry, we can use that entry's job ID.
777 * if the task has a time_started, we can try to compare that against the
778 started_on field of queue_entries. this isn't guaranteed to work perfectly
779 since queue_entries may also have null started_on values.
780 * if the task has neither, or if use of time_started fails, just use the
781 last computed job ID.
MK Ryu0c1a37d2015-04-30 12:00:55 -0700782
783 @param queue_entries Host queue entries as a list of dictionaries.
784 @param special_tasks Special tasks as a list of dictionaries.
showardc0ac3a72009-07-08 21:14:45 +0000785 """
786 next_job_id = None # most recently computed next job
787 hqe_index = 0 # index for scanning by started_on times
788 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700789 if task['queue_entry']:
790 next_job_id = task['queue_entry']['job']
791 elif task['time_started'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000792 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700793 if queue_entry['started_on'] is None:
showardc0ac3a72009-07-08 21:14:45 +0000794 continue
MK Ryu0c1a37d2015-04-30 12:00:55 -0700795 t1 = time_utils.time_string_to_datetime(
796 queue_entry['started_on'])
797 t2 = time_utils.time_string_to_datetime(task['time_started'])
798 if t1 < t2:
showardc0ac3a72009-07-08 21:14:45 +0000799 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700800 next_job_id = queue_entry['job']['id']
showardc0ac3a72009-07-08 21:14:45 +0000801
MK Ryu0c1a37d2015-04-30 12:00:55 -0700802 task['next_job_id'] = next_job_id
showardc0ac3a72009-07-08 21:14:45 +0000803
804 # advance hqe_index to just after next_job_id
805 if next_job_id is not None:
806 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700807 if queue_entry['job']['id'] < next_job_id:
showardc0ac3a72009-07-08 21:14:45 +0000808 break
809 hqe_index += 1
810
811
812def interleave_entries(queue_entries, special_tasks):
813 """
814 Both lists should be ordered by descending ID.
815 """
816 _compute_next_job_for_tasks(queue_entries, special_tasks)
817
818 # start with all special tasks that've run since the last job
819 interleaved_entries = []
820 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700821 if task['next_job_id'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000822 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700823 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000824
825 # now interleave queue entries with the remaining special tasks
826 special_task_index = len(interleaved_entries)
827 for queue_entry in queue_entries:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700828 interleaved_entries.append(queue_entry)
showardc0ac3a72009-07-08 21:14:45 +0000829 # add all tasks that ran between this job and the previous one
830 for task in special_tasks[special_task_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700831 if task['next_job_id'] < queue_entry['job']['id']:
showardc0ac3a72009-07-08 21:14:45 +0000832 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700833 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000834 special_task_index += 1
835
836 return interleaved_entries
jamesren4a41e012010-07-16 22:33:48 +0000837
838
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800839def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
840 """Figure out which hosts are on which shards.
841
842 @param host_objs: A list of host objects.
843 @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
844 instead of the 'real' shard hostnames. This only matters for testing
845 environments.
846
847 @return: A map of shard hostname: list of hosts on the shard.
848 """
849 shard_host_map = {}
850 for host in host_objs:
851 if host.shard:
852 shard_name = (host.shard.rpc_hostname() if rpc_hostnames
853 else host.shard.hostname)
854 shard_host_map.setdefault(shard_name, []).append(host.hostname)
855 return shard_host_map
856
857
jamesren4a41e012010-07-16 22:33:48 +0000858def get_create_job_common_args(local_args):
859 """
860 Returns a dict containing only the args that apply for create_job_common
861
862 Returns a subset of local_args, which contains only the arguments that can
863 be passed in to create_job_common().
864 """
Alex Miller7d658cf2013-09-04 16:00:35 -0700865 # This code is only here to not kill suites scheduling tests when priority
866 # becomes an int instead of a string.
867 if isinstance(local_args['priority'], str):
868 local_args['priority'] = priorities.Priority.DEFAULT
869 # </migration hack>
jamesren4a41e012010-07-16 22:33:48 +0000870 arg_names, _, _, _ = inspect.getargspec(create_job_common)
871 return dict(item for item in local_args.iteritems() if item[0] in arg_names)
872
873
874def create_job_common(name, priority, control_type, control_file=None,
875 hosts=(), meta_hosts=(), one_time_hosts=(),
876 atomic_group_name=None, synch_count=None,
Simran Basi7e605742013-11-12 13:43:36 -0800877 is_template=False, timeout=None, timeout_mins=None,
878 max_runtime_mins=None, run_verify=True, email_list='',
879 dependencies=(), reboot_before=None, reboot_after=None,
jamesren4a41e012010-07-16 22:33:48 +0000880 parse_failed_repair=None, hostless=False, keyvals=None,
Aviv Keshet18308922013-02-19 17:49:49 -0800881 drone_set=None, parameterized_job=None,
Dan Shiec1d47d2015-02-13 11:38:13 -0800882 parent_job_id=None, test_retry=0, run_reset=True,
883 require_ssp=None):
Aviv Keshet18308922013-02-19 17:49:49 -0800884 #pylint: disable-msg=C0111
jamesren4a41e012010-07-16 22:33:48 +0000885 """
886 Common code between creating "standard" jobs and creating parameterized jobs
887 """
888 user = models.User.current_user()
889 owner = user.login
890
jamesren4a41e012010-07-16 22:33:48 +0000891 # input validation
892 if not (hosts or meta_hosts or one_time_hosts or atomic_group_name
893 or hostless):
894 raise model_logic.ValidationError({
895 'arguments' : "You must pass at least one of 'hosts', "
896 "'meta_hosts', 'one_time_hosts', "
897 "'atomic_group_name', or 'hostless'"
898 })
899
900 if hostless:
901 if hosts or meta_hosts or one_time_hosts or atomic_group_name:
902 raise model_logic.ValidationError({
903 'hostless': 'Hostless jobs cannot include any hosts!'})
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700904 server_type = control_data.CONTROL_TYPE_NAMES.SERVER
jamesren4a41e012010-07-16 22:33:48 +0000905 if control_type != server_type:
906 raise model_logic.ValidationError({
907 'control_type': 'Hostless jobs cannot use client-side '
908 'control files'})
909
Alex Miller871291b2013-08-08 01:19:20 -0700910 atomic_groups_by_name = dict((ag.name, ag)
jamesren4a41e012010-07-16 22:33:48 +0000911 for ag in models.AtomicGroup.objects.all())
Alex Miller871291b2013-08-08 01:19:20 -0700912 label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
jamesren4a41e012010-07-16 22:33:48 +0000913
914 # Schedule on an atomic group automagically if one of the labels given
915 # is an atomic group label and no explicit atomic_group_name was supplied.
916 if not atomic_group_name:
Alex Miller871291b2013-08-08 01:19:20 -0700917 for label in label_objects:
jamesren4a41e012010-07-16 22:33:48 +0000918 if label and label.atomic_group:
919 atomic_group_name = label.atomic_group.name
920 break
jamesren4a41e012010-07-16 22:33:48 +0000921 # convert hostnames & meta hosts to host/label objects
922 host_objects = models.Host.smart_get_bulk(hosts)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800923 if not server_utils.is_shard():
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800924 shard_host_map = bucket_hosts_by_shard(host_objects)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800925 num_shards = len(shard_host_map)
926 if (num_shards > 1 or (num_shards == 1 and
927 len(shard_host_map.values()[0]) != len(host_objects))):
928 # We disallow the following jobs on master:
929 # num_shards > 1: this is a job spanning across multiple shards.
930 # num_shards == 1 but number of hosts on shard is less
931 # than total number of hosts: this is a job that spans across
932 # one shard and the master.
933 raise ValueError(
934 'The following hosts are on shard(s), please create '
935 'seperate jobs for hosts on each shard: %s ' %
936 shard_host_map)
jamesren4a41e012010-07-16 22:33:48 +0000937 metahost_objects = []
Alex Miller871291b2013-08-08 01:19:20 -0700938 meta_host_labels_by_name = {label.name: label for label in label_objects}
jamesren4a41e012010-07-16 22:33:48 +0000939 for label_name in meta_hosts or []:
Alex Miller871291b2013-08-08 01:19:20 -0700940 if label_name in meta_host_labels_by_name:
941 metahost_objects.append(meta_host_labels_by_name[label_name])
jamesren4a41e012010-07-16 22:33:48 +0000942 elif label_name in atomic_groups_by_name:
943 # If given a metahost name that isn't a Label, check to
944 # see if the user was specifying an Atomic Group instead.
945 atomic_group = atomic_groups_by_name[label_name]
946 if atomic_group_name and atomic_group_name != atomic_group.name:
947 raise model_logic.ValidationError({
948 'meta_hosts': (
949 'Label "%s" not found. If assumed to be an '
950 'atomic group it would conflict with the '
951 'supplied atomic group "%s".' % (
952 label_name, atomic_group_name))})
953 atomic_group_name = atomic_group.name
954 else:
955 raise model_logic.ValidationError(
956 {'meta_hosts' : 'Label "%s" not found' % label_name})
957
958 # Create and sanity check an AtomicGroup object if requested.
959 if atomic_group_name:
960 if one_time_hosts:
961 raise model_logic.ValidationError(
962 {'one_time_hosts':
963 'One time hosts cannot be used with an Atomic Group.'})
964 atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
965 if synch_count and synch_count > atomic_group.max_number_of_machines:
966 raise model_logic.ValidationError(
967 {'atomic_group_name' :
968 'You have requested a synch_count (%d) greater than the '
969 'maximum machines in the requested Atomic Group (%d).' %
970 (synch_count, atomic_group.max_number_of_machines)})
971 else:
972 atomic_group = None
973
974 for host in one_time_hosts or []:
975 this_host = models.Host.create_one_time_host(host)
976 host_objects.append(this_host)
977
978 options = dict(name=name,
979 priority=priority,
980 control_file=control_file,
981 control_type=control_type,
982 is_template=is_template,
983 timeout=timeout,
Simran Basi7e605742013-11-12 13:43:36 -0800984 timeout_mins=timeout_mins,
Simran Basi34217022012-11-06 13:43:15 -0800985 max_runtime_mins=max_runtime_mins,
jamesren4a41e012010-07-16 22:33:48 +0000986 synch_count=synch_count,
987 run_verify=run_verify,
988 email_list=email_list,
989 dependencies=dependencies,
990 reboot_before=reboot_before,
991 reboot_after=reboot_after,
992 parse_failed_repair=parse_failed_repair,
993 keyvals=keyvals,
994 drone_set=drone_set,
Aviv Keshet18308922013-02-19 17:49:49 -0800995 parameterized_job=parameterized_job,
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800996 parent_job_id=parent_job_id,
Dan Shi07e09af2013-04-12 09:31:29 -0700997 test_retry=test_retry,
Dan Shiec1d47d2015-02-13 11:38:13 -0800998 run_reset=run_reset,
999 require_ssp=require_ssp)
jamesren4a41e012010-07-16 22:33:48 +00001000 return create_new_job(owner=owner,
1001 options=options,
1002 host_objects=host_objects,
1003 metahost_objects=metahost_objects,
1004 atomic_group=atomic_group)
Simran Basib6ec8ae2014-04-23 12:05:08 -07001005
1006
1007def encode_ascii(control_file):
1008 """Force a control file to only contain ascii characters.
1009
1010 @param control_file: Control file to encode.
1011
1012 @returns the control file in an ascii encoding.
1013
1014 @raises error.ControlFileMalformed: if encoding fails.
1015 """
1016 try:
1017 return control_file.encode('ascii')
1018 except UnicodeDecodeError as e:
Jiaxi Luo421608e2014-07-07 14:38:00 -07001019 raise error.ControlFileMalformed(str(e))
1020
1021
1022def get_wmatrix_url():
1023 """Get wmatrix url from config file.
1024
1025 @returns the wmatrix url or an empty string.
1026 """
1027 return global_config.global_config.get_config_value('AUTOTEST_WEB',
1028 'wmatrix_url',
Jiaxi Luo15cbf372014-07-01 19:20:20 -07001029 default='')
Jiaxi Luo57bc1952014-07-22 15:27:30 -07001030
1031
1032def inject_times_to_filter(start_time_key=None, end_time_key=None,
1033 start_time_value=None, end_time_value=None,
1034 **filter_data):
1035 """Inject the key value pairs of start and end time if provided.
1036
1037 @param start_time_key: A string represents the filter key of start_time.
1038 @param end_time_key: A string represents the filter key of end_time.
1039 @param start_time_value: Start_time value.
1040 @param end_time_value: End_time value.
1041
1042 @returns the injected filter_data.
1043 """
1044 if start_time_value:
1045 filter_data[start_time_key] = start_time_value
1046 if end_time_value:
1047 filter_data[end_time_key] = end_time_value
1048 return filter_data
1049
1050
1051def inject_times_to_hqe_special_tasks_filters(filter_data_common,
1052 start_time, end_time):
1053 """Inject start and end time to hqe and special tasks filters.
1054
1055 @param filter_data_common: Common filter for hqe and special tasks.
1056 @param start_time_key: A string represents the filter key of start_time.
1057 @param end_time_key: A string represents the filter key of end_time.
1058
1059 @returns a pair of hqe and special tasks filters.
1060 """
1061 filter_data_special_tasks = filter_data_common.copy()
1062 return (inject_times_to_filter('started_on__gte', 'started_on__lte',
1063 start_time, end_time, **filter_data_common),
1064 inject_times_to_filter('time_started__gte', 'time_started__lte',
1065 start_time, end_time,
Jakob Juelich59cfe542014-09-02 16:37:46 -07001066 **filter_data_special_tasks))
1067
1068
1069def retrieve_shard(shard_hostname):
1070 """
Jakob Juelich77457572014-09-22 17:02:43 -07001071 Retrieves the shard with the given hostname from the database.
Jakob Juelich59cfe542014-09-02 16:37:46 -07001072
1073 @param shard_hostname: Hostname of the shard to retrieve
1074
Jakob Juelich77457572014-09-22 17:02:43 -07001075 @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
1076
Jakob Juelich59cfe542014-09-02 16:37:46 -07001077 @returns: Shard object
1078 """
MK Ryu509516b2015-05-18 12:00:47 -07001079 timer = autotest_stats.Timer('shard_heartbeat.retrieve_shard')
1080 with timer:
1081 return models.Shard.smart_get(shard_hostname)
Jakob Juelich59cfe542014-09-02 16:37:46 -07001082
1083
Jakob Juelich1b525742014-09-30 13:08:07 -07001084def find_records_for_shard(shard, known_job_ids, known_host_ids):
Jakob Juelich59cfe542014-09-02 16:37:46 -07001085 """Find records that should be sent to a shard.
1086
Jakob Juelicha94efe62014-09-18 16:02:49 -07001087 @param shard: Shard to find records for.
Jakob Juelich1b525742014-09-30 13:08:07 -07001088 @param known_job_ids: List of ids of jobs the shard already has.
1089 @param known_host_ids: List of ids of hosts the shard already has.
Jakob Juelicha94efe62014-09-18 16:02:49 -07001090
Fang Dengf3705992014-12-16 17:32:18 -08001091 @returns: Tuple of three lists for hosts, jobs, and suite job keyvals:
1092 (hosts, jobs, suite_job_keyvals).
Jakob Juelich59cfe542014-09-02 16:37:46 -07001093 """
MK Ryu509516b2015-05-18 12:00:47 -07001094 timer = autotest_stats.Timer('shard_heartbeat')
1095 with timer.get_client('find_hosts'):
1096 hosts = models.Host.assign_to_shard(shard, known_host_ids)
1097 with timer.get_client('find_jobs'):
1098 jobs = models.Job.assign_to_shard(shard, known_job_ids)
1099 with timer.get_client('find_suite_job_keyvals'):
1100 parent_job_ids = [job.parent_job_id for job in jobs]
1101 suite_job_keyvals = models.JobKeyval.objects.filter(
1102 job_id__in=parent_job_ids)
Fang Dengf3705992014-12-16 17:32:18 -08001103 return hosts, jobs, suite_job_keyvals
Jakob Juelicha94efe62014-09-18 16:02:49 -07001104
1105
1106def _persist_records_with_type_sent_from_shard(
1107 shard, records, record_type, *args, **kwargs):
1108 """
1109 Handle records of a specified type that were sent to the shard master.
1110
1111 @param shard: The shard the records were sent from.
1112 @param records: The records sent in their serialized format.
1113 @param record_type: Type of the objects represented by records.
1114 @param args: Additional arguments that will be passed on to the sanity
1115 checks.
1116 @param kwargs: Additional arguments that will be passed on to the sanity
1117 checks.
1118
1119 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1120
1121 @returns: List of primary keys of the processed records.
1122 """
1123 pks = []
1124 for serialized_record in records:
1125 pk = serialized_record['id']
1126 try:
1127 current_record = record_type.objects.get(pk=pk)
1128 except record_type.DoesNotExist:
1129 raise error.UnallowedRecordsSentToMaster(
1130 'Object with pk %s of type %s does not exist on master.' % (
1131 pk, record_type))
1132
1133 current_record.sanity_check_update_from_shard(
1134 shard, serialized_record, *args, **kwargs)
1135
1136 current_record.update_from_serialized(serialized_record)
1137 pks.append(pk)
1138 return pks
1139
1140
1141def persist_records_sent_from_shard(shard, jobs, hqes):
1142 """
1143 Sanity checking then saving serialized records sent to master from shard.
1144
1145 During heartbeats shards upload jobs and hostqueuentries. This performs
1146 some sanity checks on these and then updates the existing records for those
1147 entries with the updated ones from the heartbeat.
1148
1149 The sanity checks include:
1150 - Checking if the objects sent already exist on the master.
1151 - Checking if the objects sent were assigned to this shard.
1152 - hostqueueentries must be sent together with their jobs.
1153
1154 @param shard: The shard the records were sent from.
1155 @param jobs: The jobs the shard sent.
1156 @param hqes: The hostqueuentries the shart sent.
1157
1158 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1159 """
MK Ryu509516b2015-05-18 12:00:47 -07001160 timer = autotest_stats.Timer('shard_heartbeat')
1161 with timer.get_client('persist_jobs'):
1162 job_ids_sent = _persist_records_with_type_sent_from_shard(
1163 shard, jobs, models.Job)
Jakob Juelicha94efe62014-09-18 16:02:49 -07001164
MK Ryu509516b2015-05-18 12:00:47 -07001165 with timer.get_client('persist_hqes'):
1166 _persist_records_with_type_sent_from_shard(
1167 shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001168
1169
Jakob Juelich50e91f72014-10-01 12:43:23 -07001170def forward_single_host_rpc_to_shard(func):
1171 """This decorator forwards rpc calls that modify a host to a shard.
1172
1173 If a host is assigned to a shard, rpcs that change his attributes should be
1174 forwarded to the shard.
1175
1176 This assumes the first argument of the function represents a host id.
1177
1178 @param func: The function to decorate
1179
1180 @returns: The function to replace func with.
1181 """
1182 def replacement(**kwargs):
1183 # Only keyword arguments can be accepted here, as we need the argument
1184 # names to send the rpc. serviceHandler always provides arguments with
1185 # their keywords, so this is not a problem.
1186 host = models.Host.smart_get(kwargs['id'])
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -08001187 if host.shard and not server_utils.is_shard():
MK Ryu26f0c932015-05-28 18:14:33 -07001188 run_rpc_on_multiple_hostnames(func.func_name,
1189 [host.shard.rpc_hostname()],
Jakob Juelich50e91f72014-10-01 12:43:23 -07001190 **kwargs)
1191 return func(**kwargs)
1192
1193 return replacement
1194
1195
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001196def forward_multi_host_rpc_to_shards(func):
1197 """This decorator forwards rpc calls that modify multiple hosts.
1198
1199 If a host is assigned to a shard, rpcs that change his attributes should be
1200 forwarded to the shard. Some calls however, take a list of hosts and a
1201 single id to modify, eg: label_add_hosts. This wrapper will sift through
1202 the list of hosts, find each of their shards, and forward the rpc for
1203 those hosts to that shard before calling the local version of the given rpc.
1204
1205 This assumes:
1206 1. The rpc call uses `smart_get` to retrieve host objects, not the
1207 stock django `get` call. This is true for most, if not all rpcs in
1208 the rpc_interface.
1209 2. The kwargs to the function contain either a list of host ids or
1210 hostnames, keyed under 'hosts'. This is true for all the rpc
1211 functions that use 'smart_get'.
1212
1213 @param func: The function to decorate
1214
1215 @returns: The function to replace func with.
1216 """
1217 def replacement(**kwargs):
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001218 fanout_rpc(
1219 models.Host.smart_get_bulk(kwargs['hosts']),
1220 func.func_name, **kwargs)
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001221 return func(**kwargs)
1222
1223 return replacement
1224
1225
MK Ryufb5e3a82015-07-01 12:21:20 -07001226def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
1227 """Fanout the given rpc to shards of given hosts.
1228
1229 @param host_objs: Host objects for the rpc.
1230 @param rpc_name: The name of the rpc.
1231 @param include_hostnames: If True, include the hostnames in the kwargs.
1232 Hostnames are not always necessary, this functions is designed to
1233 send rpcs to the shard a host is on, the rpcs themselves could be
1234 related to labels, acls etc.
1235 @param kwargs: The kwargs for the rpc.
1236 """
1237 # Figure out which hosts are on which shards.
1238 shard_host_map = bucket_hosts_by_shard(
1239 host_objs, rpc_hostnames=True)
1240
1241 # Execute the rpc against the appropriate shards.
1242 for shard, hostnames in shard_host_map.iteritems():
1243 if include_hostnames:
1244 kwargs['hosts'] = hostnames
1245 try:
1246 run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
1247 except:
1248 ei = sys.exc_info()
1249 new_exc = error.RPCException('RPC %s failed on shard %s due to '
1250 '%s: %s' % (rpc_name, shard, ei[0].__name__, ei[1]))
1251 raise new_exc.__class__, new_exc, ei[2]
1252
1253
Jakob Juelich50e91f72014-10-01 12:43:23 -07001254def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1255 """Runs an rpc to multiple AFEs
1256
1257 This is i.e. used to propagate changes made to hosts after they are assigned
1258 to a shard.
1259
1260 @param rpc_call: Name of the rpc endpoint to call.
1261 @param shard_hostnames: List of hostnames to run the rpcs on.
1262 @param **kwargs: Keyword arguments to pass in the rpcs.
1263 """
MK Ryufb5e3a82015-07-01 12:21:20 -07001264 # Make sure this function is not called on shards but only on master.
1265 assert not server_utils.is_shard()
Jakob Juelich50e91f72014-10-01 12:43:23 -07001266 for shard_hostname in shard_hostnames:
MK Ryu0a9c82e2015-09-17 17:54:01 -07001267 afe = frontend_wrappers.RetryingAFE(server=shard_hostname,
1268 user=thread_local.get_user())
Jakob Juelich50e91f72014-10-01 12:43:23 -07001269 afe.run(rpc_call, **kwargs)
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001270
1271
1272def get_label(name):
1273 """Gets a label object using a given name.
1274
1275 @param name: Label name.
1276 @raises model.Label.DoesNotExist: when there is no label matching
1277 the given name.
1278 @return: a label object matching the given name.
1279 """
1280 try:
1281 label = models.Label.smart_get(name)
1282 except models.Label.DoesNotExist:
1283 return None
1284 return label
1285
1286
1287def get_global_afe_hostname():
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001288 """Read the hostname of the global AFE from the global configuration."""
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001289 return global_config.global_config.get_config_value(
MK Ryub1dc8242015-08-27 12:11:12 -07001290 'SERVER', 'global_afe_hostname')
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001291
1292
MK Ryufbb002c2015-06-08 14:13:16 -07001293def route_rpc_to_master(func):
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001294 """Route RPC to master AFE.
MK Ryu2d107562015-02-24 17:45:02 -08001295
MK Ryu6f5eadb2015-09-04 10:50:47 -07001296 When a shard receives an RPC decorated by this, the RPC is just
1297 forwarded to the master.
1298 When the master gets the RPC, the RPC function is executed.
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001299
MK Ryu6f5eadb2015-09-04 10:50:47 -07001300 @param func: An RPC function to decorate
1301
1302 @returns: A function replacing the RPC func.
MK Ryu2d107562015-02-24 17:45:02 -08001303 """
MK Ryufbb002c2015-06-08 14:13:16 -07001304 @wraps(func)
MK Ryuf6ab8a72015-07-06 10:19:48 -07001305 def replacement(*args, **kwargs):
MK Ryu6f5eadb2015-09-04 10:50:47 -07001306 """
1307 We need a special care when decorating an RPC that can be called
1308 directly using positional arguments. One example is
1309 rpc_interface.create_job().
1310 rpc_interface.create_job_page_handler() calls the function using
1311 positional and keyword arguments.
1312 Since frontend.RpcClient.run() takes only keyword arguments for
1313 an RPC, positional arguments of the RPC function need to be
1314 transformed to key-value pair (dictionary type).
1315
1316 inspect.getcallargs() is a useful utility to achieve the goal,
1317 however, we need an additional effort when an RPC function has
1318 **kwargs argument.
1319 Let's say we have a following form of RPC function.
1320
1321 def rpcfunc(a, b, **kwargs)
1322
1323 When we call the function like "rpcfunc(1, 2, id=3, name='mk')",
1324 inspect.getcallargs() returns a dictionary like below.
1325
1326 {'a':1, 'b':2, 'kwargs': {'id':3, 'name':'mk'}}
1327
1328 This is an incorrect form of arguments to pass to the rpc function.
1329 Instead, the dictionary should be like this.
1330
1331 {'a':1, 'b':2, 'id':3, 'name':'mk'}
1332 """
1333 argspec = inspect.getargspec(func)
1334 if argspec.varargs is not None:
1335 raise Exception('RPC function must not have *args.')
1336 funcargs = inspect.getcallargs(func, *args, **kwargs)
1337 kwargs = dict()
1338 for k, v in funcargs.iteritems():
1339 if argspec.keywords and k in argspec.keywords:
1340 kwargs.update(v)
1341 else:
1342 kwargs[k] = v
1343
MK Ryufbb002c2015-06-08 14:13:16 -07001344 if server_utils.is_shard():
MK Ryu9651ca52015-06-08 17:48:22 -07001345 afe = frontend_wrappers.RetryingAFE(
MK Ryu0a9c82e2015-09-17 17:54:01 -07001346 server=get_global_afe_hostname(),
1347 user=thread_local.get_user())
MK Ryu9651ca52015-06-08 17:48:22 -07001348 return afe.run(func.func_name, **kwargs)
MK Ryufbb002c2015-06-08 14:13:16 -07001349 return func(**kwargs)
1350 return replacement