blob: 23fcff2569308a1fd8d92e66350684aa6e22e0d8 [file] [log] [blame]
Prashanth B340fd1e2014-06-22 12:44:10 -07001import collections
2import heapq
3import os
4import Queue
5import time
6import threading
7import traceback
8import logging
Michael Liangda8c60a2014-06-03 13:24:51 -07009
10import common
Simran Basicced3092012-08-02 15:09:23 -070011from autotest_lib.client.common_lib import error, global_config, utils
Michael Liangda8c60a2014-06-03 13:24:51 -070012from autotest_lib.client.common_lib.cros.graphite import stats
showard170873e2009-01-07 00:22:26 +000013from autotest_lib.scheduler import email_manager, drone_utility, drones
showard324bf812009-01-20 23:23:38 +000014from autotest_lib.scheduler import scheduler_config
Prashanth B340fd1e2014-06-22 12:44:10 -070015from autotest_lib.scheduler import thread_lib
showard170873e2009-01-07 00:22:26 +000016
showard170873e2009-01-07 00:22:26 +000017
showardc75fded2009-10-14 16:20:02 +000018# results on drones will be placed under the drone_installation_directory in a
19# directory with this name
20_DRONE_RESULTS_DIR_SUFFIX = 'results'
21
showarded2afea2009-07-07 20:54:07 +000022WORKING_DIRECTORY = object() # see execute_command()
23
showard8d3dbca2009-09-25 20:29:38 +000024
jamesrenc44ae992010-02-19 00:12:54 +000025AUTOSERV_PID_FILE = '.autoserv_execute'
26CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
27PARSER_PID_FILE = '.parser_execute'
28ARCHIVER_PID_FILE = '.archiver_execute'
29
30ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
31 ARCHIVER_PID_FILE)
32
33
showard170873e2009-01-07 00:22:26 +000034class DroneManagerError(Exception):
35 pass
36
37
38class CustomEquals(object):
39 def _id(self):
40 raise NotImplementedError
41
42
43 def __eq__(self, other):
44 if not isinstance(other, type(self)):
45 return NotImplemented
46 return self._id() == other._id()
47
48
49 def __ne__(self, other):
50 return not self == other
51
52
53 def __hash__(self):
54 return hash(self._id())
55
56
57class Process(CustomEquals):
58 def __init__(self, hostname, pid, ppid=None):
59 self.hostname = hostname
60 self.pid = pid
61 self.ppid = ppid
62
63 def _id(self):
64 return (self.hostname, self.pid)
65
66
67 def __str__(self):
68 return '%s/%s' % (self.hostname, self.pid)
69
70
71 def __repr__(self):
72 return super(Process, self).__repr__() + '<%s>' % self
73
74
75class PidfileId(CustomEquals):
76 def __init__(self, path):
77 self.path = path
78
79
80 def _id(self):
81 return self.path
82
83
84 def __str__(self):
85 return str(self.path)
86
87
showardd1195652009-12-08 22:21:02 +000088class _PidfileInfo(object):
89 age = 0
90 num_processes = None
91
92
showard170873e2009-01-07 00:22:26 +000093class PidfileContents(object):
94 process = None
95 exit_status = None
96 num_tests_failed = None
97
98 def is_invalid(self):
99 return False
100
101
showardd1195652009-12-08 22:21:02 +0000102 def is_running(self):
103 return self.process and not self.exit_status
104
105
showard170873e2009-01-07 00:22:26 +0000106class InvalidPidfile(object):
Simran Basi899f9fe2013-02-27 11:58:49 -0800107 process = None
108 exit_status = None
109 num_tests_failed = None
Simran Basi4d7bca22013-02-27 10:57:04 -0800110
111
Simran Basi899f9fe2013-02-27 11:58:49 -0800112 def __init__(self, error):
showard170873e2009-01-07 00:22:26 +0000113 self.error = error
114
115
116 def is_invalid(self):
117 return True
118
119
showardd1195652009-12-08 22:21:02 +0000120 def is_running(self):
121 return False
122
123
showard170873e2009-01-07 00:22:26 +0000124 def __str__(self):
125 return self.error
126
127
showard418785b2009-11-23 20:19:59 +0000128class _DroneHeapWrapper(object):
129 """Wrapper to compare drones based on used_capacity().
130
131 These objects can be used to keep a heap of drones by capacity.
132 """
133 def __init__(self, drone):
134 self.drone = drone
135
136
137 def __cmp__(self, other):
138 assert isinstance(other, _DroneHeapWrapper)
139 return cmp(self.drone.used_capacity(), other.drone.used_capacity())
140
141
Simran Basicced3092012-08-02 15:09:23 -0700142class BaseDroneManager(object):
showard170873e2009-01-07 00:22:26 +0000143 """
144 This class acts as an interface from the scheduler to drones, whether it be
145 only a single "drone" for localhost or multiple remote drones.
146
147 All paths going into and out of this class are relative to the full results
148 directory, except for those returns by absolute_path().
149 """
Fang Deng9a0c6c32013-09-04 15:34:55 -0700150
151
152 # Minimum time to wait before next email
153 # about a drone hitting process limit is sent.
154 NOTIFY_INTERVAL = 60 * 60 * 24 # one day
Prashanth B340fd1e2014-06-22 12:44:10 -0700155 _STATS_KEY = 'drone_manager'
156 _timer = stats.Timer(_STATS_KEY)
Fang Deng9a0c6c32013-09-04 15:34:55 -0700157
158
showard170873e2009-01-07 00:22:26 +0000159 def __init__(self):
showardd1195652009-12-08 22:21:02 +0000160 # absolute path of base results dir
showard170873e2009-01-07 00:22:26 +0000161 self._results_dir = None
showardd1195652009-12-08 22:21:02 +0000162 # holds Process objects
showard170873e2009-01-07 00:22:26 +0000163 self._process_set = set()
Alex Millere76e2252013-08-15 09:24:27 -0700164 # holds the list of all processes running on all drones
165 self._all_processes = {}
showardd1195652009-12-08 22:21:02 +0000166 # maps PidfileId to PidfileContents
showard170873e2009-01-07 00:22:26 +0000167 self._pidfiles = {}
showardd1195652009-12-08 22:21:02 +0000168 # same as _pidfiles
showard170873e2009-01-07 00:22:26 +0000169 self._pidfiles_second_read = {}
showardd1195652009-12-08 22:21:02 +0000170 # maps PidfileId to _PidfileInfo
171 self._registered_pidfile_info = {}
172 # used to generate unique temporary paths
showard170873e2009-01-07 00:22:26 +0000173 self._temporary_path_counter = 0
showardd1195652009-12-08 22:21:02 +0000174 # maps hostname to Drone object
showard170873e2009-01-07 00:22:26 +0000175 self._drones = {}
176 self._results_drone = None
showardd1195652009-12-08 22:21:02 +0000177 # maps results dir to dict mapping file path to contents
showard170873e2009-01-07 00:22:26 +0000178 self._attached_files = {}
showard418785b2009-11-23 20:19:59 +0000179 # heapq of _DroneHeapWrappers
showard170873e2009-01-07 00:22:26 +0000180 self._drone_queue = []
Fang Deng9a0c6c32013-09-04 15:34:55 -0700181 # map drone hostname to time stamp of email that
182 # has been sent about the drone hitting process limit.
183 self._notify_record = {}
Prashanth B340fd1e2014-06-22 12:44:10 -0700184 # A threaded task queue used to refresh drones asynchronously.
185 self._refresh_task_queue = thread_lib.ThreadedTaskQueue(
186 name='%s.refresh_queue' % self._STATS_KEY)
showard170873e2009-01-07 00:22:26 +0000187
188
189 def initialize(self, base_results_dir, drone_hostnames,
190 results_repository_hostname):
191 self._results_dir = base_results_dir
showard170873e2009-01-07 00:22:26 +0000192
193 for hostname in drone_hostnames:
Eric Li861b2d52011-02-04 14:50:35 -0800194 self._add_drone(hostname)
showard170873e2009-01-07 00:22:26 +0000195
196 if not self._drones:
197 # all drones failed to initialize
198 raise DroneManagerError('No valid drones found')
199
showard324bf812009-01-20 23:23:38 +0000200 self.refresh_drone_configs()
showardc5afc462009-01-13 00:09:39 +0000201
showard4460ee82009-07-07 20:54:29 +0000202 logging.info('Using results repository on %s',
showardb18134f2009-03-20 20:52:18 +0000203 results_repository_hostname)
showard170873e2009-01-07 00:22:26 +0000204 self._results_drone = drones.get_drone(results_repository_hostname)
showardac5b0002009-10-19 18:34:00 +0000205 results_installation_dir = global_config.global_config.get_config_value(
206 scheduler_config.CONFIG_SECTION,
207 'results_host_installation_directory', default=None)
208 if results_installation_dir:
209 self._results_drone.set_autotest_install_dir(
210 results_installation_dir)
showard170873e2009-01-07 00:22:26 +0000211 # don't initialize() the results drone - we don't want to clear out any
showardd1195652009-12-08 22:21:02 +0000212 # directories and we don't need to kill any processes
showard170873e2009-01-07 00:22:26 +0000213
214
215 def reinitialize_drones(self):
216 self._call_all_drones('initialize', self._results_dir)
217
218
219 def shutdown(self):
showard324bf812009-01-20 23:23:38 +0000220 for drone in self.get_drones():
showard170873e2009-01-07 00:22:26 +0000221 drone.shutdown()
222
223
showard8d3dbca2009-09-25 20:29:38 +0000224 def _get_max_pidfile_refreshes(self):
225 """
226 Normally refresh() is called on every monitor_db.Dispatcher.tick().
227
228 @returns: The number of refresh() calls before we forget a pidfile.
229 """
230 pidfile_timeout = global_config.global_config.get_config_value(
231 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
232 type=int, default=2000)
233 return pidfile_timeout
234
235
showard170873e2009-01-07 00:22:26 +0000236 def _add_drone(self, hostname):
showardb18134f2009-03-20 20:52:18 +0000237 logging.info('Adding drone %s' % hostname)
showard170873e2009-01-07 00:22:26 +0000238 drone = drones.get_drone(hostname)
Eric Li861b2d52011-02-04 14:50:35 -0800239 if drone:
240 self._drones[drone.hostname] = drone
241 drone.call('initialize', self.absolute_path(''))
showard170873e2009-01-07 00:22:26 +0000242
243
244 def _remove_drone(self, hostname):
245 self._drones.pop(hostname, None)
246
247
showard324bf812009-01-20 23:23:38 +0000248 def refresh_drone_configs(self):
showardc5afc462009-01-13 00:09:39 +0000249 """
showard324bf812009-01-20 23:23:38 +0000250 Reread global config options for all drones.
showardc5afc462009-01-13 00:09:39 +0000251 """
showard324bf812009-01-20 23:23:38 +0000252 config = global_config.global_config
253 section = scheduler_config.CONFIG_SECTION
254 config.parse_config_file()
showardc5afc462009-01-13 00:09:39 +0000255 for hostname, drone in self._drones.iteritems():
showard324bf812009-01-20 23:23:38 +0000256 disabled = config.get_config_value(
showard9bb960b2009-11-19 01:02:11 +0000257 section, '%s_disabled' % hostname, default='')
showardc5afc462009-01-13 00:09:39 +0000258 drone.enabled = not bool(disabled)
259
showard324bf812009-01-20 23:23:38 +0000260 drone.max_processes = config.get_config_value(
showard9bb960b2009-11-19 01:02:11 +0000261 section, '%s_max_processes' % hostname, type=int,
262 default=scheduler_config.config.max_processes_per_drone)
263
264 allowed_users = config.get_config_value(
265 section, '%s_users' % hostname, default=None)
266 if allowed_users is not None:
showard1b7142d2010-01-15 00:21:37 +0000267 allowed_users = set(allowed_users.split())
268 drone.allowed_users = allowed_users
showardc5afc462009-01-13 00:09:39 +0000269
showard418785b2009-11-23 20:19:59 +0000270 self._reorder_drone_queue() # max_processes may have changed
Fang Deng9a0c6c32013-09-04 15:34:55 -0700271 # Clear notification record about reaching max_processes limit.
272 self._notify_record = {}
showard418785b2009-11-23 20:19:59 +0000273
showardc5afc462009-01-13 00:09:39 +0000274
showard324bf812009-01-20 23:23:38 +0000275 def get_drones(self):
276 return self._drones.itervalues()
showardc5afc462009-01-13 00:09:39 +0000277
278
showard170873e2009-01-07 00:22:26 +0000279 def _get_drone_for_process(self, process):
showard170873e2009-01-07 00:22:26 +0000280 return self._drones[process.hostname]
281
282
283 def _get_drone_for_pidfile_id(self, pidfile_id):
284 pidfile_contents = self.get_pidfile_contents(pidfile_id)
285 assert pidfile_contents.process is not None
286 return self._get_drone_for_process(pidfile_contents.process)
287
288
289 def _drop_old_pidfiles(self):
showardd3496242009-12-10 21:41:43 +0000290 # use items() since the dict is modified in unregister_pidfile()
291 for pidfile_id, info in self._registered_pidfile_info.items():
showardd1195652009-12-08 22:21:02 +0000292 if info.age > self._get_max_pidfile_refreshes():
showardf85a0b72009-10-07 20:48:45 +0000293 logging.warning('dropping leaked pidfile %s', pidfile_id)
294 self.unregister_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000295 else:
showardd1195652009-12-08 22:21:02 +0000296 info.age += 1
showard170873e2009-01-07 00:22:26 +0000297
298
299 def _reset(self):
showard170873e2009-01-07 00:22:26 +0000300 self._process_set = set()
Alex Millere76e2252013-08-15 09:24:27 -0700301 self._all_processes = {}
showard170873e2009-01-07 00:22:26 +0000302 self._pidfiles = {}
303 self._pidfiles_second_read = {}
304 self._drone_queue = []
305
306
307 def _call_all_drones(self, method, *args, **kwargs):
308 all_results = {}
showard324bf812009-01-20 23:23:38 +0000309 for drone in self.get_drones():
Prashanth Bf78a6fb2014-06-10 16:09:40 -0700310 with self._timer.get_client(
311 '%s.%s' % (drone.hostname.replace('.', '_'), method)):
312 all_results[drone] = drone.call(method, *args, **kwargs)
showard170873e2009-01-07 00:22:26 +0000313 return all_results
314
315
316 def _parse_pidfile(self, drone, raw_contents):
Prashanth B340fd1e2014-06-22 12:44:10 -0700317 """Parse raw pidfile contents.
318
319 @param drone: The drone on which this pidfile was found.
320 @param raw_contents: The raw contents of a pidfile, eg:
321 "pid\nexit_staus\nnum_tests_failed\n".
322 """
showard170873e2009-01-07 00:22:26 +0000323 contents = PidfileContents()
324 if not raw_contents:
325 return contents
326 lines = raw_contents.splitlines()
327 if len(lines) > 3:
328 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
329 (len(lines), lines))
330 try:
331 pid = int(lines[0])
332 contents.process = Process(drone.hostname, pid)
333 # if len(lines) == 2, assume we caught Autoserv between writing
334 # exit_status and num_failed_tests, so just ignore it and wait for
335 # the next cycle
336 if len(lines) == 3:
337 contents.exit_status = int(lines[1])
338 contents.num_tests_failed = int(lines[2])
339 except ValueError, exc:
340 return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
341
342 return contents
343
344
345 def _process_pidfiles(self, drone, pidfiles, store_in_dict):
346 for pidfile_path, contents in pidfiles.iteritems():
347 pidfile_id = PidfileId(pidfile_path)
348 contents = self._parse_pidfile(drone, contents)
349 store_in_dict[pidfile_id] = contents
350
351
showard0205a3e2009-01-16 03:03:50 +0000352 def _add_process(self, drone, process_info):
353 process = Process(drone.hostname, int(process_info['pid']),
354 int(process_info['ppid']))
355 self._process_set.add(process)
showard0205a3e2009-01-16 03:03:50 +0000356
357
358 def _add_autoserv_process(self, drone, process_info):
359 assert process_info['comm'] == 'autoserv'
360 # only root autoserv processes have pgid == pid
361 if process_info['pgid'] != process_info['pid']:
362 return
showardd1195652009-12-08 22:21:02 +0000363 self._add_process(drone, process_info)
showard0205a3e2009-01-16 03:03:50 +0000364
365
showard324bf812009-01-20 23:23:38 +0000366 def _enqueue_drone(self, drone):
showard418785b2009-11-23 20:19:59 +0000367 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
368
369
370 def _reorder_drone_queue(self):
371 heapq.heapify(self._drone_queue)
showard324bf812009-01-20 23:23:38 +0000372
373
showardd1195652009-12-08 22:21:02 +0000374 def _compute_active_processes(self, drone):
375 drone.active_processes = 0
376 for pidfile_id, contents in self._pidfiles.iteritems():
377 is_running = contents.exit_status is None
378 on_this_drone = (contents.process
379 and contents.process.hostname == drone.hostname)
380 if is_running and on_this_drone:
381 info = self._registered_pidfile_info[pidfile_id]
382 if info.num_processes is not None:
383 drone.active_processes += info.num_processes
384
385
Fang Deng9a0c6c32013-09-04 15:34:55 -0700386 def _check_drone_process_limit(self, drone):
387 """
388 Notify if the number of processes on |drone| is approaching limit.
389
390 @param drone: A Drone object.
391 """
Alex Millerda713d92013-12-06 10:02:43 -0800392 try:
393 percent = float(drone.active_processes) / drone.max_processes
394 except ZeroDivisionError:
395 percent = 100
Fang Deng9a0c6c32013-09-04 15:34:55 -0700396 max_percent = scheduler_config.config.max_processes_warning_threshold
397 if percent >= max_percent:
398 message = ('Drone %s is hitting %s of process limit.' %
399 (drone.hostname, format(percent, '.2%')))
400 logging.warning(message)
401 last_notified = self._notify_record.get(drone.hostname, 0)
402 now = time.time()
403 if last_notified + BaseDroneManager.NOTIFY_INTERVAL < now:
404 body = ('Active processes/Process limit: %d/%d (%s)' %
405 (drone.active_processes, drone.max_processes,
406 format(percent, '.2%')))
407 email_manager.manager.enqueue_notify_email(message, body)
408 self._notify_record[drone.hostname] = now
409
410
Prashanth B340fd1e2014-06-22 12:44:10 -0700411 def trigger_refresh(self):
412 """Triggers a drone manager refresh.
413
414 @raises DroneManagerError: If a drone has un-executed calls.
415 Since they will get clobbered when we queue refresh calls.
showard170873e2009-01-07 00:22:26 +0000416 """
417 self._reset()
showardbf9695d2009-07-06 20:22:24 +0000418 self._drop_old_pidfiles()
showardd1195652009-12-08 22:21:02 +0000419 pidfile_paths = [pidfile_id.path
420 for pidfile_id in self._registered_pidfile_info]
Prashanth B340fd1e2014-06-22 12:44:10 -0700421 drones = list(self.get_drones())
422 for drone in drones:
423 calls = drone.get_calls()
424 if calls:
425 raise DroneManagerError('Drone %s has un-executed calls: %s '
426 'which might get corrupted through '
427 'this invocation' %
428 (drone, [str(call) for call in calls]))
429 drone.queue_call('refresh', pidfile_paths)
430 logging.info("Invoking drone refresh.")
431 with self._timer.get_client('trigger_refresh'):
432 self._refresh_task_queue.execute(drones, wait=False)
showard170873e2009-01-07 00:22:26 +0000433
Prashanth B340fd1e2014-06-22 12:44:10 -0700434
435 def sync_refresh(self):
436 """Complete the drone refresh started by trigger_refresh.
437
438 Waits for all drone threads then refreshes internal datastructures
439 with drone process information.
440 """
441
442 # This gives us a dictionary like what follows:
443 # {drone: [{'pidfiles': (raw contents of pidfile paths),
444 # 'autoserv_processes': (autoserv process info from ps),
445 # 'all_processes': (all process info from ps),
446 # 'parse_processes': (parse process infor from ps),
447 # 'pidfile_second_read': (pidfile contents, again),}]
448 # drone2: ...}
449 # The values of each drone are only a list because this adheres to the
450 # drone utility interface (each call is executed and its results are
451 # places in a list, but since we never couple the refresh calls with
452 # any other call, this list will always contain a single dict).
453 with self._timer.get_client('sync_refresh'):
454 all_results = self._refresh_task_queue.get_results()
455 logging.info("Drones refreshed.")
456
457 # The loop below goes through and parses pidfile contents. Pidfiles
458 # are used to track autoserv execution, and will always contain < 3
459 # lines of the following: pid, exit code, number of tests. Each pidfile
460 # is identified by a PidfileId object, which contains a unique pidfile
461 # path (unique because it contains the job id) making it hashable.
462 # All pidfiles are stored in the drone managers _pidfiles dict as:
463 # {pidfile_id: pidfile_contents(Process(drone, pid),
464 # exit_code, num_tests_failed)}
465 # In handle agents, each agent knows its pidfile_id, and uses this
466 # to retrieve the refreshed contents of its pidfile via the
467 # PidfileRunMonitor (through its tick) before making decisions. If
468 # the agent notices that its process has exited, it unregisters the
469 # pidfile from the drone_managers._registered_pidfile_info dict
470 # through its epilog.
showard170873e2009-01-07 00:22:26 +0000471 for drone, results_list in all_results.iteritems():
472 results = results_list[0]
Alex Miller82d7a9f2014-05-16 14:43:32 -0700473 drone_hostname = drone.hostname.replace('.', '_')
showard0205a3e2009-01-16 03:03:50 +0000474
Alex Miller82d7a9f2014-05-16 14:43:32 -0700475 with self._timer.get_client('%s.results' % drone_hostname):
Alex Miller6cbd7582014-05-14 19:06:52 -0700476 for process_info in results['all_processes']:
477 if process_info['comm'] == 'autoserv':
478 self._add_autoserv_process(drone, process_info)
479 drone_pid = drone.hostname, int(process_info['pid'])
480 self._all_processes[drone_pid] = process_info
showard170873e2009-01-07 00:22:26 +0000481
Alex Miller6cbd7582014-05-14 19:06:52 -0700482 for process_info in results['parse_processes']:
483 self._add_process(drone, process_info)
484
Alex Miller82d7a9f2014-05-16 14:43:32 -0700485 with self._timer.get_client('%s.pidfiles' % drone_hostname):
Alex Miller6cbd7582014-05-14 19:06:52 -0700486 self._process_pidfiles(drone, results['pidfiles'],
487 self._pidfiles)
Alex Miller82d7a9f2014-05-16 14:43:32 -0700488 with self._timer.get_client('%s.pidfiles_second' % drone_hostname):
Alex Miller6cbd7582014-05-14 19:06:52 -0700489 self._process_pidfiles(drone, results['pidfiles_second_read'],
490 self._pidfiles_second_read)
showard170873e2009-01-07 00:22:26 +0000491
showardd1195652009-12-08 22:21:02 +0000492 self._compute_active_processes(drone)
493 if drone.enabled:
494 self._enqueue_drone(drone)
Fang Deng9a0c6c32013-09-04 15:34:55 -0700495 self._check_drone_process_limit(drone)
showard170873e2009-01-07 00:22:26 +0000496
497
Prashanth B340fd1e2014-06-22 12:44:10 -0700498 def refresh(self):
499 """Refresh all drones."""
500 with self._timer.get_client('refresh'):
501 self.trigger_refresh()
502 self.sync_refresh()
503
504
showard170873e2009-01-07 00:22:26 +0000505 def execute_actions(self):
506 """
507 Called at the end of a scheduler cycle to execute all queued actions
508 on drones.
509 """
Prashanth B340fd1e2014-06-22 12:44:10 -0700510 # Invoke calls queued on all drones since the last call to execute
511 # and wait for them to return.
512 thread_lib.ThreadedTaskQueue(
513 name='%s.execute_queue' % self._STATS_KEY).execute(
514 self._drones.values())
showard170873e2009-01-07 00:22:26 +0000515
516 try:
mbligh1ef218d2009-08-03 16:57:56 +0000517 self._results_drone.execute_queued_calls()
showard170873e2009-01-07 00:22:26 +0000518 except error.AutoservError:
519 warning = ('Results repository failed to execute calls:\n' +
520 traceback.format_exc())
showard170873e2009-01-07 00:22:26 +0000521 email_manager.manager.enqueue_notify_email(
522 'Results repository error', warning)
523 self._results_drone.clear_call_queue()
524
525
526 def get_orphaned_autoserv_processes(self):
527 """
showardd3dc1992009-04-22 21:01:40 +0000528 Returns a set of Process objects for orphaned processes only.
showard170873e2009-01-07 00:22:26 +0000529 """
showardd3dc1992009-04-22 21:01:40 +0000530 return set(process for process in self._process_set
531 if process.ppid == 1)
showard170873e2009-01-07 00:22:26 +0000532
533
showard170873e2009-01-07 00:22:26 +0000534 def kill_process(self, process):
535 """
536 Kill the given process.
537 """
showardd3dc1992009-04-22 21:01:40 +0000538 logging.info('killing %s', process)
showard170873e2009-01-07 00:22:26 +0000539 drone = self._get_drone_for_process(process)
540 drone.queue_call('kill_process', process)
541
542
543 def _ensure_directory_exists(self, path):
544 if not os.path.exists(path):
545 os.makedirs(path)
546
547
showard324bf812009-01-20 23:23:38 +0000548 def total_running_processes(self):
549 return sum(drone.active_processes for drone in self.get_drones())
550
551
jamesren76fcf192010-04-21 20:39:50 +0000552 def max_runnable_processes(self, username, drone_hostnames_allowed):
showard324bf812009-01-20 23:23:38 +0000553 """
554 Return the maximum number of processes that can be run (in a single
555 execution) given the current load on drones.
showard9bb960b2009-11-19 01:02:11 +0000556 @param username: login of user to run a process. may be None.
jamesren76fcf192010-04-21 20:39:50 +0000557 @param drone_hostnames_allowed: list of drones that can be used. May be
558 None
showard324bf812009-01-20 23:23:38 +0000559 """
showard1b7142d2010-01-15 00:21:37 +0000560 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
jamesren76fcf192010-04-21 20:39:50 +0000561 if wrapper.drone.usable_by(username) and
562 (drone_hostnames_allowed is None or
563 wrapper.drone.hostname in
564 drone_hostnames_allowed)]
showard1b7142d2010-01-15 00:21:37 +0000565 if not usable_drone_wrappers:
566 # all drones disabled or inaccessible
showardde700d32009-02-25 00:12:42 +0000567 return 0
jamesren37b50452010-03-25 20:38:56 +0000568 runnable_processes = [
569 wrapper.drone.max_processes - wrapper.drone.active_processes
570 for wrapper in usable_drone_wrappers]
571 return max([0] + runnable_processes)
showard324bf812009-01-20 23:23:38 +0000572
573
showarde39ebe92009-06-18 23:14:48 +0000574 def _least_loaded_drone(self, drones):
575 drone_to_use = drones[0]
576 for drone in drones[1:]:
577 if drone.used_capacity() < drone_to_use.used_capacity():
578 drone_to_use = drone
579 return drone_to_use
580
581
jamesren76fcf192010-04-21 20:39:50 +0000582 def _choose_drone_for_execution(self, num_processes, username,
583 drone_hostnames_allowed):
showard324bf812009-01-20 23:23:38 +0000584 # cycle through drones is order of increasing used capacity until
585 # we find one that can handle these processes
586 checked_drones = []
jamesren37b50452010-03-25 20:38:56 +0000587 usable_drones = []
showard324bf812009-01-20 23:23:38 +0000588 drone_to_use = None
589 while self._drone_queue:
showard418785b2009-11-23 20:19:59 +0000590 drone = heapq.heappop(self._drone_queue).drone
showard324bf812009-01-20 23:23:38 +0000591 checked_drones.append(drone)
Eric Lie0493a42010-11-15 13:05:43 -0800592 logging.info('Checking drone %s', drone.hostname)
showard9bb960b2009-11-19 01:02:11 +0000593 if not drone.usable_by(username):
594 continue
jamesren76fcf192010-04-21 20:39:50 +0000595
596 drone_allowed = (drone_hostnames_allowed is None
597 or drone.hostname in drone_hostnames_allowed)
598 if not drone_allowed:
Eric Lie0493a42010-11-15 13:05:43 -0800599 logging.debug('Drone %s not allowed: ', drone.hostname)
jamesren76fcf192010-04-21 20:39:50 +0000600 continue
601
jamesren37b50452010-03-25 20:38:56 +0000602 usable_drones.append(drone)
jamesren76fcf192010-04-21 20:39:50 +0000603
showard324bf812009-01-20 23:23:38 +0000604 if drone.active_processes + num_processes <= drone.max_processes:
605 drone_to_use = drone
606 break
Eric Lie0493a42010-11-15 13:05:43 -0800607 logging.info('Drone %s has %d active + %s requested > %s max',
608 drone.hostname, drone.active_processes, num_processes,
609 drone.max_processes)
showard324bf812009-01-20 23:23:38 +0000610
jamesren76fcf192010-04-21 20:39:50 +0000611 if not drone_to_use and usable_drones:
showard324bf812009-01-20 23:23:38 +0000612 drone_summary = ','.join('%s %s/%s' % (drone.hostname,
613 drone.active_processes,
614 drone.max_processes)
jamesren37b50452010-03-25 20:38:56 +0000615 for drone in usable_drones)
616 logging.error('No drone has capacity to handle %d processes (%s) '
617 'for user %s', num_processes, drone_summary, username)
618 drone_to_use = self._least_loaded_drone(usable_drones)
showarde39ebe92009-06-18 23:14:48 +0000619
showard324bf812009-01-20 23:23:38 +0000620 # refill _drone_queue
621 for drone in checked_drones:
622 self._enqueue_drone(drone)
623
showard170873e2009-01-07 00:22:26 +0000624 return drone_to_use
625
626
showarded2afea2009-07-07 20:54:07 +0000627 def _substitute_working_directory_into_command(self, command,
628 working_directory):
629 for i, item in enumerate(command):
630 if item is WORKING_DIRECTORY:
631 command[i] = working_directory
632
633
showardd3dc1992009-04-22 21:01:40 +0000634 def execute_command(self, command, working_directory, pidfile_name,
showard418785b2009-11-23 20:19:59 +0000635 num_processes, log_file=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000636 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000637 """
638 Execute the given command, taken as an argv list.
639
showarded2afea2009-07-07 20:54:07 +0000640 @param command: command to execute as a list. if any item is
641 WORKING_DIRECTORY, the absolute path to the working directory
642 will be substituted for it.
643 @param working_directory: directory in which the pidfile will be written
644 @param pidfile_name: name of the pidfile this process will write
showardd1195652009-12-08 22:21:02 +0000645 @param num_processes: number of processes to account for from this
646 execution
showarded2afea2009-07-07 20:54:07 +0000647 @param log_file (optional): path (in the results repository) to hold
648 command output.
649 @param paired_with_pidfile (optional): a PidfileId for an
650 already-executed process; the new process will execute on the
651 same drone as the previous process.
showard9bb960b2009-11-19 01:02:11 +0000652 @param username (optional): login of the user responsible for this
653 process.
jamesren76fcf192010-04-21 20:39:50 +0000654 @param drone_hostnames_allowed (optional): hostnames of the drones that
655 this command is allowed to
656 execute on
showard170873e2009-01-07 00:22:26 +0000657 """
showarddb502762009-09-09 15:31:20 +0000658 abs_working_directory = self.absolute_path(working_directory)
showard170873e2009-01-07 00:22:26 +0000659 if not log_file:
660 log_file = self.get_temporary_path('execute')
661 log_file = self.absolute_path(log_file)
showard170873e2009-01-07 00:22:26 +0000662
showarded2afea2009-07-07 20:54:07 +0000663 self._substitute_working_directory_into_command(command,
showarddb502762009-09-09 15:31:20 +0000664 abs_working_directory)
showarded2afea2009-07-07 20:54:07 +0000665
showard170873e2009-01-07 00:22:26 +0000666 if paired_with_pidfile:
667 drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
668 else:
jamesren76fcf192010-04-21 20:39:50 +0000669 drone = self._choose_drone_for_execution(num_processes, username,
670 drone_hostnames_allowed)
671
672 if not drone:
673 raise DroneManagerError('command failed; no drones available: %s'
674 % command)
675
showardb18134f2009-03-20 20:52:18 +0000676 logging.info("command = %s" % command)
677 logging.info('log file = %s:%s' % (drone.hostname, log_file))
showarddb502762009-09-09 15:31:20 +0000678 self._write_attached_files(working_directory, drone)
679 drone.queue_call('execute_command', command, abs_working_directory,
showard170873e2009-01-07 00:22:26 +0000680 log_file, pidfile_name)
showard418785b2009-11-23 20:19:59 +0000681 drone.active_processes += num_processes
682 self._reorder_drone_queue()
showard170873e2009-01-07 00:22:26 +0000683
showard42d44982009-10-12 20:34:03 +0000684 pidfile_path = os.path.join(abs_working_directory, pidfile_name)
showard170873e2009-01-07 00:22:26 +0000685 pidfile_id = PidfileId(pidfile_path)
686 self.register_pidfile(pidfile_id)
showardd1195652009-12-08 22:21:02 +0000687 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
showard170873e2009-01-07 00:22:26 +0000688 return pidfile_id
689
690
showardd3dc1992009-04-22 21:01:40 +0000691 def get_pidfile_id_from(self, execution_tag, pidfile_name):
692 path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
showard170873e2009-01-07 00:22:26 +0000693 return PidfileId(path)
694
695
696 def register_pidfile(self, pidfile_id):
697 """
698 Indicate that the DroneManager should look for the given pidfile when
699 refreshing.
700 """
showardd1195652009-12-08 22:21:02 +0000701 if pidfile_id not in self._registered_pidfile_info:
showard37399782009-08-20 23:32:20 +0000702 logging.info('monitoring pidfile %s', pidfile_id)
showardd1195652009-12-08 22:21:02 +0000703 self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
showardc6fb6042010-01-25 21:48:20 +0000704 self._reset_pidfile_age(pidfile_id)
705
706
707 def _reset_pidfile_age(self, pidfile_id):
708 if pidfile_id in self._registered_pidfile_info:
709 self._registered_pidfile_info[pidfile_id].age = 0
showard170873e2009-01-07 00:22:26 +0000710
711
showardf85a0b72009-10-07 20:48:45 +0000712 def unregister_pidfile(self, pidfile_id):
showardd1195652009-12-08 22:21:02 +0000713 if pidfile_id in self._registered_pidfile_info:
showardf85a0b72009-10-07 20:48:45 +0000714 logging.info('forgetting pidfile %s', pidfile_id)
showardd1195652009-12-08 22:21:02 +0000715 del self._registered_pidfile_info[pidfile_id]
716
717
718 def declare_process_count(self, pidfile_id, num_processes):
719 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
showardf85a0b72009-10-07 20:48:45 +0000720
721
showard170873e2009-01-07 00:22:26 +0000722 def get_pidfile_contents(self, pidfile_id, use_second_read=False):
723 """
724 Retrieve a PidfileContents object for the given pidfile_id. If
725 use_second_read is True, use results that were read after the processes
726 were checked, instead of before.
727 """
showardc6fb6042010-01-25 21:48:20 +0000728 self._reset_pidfile_age(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000729 if use_second_read:
730 pidfile_map = self._pidfiles_second_read
731 else:
732 pidfile_map = self._pidfiles
733 return pidfile_map.get(pidfile_id, PidfileContents())
734
735
736 def is_process_running(self, process):
737 """
738 Check if the given process is in the running process list.
739 """
Alex Millere76e2252013-08-15 09:24:27 -0700740 if process in self._process_set:
741 return True
742
Alex Miller06a5f752013-08-15 11:16:40 -0700743 drone_pid = process.hostname, process.pid
Alex Millere76e2252013-08-15 09:24:27 -0700744 if drone_pid in self._all_processes:
745 logging.error('Process %s found, but not an autoserv process. '
746 'Is %s', process, self._all_processes[drone_pid])
747 return True
748
749 return False
showard170873e2009-01-07 00:22:26 +0000750
751
752 def get_temporary_path(self, base_name):
753 """
754 Get a new temporary path guaranteed to be unique across all drones
755 for this scheduler execution.
756 """
757 self._temporary_path_counter += 1
758 return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
759 '%s.%s' % (base_name, self._temporary_path_counter))
760
761
showard42d44982009-10-12 20:34:03 +0000762 def absolute_path(self, path, on_results_repository=False):
763 if on_results_repository:
764 base_dir = self._results_dir
765 else:
showardc75fded2009-10-14 16:20:02 +0000766 base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
767 _DRONE_RESULTS_DIR_SUFFIX)
showard42d44982009-10-12 20:34:03 +0000768 return os.path.join(base_dir, path)
showard170873e2009-01-07 00:22:26 +0000769
770
showard678df4f2009-02-04 21:36:39 +0000771 def _copy_results_helper(self, process, source_path, destination_path,
772 to_results_repository=False):
Simran Basi882f15b2013-10-29 14:59:34 -0700773 logging.debug('_copy_results_helper. process: %s, source_path: %s, '
774 'destination_path: %s, to_results_repository: %s',
775 process, source_path, destination_path,
776 to_results_repository)
showard678df4f2009-02-04 21:36:39 +0000777 full_source = self.absolute_path(source_path)
showard42d44982009-10-12 20:34:03 +0000778 full_destination = self.absolute_path(
779 destination_path, on_results_repository=to_results_repository)
showard678df4f2009-02-04 21:36:39 +0000780 source_drone = self._get_drone_for_process(process)
781 if to_results_repository:
782 source_drone.send_file_to(self._results_drone, full_source,
783 full_destination, can_fail=True)
784 else:
785 source_drone.queue_call('copy_file_or_directory', full_source,
786 full_destination)
787
788
showard170873e2009-01-07 00:22:26 +0000789 def copy_to_results_repository(self, process, source_path,
790 destination_path=None):
791 """
792 Copy results from the given process at source_path to destination_path
793 in the results repository.
794 """
795 if destination_path is None:
796 destination_path = source_path
showard678df4f2009-02-04 21:36:39 +0000797 self._copy_results_helper(process, source_path, destination_path,
798 to_results_repository=True)
799
800
801 def copy_results_on_drone(self, process, source_path, destination_path):
802 """
803 Copy a results directory from one place to another on the drone.
804 """
805 self._copy_results_helper(process, source_path, destination_path)
showard170873e2009-01-07 00:22:26 +0000806
807
showarddb502762009-09-09 15:31:20 +0000808 def _write_attached_files(self, results_dir, drone):
809 attached_files = self._attached_files.pop(results_dir, {})
showard73ec0442009-02-07 02:05:20 +0000810 for file_path, contents in attached_files.iteritems():
showard170873e2009-01-07 00:22:26 +0000811 drone.queue_call('write_to_file', self.absolute_path(file_path),
812 contents)
813
814
showarddb502762009-09-09 15:31:20 +0000815 def attach_file_to_execution(self, results_dir, file_contents,
showard170873e2009-01-07 00:22:26 +0000816 file_path=None):
817 """
showarddb502762009-09-09 15:31:20 +0000818 When the process for the results directory is executed, the given file
819 contents will be placed in a file on the drone. Returns the path at
820 which the file will be placed.
showard170873e2009-01-07 00:22:26 +0000821 """
822 if not file_path:
823 file_path = self.get_temporary_path('attach')
showarddb502762009-09-09 15:31:20 +0000824 files_for_execution = self._attached_files.setdefault(results_dir, {})
showard73ec0442009-02-07 02:05:20 +0000825 assert file_path not in files_for_execution
826 files_for_execution[file_path] = file_contents
showard170873e2009-01-07 00:22:26 +0000827 return file_path
828
829
showard35162b02009-03-03 02:17:30 +0000830 def write_lines_to_file(self, file_path, lines, paired_with_process=None):
showard170873e2009-01-07 00:22:26 +0000831 """
832 Write the given lines (as a list of strings) to a file. If
showard35162b02009-03-03 02:17:30 +0000833 paired_with_process is given, the file will be written on the drone
834 running the given Process. Otherwise, the file will be written to the
showard170873e2009-01-07 00:22:26 +0000835 results repository.
836 """
showard170873e2009-01-07 00:22:26 +0000837 file_contents = '\n'.join(lines) + '\n'
showard35162b02009-03-03 02:17:30 +0000838 if paired_with_process:
839 drone = self._get_drone_for_process(paired_with_process)
showard42d44982009-10-12 20:34:03 +0000840 on_results_repository = False
showard170873e2009-01-07 00:22:26 +0000841 else:
842 drone = self._results_drone
showard42d44982009-10-12 20:34:03 +0000843 on_results_repository = True
844 full_path = self.absolute_path(
845 file_path, on_results_repository=on_results_repository)
showard170873e2009-01-07 00:22:26 +0000846 drone.queue_call('write_to_file', full_path, file_contents)
jamesrenc44ae992010-02-19 00:12:54 +0000847
848
Simran Basicced3092012-08-02 15:09:23 -0700849SiteDroneManager = utils.import_site_class(
850 __file__, 'autotest_lib.scheduler.site_drone_manager',
851 'SiteDroneManager', BaseDroneManager)
852
853
854class DroneManager(SiteDroneManager):
855 pass
856
857
jamesrenc44ae992010-02-19 00:12:54 +0000858_the_instance = None
859
860def instance():
861 if _the_instance is None:
862 _set_instance(DroneManager())
863 return _the_instance
864
865
866def _set_instance(instance): # usable for testing
867 global _the_instance
868 _the_instance = instance