blob: c043ae7c5130461c4e6557f3645a4f0de2a03a2a [file] [log] [blame]
Aviv Keshet18308922013-02-19 17:49:49 -08001#pylint: disable-msg=C0111
mblighe8819cd2008-02-15 16:48:40 +00002"""\
3Utility functions for rpc_interface.py. We keep them in a separate file so that
4only RPC interface functions go into that file.
5"""
6
7__author__ = 'showard@google.com (Steve Howard)'
8
MK Ryu84573e12015-02-18 15:54:09 -08009import datetime
10import inspect
11import os
12import sys
showard3d6ae112009-05-02 00:45:48 +000013import django.http
Dan Shi07e09af2013-04-12 09:31:29 -070014from autotest_lib.frontend.afe import models, model_logic
Alex Miller4a193692013-08-21 13:59:01 -070015from autotest_lib.client.common_lib import control_data, error
Jiaxi Luo421608e2014-07-07 14:38:00 -070016from autotest_lib.client.common_lib import global_config, priorities
MK Ryu0c1a37d2015-04-30 12:00:55 -070017from autotest_lib.client.common_lib import time_utils
MK Ryu509516b2015-05-18 12:00:47 -070018from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Aviv Keshetc68807e2013-07-31 16:13:01 -070019from autotest_lib.server.cros import provision
Jakob Juelich50e91f72014-10-01 12:43:23 -070020from autotest_lib.server import frontend
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -080021from autotest_lib.server import utils as server_utils
mblighe8819cd2008-02-15 16:48:40 +000022
showarda62866b2008-07-28 21:27:41 +000023NULL_DATETIME = datetime.datetime.max
24NULL_DATE = datetime.date.max
25
mblighe8819cd2008-02-15 16:48:40 +000026def prepare_for_serialization(objects):
jadmanski0afbb632008-06-06 21:10:57 +000027 """
28 Prepare Python objects to be returned via RPC.
Aviv Keshet18308922013-02-19 17:49:49 -080029 @param objects: objects to be prepared.
jadmanski0afbb632008-06-06 21:10:57 +000030 """
31 if (isinstance(objects, list) and len(objects) and
32 isinstance(objects[0], dict) and 'id' in objects[0]):
33 objects = gather_unique_dicts(objects)
34 return _prepare_data(objects)
showardb8d34242008-04-25 18:11:16 +000035
36
showardc92da832009-04-07 18:14:34 +000037def prepare_rows_as_nested_dicts(query, nested_dict_column_names):
38 """
39 Prepare a Django query to be returned via RPC as a sequence of nested
40 dictionaries.
41
42 @param query - A Django model query object with a select_related() method.
43 @param nested_dict_column_names - A list of column/attribute names for the
44 rows returned by query to expand into nested dictionaries using
45 their get_object_dict() method when not None.
46
47 @returns An list suitable to returned in an RPC.
48 """
49 all_dicts = []
50 for row in query.select_related():
51 row_dict = row.get_object_dict()
52 for column in nested_dict_column_names:
53 if row_dict[column] is not None:
54 row_dict[column] = getattr(row, column).get_object_dict()
55 all_dicts.append(row_dict)
56 return prepare_for_serialization(all_dicts)
57
58
showardb8d34242008-04-25 18:11:16 +000059def _prepare_data(data):
jadmanski0afbb632008-06-06 21:10:57 +000060 """
61 Recursively process data structures, performing necessary type
62 conversions to values in data to allow for RPC serialization:
63 -convert datetimes to strings
showard2b9a88b2008-06-13 20:55:03 +000064 -convert tuples and sets to lists
jadmanski0afbb632008-06-06 21:10:57 +000065 """
66 if isinstance(data, dict):
67 new_data = {}
68 for key, value in data.iteritems():
69 new_data[key] = _prepare_data(value)
70 return new_data
showard2b9a88b2008-06-13 20:55:03 +000071 elif (isinstance(data, list) or isinstance(data, tuple) or
72 isinstance(data, set)):
jadmanski0afbb632008-06-06 21:10:57 +000073 return [_prepare_data(item) for item in data]
showard98659972008-07-17 17:00:07 +000074 elif isinstance(data, datetime.date):
showarda62866b2008-07-28 21:27:41 +000075 if data is NULL_DATETIME or data is NULL_DATE:
76 return None
jadmanski0afbb632008-06-06 21:10:57 +000077 return str(data)
78 else:
79 return data
mblighe8819cd2008-02-15 16:48:40 +000080
81
Moises Osorio2dda22e2014-09-16 15:56:24 -070082def fetchall_as_list_of_dicts(cursor):
83 """
84 Converts each row in the cursor to a dictionary so that values can be read
85 by using the column name.
86 @param cursor: The database cursor to read from.
87 @returns: A list of each row in the cursor as a dictionary.
88 """
89 desc = cursor.description
90 return [ dict(zip([col[0] for col in desc], row))
91 for row in cursor.fetchall() ]
92
93
showard3d6ae112009-05-02 00:45:48 +000094def raw_http_response(response_data, content_type=None):
95 response = django.http.HttpResponse(response_data, mimetype=content_type)
96 response['Content-length'] = str(len(response.content))
97 return response
98
99
showardb0dfb9f2008-06-06 18:08:02 +0000100def gather_unique_dicts(dict_iterable):
jadmanski0afbb632008-06-06 21:10:57 +0000101 """\
102 Pick out unique objects (by ID) from an iterable of object dicts.
103 """
104 id_set = set()
105 result = []
106 for obj in dict_iterable:
107 if obj['id'] not in id_set:
108 id_set.add(obj['id'])
109 result.append(obj)
110 return result
showardb0dfb9f2008-06-06 18:08:02 +0000111
112
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700113def extra_job_status_filters(not_yet_run=False, running=False, finished=False):
jadmanski0afbb632008-06-06 21:10:57 +0000114 """\
115 Generate a SQL WHERE clause for job status filtering, and return it in
116 a dict of keyword args to pass to query.extra(). No more than one of
117 the parameters should be passed as True.
showard6c65d252009-10-01 18:45:22 +0000118 * not_yet_run: all HQEs are Queued
119 * finished: all HQEs are complete
120 * running: everything else
jadmanski0afbb632008-06-06 21:10:57 +0000121 """
122 assert not ((not_yet_run and running) or
123 (not_yet_run and finished) or
124 (running and finished)), ('Cannot specify more than one '
125 'filter to this function')
showard6c65d252009-10-01 18:45:22 +0000126
showardeab66ce2009-12-23 00:03:56 +0000127 not_queued = ('(SELECT job_id FROM afe_host_queue_entries '
128 'WHERE status != "%s")'
showard6c65d252009-10-01 18:45:22 +0000129 % models.HostQueueEntry.Status.QUEUED)
showardeab66ce2009-12-23 00:03:56 +0000130 not_finished = ('(SELECT job_id FROM afe_host_queue_entries '
131 'WHERE not complete)')
showard6c65d252009-10-01 18:45:22 +0000132
jadmanski0afbb632008-06-06 21:10:57 +0000133 if not_yet_run:
showard6c65d252009-10-01 18:45:22 +0000134 where = ['id NOT IN ' + not_queued]
jadmanski0afbb632008-06-06 21:10:57 +0000135 elif running:
showard6c65d252009-10-01 18:45:22 +0000136 where = ['(id IN %s) AND (id IN %s)' % (not_queued, not_finished)]
jadmanski0afbb632008-06-06 21:10:57 +0000137 elif finished:
showard6c65d252009-10-01 18:45:22 +0000138 where = ['id NOT IN ' + not_finished]
jadmanski0afbb632008-06-06 21:10:57 +0000139 else:
showard10f41672009-05-13 21:28:25 +0000140 return {}
jadmanski0afbb632008-06-06 21:10:57 +0000141 return {'where': where}
mblighe8819cd2008-02-15 16:48:40 +0000142
143
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700144def extra_job_type_filters(extra_args, suite=False,
145 sub=False, standalone=False):
146 """\
147 Generate a SQL WHERE clause for job status filtering, and return it in
148 a dict of keyword args to pass to query.extra().
149
150 param extra_args: a dict of existing extra_args.
151
152 No more than one of the parameters should be passed as True:
153 * suite: job which is parent of other jobs
154 * sub: job with a parent job
155 * standalone: job with no child or parent jobs
156 """
157 assert not ((suite and sub) or
158 (suite and standalone) or
159 (sub and standalone)), ('Cannot specify more than one '
160 'filter to this function')
161
162 where = extra_args.get('where', [])
163 parent_job_id = ('DISTINCT parent_job_id')
164 child_job_id = ('id')
165 filter_common = ('(SELECT %s FROM afe_jobs '
166 'WHERE parent_job_id IS NOT NULL)')
167
168 if suite:
169 where.append('id IN ' + filter_common % parent_job_id)
170 elif sub:
171 where.append('id IN ' + filter_common % child_job_id)
172 elif standalone:
173 where.append('NOT EXISTS (SELECT 1 from afe_jobs AS sub_query '
174 'WHERE parent_job_id IS NOT NULL'
175 ' AND (sub_query.parent_job_id=afe_jobs.id'
176 ' OR sub_query.id=afe_jobs.id))')
177 else:
178 return extra_args
179
180 extra_args['where'] = where
181 return extra_args
182
183
184
showard87cc38f2009-08-20 23:37:04 +0000185def extra_host_filters(multiple_labels=()):
jadmanski0afbb632008-06-06 21:10:57 +0000186 """\
187 Generate SQL WHERE clauses for matching hosts in an intersection of
188 labels.
189 """
190 extra_args = {}
showardeab66ce2009-12-23 00:03:56 +0000191 where_str = ('afe_hosts.id in (select host_id from afe_hosts_labels '
jadmanski0afbb632008-06-06 21:10:57 +0000192 'where label_id=%s)')
193 extra_args['where'] = [where_str] * len(multiple_labels)
194 extra_args['params'] = [models.Label.smart_get(label).id
195 for label in multiple_labels]
196 return extra_args
showard8e3aa5e2008-04-08 19:42:32 +0000197
198
showard87cc38f2009-08-20 23:37:04 +0000199def get_host_query(multiple_labels, exclude_only_if_needed_labels,
showard8aa84fc2009-09-16 17:17:55 +0000200 exclude_atomic_group_hosts, valid_only, filter_data):
201 if valid_only:
202 query = models.Host.valid_objects.all()
203 else:
204 query = models.Host.objects.all()
205
showard43a3d262008-11-12 18:17:05 +0000206 if exclude_only_if_needed_labels:
207 only_if_needed_labels = models.Label.valid_objects.filter(
208 only_if_needed=True)
showardf7eac6f2008-11-13 21:18:01 +0000209 if only_if_needed_labels.count() > 0:
showard87cc38f2009-08-20 23:37:04 +0000210 only_if_needed_ids = ','.join(
211 str(label['id'])
212 for label in only_if_needed_labels.values('id'))
showardf7eac6f2008-11-13 21:18:01 +0000213 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000214 query, 'afe_hosts_labels', join_key='host_id',
215 join_condition=('afe_hosts_labels_exclude_OIN.label_id IN (%s)'
showard87cc38f2009-08-20 23:37:04 +0000216 % only_if_needed_ids),
217 suffix='_exclude_OIN', exclude=True)
showard8aa84fc2009-09-16 17:17:55 +0000218
showard87cc38f2009-08-20 23:37:04 +0000219 if exclude_atomic_group_hosts:
220 atomic_group_labels = models.Label.valid_objects.filter(
221 atomic_group__isnull=False)
222 if atomic_group_labels.count() > 0:
223 atomic_group_label_ids = ','.join(
224 str(atomic_group['id'])
225 for atomic_group in atomic_group_labels.values('id'))
226 query = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000227 query, 'afe_hosts_labels', join_key='host_id',
228 join_condition=(
229 'afe_hosts_labels_exclude_AG.label_id IN (%s)'
230 % atomic_group_label_ids),
showard87cc38f2009-08-20 23:37:04 +0000231 suffix='_exclude_AG', exclude=True)
Fang Deng04d30612013-04-10 18:13:13 -0700232 try:
233 assert 'extra_args' not in filter_data
234 filter_data['extra_args'] = extra_host_filters(multiple_labels)
235 return models.Host.query_objects(filter_data, initial_query=query)
236 except models.Label.DoesNotExist as e:
237 return models.Host.objects.none()
showard43a3d262008-11-12 18:17:05 +0000238
239
showard8fd58242008-03-10 21:29:07 +0000240class InconsistencyException(Exception):
jadmanski0afbb632008-06-06 21:10:57 +0000241 'Raised when a list of objects does not have a consistent value'
showard8fd58242008-03-10 21:29:07 +0000242
243
244def get_consistent_value(objects, field):
mblighc5ddfd12008-08-04 17:15:00 +0000245 if not objects:
246 # well a list of nothing is consistent
247 return None
248
jadmanski0afbb632008-06-06 21:10:57 +0000249 value = getattr(objects[0], field)
250 for obj in objects:
251 this_value = getattr(obj, field)
252 if this_value != value:
253 raise InconsistencyException(objects[0], obj)
254 return value
showard8fd58242008-03-10 21:29:07 +0000255
256
showard2b9a88b2008-06-13 20:55:03 +0000257def prepare_generate_control_file(tests, kernel, label, profilers):
jadmanski0afbb632008-06-06 21:10:57 +0000258 test_objects = [models.Test.smart_get(test) for test in tests]
showard2b9a88b2008-06-13 20:55:03 +0000259 profiler_objects = [models.Profiler.smart_get(profiler)
260 for profiler in profilers]
jadmanski0afbb632008-06-06 21:10:57 +0000261 # ensure tests are all the same type
262 try:
263 test_type = get_consistent_value(test_objects, 'test_type')
264 except InconsistencyException, exc:
265 test1, test2 = exc.args
mblighec5546d2008-06-16 16:51:28 +0000266 raise model_logic.ValidationError(
jadmanski0afbb632008-06-06 21:10:57 +0000267 {'tests' : 'You cannot run both server- and client-side '
268 'tests together (tests %s and %s differ' % (
269 test1.name, test2.name)})
showard8fd58242008-03-10 21:29:07 +0000270
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700271 is_server = (test_type == control_data.CONTROL_TYPE.SERVER)
showard14374b12009-01-31 00:11:54 +0000272 if test_objects:
273 synch_count = max(test.sync_count for test in test_objects)
274 else:
275 synch_count = 1
jadmanski0afbb632008-06-06 21:10:57 +0000276 if label:
277 label = models.Label.smart_get(label)
mblighe8819cd2008-02-15 16:48:40 +0000278
showard989f25d2008-10-01 11:38:11 +0000279 dependencies = set(label.name for label
280 in models.Label.objects.filter(test__in=test_objects))
281
showard2bab8f42008-11-12 18:15:22 +0000282 cf_info = dict(is_server=is_server, synch_count=synch_count,
283 dependencies=list(dependencies))
284 return cf_info, test_objects, profiler_objects, label
showard989f25d2008-10-01 11:38:11 +0000285
286
287def check_job_dependencies(host_objects, job_dependencies):
288 """
289 Check that a set of machines satisfies a job's dependencies.
290 host_objects: list of models.Host objects
291 job_dependencies: list of names of labels
292 """
293 # check that hosts satisfy dependencies
294 host_ids = [host.id for host in host_objects]
295 hosts_in_job = models.Host.objects.filter(id__in=host_ids)
296 ok_hosts = hosts_in_job
297 for index, dependency in enumerate(job_dependencies):
Alex Milleraa772002014-04-10 17:51:21 -0700298 if not provision.is_for_special_action(dependency):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700299 ok_hosts = ok_hosts.filter(labels__name=dependency)
showard989f25d2008-10-01 11:38:11 +0000300 failing_hosts = (set(host.hostname for host in host_objects) -
301 set(host.hostname for host in ok_hosts))
302 if failing_hosts:
303 raise model_logic.ValidationError(
Eric Lie0493a42010-11-15 13:05:43 -0800304 {'hosts' : 'Host(s) failed to meet job dependencies (' +
305 (', '.join(job_dependencies)) + '): ' +
306 (', '.join(failing_hosts))})
307
showard989f25d2008-10-01 11:38:11 +0000308
Alex Miller4a193692013-08-21 13:59:01 -0700309def check_job_metahost_dependencies(metahost_objects, job_dependencies):
310 """
311 Check that at least one machine within the metahost spec satisfies the job's
312 dependencies.
313
314 @param metahost_objects A list of label objects representing the metahosts.
315 @param job_dependencies A list of strings of the required label names.
316 @raises NoEligibleHostException If a metahost cannot run the job.
317 """
318 for metahost in metahost_objects:
319 hosts = models.Host.objects.filter(labels=metahost)
320 for label_name in job_dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700321 if not provision.is_for_special_action(label_name):
Alex Miller4a193692013-08-21 13:59:01 -0700322 hosts = hosts.filter(labels__name=label_name)
323 if not any(hosts):
324 raise error.NoEligibleHostException("No hosts within %s satisfy %s."
325 % (metahost.name, ', '.join(job_dependencies)))
326
showard2bab8f42008-11-12 18:15:22 +0000327
328def _execution_key_for(host_queue_entry):
329 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)
330
331
332def check_abort_synchronous_jobs(host_queue_entries):
333 # ensure user isn't aborting part of a synchronous autoserv execution
334 count_per_execution = {}
335 for queue_entry in host_queue_entries:
336 key = _execution_key_for(queue_entry)
337 count_per_execution.setdefault(key, 0)
338 count_per_execution[key] += 1
339
340 for queue_entry in host_queue_entries:
341 if not queue_entry.execution_subdir:
342 continue
343 execution_count = count_per_execution[_execution_key_for(queue_entry)]
344 if execution_count < queue_entry.job.synch_count:
mbligh1ef218d2009-08-03 16:57:56 +0000345 raise model_logic.ValidationError(
346 {'' : 'You cannot abort part of a synchronous job execution '
347 '(%d/%s), %d included, %d expected'
348 % (queue_entry.job.id, queue_entry.execution_subdir,
349 execution_count, queue_entry.job.synch_count)})
showard8fbae652009-01-20 23:23:10 +0000350
351
showardc92da832009-04-07 18:14:34 +0000352def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700353 dependencies, atomic_group):
showardc92da832009-04-07 18:14:34 +0000354 """
355 Attempt to reject create_job requests with an atomic group that
356 will be impossible to schedule. The checks are not perfect but
357 should catch the most obvious issues.
358
359 @param synch_count - The job's minimum synch count.
360 @param host_objects - A list of models.Host instances.
361 @param metahost_objects - A list of models.Label instances.
362 @param dependencies - A list of job dependency label names.
showardc92da832009-04-07 18:14:34 +0000363 @param labels_by_name - A dictionary mapping label names to models.Label
364 instance. Used to look up instances for dependencies.
365
366 @raises model_logic.ValidationError - When an issue is found.
367 """
368 # If specific host objects were supplied with an atomic group, verify
369 # that there are enough to satisfy the synch_count.
370 minimum_required = synch_count or 1
371 if (host_objects and not metahost_objects and
372 len(host_objects) < minimum_required):
373 raise model_logic.ValidationError(
374 {'hosts':
375 'only %d hosts provided for job with synch_count = %d' %
376 (len(host_objects), synch_count)})
377
378 # Check that the atomic group has a hope of running this job
379 # given any supplied metahosts and dependancies that may limit.
380
381 # Get a set of hostnames in the atomic group.
382 possible_hosts = set()
383 for label in atomic_group.label_set.all():
384 possible_hosts.update(h.hostname for h in label.host_set.all())
385
386 # Filter out hosts that don't match all of the job dependency labels.
Alex Miller871291b2013-08-08 01:19:20 -0700387 for label in models.Label.objects.filter(name__in=dependencies):
showardc92da832009-04-07 18:14:34 +0000388 hosts_in_label = (h.hostname for h in label.host_set.all())
389 possible_hosts.intersection_update(hosts_in_label)
390
showard225bdc12009-04-13 16:09:21 +0000391 if not host_objects and not metahost_objects:
392 # No hosts or metahosts are required to queue an atomic group Job.
393 # However, if they are given, we respect them below.
394 host_set = possible_hosts
395 else:
396 host_set = set(host.hostname for host in host_objects)
397 unusable_host_set = host_set.difference(possible_hosts)
398 if unusable_host_set:
399 raise model_logic.ValidationError(
400 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %
401 (', '.join(sorted(unusable_host_set)), atomic_group.name)})
showardc92da832009-04-07 18:14:34 +0000402
403 # Lookup hosts provided by each meta host and merge them into the
404 # host_set for final counting.
405 for meta_host in metahost_objects:
406 meta_possible = possible_hosts.copy()
407 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())
408 meta_possible.intersection_update(hosts_in_meta_host)
409
410 # Count all hosts that this meta_host will provide.
411 host_set.update(meta_possible)
412
413 if len(host_set) < minimum_required:
414 raise model_logic.ValidationError(
415 {'atomic_group_name':
416 'Insufficient hosts in Atomic Group "%s" with the'
417 ' supplied dependencies and meta_hosts.' %
418 (atomic_group.name,)})
419
420
showardbe0d8692009-08-20 23:42:44 +0000421def check_modify_host(update_data):
422 """
423 Sanity check modify_host* requests.
424
425 @param update_data: A dictionary with the changes to make to a host
426 or hosts.
427 """
428 # Only the scheduler (monitor_db) is allowed to modify Host status.
429 # Otherwise race conditions happen as a hosts state is changed out from
430 # beneath tasks being run on a host.
431 if 'status' in update_data:
432 raise model_logic.ValidationError({
433 'status': 'Host status can not be modified by the frontend.'})
434
435
showardce7c0922009-09-11 18:39:24 +0000436def check_modify_host_locking(host, update_data):
437 """
438 Checks when locking/unlocking has been requested if the host is already
439 locked/unlocked.
440
441 @param host: models.Host object to be modified
442 @param update_data: A dictionary with the changes to make to the host.
443 """
444 locked = update_data.get('locked', None)
Matthew Sartori68186332015-04-27 17:19:53 -0700445 lock_reason = update_data.get('lock_reason', None)
showardce7c0922009-09-11 18:39:24 +0000446 if locked is not None:
447 if locked and host.locked:
448 raise model_logic.ValidationError({
449 'locked': 'Host already locked by %s on %s.' %
450 (host.locked_by, host.lock_time)})
451 if not locked and not host.locked:
452 raise model_logic.ValidationError({
453 'locked': 'Host already unlocked.'})
Matthew Sartori68186332015-04-27 17:19:53 -0700454 if locked and not lock_reason and not host.locked:
455 raise model_logic.ValidationError({
456 'locked': 'Please provide a reason for locking'})
showardce7c0922009-09-11 18:39:24 +0000457
458
showard8fbae652009-01-20 23:23:10 +0000459def get_motd():
460 dirname = os.path.dirname(__file__)
461 filename = os.path.join(dirname, "..", "..", "motd.txt")
462 text = ''
463 try:
464 fp = open(filename, "r")
465 try:
466 text = fp.read()
467 finally:
468 fp.close()
469 except:
470 pass
471
472 return text
showard29f7cd22009-04-29 21:16:24 +0000473
474
475def _get_metahost_counts(metahost_objects):
476 metahost_counts = {}
477 for metahost in metahost_objects:
478 metahost_counts.setdefault(metahost, 0)
479 metahost_counts[metahost] += 1
480 return metahost_counts
481
482
showarda965cef2009-05-15 23:17:41 +0000483def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):
showard29f7cd22009-04-29 21:16:24 +0000484 hosts = []
485 one_time_hosts = []
486 meta_hosts = []
487 atomic_group = None
jamesren2275ef12010-04-12 18:25:06 +0000488 hostless = False
showard29f7cd22009-04-29 21:16:24 +0000489
showard4d077562009-05-08 18:24:36 +0000490 queue_entries = job.hostqueueentry_set.all()
showarda965cef2009-05-15 23:17:41 +0000491 if queue_entry_filter_data:
492 queue_entries = models.HostQueueEntry.query_objects(
493 queue_entry_filter_data, initial_query=queue_entries)
showard4d077562009-05-08 18:24:36 +0000494
495 for queue_entry in queue_entries:
showard29f7cd22009-04-29 21:16:24 +0000496 if (queue_entry.host and (preserve_metahosts or
497 not queue_entry.meta_host)):
498 if queue_entry.deleted:
499 continue
500 if queue_entry.host.invalid:
501 one_time_hosts.append(queue_entry.host)
502 else:
503 hosts.append(queue_entry.host)
jamesren2275ef12010-04-12 18:25:06 +0000504 elif queue_entry.meta_host:
showard29f7cd22009-04-29 21:16:24 +0000505 meta_hosts.append(queue_entry.meta_host)
jamesren2275ef12010-04-12 18:25:06 +0000506 else:
507 hostless = True
508
showard29f7cd22009-04-29 21:16:24 +0000509 if atomic_group is None:
510 if queue_entry.atomic_group is not None:
511 atomic_group = queue_entry.atomic_group
512 else:
513 assert atomic_group.name == queue_entry.atomic_group.name, (
514 'DB inconsistency. HostQueueEntries with multiple atomic'
515 ' groups on job %s: %s != %s' % (
516 id, atomic_group.name, queue_entry.atomic_group.name))
517
518 meta_host_counts = _get_metahost_counts(meta_hosts)
519
520 info = dict(dependencies=[label.name for label
521 in job.dependency_labels.all()],
522 hosts=hosts,
523 meta_hosts=meta_hosts,
524 meta_host_counts=meta_host_counts,
525 one_time_hosts=one_time_hosts,
jamesren2275ef12010-04-12 18:25:06 +0000526 atomic_group=atomic_group,
527 hostless=hostless)
showard29f7cd22009-04-29 21:16:24 +0000528 return info
529
530
showard09d80f92009-11-19 01:01:19 +0000531def check_for_duplicate_hosts(host_objects):
532 host_ids = set()
533 duplicate_hostnames = set()
534 for host in host_objects:
535 if host.id in host_ids:
536 duplicate_hostnames.add(host.hostname)
537 host_ids.add(host.id)
538
539 if duplicate_hostnames:
540 raise model_logic.ValidationError(
541 {'hosts' : 'Duplicate hosts: %s'
542 % ', '.join(duplicate_hostnames)})
543
544
showarda1e74b32009-05-12 17:32:04 +0000545def create_new_job(owner, options, host_objects, metahost_objects,
546 atomic_group=None):
showard29f7cd22009-04-29 21:16:24 +0000547 all_host_objects = host_objects + metahost_objects
548 metahost_counts = _get_metahost_counts(metahost_objects)
showarda1e74b32009-05-12 17:32:04 +0000549 dependencies = options.get('dependencies', [])
550 synch_count = options.get('synch_count')
showard29f7cd22009-04-29 21:16:24 +0000551
showard29f7cd22009-04-29 21:16:24 +0000552 if atomic_group:
553 check_atomic_group_create_job(
554 synch_count, host_objects, metahost_objects,
Alex Miller871291b2013-08-08 01:19:20 -0700555 dependencies, atomic_group)
showard29f7cd22009-04-29 21:16:24 +0000556 else:
557 if synch_count is not None and synch_count > len(all_host_objects):
558 raise model_logic.ValidationError(
559 {'hosts':
560 'only %d hosts provided for job with synch_count = %d' %
561 (len(all_host_objects), synch_count)})
562 atomic_hosts = models.Host.objects.filter(
563 id__in=[host.id for host in host_objects],
564 labels__atomic_group=True)
565 unusable_host_names = [host.hostname for host in atomic_hosts]
566 if unusable_host_names:
567 raise model_logic.ValidationError(
568 {'hosts':
569 'Host(s) "%s" are atomic group hosts but no '
570 'atomic group was specified for this job.' %
571 (', '.join(unusable_host_names),)})
572
showard09d80f92009-11-19 01:01:19 +0000573 check_for_duplicate_hosts(host_objects)
showard29f7cd22009-04-29 21:16:24 +0000574
Aviv Keshetc68807e2013-07-31 16:13:01 -0700575 for label_name in dependencies:
Alex Milleraa772002014-04-10 17:51:21 -0700576 if provision.is_for_special_action(label_name):
Aviv Keshetc68807e2013-07-31 16:13:01 -0700577 # TODO: We could save a few queries
578 # if we had a bulk ensure-label-exists function, which used
579 # a bulk .get() call. The win is probably very small.
Alex Miller871291b2013-08-08 01:19:20 -0700580 _ensure_label_exists(label_name)
Aviv Keshetc68807e2013-07-31 16:13:01 -0700581
Alex Miller4a193692013-08-21 13:59:01 -0700582 # This only checks targeted hosts, not hosts eligible due to the metahost
583 check_job_dependencies(host_objects, dependencies)
584 check_job_metahost_dependencies(metahost_objects, dependencies)
585
Alex Miller871291b2013-08-08 01:19:20 -0700586 options['dependencies'] = list(
587 models.Label.objects.filter(name__in=dependencies))
showard29f7cd22009-04-29 21:16:24 +0000588
showarda1e74b32009-05-12 17:32:04 +0000589 for label in metahost_objects + options['dependencies']:
showard29f7cd22009-04-29 21:16:24 +0000590 if label.atomic_group and not atomic_group:
591 raise model_logic.ValidationError(
592 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000593 'Dependency %r requires an atomic group but no '
594 'atomic_group_name or meta_host in an atomic group was '
595 'specified for this job.' % label.name})
showard29f7cd22009-04-29 21:16:24 +0000596 elif (label.atomic_group and
597 label.atomic_group.name != atomic_group.name):
598 raise model_logic.ValidationError(
599 {'atomic_group_name':
showardc8730322009-06-30 01:56:38 +0000600 'meta_hosts or dependency %r requires atomic group '
601 '%r instead of the supplied atomic_group_name=%r.' %
602 (label.name, label.atomic_group.name, atomic_group.name)})
showard29f7cd22009-04-29 21:16:24 +0000603
showarda1e74b32009-05-12 17:32:04 +0000604 job = models.Job.create(owner=owner, options=options,
605 hosts=all_host_objects)
showard29f7cd22009-04-29 21:16:24 +0000606 job.queue(all_host_objects, atomic_group=atomic_group,
showarda1e74b32009-05-12 17:32:04 +0000607 is_template=options.get('is_template', False))
showard29f7cd22009-04-29 21:16:24 +0000608 return job.id
showard0957a842009-05-11 19:25:08 +0000609
610
Aviv Keshetc68807e2013-07-31 16:13:01 -0700611def _ensure_label_exists(name):
612 """
613 Ensure that a label called |name| exists in the Django models.
614
615 This function is to be called from within afe rpcs only, as an
616 alternative to server.cros.provision.ensure_label_exists(...). It works
617 by Django model manipulation, rather than by making another create_label
618 rpc call.
619
620 @param name: the label to check for/create.
621 @raises ValidationError: There was an error in the response that was
622 not because the label already existed.
623 @returns True is a label was created, False otherwise.
624 """
625 try:
626 models.Label.objects.get(name=name)
627 except models.Label.DoesNotExist:
628 new_label = models.Label.objects.create(name=name)
629 new_label.save()
630 return True
631 return False
632
633
showard909c9142009-07-07 20:54:42 +0000634def find_platform_and_atomic_group(host):
635 """
636 Figure out the platform name and atomic group name for the given host
637 object. If none, the return value for either will be None.
638
639 @returns (platform name, atomic group name) for the given host.
640 """
showard0957a842009-05-11 19:25:08 +0000641 platforms = [label.name for label in host.label_list if label.platform]
642 if not platforms:
showard909c9142009-07-07 20:54:42 +0000643 platform = None
644 else:
645 platform = platforms[0]
showard0957a842009-05-11 19:25:08 +0000646 if len(platforms) > 1:
647 raise ValueError('Host %s has more than one platform: %s' %
648 (host.hostname, ', '.join(platforms)))
showard909c9142009-07-07 20:54:42 +0000649 for label in host.label_list:
650 if label.atomic_group:
651 atomic_group_name = label.atomic_group.name
652 break
653 else:
654 atomic_group_name = None
655 # Don't check for multiple atomic groups on a host here. That is an
656 # error but should not trip up the RPC interface. monitor_db_cleanup
657 # deals with it. This just returns the first one found.
658 return platform, atomic_group_name
showardc0ac3a72009-07-08 21:14:45 +0000659
660
661# support for get_host_queue_entries_and_special_tasks()
662
MK Ryu0c1a37d2015-04-30 12:00:55 -0700663def _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on):
showardc0ac3a72009-07-08 21:14:45 +0000664 return dict(type=type,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700665 host=entry['host'],
showardc0ac3a72009-07-08 21:14:45 +0000666 job=job_dict,
MK Ryu0c1a37d2015-04-30 12:00:55 -0700667 execution_path=exec_path,
668 status=status,
669 started_on=started_on,
670 id=str(entry['id']) + type,
671 oid=entry['id'])
showardc0ac3a72009-07-08 21:14:45 +0000672
673
MK Ryu0c1a37d2015-04-30 12:00:55 -0700674def _special_task_to_dict(task, queue_entries):
675 """Transforms a special task dictionary to another form of dictionary.
676
677 @param task Special task as a dictionary type
678 @param queue_entries Host queue entries as a list of dictionaries.
679
680 @return Transformed dictionary for a special task.
681 """
showardc0ac3a72009-07-08 21:14:45 +0000682 job_dict = None
MK Ryu0c1a37d2015-04-30 12:00:55 -0700683 if task['queue_entry']:
684 # Scan queue_entries to get the job detail info.
685 for qentry in queue_entries:
686 if task['queue_entry']['id'] == qentry['id']:
687 job_dict = qentry['job']
688 break
689 # If not found, get it from DB.
690 if job_dict is None:
691 job = models.Job.objects.get(id=task['queue_entry']['job'])
692 job_dict = job.get_object_dict()
693
694 exec_path = server_utils.get_special_task_exec_path(
695 task['host']['hostname'], task['id'], task['task'],
696 time_utils.time_string_to_datetime(task['time_requested']))
697 status = server_utils.get_special_task_status(
698 task['is_complete'], task['success'], task['is_active'])
699 return _common_entry_to_dict(task, task['task'], job_dict,
700 exec_path, status, task['time_started'])
showardc0ac3a72009-07-08 21:14:45 +0000701
702
703def _queue_entry_to_dict(queue_entry):
MK Ryu0c1a37d2015-04-30 12:00:55 -0700704 job_dict = queue_entry['job']
705 tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner'])
706 exec_path = server_utils.get_hqe_exec_path(tag,
707 queue_entry['execution_subdir'])
708 return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path,
709 queue_entry['status'], queue_entry['started_on'])
710
711
712def prepare_host_queue_entries_and_special_tasks(interleaved_entries,
713 queue_entries):
714 """
715 Prepare for serialization the interleaved entries of host queue entries
716 and special tasks.
717 Each element in the entries is a dictionary type.
718 The special task dictionary has only a job id for a job and lacks
719 the detail of the job while the host queue entry dictionary has.
720 queue_entries is used to look up the job detail info.
721
722 @param interleaved_entries Host queue entries and special tasks as a list
723 of dictionaries.
724 @param queue_entries Host queue entries as a list of dictionaries.
725
726 @return A post-processed list of dictionaries that is to be serialized.
727 """
728 dict_list = []
729 for e in interleaved_entries:
730 # Distinguish the two mixed entries based on the existence of
731 # the key "task". If an entry has the key, the entry is for
732 # special task. Otherwise, host queue entry.
733 if 'task' in e:
734 dict_list.append(_special_task_to_dict(e, queue_entries))
735 else:
736 dict_list.append(_queue_entry_to_dict(e))
737 return prepare_for_serialization(dict_list)
showardc0ac3a72009-07-08 21:14:45 +0000738
739
740def _compute_next_job_for_tasks(queue_entries, special_tasks):
741 """
742 For each task, try to figure out the next job that ran after that task.
743 This is done using two pieces of information:
744 * if the task has a queue entry, we can use that entry's job ID.
745 * if the task has a time_started, we can try to compare that against the
746 started_on field of queue_entries. this isn't guaranteed to work perfectly
747 since queue_entries may also have null started_on values.
748 * if the task has neither, or if use of time_started fails, just use the
749 last computed job ID.
MK Ryu0c1a37d2015-04-30 12:00:55 -0700750
751 @param queue_entries Host queue entries as a list of dictionaries.
752 @param special_tasks Special tasks as a list of dictionaries.
showardc0ac3a72009-07-08 21:14:45 +0000753 """
754 next_job_id = None # most recently computed next job
755 hqe_index = 0 # index for scanning by started_on times
756 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700757 if task['queue_entry']:
758 next_job_id = task['queue_entry']['job']
759 elif task['time_started'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000760 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700761 if queue_entry['started_on'] is None:
showardc0ac3a72009-07-08 21:14:45 +0000762 continue
MK Ryu0c1a37d2015-04-30 12:00:55 -0700763 t1 = time_utils.time_string_to_datetime(
764 queue_entry['started_on'])
765 t2 = time_utils.time_string_to_datetime(task['time_started'])
766 if t1 < t2:
showardc0ac3a72009-07-08 21:14:45 +0000767 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700768 next_job_id = queue_entry['job']['id']
showardc0ac3a72009-07-08 21:14:45 +0000769
MK Ryu0c1a37d2015-04-30 12:00:55 -0700770 task['next_job_id'] = next_job_id
showardc0ac3a72009-07-08 21:14:45 +0000771
772 # advance hqe_index to just after next_job_id
773 if next_job_id is not None:
774 for queue_entry in queue_entries[hqe_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700775 if queue_entry['job']['id'] < next_job_id:
showardc0ac3a72009-07-08 21:14:45 +0000776 break
777 hqe_index += 1
778
779
780def interleave_entries(queue_entries, special_tasks):
781 """
782 Both lists should be ordered by descending ID.
783 """
784 _compute_next_job_for_tasks(queue_entries, special_tasks)
785
786 # start with all special tasks that've run since the last job
787 interleaved_entries = []
788 for task in special_tasks:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700789 if task['next_job_id'] is not None:
showardc0ac3a72009-07-08 21:14:45 +0000790 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700791 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000792
793 # now interleave queue entries with the remaining special tasks
794 special_task_index = len(interleaved_entries)
795 for queue_entry in queue_entries:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700796 interleaved_entries.append(queue_entry)
showardc0ac3a72009-07-08 21:14:45 +0000797 # add all tasks that ran between this job and the previous one
798 for task in special_tasks[special_task_index:]:
MK Ryu0c1a37d2015-04-30 12:00:55 -0700799 if task['next_job_id'] < queue_entry['job']['id']:
showardc0ac3a72009-07-08 21:14:45 +0000800 break
MK Ryu0c1a37d2015-04-30 12:00:55 -0700801 interleaved_entries.append(task)
showardc0ac3a72009-07-08 21:14:45 +0000802 special_task_index += 1
803
804 return interleaved_entries
jamesren4a41e012010-07-16 22:33:48 +0000805
806
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800807def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
808 """Figure out which hosts are on which shards.
809
810 @param host_objs: A list of host objects.
811 @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
812 instead of the 'real' shard hostnames. This only matters for testing
813 environments.
814
815 @return: A map of shard hostname: list of hosts on the shard.
816 """
817 shard_host_map = {}
818 for host in host_objs:
819 if host.shard:
820 shard_name = (host.shard.rpc_hostname() if rpc_hostnames
821 else host.shard.hostname)
822 shard_host_map.setdefault(shard_name, []).append(host.hostname)
823 return shard_host_map
824
825
jamesren4a41e012010-07-16 22:33:48 +0000826def get_create_job_common_args(local_args):
827 """
828 Returns a dict containing only the args that apply for create_job_common
829
830 Returns a subset of local_args, which contains only the arguments that can
831 be passed in to create_job_common().
832 """
Alex Miller7d658cf2013-09-04 16:00:35 -0700833 # This code is only here to not kill suites scheduling tests when priority
834 # becomes an int instead of a string.
835 if isinstance(local_args['priority'], str):
836 local_args['priority'] = priorities.Priority.DEFAULT
837 # </migration hack>
jamesren4a41e012010-07-16 22:33:48 +0000838 arg_names, _, _, _ = inspect.getargspec(create_job_common)
839 return dict(item for item in local_args.iteritems() if item[0] in arg_names)
840
841
842def create_job_common(name, priority, control_type, control_file=None,
843 hosts=(), meta_hosts=(), one_time_hosts=(),
844 atomic_group_name=None, synch_count=None,
Simran Basi7e605742013-11-12 13:43:36 -0800845 is_template=False, timeout=None, timeout_mins=None,
846 max_runtime_mins=None, run_verify=True, email_list='',
847 dependencies=(), reboot_before=None, reboot_after=None,
jamesren4a41e012010-07-16 22:33:48 +0000848 parse_failed_repair=None, hostless=False, keyvals=None,
Aviv Keshet18308922013-02-19 17:49:49 -0800849 drone_set=None, parameterized_job=None,
Dan Shiec1d47d2015-02-13 11:38:13 -0800850 parent_job_id=None, test_retry=0, run_reset=True,
851 require_ssp=None):
Aviv Keshet18308922013-02-19 17:49:49 -0800852 #pylint: disable-msg=C0111
jamesren4a41e012010-07-16 22:33:48 +0000853 """
854 Common code between creating "standard" jobs and creating parameterized jobs
855 """
856 user = models.User.current_user()
857 owner = user.login
858
jamesren4a41e012010-07-16 22:33:48 +0000859 # input validation
860 if not (hosts or meta_hosts or one_time_hosts or atomic_group_name
861 or hostless):
862 raise model_logic.ValidationError({
863 'arguments' : "You must pass at least one of 'hosts', "
864 "'meta_hosts', 'one_time_hosts', "
865 "'atomic_group_name', or 'hostless'"
866 })
867
868 if hostless:
869 if hosts or meta_hosts or one_time_hosts or atomic_group_name:
870 raise model_logic.ValidationError({
871 'hostless': 'Hostless jobs cannot include any hosts!'})
Aviv Keshet3dd8beb2013-05-13 17:36:04 -0700872 server_type = control_data.CONTROL_TYPE_NAMES.SERVER
jamesren4a41e012010-07-16 22:33:48 +0000873 if control_type != server_type:
874 raise model_logic.ValidationError({
875 'control_type': 'Hostless jobs cannot use client-side '
876 'control files'})
877
Alex Miller871291b2013-08-08 01:19:20 -0700878 atomic_groups_by_name = dict((ag.name, ag)
jamesren4a41e012010-07-16 22:33:48 +0000879 for ag in models.AtomicGroup.objects.all())
Alex Miller871291b2013-08-08 01:19:20 -0700880 label_objects = list(models.Label.objects.filter(name__in=meta_hosts))
jamesren4a41e012010-07-16 22:33:48 +0000881
882 # Schedule on an atomic group automagically if one of the labels given
883 # is an atomic group label and no explicit atomic_group_name was supplied.
884 if not atomic_group_name:
Alex Miller871291b2013-08-08 01:19:20 -0700885 for label in label_objects:
jamesren4a41e012010-07-16 22:33:48 +0000886 if label and label.atomic_group:
887 atomic_group_name = label.atomic_group.name
888 break
jamesren4a41e012010-07-16 22:33:48 +0000889 # convert hostnames & meta hosts to host/label objects
890 host_objects = models.Host.smart_get_bulk(hosts)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800891 if not server_utils.is_shard():
Prashanth Balasubramanian6edaaf92014-11-24 16:36:25 -0800892 shard_host_map = bucket_hosts_by_shard(host_objects)
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -0800893 num_shards = len(shard_host_map)
894 if (num_shards > 1 or (num_shards == 1 and
895 len(shard_host_map.values()[0]) != len(host_objects))):
896 # We disallow the following jobs on master:
897 # num_shards > 1: this is a job spanning across multiple shards.
898 # num_shards == 1 but number of hosts on shard is less
899 # than total number of hosts: this is a job that spans across
900 # one shard and the master.
901 raise ValueError(
902 'The following hosts are on shard(s), please create '
903 'seperate jobs for hosts on each shard: %s ' %
904 shard_host_map)
jamesren4a41e012010-07-16 22:33:48 +0000905 metahost_objects = []
Alex Miller871291b2013-08-08 01:19:20 -0700906 meta_host_labels_by_name = {label.name: label for label in label_objects}
jamesren4a41e012010-07-16 22:33:48 +0000907 for label_name in meta_hosts or []:
Alex Miller871291b2013-08-08 01:19:20 -0700908 if label_name in meta_host_labels_by_name:
909 metahost_objects.append(meta_host_labels_by_name[label_name])
jamesren4a41e012010-07-16 22:33:48 +0000910 elif label_name in atomic_groups_by_name:
911 # If given a metahost name that isn't a Label, check to
912 # see if the user was specifying an Atomic Group instead.
913 atomic_group = atomic_groups_by_name[label_name]
914 if atomic_group_name and atomic_group_name != atomic_group.name:
915 raise model_logic.ValidationError({
916 'meta_hosts': (
917 'Label "%s" not found. If assumed to be an '
918 'atomic group it would conflict with the '
919 'supplied atomic group "%s".' % (
920 label_name, atomic_group_name))})
921 atomic_group_name = atomic_group.name
922 else:
923 raise model_logic.ValidationError(
924 {'meta_hosts' : 'Label "%s" not found' % label_name})
925
926 # Create and sanity check an AtomicGroup object if requested.
927 if atomic_group_name:
928 if one_time_hosts:
929 raise model_logic.ValidationError(
930 {'one_time_hosts':
931 'One time hosts cannot be used with an Atomic Group.'})
932 atomic_group = models.AtomicGroup.smart_get(atomic_group_name)
933 if synch_count and synch_count > atomic_group.max_number_of_machines:
934 raise model_logic.ValidationError(
935 {'atomic_group_name' :
936 'You have requested a synch_count (%d) greater than the '
937 'maximum machines in the requested Atomic Group (%d).' %
938 (synch_count, atomic_group.max_number_of_machines)})
939 else:
940 atomic_group = None
941
942 for host in one_time_hosts or []:
943 this_host = models.Host.create_one_time_host(host)
944 host_objects.append(this_host)
945
946 options = dict(name=name,
947 priority=priority,
948 control_file=control_file,
949 control_type=control_type,
950 is_template=is_template,
951 timeout=timeout,
Simran Basi7e605742013-11-12 13:43:36 -0800952 timeout_mins=timeout_mins,
Simran Basi34217022012-11-06 13:43:15 -0800953 max_runtime_mins=max_runtime_mins,
jamesren4a41e012010-07-16 22:33:48 +0000954 synch_count=synch_count,
955 run_verify=run_verify,
956 email_list=email_list,
957 dependencies=dependencies,
958 reboot_before=reboot_before,
959 reboot_after=reboot_after,
960 parse_failed_repair=parse_failed_repair,
961 keyvals=keyvals,
962 drone_set=drone_set,
Aviv Keshet18308922013-02-19 17:49:49 -0800963 parameterized_job=parameterized_job,
Aviv Keshetcd1ff9b2013-03-01 14:55:19 -0800964 parent_job_id=parent_job_id,
Dan Shi07e09af2013-04-12 09:31:29 -0700965 test_retry=test_retry,
Dan Shiec1d47d2015-02-13 11:38:13 -0800966 run_reset=run_reset,
967 require_ssp=require_ssp)
jamesren4a41e012010-07-16 22:33:48 +0000968 return create_new_job(owner=owner,
969 options=options,
970 host_objects=host_objects,
971 metahost_objects=metahost_objects,
972 atomic_group=atomic_group)
Simran Basib6ec8ae2014-04-23 12:05:08 -0700973
974
975def encode_ascii(control_file):
976 """Force a control file to only contain ascii characters.
977
978 @param control_file: Control file to encode.
979
980 @returns the control file in an ascii encoding.
981
982 @raises error.ControlFileMalformed: if encoding fails.
983 """
984 try:
985 return control_file.encode('ascii')
986 except UnicodeDecodeError as e:
Jiaxi Luo421608e2014-07-07 14:38:00 -0700987 raise error.ControlFileMalformed(str(e))
988
989
990def get_wmatrix_url():
991 """Get wmatrix url from config file.
992
993 @returns the wmatrix url or an empty string.
994 """
995 return global_config.global_config.get_config_value('AUTOTEST_WEB',
996 'wmatrix_url',
Jiaxi Luo15cbf372014-07-01 19:20:20 -0700997 default='')
Jiaxi Luo57bc1952014-07-22 15:27:30 -0700998
999
1000def inject_times_to_filter(start_time_key=None, end_time_key=None,
1001 start_time_value=None, end_time_value=None,
1002 **filter_data):
1003 """Inject the key value pairs of start and end time if provided.
1004
1005 @param start_time_key: A string represents the filter key of start_time.
1006 @param end_time_key: A string represents the filter key of end_time.
1007 @param start_time_value: Start_time value.
1008 @param end_time_value: End_time value.
1009
1010 @returns the injected filter_data.
1011 """
1012 if start_time_value:
1013 filter_data[start_time_key] = start_time_value
1014 if end_time_value:
1015 filter_data[end_time_key] = end_time_value
1016 return filter_data
1017
1018
1019def inject_times_to_hqe_special_tasks_filters(filter_data_common,
1020 start_time, end_time):
1021 """Inject start and end time to hqe and special tasks filters.
1022
1023 @param filter_data_common: Common filter for hqe and special tasks.
1024 @param start_time_key: A string represents the filter key of start_time.
1025 @param end_time_key: A string represents the filter key of end_time.
1026
1027 @returns a pair of hqe and special tasks filters.
1028 """
1029 filter_data_special_tasks = filter_data_common.copy()
1030 return (inject_times_to_filter('started_on__gte', 'started_on__lte',
1031 start_time, end_time, **filter_data_common),
1032 inject_times_to_filter('time_started__gte', 'time_started__lte',
1033 start_time, end_time,
Jakob Juelich59cfe542014-09-02 16:37:46 -07001034 **filter_data_special_tasks))
1035
1036
1037def retrieve_shard(shard_hostname):
1038 """
Jakob Juelich77457572014-09-22 17:02:43 -07001039 Retrieves the shard with the given hostname from the database.
Jakob Juelich59cfe542014-09-02 16:37:46 -07001040
1041 @param shard_hostname: Hostname of the shard to retrieve
1042
Jakob Juelich77457572014-09-22 17:02:43 -07001043 @raises models.Shard.DoesNotExist, if no shard with this hostname was found.
1044
Jakob Juelich59cfe542014-09-02 16:37:46 -07001045 @returns: Shard object
1046 """
MK Ryu509516b2015-05-18 12:00:47 -07001047 timer = autotest_stats.Timer('shard_heartbeat.retrieve_shard')
1048 with timer:
1049 return models.Shard.smart_get(shard_hostname)
Jakob Juelich59cfe542014-09-02 16:37:46 -07001050
1051
Jakob Juelich1b525742014-09-30 13:08:07 -07001052def find_records_for_shard(shard, known_job_ids, known_host_ids):
Jakob Juelich59cfe542014-09-02 16:37:46 -07001053 """Find records that should be sent to a shard.
1054
Jakob Juelicha94efe62014-09-18 16:02:49 -07001055 @param shard: Shard to find records for.
Jakob Juelich1b525742014-09-30 13:08:07 -07001056 @param known_job_ids: List of ids of jobs the shard already has.
1057 @param known_host_ids: List of ids of hosts the shard already has.
Jakob Juelicha94efe62014-09-18 16:02:49 -07001058
Fang Dengf3705992014-12-16 17:32:18 -08001059 @returns: Tuple of three lists for hosts, jobs, and suite job keyvals:
1060 (hosts, jobs, suite_job_keyvals).
Jakob Juelich59cfe542014-09-02 16:37:46 -07001061 """
MK Ryu509516b2015-05-18 12:00:47 -07001062 timer = autotest_stats.Timer('shard_heartbeat')
1063 with timer.get_client('find_hosts'):
1064 hosts = models.Host.assign_to_shard(shard, known_host_ids)
1065 with timer.get_client('find_jobs'):
1066 jobs = models.Job.assign_to_shard(shard, known_job_ids)
1067 with timer.get_client('find_suite_job_keyvals'):
1068 parent_job_ids = [job.parent_job_id for job in jobs]
1069 suite_job_keyvals = models.JobKeyval.objects.filter(
1070 job_id__in=parent_job_ids)
Fang Dengf3705992014-12-16 17:32:18 -08001071 return hosts, jobs, suite_job_keyvals
Jakob Juelicha94efe62014-09-18 16:02:49 -07001072
1073
1074def _persist_records_with_type_sent_from_shard(
1075 shard, records, record_type, *args, **kwargs):
1076 """
1077 Handle records of a specified type that were sent to the shard master.
1078
1079 @param shard: The shard the records were sent from.
1080 @param records: The records sent in their serialized format.
1081 @param record_type: Type of the objects represented by records.
1082 @param args: Additional arguments that will be passed on to the sanity
1083 checks.
1084 @param kwargs: Additional arguments that will be passed on to the sanity
1085 checks.
1086
1087 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1088
1089 @returns: List of primary keys of the processed records.
1090 """
1091 pks = []
1092 for serialized_record in records:
1093 pk = serialized_record['id']
1094 try:
1095 current_record = record_type.objects.get(pk=pk)
1096 except record_type.DoesNotExist:
1097 raise error.UnallowedRecordsSentToMaster(
1098 'Object with pk %s of type %s does not exist on master.' % (
1099 pk, record_type))
1100
1101 current_record.sanity_check_update_from_shard(
1102 shard, serialized_record, *args, **kwargs)
1103
1104 current_record.update_from_serialized(serialized_record)
1105 pks.append(pk)
1106 return pks
1107
1108
1109def persist_records_sent_from_shard(shard, jobs, hqes):
1110 """
1111 Sanity checking then saving serialized records sent to master from shard.
1112
1113 During heartbeats shards upload jobs and hostqueuentries. This performs
1114 some sanity checks on these and then updates the existing records for those
1115 entries with the updated ones from the heartbeat.
1116
1117 The sanity checks include:
1118 - Checking if the objects sent already exist on the master.
1119 - Checking if the objects sent were assigned to this shard.
1120 - hostqueueentries must be sent together with their jobs.
1121
1122 @param shard: The shard the records were sent from.
1123 @param jobs: The jobs the shard sent.
1124 @param hqes: The hostqueuentries the shart sent.
1125
1126 @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
1127 """
MK Ryu509516b2015-05-18 12:00:47 -07001128 timer = autotest_stats.Timer('shard_heartbeat')
1129 with timer.get_client('persist_jobs'):
1130 job_ids_sent = _persist_records_with_type_sent_from_shard(
1131 shard, jobs, models.Job)
Jakob Juelicha94efe62014-09-18 16:02:49 -07001132
MK Ryu509516b2015-05-18 12:00:47 -07001133 with timer.get_client('persist_hqes'):
1134 _persist_records_with_type_sent_from_shard(
1135 shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
Jakob Juelich50e91f72014-10-01 12:43:23 -07001136
1137
Jakob Juelich50e91f72014-10-01 12:43:23 -07001138def forward_single_host_rpc_to_shard(func):
1139 """This decorator forwards rpc calls that modify a host to a shard.
1140
1141 If a host is assigned to a shard, rpcs that change his attributes should be
1142 forwarded to the shard.
1143
1144 This assumes the first argument of the function represents a host id.
1145
1146 @param func: The function to decorate
1147
1148 @returns: The function to replace func with.
1149 """
1150 def replacement(**kwargs):
1151 # Only keyword arguments can be accepted here, as we need the argument
1152 # names to send the rpc. serviceHandler always provides arguments with
1153 # their keywords, so this is not a problem.
1154 host = models.Host.smart_get(kwargs['id'])
Prashanth Balasubramanian8c98ac12014-12-23 11:26:44 -08001155 if host.shard and not server_utils.is_shard():
MK Ryu26f0c932015-05-28 18:14:33 -07001156 run_rpc_on_multiple_hostnames(func.func_name,
1157 [host.shard.rpc_hostname()],
Jakob Juelich50e91f72014-10-01 12:43:23 -07001158 **kwargs)
1159 return func(**kwargs)
1160
1161 return replacement
1162
1163
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001164def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
1165 """Fanout the given rpc to all shards.
1166
1167 @param host_objs: Host objects for the rpc.
1168 @param rpc_name: The name of the rpc.
1169 @param include_hostnames: If True, include the hostnames in the kwargs.
1170 Hostnames are not always necessary, this functions is designed to
1171 send rpcs to the shard a host is on, the rpcs themselves could be
1172 related to labels, acls etc.
1173 @param kwargs: The kwargs for the rpc.
1174 """
1175 # Fanout should only happen from the master to the shards.
1176 if server_utils.is_shard():
1177 return
1178
1179 # Figure out which hosts are on which shards.
1180 shard_host_map = bucket_hosts_by_shard(
1181 host_objs, rpc_hostnames=True)
1182
1183 # Execute the rpc against the appropriate shards.
1184 for shard, hostnames in shard_host_map.iteritems():
1185 if include_hostnames:
1186 kwargs['hosts'] = hostnames
MK Ryu84573e12015-02-18 15:54:09 -08001187 try:
1188 run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
1189 except:
1190 ei = sys.exc_info()
1191 new_exc = error.RPCException('RPC %s failed on shard %s due to '
1192 '%s: %s' % (rpc_name, shard, ei[0].__name__, ei[1]))
1193 raise new_exc.__class__, new_exc, ei[2]
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001194
1195
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001196def forward_multi_host_rpc_to_shards(func):
1197 """This decorator forwards rpc calls that modify multiple hosts.
1198
1199 If a host is assigned to a shard, rpcs that change his attributes should be
1200 forwarded to the shard. Some calls however, take a list of hosts and a
1201 single id to modify, eg: label_add_hosts. This wrapper will sift through
1202 the list of hosts, find each of their shards, and forward the rpc for
1203 those hosts to that shard before calling the local version of the given rpc.
1204
1205 This assumes:
1206 1. The rpc call uses `smart_get` to retrieve host objects, not the
1207 stock django `get` call. This is true for most, if not all rpcs in
1208 the rpc_interface.
1209 2. The kwargs to the function contain either a list of host ids or
1210 hostnames, keyed under 'hosts'. This is true for all the rpc
1211 functions that use 'smart_get'.
1212
1213 @param func: The function to decorate
1214
1215 @returns: The function to replace func with.
1216 """
1217 def replacement(**kwargs):
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001218 fanout_rpc(
1219 models.Host.smart_get_bulk(kwargs['hosts']),
1220 func.func_name, **kwargs)
Prashanth Balasubramanian5949b4a2014-11-23 12:58:30 -08001221 return func(**kwargs)
1222
1223 return replacement
1224
1225
Jakob Juelich50e91f72014-10-01 12:43:23 -07001226def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
1227 """Runs an rpc to multiple AFEs
1228
1229 This is i.e. used to propagate changes made to hosts after they are assigned
1230 to a shard.
1231
1232 @param rpc_call: Name of the rpc endpoint to call.
1233 @param shard_hostnames: List of hostnames to run the rpcs on.
1234 @param **kwargs: Keyword arguments to pass in the rpcs.
1235 """
1236 for shard_hostname in shard_hostnames:
1237 afe = frontend.AFE(server=shard_hostname)
1238 afe.run(rpc_call, **kwargs)
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001239
1240
1241def get_label(name):
1242 """Gets a label object using a given name.
1243
1244 @param name: Label name.
1245 @raises model.Label.DoesNotExist: when there is no label matching
1246 the given name.
1247 @return: a label object matching the given name.
1248 """
1249 try:
1250 label = models.Label.smart_get(name)
1251 except models.Label.DoesNotExist:
1252 return None
1253 return label
1254
1255
1256def get_global_afe_hostname():
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001257 """Read the hostname of the global AFE from the global configuration."""
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001258 return global_config.global_config.get_config_value(
1259 'SHARD', 'global_afe_hostname')
1260
1261
1262def route_rpc_to_master(rpc_name, **kwargs):
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001263 """Route RPC to master AFE.
MK Ryu2d107562015-02-24 17:45:02 -08001264
1265 @param rpc_name: The name of the rpc.
1266 @param **kwargs: The kwargs for the rpc.
J. Richard Barnettefdfcd662015-04-13 17:20:29 -07001267
MK Ryu2d107562015-02-24 17:45:02 -08001268 """
MK Ryu9c5fbbe2015-02-11 15:46:22 -08001269 master_afe = frontend.AFE(server=get_global_afe_hostname())
MK Ryu2d107562015-02-24 17:45:02 -08001270 return master_afe.run(rpc_name, **kwargs)