blob: ae331e631902de50fb760888a2f3f94511ae3bc9 [file] [log] [blame]
Prashanth B340fd1e2014-06-22 12:44:10 -07001import heapq
2import os
Prashanth B340fd1e2014-06-22 12:44:10 -07003import logging
Michael Liangda8c60a2014-06-03 13:24:51 -07004
5import common
Dan Shi5e2efb72017-02-07 11:40:23 -08006from autotest_lib.client.common_lib import error
7from autotest_lib.client.common_lib import global_config
8from autotest_lib.client.common_lib import utils
9from autotest_lib.scheduler import drones
10from autotest_lib.scheduler import drone_utility
MK Ryu7911ad52015-12-18 11:40:04 -080011from autotest_lib.scheduler import drone_task_queue
showard324bf812009-01-20 23:23:38 +000012from autotest_lib.scheduler import scheduler_config
Prashanth B340fd1e2014-06-22 12:44:10 -070013from autotest_lib.scheduler import thread_lib
showard170873e2009-01-07 00:22:26 +000014
Dan Shi5e2efb72017-02-07 11:40:23 -080015try:
16 from chromite.lib import metrics
17except ImportError:
18 metrics = utils.metrics_mock
19
showard170873e2009-01-07 00:22:26 +000020
showardc75fded2009-10-14 16:20:02 +000021# results on drones will be placed under the drone_installation_directory in a
22# directory with this name
23_DRONE_RESULTS_DIR_SUFFIX = 'results'
24
showarded2afea2009-07-07 20:54:07 +000025WORKING_DIRECTORY = object() # see execute_command()
26
showard8d3dbca2009-09-25 20:29:38 +000027
jamesrenc44ae992010-02-19 00:12:54 +000028AUTOSERV_PID_FILE = '.autoserv_execute'
29CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
30PARSER_PID_FILE = '.parser_execute'
31ARCHIVER_PID_FILE = '.archiver_execute'
32
33ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
34 ARCHIVER_PID_FILE)
35
MK Ryu7911ad52015-12-18 11:40:04 -080036_THREADED_DRONE_MANAGER = global_config.global_config.get_config_value(
37 scheduler_config.CONFIG_SECTION, 'threaded_drone_manager',
38 type=bool, default=True)
39
jamesrenc44ae992010-02-19 00:12:54 +000040
showard170873e2009-01-07 00:22:26 +000041class DroneManagerError(Exception):
42 pass
43
44
45class CustomEquals(object):
46 def _id(self):
47 raise NotImplementedError
48
49
50 def __eq__(self, other):
51 if not isinstance(other, type(self)):
52 return NotImplemented
53 return self._id() == other._id()
54
55
56 def __ne__(self, other):
57 return not self == other
58
59
60 def __hash__(self):
61 return hash(self._id())
62
63
64class Process(CustomEquals):
65 def __init__(self, hostname, pid, ppid=None):
66 self.hostname = hostname
67 self.pid = pid
68 self.ppid = ppid
69
70 def _id(self):
71 return (self.hostname, self.pid)
72
73
74 def __str__(self):
75 return '%s/%s' % (self.hostname, self.pid)
76
77
78 def __repr__(self):
79 return super(Process, self).__repr__() + '<%s>' % self
80
81
82class PidfileId(CustomEquals):
83 def __init__(self, path):
84 self.path = path
85
86
87 def _id(self):
88 return self.path
89
90
91 def __str__(self):
92 return str(self.path)
93
94
showardd1195652009-12-08 22:21:02 +000095class _PidfileInfo(object):
96 age = 0
97 num_processes = None
98
99
showard170873e2009-01-07 00:22:26 +0000100class PidfileContents(object):
101 process = None
102 exit_status = None
103 num_tests_failed = None
104
105 def is_invalid(self):
106 return False
107
108
showardd1195652009-12-08 22:21:02 +0000109 def is_running(self):
110 return self.process and not self.exit_status
111
112
showard170873e2009-01-07 00:22:26 +0000113class InvalidPidfile(object):
Simran Basi899f9fe2013-02-27 11:58:49 -0800114 process = None
115 exit_status = None
116 num_tests_failed = None
Simran Basi4d7bca22013-02-27 10:57:04 -0800117
118
Simran Basi899f9fe2013-02-27 11:58:49 -0800119 def __init__(self, error):
showard170873e2009-01-07 00:22:26 +0000120 self.error = error
121
122
123 def is_invalid(self):
124 return True
125
126
showardd1195652009-12-08 22:21:02 +0000127 def is_running(self):
128 return False
129
130
showard170873e2009-01-07 00:22:26 +0000131 def __str__(self):
132 return self.error
133
134
showard418785b2009-11-23 20:19:59 +0000135class _DroneHeapWrapper(object):
136 """Wrapper to compare drones based on used_capacity().
137
138 These objects can be used to keep a heap of drones by capacity.
139 """
140 def __init__(self, drone):
141 self.drone = drone
142
143
144 def __cmp__(self, other):
145 assert isinstance(other, _DroneHeapWrapper)
146 return cmp(self.drone.used_capacity(), other.drone.used_capacity())
147
148
Simran Basicced3092012-08-02 15:09:23 -0700149class BaseDroneManager(object):
showard170873e2009-01-07 00:22:26 +0000150 """
151 This class acts as an interface from the scheduler to drones, whether it be
152 only a single "drone" for localhost or multiple remote drones.
153
154 All paths going into and out of this class are relative to the full results
155 directory, except for those returns by absolute_path().
156 """
Fang Deng9a0c6c32013-09-04 15:34:55 -0700157
158
159 # Minimum time to wait before next email
160 # about a drone hitting process limit is sent.
161 NOTIFY_INTERVAL = 60 * 60 * 24 # one day
Prashanth B340fd1e2014-06-22 12:44:10 -0700162 _STATS_KEY = 'drone_manager'
Fang Deng9a0c6c32013-09-04 15:34:55 -0700163
Aviv Keshet99e6adb2016-07-14 16:35:32 -0700164 _ACTIVE_PROCESS_GAUGE = metrics.Gauge(
165 'chromeos/autotest/drone/active_processes')
166
Fang Deng9a0c6c32013-09-04 15:34:55 -0700167
showard170873e2009-01-07 00:22:26 +0000168 def __init__(self):
showardd1195652009-12-08 22:21:02 +0000169 # absolute path of base results dir
showard170873e2009-01-07 00:22:26 +0000170 self._results_dir = None
showardd1195652009-12-08 22:21:02 +0000171 # holds Process objects
showard170873e2009-01-07 00:22:26 +0000172 self._process_set = set()
Alex Millere76e2252013-08-15 09:24:27 -0700173 # holds the list of all processes running on all drones
174 self._all_processes = {}
showardd1195652009-12-08 22:21:02 +0000175 # maps PidfileId to PidfileContents
showard170873e2009-01-07 00:22:26 +0000176 self._pidfiles = {}
showardd1195652009-12-08 22:21:02 +0000177 # same as _pidfiles
showard170873e2009-01-07 00:22:26 +0000178 self._pidfiles_second_read = {}
showardd1195652009-12-08 22:21:02 +0000179 # maps PidfileId to _PidfileInfo
180 self._registered_pidfile_info = {}
181 # used to generate unique temporary paths
showard170873e2009-01-07 00:22:26 +0000182 self._temporary_path_counter = 0
showardd1195652009-12-08 22:21:02 +0000183 # maps hostname to Drone object
showard170873e2009-01-07 00:22:26 +0000184 self._drones = {}
185 self._results_drone = None
showardd1195652009-12-08 22:21:02 +0000186 # maps results dir to dict mapping file path to contents
showard170873e2009-01-07 00:22:26 +0000187 self._attached_files = {}
showard418785b2009-11-23 20:19:59 +0000188 # heapq of _DroneHeapWrappers
showard170873e2009-01-07 00:22:26 +0000189 self._drone_queue = []
Prashanth B340fd1e2014-06-22 12:44:10 -0700190 # A threaded task queue used to refresh drones asynchronously.
MK Ryu7911ad52015-12-18 11:40:04 -0800191 if _THREADED_DRONE_MANAGER:
192 self._refresh_task_queue = thread_lib.ThreadedTaskQueue(
193 name='%s.refresh_queue' % self._STATS_KEY)
194 else:
195 self._refresh_task_queue = drone_task_queue.DroneTaskQueue()
showard170873e2009-01-07 00:22:26 +0000196
197
198 def initialize(self, base_results_dir, drone_hostnames,
199 results_repository_hostname):
200 self._results_dir = base_results_dir
showard170873e2009-01-07 00:22:26 +0000201
202 for hostname in drone_hostnames:
Eric Li861b2d52011-02-04 14:50:35 -0800203 self._add_drone(hostname)
showard170873e2009-01-07 00:22:26 +0000204
205 if not self._drones:
206 # all drones failed to initialize
207 raise DroneManagerError('No valid drones found')
208
showard324bf812009-01-20 23:23:38 +0000209 self.refresh_drone_configs()
showardc5afc462009-01-13 00:09:39 +0000210
showard4460ee82009-07-07 20:54:29 +0000211 logging.info('Using results repository on %s',
showardb18134f2009-03-20 20:52:18 +0000212 results_repository_hostname)
showard170873e2009-01-07 00:22:26 +0000213 self._results_drone = drones.get_drone(results_repository_hostname)
showardac5b0002009-10-19 18:34:00 +0000214 results_installation_dir = global_config.global_config.get_config_value(
215 scheduler_config.CONFIG_SECTION,
216 'results_host_installation_directory', default=None)
217 if results_installation_dir:
218 self._results_drone.set_autotest_install_dir(
219 results_installation_dir)
showard170873e2009-01-07 00:22:26 +0000220 # don't initialize() the results drone - we don't want to clear out any
showardd1195652009-12-08 22:21:02 +0000221 # directories and we don't need to kill any processes
showard170873e2009-01-07 00:22:26 +0000222
223
224 def reinitialize_drones(self):
Prathmesh Prabhu36238622016-11-22 18:41:06 -0800225 for drone in self.get_drones():
226 with metrics.SecondsTimer(
227 'chromeos/autotest/drone_manager/'
228 'reinitialize_drones_duration',
229 fields={'drone': drone.hostname}):
230 drone.call('initialize', self._results_dir)
showard170873e2009-01-07 00:22:26 +0000231
232
233 def shutdown(self):
showard324bf812009-01-20 23:23:38 +0000234 for drone in self.get_drones():
showard170873e2009-01-07 00:22:26 +0000235 drone.shutdown()
236
237
showard8d3dbca2009-09-25 20:29:38 +0000238 def _get_max_pidfile_refreshes(self):
239 """
240 Normally refresh() is called on every monitor_db.Dispatcher.tick().
241
242 @returns: The number of refresh() calls before we forget a pidfile.
243 """
244 pidfile_timeout = global_config.global_config.get_config_value(
245 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
246 type=int, default=2000)
247 return pidfile_timeout
248
249
showard170873e2009-01-07 00:22:26 +0000250 def _add_drone(self, hostname):
Dan Shiec1d47d2015-02-13 11:38:13 -0800251 logging.info('Adding drone %s', hostname)
showard170873e2009-01-07 00:22:26 +0000252 drone = drones.get_drone(hostname)
Eric Li861b2d52011-02-04 14:50:35 -0800253 if drone:
254 self._drones[drone.hostname] = drone
255 drone.call('initialize', self.absolute_path(''))
showard170873e2009-01-07 00:22:26 +0000256
257
258 def _remove_drone(self, hostname):
259 self._drones.pop(hostname, None)
260
261
showard324bf812009-01-20 23:23:38 +0000262 def refresh_drone_configs(self):
showardc5afc462009-01-13 00:09:39 +0000263 """
showard324bf812009-01-20 23:23:38 +0000264 Reread global config options for all drones.
showardc5afc462009-01-13 00:09:39 +0000265 """
Dan Shib9144a42014-12-01 16:09:32 -0800266 # Import server_manager_utils is delayed rather than at the beginning of
267 # this module. The reason is that test_that imports drone_manager when
268 # importing autoserv_utils. The import is done before test_that setup
269 # django (test_that only setup django in setup_local_afe, since it's
270 # not needed when test_that runs the test in a lab duts through :lab:
271 # option. Therefore, if server_manager_utils is imported at the
272 # beginning of this module, test_that will fail since django is not
273 # setup yet.
274 from autotest_lib.site_utils import server_manager_utils
showard324bf812009-01-20 23:23:38 +0000275 config = global_config.global_config
276 section = scheduler_config.CONFIG_SECTION
277 config.parse_config_file()
showardc5afc462009-01-13 00:09:39 +0000278 for hostname, drone in self._drones.iteritems():
Dan Shib9144a42014-12-01 16:09:32 -0800279 if server_manager_utils.use_server_db():
280 server = server_manager_utils.get_servers(hostname=hostname)[0]
281 attributes = dict([(a.attribute, a.value)
282 for a in server.attributes.all()])
283 drone.enabled = (
284 int(attributes.get('disabled', 0)) == 0)
285 drone.max_processes = int(
286 attributes.get(
287 'max_processes',
288 scheduler_config.config.max_processes_per_drone))
289 allowed_users = attributes.get('users', None)
290 else:
291 disabled = config.get_config_value(
292 section, '%s_disabled' % hostname, default='')
293 drone.enabled = not bool(disabled)
Dan Shib9144a42014-12-01 16:09:32 -0800294 drone.max_processes = config.get_config_value(
295 section, '%s_max_processes' % hostname, type=int,
296 default=scheduler_config.config.max_processes_per_drone)
showard9bb960b2009-11-19 01:02:11 +0000297
Dan Shib9144a42014-12-01 16:09:32 -0800298 allowed_users = config.get_config_value(
299 section, '%s_users' % hostname, default=None)
300 if allowed_users:
301 drone.allowed_users = set(allowed_users.split())
302 else:
303 drone.allowed_users = None
304 logging.info('Drone %s.max_processes: %s', hostname,
305 drone.max_processes)
306 logging.info('Drone %s.enabled: %s', hostname, drone.enabled)
307 logging.info('Drone %s.allowed_users: %s', hostname,
308 drone.allowed_users)
Dan Shi37bee222015-04-13 15:46:47 -0700309 logging.info('Drone %s.support_ssp: %s', hostname,
310 drone.support_ssp)
showardc5afc462009-01-13 00:09:39 +0000311
showard418785b2009-11-23 20:19:59 +0000312 self._reorder_drone_queue() # max_processes may have changed
Fang Deng9a0c6c32013-09-04 15:34:55 -0700313 # Clear notification record about reaching max_processes limit.
314 self._notify_record = {}
showard418785b2009-11-23 20:19:59 +0000315
showardc5afc462009-01-13 00:09:39 +0000316
showard324bf812009-01-20 23:23:38 +0000317 def get_drones(self):
318 return self._drones.itervalues()
showardc5afc462009-01-13 00:09:39 +0000319
320
Dan Shic458f662015-04-29 12:12:38 -0700321 def cleanup_orphaned_containers(self):
322 """Queue cleanup_orphaned_containers call at each drone.
323 """
Dan Shi55d58992015-05-05 09:10:02 -0700324 for drone in self._drones.values():
325 logging.info('Queue cleanup_orphaned_containers at %s',
326 drone.hostname)
Dan Shic458f662015-04-29 12:12:38 -0700327 drone.queue_call('cleanup_orphaned_containers')
Dan Shic458f662015-04-29 12:12:38 -0700328
329
showard170873e2009-01-07 00:22:26 +0000330 def _get_drone_for_process(self, process):
showard170873e2009-01-07 00:22:26 +0000331 return self._drones[process.hostname]
332
333
334 def _get_drone_for_pidfile_id(self, pidfile_id):
335 pidfile_contents = self.get_pidfile_contents(pidfile_id)
336 assert pidfile_contents.process is not None
337 return self._get_drone_for_process(pidfile_contents.process)
338
339
340 def _drop_old_pidfiles(self):
showardd3496242009-12-10 21:41:43 +0000341 # use items() since the dict is modified in unregister_pidfile()
342 for pidfile_id, info in self._registered_pidfile_info.items():
showardd1195652009-12-08 22:21:02 +0000343 if info.age > self._get_max_pidfile_refreshes():
showardf85a0b72009-10-07 20:48:45 +0000344 logging.warning('dropping leaked pidfile %s', pidfile_id)
345 self.unregister_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000346 else:
showardd1195652009-12-08 22:21:02 +0000347 info.age += 1
showard170873e2009-01-07 00:22:26 +0000348
349
350 def _reset(self):
showard170873e2009-01-07 00:22:26 +0000351 self._process_set = set()
Alex Millere76e2252013-08-15 09:24:27 -0700352 self._all_processes = {}
showard170873e2009-01-07 00:22:26 +0000353 self._pidfiles = {}
354 self._pidfiles_second_read = {}
355 self._drone_queue = []
356
357
showard170873e2009-01-07 00:22:26 +0000358 def _parse_pidfile(self, drone, raw_contents):
Prashanth B340fd1e2014-06-22 12:44:10 -0700359 """Parse raw pidfile contents.
360
361 @param drone: The drone on which this pidfile was found.
362 @param raw_contents: The raw contents of a pidfile, eg:
363 "pid\nexit_staus\nnum_tests_failed\n".
364 """
showard170873e2009-01-07 00:22:26 +0000365 contents = PidfileContents()
366 if not raw_contents:
367 return contents
368 lines = raw_contents.splitlines()
369 if len(lines) > 3:
370 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
371 (len(lines), lines))
372 try:
373 pid = int(lines[0])
374 contents.process = Process(drone.hostname, pid)
375 # if len(lines) == 2, assume we caught Autoserv between writing
376 # exit_status and num_failed_tests, so just ignore it and wait for
377 # the next cycle
378 if len(lines) == 3:
379 contents.exit_status = int(lines[1])
380 contents.num_tests_failed = int(lines[2])
381 except ValueError, exc:
382 return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
383
384 return contents
385
386
387 def _process_pidfiles(self, drone, pidfiles, store_in_dict):
388 for pidfile_path, contents in pidfiles.iteritems():
389 pidfile_id = PidfileId(pidfile_path)
390 contents = self._parse_pidfile(drone, contents)
391 store_in_dict[pidfile_id] = contents
392
393
showard0205a3e2009-01-16 03:03:50 +0000394 def _add_process(self, drone, process_info):
395 process = Process(drone.hostname, int(process_info['pid']),
396 int(process_info['ppid']))
397 self._process_set.add(process)
showard0205a3e2009-01-16 03:03:50 +0000398
399
400 def _add_autoserv_process(self, drone, process_info):
401 assert process_info['comm'] == 'autoserv'
402 # only root autoserv processes have pgid == pid
403 if process_info['pgid'] != process_info['pid']:
404 return
showardd1195652009-12-08 22:21:02 +0000405 self._add_process(drone, process_info)
showard0205a3e2009-01-16 03:03:50 +0000406
407
showard324bf812009-01-20 23:23:38 +0000408 def _enqueue_drone(self, drone):
showard418785b2009-11-23 20:19:59 +0000409 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
410
411
412 def _reorder_drone_queue(self):
413 heapq.heapify(self._drone_queue)
showard324bf812009-01-20 23:23:38 +0000414
415
showardd1195652009-12-08 22:21:02 +0000416 def _compute_active_processes(self, drone):
417 drone.active_processes = 0
418 for pidfile_id, contents in self._pidfiles.iteritems():
419 is_running = contents.exit_status is None
420 on_this_drone = (contents.process
421 and contents.process.hostname == drone.hostname)
422 if is_running and on_this_drone:
423 info = self._registered_pidfile_info[pidfile_id]
424 if info.num_processes is not None:
425 drone.active_processes += info.num_processes
Aviv Keshet99e6adb2016-07-14 16:35:32 -0700426 self._ACTIVE_PROCESS_GAUGE.set(
427 drone.active_processes,
428 fields={'drone_hostname': drone.hostname})
showardd1195652009-12-08 22:21:02 +0000429
430
Fang Deng9a0c6c32013-09-04 15:34:55 -0700431 def _check_drone_process_limit(self, drone):
432 """
433 Notify if the number of processes on |drone| is approaching limit.
434
435 @param drone: A Drone object.
436 """
Alex Millerda713d92013-12-06 10:02:43 -0800437 try:
438 percent = float(drone.active_processes) / drone.max_processes
439 except ZeroDivisionError:
440 percent = 100
Prathmesh Prabhu21e09712016-12-20 11:50:26 -0800441 metrics.Float('chromeos/autotest/drone/active_process_percentage'
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800442 ).set(percent, fields={'drone_hostname': drone.hostname})
Fang Deng9a0c6c32013-09-04 15:34:55 -0700443
Prashanth B340fd1e2014-06-22 12:44:10 -0700444 def trigger_refresh(self):
445 """Triggers a drone manager refresh.
446
447 @raises DroneManagerError: If a drone has un-executed calls.
448 Since they will get clobbered when we queue refresh calls.
showard170873e2009-01-07 00:22:26 +0000449 """
450 self._reset()
showardbf9695d2009-07-06 20:22:24 +0000451 self._drop_old_pidfiles()
showardd1195652009-12-08 22:21:02 +0000452 pidfile_paths = [pidfile_id.path
453 for pidfile_id in self._registered_pidfile_info]
Prashanth B340fd1e2014-06-22 12:44:10 -0700454 drones = list(self.get_drones())
455 for drone in drones:
456 calls = drone.get_calls()
457 if calls:
458 raise DroneManagerError('Drone %s has un-executed calls: %s '
459 'which might get corrupted through '
460 'this invocation' %
461 (drone, [str(call) for call in calls]))
462 drone.queue_call('refresh', pidfile_paths)
463 logging.info("Invoking drone refresh.")
Aviv Keshet14cac442016-11-20 21:44:11 -0800464 with metrics.SecondsTimer(
465 'chromeos/autotest/drone_manager/trigger_refresh_duration'):
Prashanth B340fd1e2014-06-22 12:44:10 -0700466 self._refresh_task_queue.execute(drones, wait=False)
showard170873e2009-01-07 00:22:26 +0000467
Prashanth B340fd1e2014-06-22 12:44:10 -0700468
469 def sync_refresh(self):
470 """Complete the drone refresh started by trigger_refresh.
471
472 Waits for all drone threads then refreshes internal datastructures
473 with drone process information.
474 """
475
476 # This gives us a dictionary like what follows:
477 # {drone: [{'pidfiles': (raw contents of pidfile paths),
478 # 'autoserv_processes': (autoserv process info from ps),
479 # 'all_processes': (all process info from ps),
480 # 'parse_processes': (parse process infor from ps),
481 # 'pidfile_second_read': (pidfile contents, again),}]
482 # drone2: ...}
483 # The values of each drone are only a list because this adheres to the
484 # drone utility interface (each call is executed and its results are
485 # places in a list, but since we never couple the refresh calls with
486 # any other call, this list will always contain a single dict).
Aviv Keshet14cac442016-11-20 21:44:11 -0800487 with metrics.SecondsTimer(
488 'chromeos/autotest/drone_manager/sync_refresh_duration'):
Prashanth B340fd1e2014-06-22 12:44:10 -0700489 all_results = self._refresh_task_queue.get_results()
490 logging.info("Drones refreshed.")
491
492 # The loop below goes through and parses pidfile contents. Pidfiles
493 # are used to track autoserv execution, and will always contain < 3
494 # lines of the following: pid, exit code, number of tests. Each pidfile
495 # is identified by a PidfileId object, which contains a unique pidfile
496 # path (unique because it contains the job id) making it hashable.
497 # All pidfiles are stored in the drone managers _pidfiles dict as:
498 # {pidfile_id: pidfile_contents(Process(drone, pid),
499 # exit_code, num_tests_failed)}
500 # In handle agents, each agent knows its pidfile_id, and uses this
501 # to retrieve the refreshed contents of its pidfile via the
502 # PidfileRunMonitor (through its tick) before making decisions. If
503 # the agent notices that its process has exited, it unregisters the
504 # pidfile from the drone_managers._registered_pidfile_info dict
505 # through its epilog.
showard170873e2009-01-07 00:22:26 +0000506 for drone, results_list in all_results.iteritems():
507 results = results_list[0]
Alex Miller82d7a9f2014-05-16 14:43:32 -0700508 drone_hostname = drone.hostname.replace('.', '_')
showard0205a3e2009-01-16 03:03:50 +0000509
Aviv Keshet14cac442016-11-20 21:44:11 -0800510 for process_info in results['all_processes']:
511 if process_info['comm'] == 'autoserv':
512 self._add_autoserv_process(drone, process_info)
513 drone_pid = drone.hostname, int(process_info['pid'])
514 self._all_processes[drone_pid] = process_info
showard170873e2009-01-07 00:22:26 +0000515
Aviv Keshet14cac442016-11-20 21:44:11 -0800516 for process_info in results['parse_processes']:
517 self._add_process(drone, process_info)
Alex Miller6cbd7582014-05-14 19:06:52 -0700518
Aviv Keshet14cac442016-11-20 21:44:11 -0800519 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)
520 self._process_pidfiles(drone, results['pidfiles_second_read'],
521 self._pidfiles_second_read)
showard170873e2009-01-07 00:22:26 +0000522
showardd1195652009-12-08 22:21:02 +0000523 self._compute_active_processes(drone)
524 if drone.enabled:
525 self._enqueue_drone(drone)
Fang Deng9a0c6c32013-09-04 15:34:55 -0700526 self._check_drone_process_limit(drone)
showard170873e2009-01-07 00:22:26 +0000527
528
Prashanth B340fd1e2014-06-22 12:44:10 -0700529 def refresh(self):
530 """Refresh all drones."""
Aviv Keshet14cac442016-11-20 21:44:11 -0800531 with metrics.SecondsTimer(
532 'chromeos/autotest/drone_manager/refresh_duration'):
Prashanth B340fd1e2014-06-22 12:44:10 -0700533 self.trigger_refresh()
534 self.sync_refresh()
535
536
Prathmesh Prabhu36238622016-11-22 18:41:06 -0800537 @metrics.SecondsTimerDecorator(
538 'chromeos/autotest/drone_manager/execute_actions_duration')
showard170873e2009-01-07 00:22:26 +0000539 def execute_actions(self):
540 """
541 Called at the end of a scheduler cycle to execute all queued actions
542 on drones.
543 """
Prashanth B340fd1e2014-06-22 12:44:10 -0700544 # Invoke calls queued on all drones since the last call to execute
545 # and wait for them to return.
MK Ryu7911ad52015-12-18 11:40:04 -0800546 if _THREADED_DRONE_MANAGER:
547 thread_lib.ThreadedTaskQueue(
548 name='%s.execute_queue' % self._STATS_KEY).execute(
549 self._drones.values())
550 else:
551 drone_task_queue.DroneTaskQueue().execute(self._drones.values())
showard170873e2009-01-07 00:22:26 +0000552
553 try:
mbligh1ef218d2009-08-03 16:57:56 +0000554 self._results_drone.execute_queued_calls()
showard170873e2009-01-07 00:22:26 +0000555 except error.AutoservError:
Aviv Keshetc29b4c72016-12-14 22:27:35 -0800556 m = 'chromeos/autotest/errors/results_repository_failed'
557 metrics.Counter(m).increment(
558 fields={'drone_hostname': self._results_drone.hostname})
showard170873e2009-01-07 00:22:26 +0000559 self._results_drone.clear_call_queue()
560
561
562 def get_orphaned_autoserv_processes(self):
563 """
showardd3dc1992009-04-22 21:01:40 +0000564 Returns a set of Process objects for orphaned processes only.
showard170873e2009-01-07 00:22:26 +0000565 """
showardd3dc1992009-04-22 21:01:40 +0000566 return set(process for process in self._process_set
567 if process.ppid == 1)
showard170873e2009-01-07 00:22:26 +0000568
569
showard170873e2009-01-07 00:22:26 +0000570 def kill_process(self, process):
571 """
572 Kill the given process.
573 """
showardd3dc1992009-04-22 21:01:40 +0000574 logging.info('killing %s', process)
showard170873e2009-01-07 00:22:26 +0000575 drone = self._get_drone_for_process(process)
576 drone.queue_call('kill_process', process)
577
578
579 def _ensure_directory_exists(self, path):
580 if not os.path.exists(path):
581 os.makedirs(path)
582
583
showard324bf812009-01-20 23:23:38 +0000584 def total_running_processes(self):
585 return sum(drone.active_processes for drone in self.get_drones())
586
587
jamesren76fcf192010-04-21 20:39:50 +0000588 def max_runnable_processes(self, username, drone_hostnames_allowed):
showard324bf812009-01-20 23:23:38 +0000589 """
590 Return the maximum number of processes that can be run (in a single
591 execution) given the current load on drones.
showard9bb960b2009-11-19 01:02:11 +0000592 @param username: login of user to run a process. may be None.
jamesren76fcf192010-04-21 20:39:50 +0000593 @param drone_hostnames_allowed: list of drones that can be used. May be
594 None
showard324bf812009-01-20 23:23:38 +0000595 """
showard1b7142d2010-01-15 00:21:37 +0000596 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
jamesren76fcf192010-04-21 20:39:50 +0000597 if wrapper.drone.usable_by(username) and
598 (drone_hostnames_allowed is None or
599 wrapper.drone.hostname in
600 drone_hostnames_allowed)]
showard1b7142d2010-01-15 00:21:37 +0000601 if not usable_drone_wrappers:
602 # all drones disabled or inaccessible
showardde700d32009-02-25 00:12:42 +0000603 return 0
jamesren37b50452010-03-25 20:38:56 +0000604 runnable_processes = [
605 wrapper.drone.max_processes - wrapper.drone.active_processes
606 for wrapper in usable_drone_wrappers]
607 return max([0] + runnable_processes)
showard324bf812009-01-20 23:23:38 +0000608
609
showarde39ebe92009-06-18 23:14:48 +0000610 def _least_loaded_drone(self, drones):
611 drone_to_use = drones[0]
612 for drone in drones[1:]:
613 if drone.used_capacity() < drone_to_use.used_capacity():
614 drone_to_use = drone
615 return drone_to_use
616
617
jamesren76fcf192010-04-21 20:39:50 +0000618 def _choose_drone_for_execution(self, num_processes, username,
Dan Shiec1d47d2015-02-13 11:38:13 -0800619 drone_hostnames_allowed,
620 require_ssp=False):
621 """Choose a drone to execute command.
622
623 @param num_processes: Number of processes needed for execution.
624 @param username: Name of the user to execute the command.
625 @param drone_hostnames_allowed: A list of names of drone allowed.
626 @param require_ssp: Require server-side packaging to execute the,
627 command, default to False.
628
629 @return: A drone object to be used for execution.
630 """
showard324bf812009-01-20 23:23:38 +0000631 # cycle through drones is order of increasing used capacity until
632 # we find one that can handle these processes
633 checked_drones = []
jamesren37b50452010-03-25 20:38:56 +0000634 usable_drones = []
Dan Shiec1d47d2015-02-13 11:38:13 -0800635 # Drones do not support server-side packaging, used as backup if no
636 # drone is found to run command requires server-side packaging.
637 no_ssp_drones = []
showard324bf812009-01-20 23:23:38 +0000638 drone_to_use = None
639 while self._drone_queue:
showard418785b2009-11-23 20:19:59 +0000640 drone = heapq.heappop(self._drone_queue).drone
showard324bf812009-01-20 23:23:38 +0000641 checked_drones.append(drone)
Eric Lie0493a42010-11-15 13:05:43 -0800642 logging.info('Checking drone %s', drone.hostname)
showard9bb960b2009-11-19 01:02:11 +0000643 if not drone.usable_by(username):
644 continue
jamesren76fcf192010-04-21 20:39:50 +0000645
646 drone_allowed = (drone_hostnames_allowed is None
647 or drone.hostname in drone_hostnames_allowed)
648 if not drone_allowed:
Eric Lie0493a42010-11-15 13:05:43 -0800649 logging.debug('Drone %s not allowed: ', drone.hostname)
jamesren76fcf192010-04-21 20:39:50 +0000650 continue
Dan Shiec1d47d2015-02-13 11:38:13 -0800651 if require_ssp and not drone.support_ssp:
652 logging.debug('Drone %s does not support server-side '
653 'packaging.', drone.hostname)
654 no_ssp_drones.append(drone)
655 continue
jamesren76fcf192010-04-21 20:39:50 +0000656
jamesren37b50452010-03-25 20:38:56 +0000657 usable_drones.append(drone)
jamesren76fcf192010-04-21 20:39:50 +0000658
showard324bf812009-01-20 23:23:38 +0000659 if drone.active_processes + num_processes <= drone.max_processes:
660 drone_to_use = drone
661 break
Eric Lie0493a42010-11-15 13:05:43 -0800662 logging.info('Drone %s has %d active + %s requested > %s max',
663 drone.hostname, drone.active_processes, num_processes,
664 drone.max_processes)
showard324bf812009-01-20 23:23:38 +0000665
jamesren76fcf192010-04-21 20:39:50 +0000666 if not drone_to_use and usable_drones:
Dan Shiec1d47d2015-02-13 11:38:13 -0800667 # Drones are all over loaded, pick the one with least load.
showard324bf812009-01-20 23:23:38 +0000668 drone_summary = ','.join('%s %s/%s' % (drone.hostname,
669 drone.active_processes,
670 drone.max_processes)
jamesren37b50452010-03-25 20:38:56 +0000671 for drone in usable_drones)
672 logging.error('No drone has capacity to handle %d processes (%s) '
673 'for user %s', num_processes, drone_summary, username)
674 drone_to_use = self._least_loaded_drone(usable_drones)
Dan Shiec1d47d2015-02-13 11:38:13 -0800675 elif not drone_to_use and require_ssp and no_ssp_drones:
676 # No drone supports server-side packaging, choose the least loaded.
677 drone_to_use = self._least_loaded_drone(no_ssp_drones)
showarde39ebe92009-06-18 23:14:48 +0000678
showard324bf812009-01-20 23:23:38 +0000679 # refill _drone_queue
680 for drone in checked_drones:
681 self._enqueue_drone(drone)
682
showard170873e2009-01-07 00:22:26 +0000683 return drone_to_use
684
685
showarded2afea2009-07-07 20:54:07 +0000686 def _substitute_working_directory_into_command(self, command,
687 working_directory):
688 for i, item in enumerate(command):
689 if item is WORKING_DIRECTORY:
690 command[i] = working_directory
691
692
showardd3dc1992009-04-22 21:01:40 +0000693 def execute_command(self, command, working_directory, pidfile_name,
showard418785b2009-11-23 20:19:59 +0000694 num_processes, log_file=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000695 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000696 """
697 Execute the given command, taken as an argv list.
698
showarded2afea2009-07-07 20:54:07 +0000699 @param command: command to execute as a list. if any item is
700 WORKING_DIRECTORY, the absolute path to the working directory
701 will be substituted for it.
702 @param working_directory: directory in which the pidfile will be written
703 @param pidfile_name: name of the pidfile this process will write
showardd1195652009-12-08 22:21:02 +0000704 @param num_processes: number of processes to account for from this
705 execution
showarded2afea2009-07-07 20:54:07 +0000706 @param log_file (optional): path (in the results repository) to hold
707 command output.
708 @param paired_with_pidfile (optional): a PidfileId for an
709 already-executed process; the new process will execute on the
710 same drone as the previous process.
showard9bb960b2009-11-19 01:02:11 +0000711 @param username (optional): login of the user responsible for this
712 process.
jamesren76fcf192010-04-21 20:39:50 +0000713 @param drone_hostnames_allowed (optional): hostnames of the drones that
714 this command is allowed to
715 execute on
showard170873e2009-01-07 00:22:26 +0000716 """
showarddb502762009-09-09 15:31:20 +0000717 abs_working_directory = self.absolute_path(working_directory)
showard170873e2009-01-07 00:22:26 +0000718 if not log_file:
719 log_file = self.get_temporary_path('execute')
720 log_file = self.absolute_path(log_file)
showard170873e2009-01-07 00:22:26 +0000721
showarded2afea2009-07-07 20:54:07 +0000722 self._substitute_working_directory_into_command(command,
showarddb502762009-09-09 15:31:20 +0000723 abs_working_directory)
showarded2afea2009-07-07 20:54:07 +0000724
showard170873e2009-01-07 00:22:26 +0000725 if paired_with_pidfile:
726 drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
727 else:
Dan Shiec1d47d2015-02-13 11:38:13 -0800728 require_ssp = '--require-ssp' in command
729 drone = self._choose_drone_for_execution(
730 num_processes, username, drone_hostnames_allowed,
731 require_ssp=require_ssp)
732 # Enable --warn-no-ssp option for autoserv to log a warning and run
733 # the command without using server-side packaging.
734 if require_ssp and not drone.support_ssp:
735 command.append('--warn-no-ssp')
jamesren76fcf192010-04-21 20:39:50 +0000736
737 if not drone:
738 raise DroneManagerError('command failed; no drones available: %s'
739 % command)
740
Dan Shiec1d47d2015-02-13 11:38:13 -0800741 logging.info("command = %s", command)
742 logging.info('log file = %s:%s', drone.hostname, log_file)
showarddb502762009-09-09 15:31:20 +0000743 self._write_attached_files(working_directory, drone)
744 drone.queue_call('execute_command', command, abs_working_directory,
showard170873e2009-01-07 00:22:26 +0000745 log_file, pidfile_name)
showard418785b2009-11-23 20:19:59 +0000746 drone.active_processes += num_processes
747 self._reorder_drone_queue()
showard170873e2009-01-07 00:22:26 +0000748
showard42d44982009-10-12 20:34:03 +0000749 pidfile_path = os.path.join(abs_working_directory, pidfile_name)
showard170873e2009-01-07 00:22:26 +0000750 pidfile_id = PidfileId(pidfile_path)
751 self.register_pidfile(pidfile_id)
showardd1195652009-12-08 22:21:02 +0000752 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
showard170873e2009-01-07 00:22:26 +0000753 return pidfile_id
754
755
showardd3dc1992009-04-22 21:01:40 +0000756 def get_pidfile_id_from(self, execution_tag, pidfile_name):
757 path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
showard170873e2009-01-07 00:22:26 +0000758 return PidfileId(path)
759
760
761 def register_pidfile(self, pidfile_id):
762 """
763 Indicate that the DroneManager should look for the given pidfile when
764 refreshing.
765 """
showardd1195652009-12-08 22:21:02 +0000766 if pidfile_id not in self._registered_pidfile_info:
showard37399782009-08-20 23:32:20 +0000767 logging.info('monitoring pidfile %s', pidfile_id)
showardd1195652009-12-08 22:21:02 +0000768 self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
showardc6fb6042010-01-25 21:48:20 +0000769 self._reset_pidfile_age(pidfile_id)
770
771
772 def _reset_pidfile_age(self, pidfile_id):
773 if pidfile_id in self._registered_pidfile_info:
774 self._registered_pidfile_info[pidfile_id].age = 0
showard170873e2009-01-07 00:22:26 +0000775
776
showardf85a0b72009-10-07 20:48:45 +0000777 def unregister_pidfile(self, pidfile_id):
showardd1195652009-12-08 22:21:02 +0000778 if pidfile_id in self._registered_pidfile_info:
showardf85a0b72009-10-07 20:48:45 +0000779 logging.info('forgetting pidfile %s', pidfile_id)
showardd1195652009-12-08 22:21:02 +0000780 del self._registered_pidfile_info[pidfile_id]
781
782
783 def declare_process_count(self, pidfile_id, num_processes):
784 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
showardf85a0b72009-10-07 20:48:45 +0000785
786
showard170873e2009-01-07 00:22:26 +0000787 def get_pidfile_contents(self, pidfile_id, use_second_read=False):
788 """
789 Retrieve a PidfileContents object for the given pidfile_id. If
790 use_second_read is True, use results that were read after the processes
791 were checked, instead of before.
792 """
showardc6fb6042010-01-25 21:48:20 +0000793 self._reset_pidfile_age(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000794 if use_second_read:
795 pidfile_map = self._pidfiles_second_read
796 else:
797 pidfile_map = self._pidfiles
798 return pidfile_map.get(pidfile_id, PidfileContents())
799
800
801 def is_process_running(self, process):
802 """
803 Check if the given process is in the running process list.
804 """
Alex Millere76e2252013-08-15 09:24:27 -0700805 if process in self._process_set:
806 return True
807
Alex Miller06a5f752013-08-15 11:16:40 -0700808 drone_pid = process.hostname, process.pid
Alex Millere76e2252013-08-15 09:24:27 -0700809 if drone_pid in self._all_processes:
810 logging.error('Process %s found, but not an autoserv process. '
811 'Is %s', process, self._all_processes[drone_pid])
812 return True
813
814 return False
showard170873e2009-01-07 00:22:26 +0000815
816
817 def get_temporary_path(self, base_name):
818 """
819 Get a new temporary path guaranteed to be unique across all drones
820 for this scheduler execution.
821 """
822 self._temporary_path_counter += 1
823 return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
824 '%s.%s' % (base_name, self._temporary_path_counter))
825
826
showard42d44982009-10-12 20:34:03 +0000827 def absolute_path(self, path, on_results_repository=False):
828 if on_results_repository:
829 base_dir = self._results_dir
830 else:
showardc75fded2009-10-14 16:20:02 +0000831 base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
832 _DRONE_RESULTS_DIR_SUFFIX)
showard42d44982009-10-12 20:34:03 +0000833 return os.path.join(base_dir, path)
showard170873e2009-01-07 00:22:26 +0000834
835
showard678df4f2009-02-04 21:36:39 +0000836 def _copy_results_helper(self, process, source_path, destination_path,
837 to_results_repository=False):
Simran Basi882f15b2013-10-29 14:59:34 -0700838 logging.debug('_copy_results_helper. process: %s, source_path: %s, '
839 'destination_path: %s, to_results_repository: %s',
840 process, source_path, destination_path,
841 to_results_repository)
showard678df4f2009-02-04 21:36:39 +0000842 full_source = self.absolute_path(source_path)
showard42d44982009-10-12 20:34:03 +0000843 full_destination = self.absolute_path(
844 destination_path, on_results_repository=to_results_repository)
showard678df4f2009-02-04 21:36:39 +0000845 source_drone = self._get_drone_for_process(process)
846 if to_results_repository:
847 source_drone.send_file_to(self._results_drone, full_source,
848 full_destination, can_fail=True)
849 else:
850 source_drone.queue_call('copy_file_or_directory', full_source,
851 full_destination)
852
853
showard170873e2009-01-07 00:22:26 +0000854 def copy_to_results_repository(self, process, source_path,
855 destination_path=None):
856 """
857 Copy results from the given process at source_path to destination_path
858 in the results repository.
859 """
860 if destination_path is None:
861 destination_path = source_path
showard678df4f2009-02-04 21:36:39 +0000862 self._copy_results_helper(process, source_path, destination_path,
863 to_results_repository=True)
864
865
866 def copy_results_on_drone(self, process, source_path, destination_path):
867 """
868 Copy a results directory from one place to another on the drone.
869 """
870 self._copy_results_helper(process, source_path, destination_path)
showard170873e2009-01-07 00:22:26 +0000871
872
showarddb502762009-09-09 15:31:20 +0000873 def _write_attached_files(self, results_dir, drone):
874 attached_files = self._attached_files.pop(results_dir, {})
showard73ec0442009-02-07 02:05:20 +0000875 for file_path, contents in attached_files.iteritems():
showard170873e2009-01-07 00:22:26 +0000876 drone.queue_call('write_to_file', self.absolute_path(file_path),
877 contents)
878
879
showarddb502762009-09-09 15:31:20 +0000880 def attach_file_to_execution(self, results_dir, file_contents,
showard170873e2009-01-07 00:22:26 +0000881 file_path=None):
882 """
showarddb502762009-09-09 15:31:20 +0000883 When the process for the results directory is executed, the given file
884 contents will be placed in a file on the drone. Returns the path at
885 which the file will be placed.
showard170873e2009-01-07 00:22:26 +0000886 """
887 if not file_path:
888 file_path = self.get_temporary_path('attach')
showarddb502762009-09-09 15:31:20 +0000889 files_for_execution = self._attached_files.setdefault(results_dir, {})
showard73ec0442009-02-07 02:05:20 +0000890 assert file_path not in files_for_execution
891 files_for_execution[file_path] = file_contents
showard170873e2009-01-07 00:22:26 +0000892 return file_path
893
894
showard35162b02009-03-03 02:17:30 +0000895 def write_lines_to_file(self, file_path, lines, paired_with_process=None):
showard170873e2009-01-07 00:22:26 +0000896 """
897 Write the given lines (as a list of strings) to a file. If
showard35162b02009-03-03 02:17:30 +0000898 paired_with_process is given, the file will be written on the drone
899 running the given Process. Otherwise, the file will be written to the
showard170873e2009-01-07 00:22:26 +0000900 results repository.
901 """
showard170873e2009-01-07 00:22:26 +0000902 file_contents = '\n'.join(lines) + '\n'
showard35162b02009-03-03 02:17:30 +0000903 if paired_with_process:
904 drone = self._get_drone_for_process(paired_with_process)
showard42d44982009-10-12 20:34:03 +0000905 on_results_repository = False
showard170873e2009-01-07 00:22:26 +0000906 else:
907 drone = self._results_drone
showard42d44982009-10-12 20:34:03 +0000908 on_results_repository = True
909 full_path = self.absolute_path(
910 file_path, on_results_repository=on_results_repository)
showard170873e2009-01-07 00:22:26 +0000911 drone.queue_call('write_to_file', full_path, file_contents)
jamesrenc44ae992010-02-19 00:12:54 +0000912
913
Simran Basicced3092012-08-02 15:09:23 -0700914SiteDroneManager = utils.import_site_class(
915 __file__, 'autotest_lib.scheduler.site_drone_manager',
916 'SiteDroneManager', BaseDroneManager)
917
918
919class DroneManager(SiteDroneManager):
920 pass
921
922
jamesrenc44ae992010-02-19 00:12:54 +0000923_the_instance = None
924
925def instance():
926 if _the_instance is None:
927 _set_instance(DroneManager())
928 return _the_instance
929
930
931def _set_instance(instance): # usable for testing
932 global _the_instance
933 _the_instance = instance