blob: d60bd0f82e722960212ef15987f305d954ef4485 [file] [log] [blame]
Aviv Keshet18308922013-02-19 17:49:49 -08001#pylint: disable-msg=C0111
mblighe8819cd2008-02-15 16:48:40 +00002"""\
3Utility functions for rpc_interface.py. We keep them in a separate file so that
4only RPC interface functions go into that file.
5"""
6
7__author__ = 'showard@google.com (Steve Howard)'
8
MK Ryu84573e12015-02-18 15:54:09 -08009import datetime
MK Ryufbb002c2015-06-08 14:13:16 -070010from functools import wraps
MK Ryu84573e12015-02-18 15:54:09 -080011import inspect
12import os
13import sys
showard3d6ae112009-05-02 00:45:48 +000014import django.http
Dan Shi07e09af2013-04-12 09:31:29 -070015from autotest_lib.frontend.afe import models, model_logic
Alex Miller4a193692013-08-21 13:59:01 -070016from autotest_lib.client.common_lib import control_data, error
Jiaxi Luo421608e2014-07-07 14:38:00 -070017from autotest_lib.client.common_lib import global_config, priorities
MK Ryu0c1a37d2015-04-30 12:00:55 -070018from autotest_lib.client.common_lib import time_utils
MK Ryu509516b2015-05-18 12:00:47 -070019from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080020from autotest_lib.server import utils as server_utils
MK Ryu9651ca52015-06-08 17:48:22 -070021from autotest_lib.server.cros import provision
22from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
mblighe8819cd2008-02-15 16:48:40 +000023
showarda62866b2008-07-28 21:27:41 +000024NULL_DATETIME = datetime.datetime.max
25NULL_DATE = datetime.date.max
26
mblighe8819cd2008-02-15 16:48:40 +000027def prepare_for_serialization(objects):
jadmanski0afbb632008-06-06 21:10:57 +000028 """
29 Prepare Python objects to be returned via RPC.
Aviv Keshet18308922013-02-19 17:49:49 -080030 @param objects: objects to be prepared.
jadmanski0afbb632008-06-06 21:10:57 +000031 """
32 if (isinstance(objects, list) and len(objects) and
33 isinstance(objects[0], dict) and 'id' in objects[0]):
34 objects = gather_unique_dicts(objects)
35 return _prepare_data(objects)
showardb8d34242008-04-25 18:11:16 +000036
37
showardc92da832009-04-07 18:14:34 +000038def prepare_rows_as_nested_dicts(query, nested_dict_column_names):
39 """
40 Prepare a Django query to be returned via RPC as a sequence of nested
41 dictionaries.
42
43 @param query - A Django model query object with a select_related() method.
44 @param nested_dict_column_names - A list of column/attribute names for the
45 rows returned by query to expand into nested dictionaries using
46 their get_object_dict() method when not None.
47
48 @returns An list suitable to returned in an RPC.
49 """
50 all_dicts = []
51 for row in query.select_related():
52 row_dict = row.get_object_dict()
53 for column in nested_dict_column_names:
54 if row_dict[column] is not None:
55 row_dict[column] = getattr(row, column).get_object_dict()
56 all_dicts.append(row_dict)
57 return prepare_for_serialization(all_dicts)
58
59
showardb8d34242008-04-25 18:11:16 +000060def _prepare_data(data):
jadmanski0afbb632008-06-06 21:10:57 +000061 """
62 Recursively process data structures, performing necessary type
63 conversions to values in data to allow for RPC serialization:
64 -convert datetimes to strings
showard2b9a88b2008-06-13 20:55:03 +000065 -convert tuples and sets to lists
jadmanski0afbb632008-06-06 21:10:57 +000066 """
67 if isinstance(data, dict):
68 new_data = {}
69 for key, value in data.iteritems():
70 new_data[key] = _prepare_data(value)
71 return new_data
showard2b9a88b2008-06-13 20:55:03 +000072 elif (isinstance(data, list) or isinstance(data, tuple) or
73 isinstance(data, set)):
jadmanski0afbb632008-06-06 21:10:57 +000074 return [_prepare_data(item) for item in data]
showard98659972008-07-17 17:00:07 +000075 elif isinstance(data, datetime.date):
showarda62866b2008-07-28 21:27:41 +000076 if data is NULL_DATETIME or data is NULL_DATE:
77 return None
jadmanski0afbb632008-06-06 21:10:57 +000078 return str(data)
79 else:
80 return data
mblighe8819cd2008-02-15 16:48:40 +000081
82
Moises Osorio2dda22e2014-09-16 15:56:24 -070083def fetchall_as_list_of_dicts(cursor):
84 """
85 Converts each row in the cursor to a dictionary so that values can be read
86 by using the column name.
87 @param cursor: The database cursor to read from.
88 @returns: A list of each row in the cursor as a dictionary.
89 """
90 desc = cursor.description
91 return [ dict(zip([col[0] for col in desc], row))
92 for row in cursor.fetchall() ]
93
94
showard3d6ae112009-05-02 00:45:48 +000095def raw_http_response(response_data, content_type=None):
96 response = django.http.HttpResponse(response_data, mimetype=content_type)
97 response['Content-length'] = str(len(response.content))
98 return response
99
100
showardb0dfb9f2008-06-06 18:08:02 +0000101def gather_unique_dicts(dict_iterable):
jadmanski0afbb632008-06-06 21:10:57 +0000102 """\
103 Pick out unique objects (by ID) from an iterable of object dicts.
104 """
105 id_set = set()
106 result = []
107 for obj in dict_iterable:
108 if obj['id'] not in id_set:
109 id_set.add(obj['id'])
110 result.append(obj)
111 return result
showardb0dfb9f2008-06-06 18:08:02 +0000112
113
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700114def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
jadmanski0afbb632008-06-06 21:10:57 +0000115 """\
116 Generate a SQL WHERE clause for job status filtering, and return it in
117 a dict of keyword args to pass to query.extra(). No more than one of
118 the parameters should be passed as True.
showard6c65d252009-10-01 18:45:22 +0000119 * not_yet_run: all HQEs are Queued
120 * finished: all HQEs are complete
121 * running: everything else
jadmanski0afbb632008-06-06 21:10:57 +0000122 """
123 assert not ((not_yet_run and running) or
124 (not_yet_run and finished) or
125 (running and finished)), ('Cannot specify more than one '
126 'filter to this function')
showard6c65d252009-10-01 18:45:22 +0000127
showardeab66ce2009-12-23 00:03:56 +0000128 not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
129 'WHERE status != "%s")'
showard6c65d252009-10-01 18:45:22 +0000130 % models.HostQueueEntry.Status.QUEUED)
showardeab66ce2009-12-23 00:03:56 +0000131 not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
132 'WHERE not complete)')
showard6c65d252009-10-01 18:45:22 +0000133
jadmanski0afbb632008-06-06 21:10:57 +0000134 if not_yet_run:
showard6c65d252009-10-01 18:45:22 +0000135 where = ['id NOT IN ' + not_queued]
jadmanski0afbb632008-06-06 21:10:57 +0000136 elif running:
showard6c65d252009-10-01 18:45:22 +0000137 where = ['(id IN %s) AND (id IN %s)' % (not_queued, not_finished)]
jadmanski0afbb632008-06-06 21:10:57 +0000138 elif finished:
showard6c65d252009-10-01 18:45:22 +0000139 where = ['id NOT IN ' + not_finished]
jadmanski0afbb632008-06-06 21:10:57 +0000140 else:
showard10f41672009-05-13 21:28:25 +0000141 return {}
jadmanski0afbb632008-06-06 21:10:57 +0000142 return {'where': where}
mblighe8819cd2008-02-15 16:48:40 +0000143
144
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700145def extra_job_type_filters(extra_args, suite=False,
146 sub=False, standalone=False):
147 """\
148 Generate a SQL WHERE clause for job status filtering, and return it in
149 a dict of keyword args to pass to query.extra().
150
151 param extra_args: a dict of existing extra_args.
152
153 No more than one of the parameters should be passed as True:
154 * suite: job which is parent of other jobs
155 * sub: job with a parent job
156 * standalone: job with no child or parent jobs
157 """
158 assert not ((suite and sub) or
159 (suite and standalone) or
160 (sub and standalone)), ('Cannot specify more than one '
161 'filter to this function')
162
163 where = extra_args.get('where', [])
164 parent_job_id = ('DISTINCT parent_job_id')
165 child_job_id = ('id')
166 filter_common = ('(SELECT %s FROM afe_jobs '
167 'WHERE parent_job_id IS NOT NULL)')
168
169 if suite:
170 where.append('id IN ' + filter_common % parent_job_id)
171 elif sub:
172 where.append('id IN ' + filter_common % child_job_id)
173 elif standalone:
174 where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
175 'WHERE parent_job_id IS NOT NULL'
176 ' AND (sub_query.parent_job_id=afe_jobs.id'
177 ' OR sub_query.id=afe_jobs.id))')
178 else:
179 return extra_args
180
181 extra_args['where'] = where
182 return extra_args
183
184
185
showard87cc38f2009-08-20 23:37:04 +0000186def extra_host_filters(multiple_labels=()):
jadmanski0afbb632008-06-06 21:10:57 +0000187 """\
188 Generate SQL WHERE clauses for matching hosts in an intersection of
189 labels.
190 """
191 extra_args = {}
showardeab66ce2009-12-23 00:03:56 +0000192 where_str = ('afe_hosts.id in (select host_id from afe_hosts_labels '
jadmanski0afbb632008-06-06 21:10:57 +0000193 'where label_id=%s)')
194 extra_args['where'] = [where_str] * len(multiple_labels)
195 extra_args['params'] = [models.Label.smart_get(label).id
196 for label in multiple_labels]
197 return extra_args
showard8e3aa5e2008-04-08 19:42:32 +0000198
199
showard87cc38f2009-08-20 23:37:04 +0000200def get_host_query(multiple_labels, exclude_only_if_needed_labels,
showard8aa84fc2009-09-16 17:17:55 +0000201 exclude_atomic_group_hosts, valid_only, filter_data):
202 if valid_only:
203 query = models.Host.valid_objects.all()
204 else:
205 query = models.Host.objects.all()
206
showard43a3d262008-11-12 18:17:05 +0000207 if exclude_only_if_needed_labels:
208 only_if_needed_labels = models.Label.valid_objects.filter(
209 only_if_needed=True)
showardf7eac6f2008-11-13 21:18:01 +0000210 if only_if_needed_labels.count() > 0:
showard87cc38f2009-08-20 23:37:04 +0000211 only_if_needed_ids = ','.join(
212 str(label['id'])
213 for label in only_if_needed_labels.values('id'))
showardf7eac6f2008-11-13 21:18:01 +0000214 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000215 query, 'afe_hosts_labels', join_key='host_id',
216 join_condition=('afe_hosts_labels_exclude_OIN.label_id IN (%s)'
showard87cc38f2009-08-20 23:37:04 +0000217 % only_if_needed_ids),
218 suffix='_exclude_OIN', exclude=True)
showard8aa84fc2009-09-16 17:17:55 +0000219
showard87cc38f2009-08-20 23:37:04 +0000220 if exclude_atomic_group_hosts:
221 atomic_group_labels = models.Label.valid_objects.filter(
222 atomic_group__isnull=False)
223 if atomic_group_labels.count() > 0:
224 atomic_group_label_ids = ','.join(
225 str(atomic_group['id'])
226 for atomic_group in atomic_group_labels.values('id'))
227 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000228 query, 'afe_hosts_labels', join_key='host_id',
229 join_condition=(
230 'afe_hosts_labels_exclude_AG.label_id IN (%s)'
231 % atomic_group_label_ids),
showard87cc38f2009-08-20 23:37:04 +0000232 suffix='_exclude_AG', exclude=True)
Fang Deng04d30612013-04-10 18:13:13 -0700233 try:
234 assert 'extra_args' not in filter_data
235 filter_data['extra_args'] = extra_host_filters(multiple_labels)
236 return models.Host.query_objects(filter_data, initial_query=query)
237 except models.Label.DoesNotExist as e:
238 return models.Host.objects.none()
showard43a3d262008-11-12 18:17:05 +0000239
240
showard8fd58242008-03-10 21:29:07 +0000241class InconsistencyException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000242 'Raised when a list of objects does not have a consistent value'
showard8fd58242008-03-10 21:29:07 +0000243
244
245def get_consistent_value(objects, field):
mblighc5ddfd12008-08-04 17:15:00 +0000246 if not objects:
247 # well a list of nothing is consistent
248 return None
249
jadmanski0afbb632008-06-06 21:10:57 +0000250 value = getattr(objects[0], field)
251 for obj in objects:
252 this_value = getattr(obj, field)
253 if this_value != value:
254 raise InconsistencyException(objects[0], obj)
255 return value
showard8fd58242008-03-10 21:29:07 +0000256
257
showard2b9a88b2008-06-13 20:55:03 +0000258def prepare_generate_control_file(tests, kernel, label, profilers):
jadmanski0afbb632008-06-06 21:10:57 +0000259 test_objects = [models.Test.smart_get(test) for test in tests]
showard2b9a88b2008-06-13 20:55:03 +0000260 profiler_objects = [models.Profiler.smart_get(profiler)
261 for profiler in profilers]
jadmanski0afbb632008-06-06 21:10:57 +0000262 # ensure tests are all the same type
263 try:
264 test_type = get_consistent_value(test_objects, 'test_type')
265 except InconsistencyException, exc:
266 test1, test2 = exc.args
mblighec5546d2008-06-16 16:51:28 +0000267 raise model_logic.ValidationError(
jadmanski0afbb632008-06-06 21:10:57 +0000268 {'tests' : 'You cannot run both server- and client-side '
269 'tests together (tests %s and %s differ' % (
270 test1.name, test2.name)})
showard8fd58242008-03-10 21:29:07 +0000271
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700272 is_server = (test_type == control_data.CONTROL_TYPE.SERVER)
showard14374b12009-01-31 00:11:54 +0000273 if test_objects:
274 synch_count = max(test.sync_count for test in test_objects)
275 else:
276 synch_count = 1
jadmanski0afbb632008-06-06 21:10:57 +0000277 if label:
278 label = models.Label.smart_get(label)
mblighe8819cd2008-02-15 16:48:40 +0000279
showard989f25d2008-10-01 11:38:11 +0000280 dependencies = set(label.name for label
281 in models.Label.objects.filter(test__in=test_objects))
282
showard2bab8f42008-11-12 18:15:22 +0000283 cf_info = dict(is_server=is_server, synch_count=synch_count,
284 dependencies=list(dependencies))
285 return cf_info, test_objects, profiler_objects, label
showard989f25d2008-10-01 11:38:11 +0000286
287
288def check_job_dependencies(host_objects, job_dependencies):
289 """
290 Check that a set of machines satisfies a job's dependencies.
291 host_objects: list of models.Host objects
292 job_dependencies: list of names of labels
293 """
294 # check that hosts satisfy dependencies
295 host_ids = [host.id for host in host_objects]
296 hosts_in_job = models.Host.objects.filter(id__in=host_ids)
297 ok_hosts = hosts_in_job
298 for index, dependency in enumerate(job_dependencies):
Alex Milleraa772002014-04-10 17:51:21 -0700299 if not provision.is_for_special_action(dependency):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700300 ok_hosts = ok_hosts.filter(labels__name=dependency)
showard989f25d2008-10-01 11:38:11 +0000301 failing_hosts = (set(host.hostname for host in host_objects) -
302 set(host.hostname for host in ok_hosts))
303 if failing_hosts:
304 raise model_logic.ValidationError(
Eric Lie0493a42010-11-15 13:05:43 -0800305 {'hosts' : 'Host(s) failed to meet job dependencies (' +
306 (', '.join(job_dependencies)) + '): ' +
307 (', '.join(failing_hosts))})
308
showard989f25d2008-10-01 11:38:11 +0000309
Alex Miller4a193692013-08-21 13:59:01 -0700310def check_job_metahost_dependencies(metahost_objects, job_dependencies):
311 """
312 Check that at least one machine within the metahost spec satisfies the job's
313 dependencies.
314
315 @param metahost_objects A list of label objects representing the metahosts.
316 @param job_dependencies A list of strings of the required label names.
317 @raises NoEligibleHostException If a metahost cannot run the job.
318 """
319 for metahost in metahost_objects:
320 hosts = models.Host.objects.filter(labels=metahost)
321 for label_name in job_dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700322 if not provision.is_for_special_action(label_name):
Alex Miller4a193692013-08-21 13:59:01 -0700323 hosts = hosts.filter(labels__name=label_name)
324 if not any(hosts):
325 raise error.NoEligibleHostException("No hosts within %s satisfy %s."
326 % (metahost.name, ', '.join(job_dependencies)))
327
showard2bab8f42008-11-12 18:15:22 +0000328
329def _execution_key_for(host_queue_entry):
330 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
331
332
333def check_abort_synchronous_jobs(host_queue_entries):
334 # ensure user isn't aborting part of a synchronous autoserv execution
335 count_per_execution = {}
336 for queue_entry in host_queue_entries:
337 key = _execution_key_for(queue_entry)
338 count_per_execution.setdefault(key, 0)
339 count_per_execution[key] += 1
340
341 for queue_entry in host_queue_entries:
342 if not queue_entry.execution_subdir:
343 continue
344 execution_count = count_per_execution[_execution_key_for(queue_entry)]
345 if execution_count < queue_entry.job.synch_count:
mbligh1ef218d2009-08-03 16:57:56 +0000346 raise model_logic.ValidationError(
347 {'' : 'You cannot abort part of a synchronous job execution '
348 '(%d/%s), %d included, %d expected'
349 % (queue_entry.job.id, queue_entry.execution_subdir,
350 execution_count, queue_entry.job.synch_count)})
showard8fbae652009-01-20 23:23:10 +0000351
352
showardc92da832009-04-07 18:14:34 +0000353def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700354 dependencies, atomic_group):
showardc92da832009-04-07 18:14:34 +0000355 """
356 Attempt to reject create_job requests with an atomic group that
357 will be impossible to schedule. The checks are not perfect but
358 should catch the most obvious issues.
359
360 @param synch_count - The job's minimum synch count.
361 @param host_objects - A list of models.Host instances.
362 @param metahost_objects - A list of models.Label instances.
363 @param dependencies - A list of job dependency label names.
showardc92da832009-04-07 18:14:34 +0000364 @param labels_by_name - A dictionary mapping label names to models.Label
365 instance. Used to look up instances for dependencies.
366
367 @raises model_logic.ValidationError - When an issue is found.
368 """
369 # If specific host objects were supplied with an atomic group, verify
370 # that there are enough to satisfy the synch_count.
371 minimum_required = synch_count or 1
372 if (host_objects and not metahost_objects and
373 len(host_objects) < minimum_required):
374 raise model_logic.ValidationError(
375 {'hosts':
376 'only %d hosts provided for job with synch_count = %d' %
377 (len(host_objects), synch_count)})
378
379 # Check that the atomic group has a hope of running this job
380 # given any supplied metahosts and dependancies that may limit.
381
382 # Get a set of hostnames in the atomic group.
383 possible_hosts = set()
384 for label in atomic_group.label_set.all():
385 possible_hosts.update(h.hostname for h in label.host_set.all())
386
387 # Filter out hosts that don't match all of the job dependency labels.
Alex Miller871291b2013-08-08 01:19:20 -0700388 for label in models.Label.objects.filter(name__in=dependencies):
showardc92da832009-04-07 18:14:34 +0000389 hosts_in_label = (h.hostname for h in label.host_set.all())
390 possible_hosts.intersection_update(hosts_in_label)
391
showard225bdc12009-04-13 16:09:21 +0000392 if not host_objects and not metahost_objects:
393 # No hosts or metahosts are required to queue an atomic group Job.
394 # However, if they are given, we respect them below.
395 host_set = possible_hosts
396 else:
397 host_set = set(host.hostname for host in host_objects)
398 unusable_host_set = host_set.difference(possible_hosts)
399 if unusable_host_set:
400 raise model_logic.ValidationError(
401 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %
402 (', '.join(sorted(unusable_host_set)), atomic_group.name)})
showardc92da832009-04-07 18:14:34 +0000403
404 # Lookup hosts provided by each meta host and merge them into the
405 # host_set for final counting.
406 for meta_host in metahost_objects:
407 meta_possible = possible_hosts.copy()
408 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())
409 meta_possible.intersection_update(hosts_in_meta_host)
410
411 # Count all hosts that this meta_host will provide.
412 host_set.update(meta_possible)
413
414 if len(host_set) < minimum_required:
415 raise model_logic.ValidationError(
416 {'atomic_group_name':
417 'Insufficient hosts in Atomic Group "%s" with the'
418 ' supplied dependencies and meta_hosts.' %
419 (atomic_group.name,)})
420
421
showardbe0d8692009-08-20 23:42:44 +0000422def check_modify_host(update_data):
423 """
424 Sanity check modify_host* requests.
425
426 @param update_data: A dictionary with the changes to make to a host
427 or hosts.
428 """
429 # Only the scheduler (monitor_db) is allowed to modify Host status.
430 # Otherwise race conditions happen as a hosts state is changed out from
431 # beneath tasks being run on a host.
432 if 'status' in update_data:
433 raise model_logic.ValidationError({
434 'status': 'Host status can not be modified by the frontend.'})
435
436
showardce7c0922009-09-11 18:39:24 +0000437def check_modify_host_locking(host, update_data):
438 """
439 Checks when locking/unlocking has been requested if the host is already
440 locked/unlocked.
441
442 @param host: models.Host object to be modified
443 @param update_data: A dictionary with the changes to make to the host.
444 """
445 locked = update_data.get('locked', None)
Matthew Sartori68186332015-04-27 17:19:53 -0700446 lock_reason = update_data.get('lock_reason', None)
showardce7c0922009-09-11 18:39:24 +0000447 if locked is not None:
448 if locked and host.locked:
449 raise model_logic.ValidationError({
450 'locked': 'Host already locked by %s on %s.' %
451 (host.locked_by, host.lock_time)})
452 if not locked and not host.locked:
453 raise model_logic.ValidationError({
454 'locked': 'Host already unlocked.'})
Matthew Sartori68186332015-04-27 17:19:53 -0700455 if locked and not lock_reason and not host.locked:
456 raise model_logic.ValidationError({
457 'locked': 'Please provide a reason for locking'})
showardce7c0922009-09-11 18:39:24 +0000458
459
showard8fbae652009-01-20 23:23:10 +0000460def get_motd():
461 dirname = os.path.dirname(__file__)
462 filename = os.path.join(dirname, "..", "..", "motd.txt")
463 text = ''
464 try:
465 fp = open(filename, "r")
466 try:
467 text = fp.read()
468 finally:
469 fp.close()
470 except:
471 pass
472
473 return text
showard29f7cd22009-04-29 21:16:24 +0000474
475
476def _get_metahost_counts(metahost_objects):
477 metahost_counts = {}
478 for metahost in metahost_objects:
479 metahost_counts.setdefault(metahost, 0)
480 metahost_counts[metahost] += 1
481 return metahost_counts
482
483
showarda965cef2009-05-15 23:17:41 +0000484def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
showard29f7cd22009-04-29 21:16:24 +0000485 hosts = []
486 one_time_hosts = []
487 meta_hosts = []
488 atomic_group = None
jamesren2275ef12010-04-12 18:25:06 +0000489 hostless = False
showard29f7cd22009-04-29 21:16:24 +0000490
showard4d077562009-05-08 18:24:36 +0000491 queue_entries = job.hostqueueentry_set.all()
showarda965cef2009-05-15 23:17:41 +0000492 if queue_entry_filter_data:
493 queue_entries = models.HostQueueEntry.query_objects(
494 queue_entry_filter_data, initial_query=queue_entries)
showard4d077562009-05-08 18:24:36 +0000495
496 for queue_entry in queue_entries:
showard29f7cd22009-04-29 21:16:24 +0000497 if (queue_entry.host and (preserve_metahosts or
498 not queue_entry.meta_host)):
499 if queue_entry.deleted:
500 continue
501 if queue_entry.host.invalid:
502 one_time_hosts.append(queue_entry.host)
503 else:
504 hosts.append(queue_entry.host)
jamesren2275ef12010-04-12 18:25:06 +0000505 elif queue_entry.meta_host:
showard29f7cd22009-04-29 21:16:24 +0000506 meta_hosts.append(queue_entry.meta_host)
jamesren2275ef12010-04-12 18:25:06 +0000507 else:
508 hostless = True
509
showard29f7cd22009-04-29 21:16:24 +0000510 if atomic_group is None:
511 if queue_entry.atomic_group is not None:
512 atomic_group = queue_entry.atomic_group
513 else:
514 assert atomic_group.name == queue_entry.atomic_group.name, (
515 'DB inconsistency. HostQueueEntries with multiple atomic'
516 ' groups on job %s: %s != %s' % (
517 id, atomic_group.name, queue_entry.atomic_group.name))
518
519 meta_host_counts = _get_metahost_counts(meta_hosts)
520
521 info = dict(dependencies=[label.name for label
522 in job.dependency_labels.all()],
523 hosts=hosts,
524 meta_hosts=meta_hosts,
525 meta_host_counts=meta_host_counts,
526 one_time_hosts=one_time_hosts,
jamesren2275ef12010-04-12 18:25:06 +0000527 atomic_group=atomic_group,
528 hostless=hostless)
showard29f7cd22009-04-29 21:16:24 +0000529 return info
530
531
showard09d80f92009-11-19 01:01:19 +0000532def check_for_duplicate_hosts(host_objects):
533 host_ids = set()
534 duplicate_hostnames = set()
535 for host in host_objects:
536 if host.id in host_ids:
537 duplicate_hostnames.add(host.hostname)
538 host_ids.add(host.id)
539
540 if duplicate_hostnames:
541 raise model_logic.ValidationError(
542 {'hosts' : 'Duplicate hosts: %s'
543 % ', '.join(duplicate_hostnames)})
544
545
showarda1e74b32009-05-12 17:32:04 +0000546def create_new_job(owner, options, host_objects, metahost_objects,
547 atomic_group=None):
showard29f7cd22009-04-29 21:16:24 +0000548 all_host_objects = host_objects + metahost_objects
549 metahost_counts = _get_metahost_counts(metahost_objects)
showarda1e74b32009-05-12 17:32:04 +0000550 dependencies = options.get('dependencies', [])
551 synch_count = options.get('synch_count')
showard29f7cd22009-04-29 21:16:24 +0000552
showard29f7cd22009-04-29 21:16:24 +0000553 if atomic_group:
554 check_atomic_group_create_job(
555 synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700556 dependencies, atomic_group)
showard29f7cd22009-04-29 21:16:24 +0000557 else:
558 if synch_count is not None and synch_count > len(all_host_objects):
559 raise model_logic.ValidationError(
560 {'hosts':
561 'only %d hosts provided for job with synch_count = %d' %
562 (len(all_host_objects), synch_count)})
563 atomic_hosts = models.Host.objects.filter(
564 id__in=[host.id for host in host_objects],
565 labels__atomic_group=True)
566 unusable_host_names = [host.hostname for host in atomic_hosts]
567 if unusable_host_names:
568 raise model_logic.ValidationError(
569 {'hosts':
570 'Host(s) "%s" are atomic group hosts but no '
571 'atomic group was specified for this job.' %
572 (', '.join(unusable_host_names),)})
573
showard09d80f92009-11-19 01:01:19 +0000574 check_for_duplicate_hosts(host_objects)
showard29f7cd22009-04-29 21:16:24 +0000575
Aviv Keshetc68807e2013-07-31 16:13:01 -0700576 for label_name in dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700577 if provision.is_for_special_action(label_name):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700578 # TODO: We could save a few queries
579 # if we had a bulk ensure-label-exists function, which used
580 # a bulk .get() call. The win is probably very small.
Alex Miller871291b2013-08-08 01:19:20 -0700581 _ensure_label_exists(label_name)
Aviv Keshetc68807e2013-07-31 16:13:01 -0700582
Alex Miller4a193692013-08-21 13:59:01 -0700583 # This only checks targeted hosts, not hosts eligible due to the metahost
584 check_job_dependencies(host_objects, dependencies)
585 check_job_metahost_dependencies(metahost_objects, dependencies)
586
Alex Miller871291b2013-08-08 01:19:20 -0700587 options['dependencies'] = list(
588 models.Label.objects.filter(name__in=dependencies))
showard29f7cd22009-04-29 21:16:24 +0000589
showarda1e74b32009-05-12 17:32:04 +0000590 for label in metahost_objects + options['dependencies']:
showard29f7cd22009-04-29 21:16:24 +0000591 if label.atomic_group and not atomic_group:
592 raise model_logic.ValidationError(
593 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000594 'Dependency %r requires an atomic group but no '
595 'atomic_group_name or meta_host in an atomic group was '
596 'specified for this job.' % label.name})
showard29f7cd22009-04-29 21:16:24 +0000597 elif (label.atomic_group and
598 label.atomic_group.name != atomic_group.name):
599 raise model_logic.ValidationError(
600 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000601 'meta_hosts or dependency %r requires atomic group '
602 '%r instead of the supplied atomic_group_name=%r.' %
603 (label.name, label.atomic_group.name, atomic_group.name)})
showard29f7cd22009-04-29 21:16:24 +0000604
showarda1e74b32009-05-12 17:32:04 +0000605 job = models.Job.create(owner=owner, options=options,
606 hosts=all_host_objects)
showard29f7cd22009-04-29 21:16:24 +0000607 job.queue(all_host_objects, atomic_group=atomic_group,
showarda1e74b32009-05-12 17:32:04 +0000608 is_template=options.get('is_template', False))
showard29f7cd22009-04-29 21:16:24 +0000609 return job.id
showard0957a842009-05-11 19:25:08 +0000610
611
Aviv Keshetc68807e2013-07-31 16:13:01 -0700612def _ensure_label_exists(name):
613 """
614 Ensure that a label called |name| exists in the Django models.
615
616 This function is to be called from within afe rpcs only, as an
617 alternative to server.cros.provision.ensure_label_exists(...). It works
618 by Django model manipulation, rather than by making another create_label
619 rpc call.
620
621 @param name: the label to check for/create.
622 @raises ValidationError: There was an error in the response that was
623 not because the label already existed.
624 @returns True is a label was created, False otherwise.
625 """
626 try:
627 models.Label.objects.get(name=name)
628 except models.Label.DoesNotExist:
629 new_label = models.Label.objects.create(name=name)
630 new_label.save()
631 return True
632 return False
633
634
showard909c9142009-07-07 20:54:42 +0000635def find_platform_and_atomic_group(host):
636 """
637 Figure out the platform name and atomic group name for the given host
638 object. If none, the return value for either will be None.
639
640 @returns (platform name, atomic group name) for the given host.
641 """
showard0957a842009-05-11 19:25:08 +0000642 platforms = [label.name for label in host.label_list if label.platform]
643 if not platforms:
showard909c9142009-07-07 20:54:42 +0000644 platform = None
645 else:
646 platform = platforms[0]
showard0957a842009-05-11 19:25:08 +0000647 if len(platforms) > 1:
648 raise ValueError('Host %s has more than one platform: %s' %
649 (host.hostname, ', '.join(platforms)))
showard909c9142009-07-07 20:54:42 +0000650 for label in host.label_list:
651 if label.atomic_group:
652 atomic_group_name = label.atomic_group.name
653 break
654 else:
655 atomic_group_name = None
656 # Don't check for multiple atomic groups on a host here. That is an
657 # error but should not trip up the RPC interface. monitor_db_cleanup
658 # deals with it. This just returns the first one found.
659 return platform, atomic_group_name
showardc0ac3a72009-07-08 21:14:45 +0000660
661
662# support for get_host_queue_entries_and_special_tasks()
663
MK Ryu0c1a37d2015-04-30 12:00:55 -0700664def _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on):
showardc0ac3a72009-07-08 21:14:45 +0000665 return dict(type=type,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700666 host=entry['host'],
showardc0ac3a72009-07-08 21:14:45 +0000667 job=job_dict,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700668 execution_path=exec_path,
669 status=status,
670 started_on=started_on,
671 id=str(entry['id']) + type,
672 oid=entry['id'])
showardc0ac3a72009-07-08 21:14:45 +0000673
674
MK Ryu0c1a37d2015-04-30 12:00:55 -0700675def _special_task_to_dict(task, queue_entries):
676 """Transforms a special task dictionary to another form of dictionary.
677
678 @param task Special task as a dictionary type
679 @param queue_entries Host queue entries as a list of dictionaries.
680
681 @return Transformed dictionary for a special task.
682 """
showardc0ac3a72009-07-08 21:14:45 +0000683 job_dict = None
MK Ryu0c1a37d2015-04-30 12:00:55 -0700684 if task['queue_entry']:
685 # Scan queue_entries to get the job detail info.
686 for qentry in queue_entries:
687 if task['queue_entry']['id'] == qentry['id']:
688 job_dict = qentry['job']
689 break
690 # If not found, get it from DB.
691 if job_dict is None:
692 job = models.Job.objects.get(id=task['queue_entry']['job'])
693 job_dict = job.get_object_dict()
694
695 exec_path = server_utils.get_special_task_exec_path(
696 task['host']['hostname'], task['id'], task['task'],
697 time_utils.time_string_to_datetime(task['time_requested']))
698 status = server_utils.get_special_task_status(
699 task['is_complete'], task['success'], task['is_active'])
700 return _common_entry_to_dict(task, task['task'], job_dict,
701 exec_path, status, task['time_started'])
showardc0ac3a72009-07-08 21:14:45 +0000702
703
704def _queue_entry_to_dict(queue_entry):
MK Ryu0c1a37d2015-04-30 12:00:55 -0700705 job_dict = queue_entry['job']
706 tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner'])
707 exec_path = server_utils.get_hqe_exec_path(tag,
708 queue_entry['execution_subdir'])
709 return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path,
710 queue_entry['status'], queue_entry['started_on'])
711
712
713def prepare_host_queue_entries_and_special_tasks(interleaved_entries,
714 queue_entries):
715 """
716 Prepare for serialization the interleaved entries of host queue entries
717 and special tasks.
718 Each element in the entries is a dictionary type.
719 The special task dictionary has only a job id for a job and lacks
720 the detail of the job while the host queue entry dictionary has.
721 queue_entries is used to look up the job detail info.
722
723 @param interleaved_entries Host queue entries and special tasks as a list
724 of dictionaries.
725 @param queue_entries Host queue entries as a list of dictionaries.
726
727 @return A post-processed list of dictionaries that is to be serialized.
728 """
729 dict_list = []
730 for e in interleaved_entries:
731 # Distinguish the two mixed entries based on the existence of
732 # the key "task". If an entry has the key, the entry is for
733 # special task. Otherwise, host queue entry.
734 if 'task' in e:
735 dict_list.append(_special_task_to_dict(e, queue_entries))
736 else:
737 dict_list.append(_queue_entry_to_dict(e))
738 return prepare_for_serialization(dict_list)
showardc0ac3a72009-07-08 21:14:45 +0000739
740
741def _compute_next_job_for_tasks(queue_entries, special_tasks):
742 """
743 For each task, try to figure out the next job that ran after that task.
744 This is done using two pieces of information:
745 * if the task has a queue entry, we can use that entry's job ID.
746 * if the task has a time_started, we can try to compare that against the
747 started_on field of queue_entries. this isn't guaranteed to work perfectly
748 since queue_entries may also have null started_on values.
749 * if the task has neither, or if use of time_started fails, just use the
750 last computed job ID.
MK Ryu0c1a37d2015-04-30 12:00:55 -0700751
752 @param queue_entries Host queue entries as a list of dictionaries.
753 @param special_tasks Special tasks as a list of dictionaries.
showardc0ac3a72009-07-08 21:14:45 +0000754 """
755 next_job_id = None # most recently computed next job
756 hqe_index = 0 # index for scanning by started_on times
757 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700758 if task['queue_entry']:
759 next_job_id = task['queue_entry']['job']
760 elif task['time_started'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000761 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700762 if queue_entry['started_on'] is None:
showardc0ac3a72009-07-08 21:14:45 +0000763 continue
MK Ryu0c1a37d2015-04-30 12:00:55 -0700764 t1 = time_utils.time_string_to_datetime(
765 queue_entry['started_on'])
766 t2 = time_utils.time_string_to_datetime(task['time_started'])
767 if t1 < t2:
showardc0ac3a72009-07-08 21:14:45 +0000768 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700769 next_job_id = queue_entry['job']['id']
showardc0ac3a72009-07-08 21:14:45 +0000770
MK Ryu0c1a37d2015-04-30 12:00:55 -0700771 task['next_job_id'] = next_job_id
showardc0ac3a72009-07-08 21:14:45 +0000772
773 # advance hqe_index to just after next_job_id
774 if next_job_id is not None:
775 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700776 if queue_entry['job']['id'] < next_job_id:
showardc0ac3a72009-07-08 21:14:45 +0000777 break
778 hqe_index += 1
779
780
781def interleave_entries(queue_entries, special_tasks):
782 """
783 Both lists should be ordered by descending ID.
784 """
785 _compute_next_job_for_tasks(queue_entries, special_tasks)
786
787 # start with all special tasks that've run since the last job
788 interleaved_entries = []
789 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700790 if task['next_job_id'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000791 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700792 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000793
794 # now interleave queue entries with the remaining special tasks
795 special_task_index = len(interleaved_entries)
796 for queue_entry in queue_entries:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700797 interleaved_entries.append(queue_entry)
showardc0ac3a72009-07-08 21:14:45 +0000798 # add all tasks that ran between this job and the previous one
799 for task in special_tasks[special_task_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700800 if task['next_job_id'] < queue_entry['job']['id']:
showardc0ac3a72009-07-08 21:14:45 +0000801 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700802 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000803 special_task_index += 1
804
805 return interleaved_entries
jamesren4a41e012010-07-16 22:33:48 +0000806
807
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800808def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
809 """Figure out which hosts are on which shards.
810
811 @param host_objs: A list of host objects.
812 @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
813 instead of the 'real' shard hostnames. This only matters for testing
814 environments.
815
816 @return: A map of shard hostname: list of hosts on the shard.
817 """
818 shard_host_map = {}
819 for host in host_objs:
820 if host.shard:
821 shard_name = (host.shard.rpc_hostname() if rpc_hostnames
822 else host.shard.hostname)
823 shard_host_map.setdefault(shard_name, []).append(host.hostname)
824 return shard_host_map
825
826
jamesren4a41e012010-07-16 22:33:48 +0000827def get_create_job_common_args(local_args):
828 """
829 Returns a dict containing only the args that apply for create_job_common
830
831 Returns a subset of local_args, which contains only the arguments that can
832 be passed in to create_job_common().
833 """
Alex Miller7d658cf2013-09-04 16:00:35 -0700834 # This code is only here to not kill suites scheduling tests when priority
835 # becomes an int instead of a string.
836 if isinstance(local_args['priority'], str):
837 local_args['priority'] = priorities.Priority.DEFAULT
838 # </migration hack>
jamesren4a41e012010-07-16 22:33:48 +0000839 arg_names, _, _, _ = inspect.getargspec(create_job_common)
840 return dict(item for item in local_args.iteritems() if item[0] in arg_names)
841
842
843def create_job_common(name, priority, control_type, control_file=None,
844 hosts=(), meta_hosts=(), one_time_hosts=(),
845 atomic_group_name=None, synch_count=None,
Simran Basi7e605742013-11-12 13:43:36 -0800846 is_template=False, timeout=None, timeout_mins=None,
847 max_runtime_mins=None, run_verify=True, email_list='',
848 dependencies=(), reboot_before=None, reboot_after=None,
jamesren4a41e012010-07-16 22:33:48 +0000849 parse_failed_repair=None, hostless=False, keyvals=None,
Aviv Keshet18308922013-02-19 17:49:49 -0800850 drone_set=None, parameterized_job=None,
Dan Shiec1d47d2015-02-13 11:38:13 -0800851 parent_job_id=None, test_retry=0, run_reset=True,
852 require_ssp=None):
Aviv Keshet18308922013-02-19 17:49:49 -0800853 #pylint: disable-msg=C0111
jamesren4a41e012010-07-16 22:33:48 +0000854 """
855 Common code between creating "standard" jobs and creating parameterized jobs
856 """
857 user = models.User.current_user()
858 owner = user.login
859
jamesren4a41e012010-07-16 22:33:48 +0000860 # input validation
861 if not (hosts or meta_hosts or one_time_hosts or atomic_group_name
862 or hostless):
863 raise model_logic.ValidationError({
864 'arguments' : "You must pass at least one of 'hosts', "
865 "'meta_hosts', 'one_time_hosts', "
866 "'atomic_group_name', or 'hostless'"
867 })
868
869 if hostless:
870 if hosts or meta_hosts or one_time_hosts or atomic_group_name:
871 raise model_logic.ValidationError({
872 'hostless': 'Hostless jobs cannot include any hosts!'})
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700873 server_type = control_data.CONTROL_TYPE_NAMES.SERVER
jamesren4a41e012010-07-16 22:33:48 +0000874 if control_type != server_type:
875 raise model_logic.ValidationError({
876 'control_type': 'Hostless jobs cannot use client-side '
877 'control files'})
878
Alex Miller871291b2013-08-08 01:19:20 -0700879 atomic_groups_by_name = dict((ag.name, ag)
jamesren4a41e012010-07-16 22:33:48 +0000880 for ag in models.AtomicGroup.objects.all())
Alex Miller871291b2013-08-08 01:19:20 -0700881 label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
jamesren4a41e012010-07-16 22:33:48 +0000882
883 # Schedule on an atomic group automagically if one of the labels given
884 # is an atomic group label and no explicit atomic_group_name was supplied.
885 if not atomic_group_name:
Alex Miller871291b2013-08-08 01:19:20 -0700886 for label in label_objects:
jamesren4a41e012010-07-16 22:33:48 +0000887 if label and label.atomic_group:
888 atomic_group_name = label.atomic_group.name
889 break
jamesren4a41e012010-07-16 22:33:48 +0000890 # convert hostnames & meta hosts to host/label objects
891 host_objects = models.Host.smart_get_bulk(hosts)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800892 if not server_utils.is_shard():
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800893 shard_host_map = bucket_hosts_by_shard(host_objects)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800894 num_shards = len(shard_host_map)
895 if (num_shards > 1 or (num_shards == 1 and
896 len(shard_host_map.values()[0]) != len(host_objects))):
897 # We disallow the following jobs on master:
898 # num_shards > 1: this is a job spanning across multiple shards.
899 # num_shards == 1 but number of hosts on shard is less
900 # than total number of hosts: this is a job that spans across
901 # one shard and the master.
902 raise ValueError(
903 'The following hosts are on shard(s), please create '
904 'seperate jobs for hosts on each shard: %s ' %
905 shard_host_map)
jamesren4a41e012010-07-16 22:33:48 +0000906 metahost_objects = []
Alex Miller871291b2013-08-08 01:19:20 -0700907 meta_host_labels_by_name = {label.name: label for label in label_objects}
jamesren4a41e012010-07-16 22:33:48 +0000908 for label_name in meta_hosts or []:
Alex Miller871291b2013-08-08 01:19:20 -0700909 if label_name in meta_host_labels_by_name:
910 metahost_objects.append(meta_host_labels_by_name[label_name])
jamesren4a41e012010-07-16 22:33:48 +0000911 elif label_name in atomic_groups_by_name:
912 # If given a metahost name that isn't a Label, check to
913 # see if the user was specifying an Atomic Group instead.
914 atomic_group = atomic_groups_by_name[label_name]
915 if atomic_group_name and atomic_group_name != atomic_group.name:
916 raise model_logic.ValidationError({
917 'meta_hosts': (
918 'Label "%s" not found. If assumed to be an '
919 'atomic group it would conflict with the '
920 'supplied atomic group "%s".' % (
921 label_name, atomic_group_name))})
922 atomic_group_name = atomic_group.name
923 else:
924 raise model_logic.ValidationError(
925 {'meta_hosts' : 'Label "%s" not found' % label_name})
926
927 # Create and sanity check an AtomicGroup object if requested.
928 if atomic_group_name:
929 if one_time_hosts:
930 raise model_logic.ValidationError(
931 {'one_time_hosts':
932 'One time hosts cannot be used with an Atomic Group.'})
933 atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
934 if synch_count and synch_count > atomic_group.max_number_of_machines:
935 raise model_logic.ValidationError(
936 {'atomic_group_name' :
937 'You have requested a synch_count (%d) greater than the '
938 'maximum machines in the requested Atomic Group (%d).' %
939 (synch_count, atomic_group.max_number_of_machines)})
940 else:
941 atomic_group = None
942
943 for host in one_time_hosts or []:
944 this_host = models.Host.create_one_time_host(host)
945 host_objects.append(this_host)
946
947 options = dict(name=name,
948 priority=priority,
949 control_file=control_file,
950 control_type=control_type,
951 is_template=is_template,
952 timeout=timeout,
Simran Basi7e605742013-11-12 13:43:36 -0800953 timeout_mins=timeout_mins,
Simran Basi34217022012-11-06 13:43:15 -0800954 max_runtime_mins=max_runtime_mins,
jamesren4a41e012010-07-16 22:33:48 +0000955 synch_count=synch_count,
956 run_verify=run_verify,
957 email_list=email_list,
958 dependencies=dependencies,
959 reboot_before=reboot_before,
960 reboot_after=reboot_after,
961 parse_failed_repair=parse_failed_repair,
962 keyvals=keyvals,
963 drone_set=drone_set,
Aviv Keshet18308922013-02-19 17:49:49 -0800964 parameterized_job=parameterized_job,
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800965 parent_job_id=parent_job_id,
Dan Shi07e09af2013-04-12 09:31:29 -0700966 test_retry=test_retry,
Dan Shiec1d47d2015-02-13 11:38:13 -0800967 run_reset=run_reset,
968 require_ssp=require_ssp)
jamesren4a41e012010-07-16 22:33:48 +0000969 return create_new_job(owner=owner,
970 options=options,
971 host_objects=host_objects,
972 metahost_objects=metahost_objects,
973 atomic_group=atomic_group)
Simran Basib6ec8ae2014-04-23 12:05:08 -0700974
975
976def encode_ascii(control_file):
977 """Force a control file to only contain ascii characters.
978
979 @param control_file: Control file to encode.
980
981 @returns the control file in an ascii encoding.
982
983 @raises error.ControlFileMalformed: if encoding fails.
984 """
985 try:
986 return control_file.encode('ascii')
987 except UnicodeDecodeError as e:
Jiaxi Luo421608e2014-07-07 14:38:00 -0700988 raise error.ControlFileMalformed(str(e))
989
990
991def get_wmatrix_url():
992 """Get wmatrix url from config file.
993
994 @returns the wmatrix url or an empty string.
995 """
996 return global_config.global_config.get_config_value('AUTOTEST_WEB',
997 'wmatrix_url',
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700998 default='')
Jiaxi Luo57bc1952014-07-22 15:27:30 -0700999
1000
1001def inject_times_to_filter(start_time_key=None, end_time_key=None,
1002 start_time_value=None, end_time_value=None,
1003 **filter_data):
1004 """Inject the key value pairs of start and end time if provided.
1005
1006 @param start_time_key: A string represents the filter key of start_time.
1007 @param end_time_key: A string represents the filter key of end_time.
1008 @param start_time_value: Start_time value.
1009 @param end_time_value: End_time value.
1010
1011 @returns the injected filter_data.
1012 """
1013 if start_time_value:
1014 filter_data[start_time_key] = start_time_value
1015 if end_time_value:
1016 filter_data[end_time_key] = end_time_value
1017 return filter_data
1018
1019
1020def inject_times_to_hqe_special_tasks_filters(filter_data_common,
1021 start_time, end_time):
1022 """Inject start and end time to hqe and special tasks filters.
1023
1024 @param filter_data_common: Common filter for hqe and special tasks.
1025 @param start_time_key: A string represents the filter key of start_time.
1026 @param end_time_key: A string represents the filter key of end_time.
1027
1028 @returns a pair of hqe and special tasks filters.
1029 """
1030 filter_data_special_tasks = filter_data_common.copy()
1031 return (inject_times_to_filter('started_on__gte', 'started_on__lte',
1032 start_time, end_time, **filter_data_common),
1033 inject_times_to_filter('time_started__gte', 'time_started__lte',
1034 start_time, end_time,
Jakob Juelich59cfe542014-09-02 16:37:46 -07001035 **filter_data_special_tasks))
1036
1037
1038def retrieve_shard(shard_hostname):
1039 """
Jakob Juelich77457572014-09-22 17:02:43 -07001040 Retrieves the shard with the given hostname from the database.
Jakob Juelich59cfe542014-09-02 16:37:46 -07001041
1042 @param shard_hostname: Hostname of the shard to retrieve
1043
Jakob Juelich77457572014-09-22 17:02:43 -07001044 @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
1045
Jakob Juelich59cfe542014-09-02 16:37:46 -07001046 @returns: Shard object
1047 """
MK Ryu509516b2015-05-18 12:00:47 -07001048 timer = autotest_stats.Timer('shard_heartbeat.retrieve_shard')
1049 with timer:
1050 return models.Shard.smart_get(shard_hostname)
Jakob Juelich59cfe542014-09-02 16:37:46 -07001051
1052
Jakob Juelich1b525742014-09-30 13:08:07 -07001053def find_records_for_shard(shard, known_job_ids, known_host_ids):
Jakob Juelich59cfe542014-09-02 16:37:46 -07001054 """Find records that should be sent to a shard.
1055
Jakob Juelicha94efe62014-09-18 16:02:49 -07001056 @param shard: Shard to find records for.
Jakob Juelich1b525742014-09-30 13:08:07 -07001057 @param known_job_ids: List of ids of jobs the shard already has.
1058 @param known_host_ids: List of ids of hosts the shard already has.
Jakob Juelicha94efe62014-09-18 16:02:49 -07001059
Fang Dengf3705992014-12-16 17:32:18 -08001060 @returns: Tuple of three lists for hosts, jobs, and suite job keyvals:
1061 (hosts, jobs, suite_job_keyvals).
Jakob Juelich59cfe542014-09-02 16:37:46 -07001062 """
MK Ryu509516b2015-05-18 12:00:47 -07001063 timer = autotest_stats.Timer('shard_heartbeat')
1064 with timer.get_client('find_hosts'):
1065 hosts = models.Host.assign_to_shard(shard, known_host_ids)
1066 with timer.get_client('find_jobs'):
1067 jobs = models.Job.assign_to_shard(shard, known_job_ids)
1068 with timer.get_client('find_suite_job_keyvals'):
1069 parent_job_ids = [job.parent_job_id for job in jobs]
1070 suite_job_keyvals = models.JobKeyval.objects.filter(
1071 job_id__in=parent_job_ids)
Fang Dengf3705992014-12-16 17:32:18 -08001072 return hosts, jobs, suite_job_keyvals
Jakob Juelicha94efe62014-09-18 16:02:49 -07001073
1074
1075def _persist_records_with_type_sent_from_shard(
1076 shard, records, record_type, *args, **kwargs):
1077 """
1078 Handle records of a specified type that were sent to the shard master.
1079
1080 @param shard: The shard the records were sent from.
1081 @param records: The records sent in their serialized format.
1082 @param record_type: Type of the objects represented by records.
1083 @param args: Additional arguments that will be passed on to the sanity
1084 checks.
1085 @param kwargs: Additional arguments that will be passed on to the sanity
1086 checks.
1087
1088 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1089
1090 @returns: List of primary keys of the processed records.
1091 """
1092 pks = []
1093 for serialized_record in records:
1094 pk = serialized_record['id']
1095 try:
1096 current_record = record_type.objects.get(pk=pk)
1097 except record_type.DoesNotExist:
1098 raise error.UnallowedRecordsSentToMaster(
1099 'Object with pk %s of type %s does not exist on master.' % (
1100 pk, record_type))
1101
1102 current_record.sanity_check_update_from_shard(
1103 shard, serialized_record, *args, **kwargs)
1104
1105 current_record.update_from_serialized(serialized_record)
1106 pks.append(pk)
1107 return pks
1108
1109
1110def persist_records_sent_from_shard(shard, jobs, hqes):
1111 """
1112 Sanity checking then saving serialized records sent to master from shard.
1113
1114 During heartbeats shards upload jobs and hostqueuentries. This performs
1115 some sanity checks on these and then updates the existing records for those
1116 entries with the updated ones from the heartbeat.
1117
1118 The sanity checks include:
1119 - Checking if the objects sent already exist on the master.
1120 - Checking if the objects sent were assigned to this shard.
1121 - hostqueueentries must be sent together with their jobs.
1122
1123 @param shard: The shard the records were sent from.
1124 @param jobs: The jobs the shard sent.
1125 @param hqes: The hostqueuentries the shart sent.
1126
1127 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1128 """
MK Ryu509516b2015-05-18 12:00:47 -07001129 timer = autotest_stats.Timer('shard_heartbeat')
1130 with timer.get_client('persist_jobs'):
1131 job_ids_sent = _persist_records_with_type_sent_from_shard(
1132 shard, jobs, models.Job)
Jakob Juelicha94efe62014-09-18 16:02:49 -07001133
MK Ryu509516b2015-05-18 12:00:47 -07001134 with timer.get_client('persist_hqes'):
1135 _persist_records_with_type_sent_from_shard(
1136 shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001137
1138
Jakob Juelich50e91f72014-10-01 12:43:23 -07001139def forward_single_host_rpc_to_shard(func):
1140 """This decorator forwards rpc calls that modify a host to a shard.
1141
1142 If a host is assigned to a shard, rpcs that change his attributes should be
1143 forwarded to the shard.
1144
1145 This assumes the first argument of the function represents a host id.
1146
1147 @param func: The function to decorate
1148
1149 @returns: The function to replace func with.
1150 """
1151 def replacement(**kwargs):
1152 # Only keyword arguments can be accepted here, as we need the argument
1153 # names to send the rpc. serviceHandler always provides arguments with
1154 # their keywords, so this is not a problem.
1155 host = models.Host.smart_get(kwargs['id'])
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -08001156 if host.shard and not server_utils.is_shard():
MK Ryu26f0c932015-05-28 18:14:33 -07001157 run_rpc_on_multiple_hostnames(func.func_name,
1158 [host.shard.rpc_hostname()],
Jakob Juelich50e91f72014-10-01 12:43:23 -07001159 **kwargs)
1160 return func(**kwargs)
1161
1162 return replacement
1163
1164
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001165def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
1166 """Fanout the given rpc to all shards.
1167
1168 @param host_objs: Host objects for the rpc.
1169 @param rpc_name: The name of the rpc.
1170 @param include_hostnames: If True, include the hostnames in the kwargs.
1171 Hostnames are not always necessary, this functions is designed to
1172 send rpcs to the shard a host is on, the rpcs themselves could be
1173 related to labels, acls etc.
1174 @param kwargs: The kwargs for the rpc.
1175 """
1176 # Fanout should only happen from the master to the shards.
1177 if server_utils.is_shard():
1178 return
1179
1180 # Figure out which hosts are on which shards.
1181 shard_host_map = bucket_hosts_by_shard(
1182 host_objs, rpc_hostnames=True)
1183
1184 # Execute the rpc against the appropriate shards.
1185 for shard, hostnames in shard_host_map.iteritems():
1186 if include_hostnames:
1187 kwargs['hosts'] = hostnames
MK Ryu84573e12015-02-18 15:54:09 -08001188 try:
1189 run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
1190 except:
1191 ei = sys.exc_info()
1192 new_exc = error.RPCException('RPC %s failed on shard %s due to '
1193 '%s: %s' % (rpc_name, shard, ei[0].__name__, ei[1]))
1194 raise new_exc.__class__, new_exc, ei[2]
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001195
1196
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001197def forward_multi_host_rpc_to_shards(func):
1198 """This decorator forwards rpc calls that modify multiple hosts.
1199
1200 If a host is assigned to a shard, rpcs that change his attributes should be
1201 forwarded to the shard. Some calls however, take a list of hosts and a
1202 single id to modify, eg: label_add_hosts. This wrapper will sift through
1203 the list of hosts, find each of their shards, and forward the rpc for
1204 those hosts to that shard before calling the local version of the given rpc.
1205
1206 This assumes:
1207 1. The rpc call uses `smart_get` to retrieve host objects, not the
1208 stock django `get` call. This is true for most, if not all rpcs in
1209 the rpc_interface.
1210 2. The kwargs to the function contain either a list of host ids or
1211 hostnames, keyed under 'hosts'. This is true for all the rpc
1212 functions that use 'smart_get'.
1213
1214 @param func: The function to decorate
1215
1216 @returns: The function to replace func with.
1217 """
1218 def replacement(**kwargs):
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001219 fanout_rpc(
1220 models.Host.smart_get_bulk(kwargs['hosts']),
1221 func.func_name, **kwargs)
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001222 return func(**kwargs)
1223
1224 return replacement
1225
1226
Jakob Juelich50e91f72014-10-01 12:43:23 -07001227def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1228 """Runs an rpc to multiple AFEs
1229
1230 This is i.e. used to propagate changes made to hosts after they are assigned
1231 to a shard.
1232
1233 @param rpc_call: Name of the rpc endpoint to call.
1234 @param shard_hostnames: List of hostnames to run the rpcs on.
1235 @param **kwargs: Keyword arguments to pass in the rpcs.
1236 """
1237 for shard_hostname in shard_hostnames:
MK Ryu9651ca52015-06-08 17:48:22 -07001238 afe = frontend_wrappers.RetryingAFE(server=shard_hostname)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001239 afe.run(rpc_call, **kwargs)
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001240
1241
1242def get_label(name):
1243 """Gets a label object using a given name.
1244
1245 @param name: Label name.
1246 @raises model.Label.DoesNotExist: when there is no label matching
1247 the given name.
1248 @return: a label object matching the given name.
1249 """
1250 try:
1251 label = models.Label.smart_get(name)
1252 except models.Label.DoesNotExist:
1253 return None
1254 return label
1255
1256
1257def get_global_afe_hostname():
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001258 """Read the hostname of the global AFE from the global configuration."""
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001259 return global_config.global_config.get_config_value(
1260 'SHARD', 'global_afe_hostname')
1261
1262
MK Ryufbb002c2015-06-08 14:13:16 -07001263def route_rpc_to_master(func):
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001264 """Route RPC to master AFE.
MK Ryu2d107562015-02-24 17:45:02 -08001265
MK Ryufbb002c2015-06-08 14:13:16 -07001266 @param func: The function to decorate
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001267
MK Ryufbb002c2015-06-08 14:13:16 -07001268 @returns: The function to replace func with.
MK Ryu2d107562015-02-24 17:45:02 -08001269 """
MK Ryufbb002c2015-06-08 14:13:16 -07001270 @wraps(func)
1271 def replacement(**kwargs):
1272 if server_utils.is_shard():
MK Ryu9651ca52015-06-08 17:48:22 -07001273 afe = frontend_wrappers.RetryingAFE(
1274 server=get_global_afe_hostname())
1275 return afe.run(func.func_name, **kwargs)
MK Ryufbb002c2015-06-08 14:13:16 -07001276 return func(**kwargs)
1277 return replacement