blob: 7d250981e3bb514603eda8e096e434885144e546 [file] [log] [blame]
showard170873e2009-01-07 00:22:26 +00001import os, re, shutil, signal, subprocess, errno, time, heapq, traceback
showardb18134f2009-03-20 20:52:18 +00002import common, logging
Simran Basicced3092012-08-02 15:09:23 -07003from autotest_lib.client.common_lib import error, global_config, utils
showard170873e2009-01-07 00:22:26 +00004from autotest_lib.scheduler import email_manager, drone_utility, drones
showard324bf812009-01-20 23:23:38 +00005from autotest_lib.scheduler import scheduler_config
Alex Miller6cbd7582014-05-14 19:06:52 -07006from autotest_lib.site_utils.graphite import stats
showard170873e2009-01-07 00:22:26 +00007
showard170873e2009-01-07 00:22:26 +00008
showardc75fded2009-10-14 16:20:02 +00009# results on drones will be placed under the drone_installation_directory in a
10# directory with this name
11_DRONE_RESULTS_DIR_SUFFIX = 'results'
12
showarded2afea2009-07-07 20:54:07 +000013WORKING_DIRECTORY = object() # see execute_command()
14
showard8d3dbca2009-09-25 20:29:38 +000015
jamesrenc44ae992010-02-19 00:12:54 +000016AUTOSERV_PID_FILE = '.autoserv_execute'
17CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
18PARSER_PID_FILE = '.parser_execute'
19ARCHIVER_PID_FILE = '.archiver_execute'
20
21ALL_PIDFILE_NAMES = (AUTOSERV_PID_FILE, CRASHINFO_PID_FILE, PARSER_PID_FILE,
22 ARCHIVER_PID_FILE)
23
24
showard170873e2009-01-07 00:22:26 +000025class DroneManagerError(Exception):
26 pass
27
28
29class CustomEquals(object):
30 def _id(self):
31 raise NotImplementedError
32
33
34 def __eq__(self, other):
35 if not isinstance(other, type(self)):
36 return NotImplemented
37 return self._id() == other._id()
38
39
40 def __ne__(self, other):
41 return not self == other
42
43
44 def __hash__(self):
45 return hash(self._id())
46
47
48class Process(CustomEquals):
49 def __init__(self, hostname, pid, ppid=None):
50 self.hostname = hostname
51 self.pid = pid
52 self.ppid = ppid
53
54 def _id(self):
55 return (self.hostname, self.pid)
56
57
58 def __str__(self):
59 return '%s/%s' % (self.hostname, self.pid)
60
61
62 def __repr__(self):
63 return super(Process, self).__repr__() + '<%s>' % self
64
65
66class PidfileId(CustomEquals):
67 def __init__(self, path):
68 self.path = path
69
70
71 def _id(self):
72 return self.path
73
74
75 def __str__(self):
76 return str(self.path)
77
78
showardd1195652009-12-08 22:21:02 +000079class _PidfileInfo(object):
80 age = 0
81 num_processes = None
82
83
showard170873e2009-01-07 00:22:26 +000084class PidfileContents(object):
85 process = None
86 exit_status = None
87 num_tests_failed = None
88
89 def is_invalid(self):
90 return False
91
92
showardd1195652009-12-08 22:21:02 +000093 def is_running(self):
94 return self.process and not self.exit_status
95
96
showard170873e2009-01-07 00:22:26 +000097class InvalidPidfile(object):
Simran Basi899f9fe2013-02-27 11:58:49 -080098 process = None
99 exit_status = None
100 num_tests_failed = None
Simran Basi4d7bca22013-02-27 10:57:04 -0800101
102
Simran Basi899f9fe2013-02-27 11:58:49 -0800103 def __init__(self, error):
showard170873e2009-01-07 00:22:26 +0000104 self.error = error
105
106
107 def is_invalid(self):
108 return True
109
110
showardd1195652009-12-08 22:21:02 +0000111 def is_running(self):
112 return False
113
114
showard170873e2009-01-07 00:22:26 +0000115 def __str__(self):
116 return self.error
117
118
showard418785b2009-11-23 20:19:59 +0000119class _DroneHeapWrapper(object):
120 """Wrapper to compare drones based on used_capacity().
121
122 These objects can be used to keep a heap of drones by capacity.
123 """
124 def __init__(self, drone):
125 self.drone = drone
126
127
128 def __cmp__(self, other):
129 assert isinstance(other, _DroneHeapWrapper)
130 return cmp(self.drone.used_capacity(), other.drone.used_capacity())
131
132
Simran Basicced3092012-08-02 15:09:23 -0700133class BaseDroneManager(object):
showard170873e2009-01-07 00:22:26 +0000134 """
135 This class acts as an interface from the scheduler to drones, whether it be
136 only a single "drone" for localhost or multiple remote drones.
137
138 All paths going into and out of this class are relative to the full results
139 directory, except for those returns by absolute_path().
140 """
Fang Deng9a0c6c32013-09-04 15:34:55 -0700141
142
143 # Minimum time to wait before next email
144 # about a drone hitting process limit is sent.
145 NOTIFY_INTERVAL = 60 * 60 * 24 # one day
Alex Miller6cbd7582014-05-14 19:06:52 -0700146 _timer = stats.Timer('drone_manager')
Fang Deng9a0c6c32013-09-04 15:34:55 -0700147
148
showard170873e2009-01-07 00:22:26 +0000149 def __init__(self):
showardd1195652009-12-08 22:21:02 +0000150 # absolute path of base results dir
showard170873e2009-01-07 00:22:26 +0000151 self._results_dir = None
showardd1195652009-12-08 22:21:02 +0000152 # holds Process objects
showard170873e2009-01-07 00:22:26 +0000153 self._process_set = set()
Alex Millere76e2252013-08-15 09:24:27 -0700154 # holds the list of all processes running on all drones
155 self._all_processes = {}
showardd1195652009-12-08 22:21:02 +0000156 # maps PidfileId to PidfileContents
showard170873e2009-01-07 00:22:26 +0000157 self._pidfiles = {}
showardd1195652009-12-08 22:21:02 +0000158 # same as _pidfiles
showard170873e2009-01-07 00:22:26 +0000159 self._pidfiles_second_read = {}
showardd1195652009-12-08 22:21:02 +0000160 # maps PidfileId to _PidfileInfo
161 self._registered_pidfile_info = {}
162 # used to generate unique temporary paths
showard170873e2009-01-07 00:22:26 +0000163 self._temporary_path_counter = 0
showardd1195652009-12-08 22:21:02 +0000164 # maps hostname to Drone object
showard170873e2009-01-07 00:22:26 +0000165 self._drones = {}
166 self._results_drone = None
showardd1195652009-12-08 22:21:02 +0000167 # maps results dir to dict mapping file path to contents
showard170873e2009-01-07 00:22:26 +0000168 self._attached_files = {}
showard418785b2009-11-23 20:19:59 +0000169 # heapq of _DroneHeapWrappers
showard170873e2009-01-07 00:22:26 +0000170 self._drone_queue = []
Fang Deng9a0c6c32013-09-04 15:34:55 -0700171 # map drone hostname to time stamp of email that
172 # has been sent about the drone hitting process limit.
173 self._notify_record = {}
showard170873e2009-01-07 00:22:26 +0000174
175
176 def initialize(self, base_results_dir, drone_hostnames,
177 results_repository_hostname):
178 self._results_dir = base_results_dir
showard170873e2009-01-07 00:22:26 +0000179
180 for hostname in drone_hostnames:
Eric Li861b2d52011-02-04 14:50:35 -0800181 self._add_drone(hostname)
showard170873e2009-01-07 00:22:26 +0000182
183 if not self._drones:
184 # all drones failed to initialize
185 raise DroneManagerError('No valid drones found')
186
showard324bf812009-01-20 23:23:38 +0000187 self.refresh_drone_configs()
showardc5afc462009-01-13 00:09:39 +0000188
showard4460ee82009-07-07 20:54:29 +0000189 logging.info('Using results repository on %s',
showardb18134f2009-03-20 20:52:18 +0000190 results_repository_hostname)
showard170873e2009-01-07 00:22:26 +0000191 self._results_drone = drones.get_drone(results_repository_hostname)
showardac5b0002009-10-19 18:34:00 +0000192 results_installation_dir = global_config.global_config.get_config_value(
193 scheduler_config.CONFIG_SECTION,
194 'results_host_installation_directory', default=None)
195 if results_installation_dir:
196 self._results_drone.set_autotest_install_dir(
197 results_installation_dir)
showard170873e2009-01-07 00:22:26 +0000198 # don't initialize() the results drone - we don't want to clear out any
showardd1195652009-12-08 22:21:02 +0000199 # directories and we don't need to kill any processes
showard170873e2009-01-07 00:22:26 +0000200
201
202 def reinitialize_drones(self):
203 self._call_all_drones('initialize', self._results_dir)
204
205
206 def shutdown(self):
showard324bf812009-01-20 23:23:38 +0000207 for drone in self.get_drones():
showard170873e2009-01-07 00:22:26 +0000208 drone.shutdown()
209
210
showard8d3dbca2009-09-25 20:29:38 +0000211 def _get_max_pidfile_refreshes(self):
212 """
213 Normally refresh() is called on every monitor_db.Dispatcher.tick().
214
215 @returns: The number of refresh() calls before we forget a pidfile.
216 """
217 pidfile_timeout = global_config.global_config.get_config_value(
218 scheduler_config.CONFIG_SECTION, 'max_pidfile_refreshes',
219 type=int, default=2000)
220 return pidfile_timeout
221
222
showard170873e2009-01-07 00:22:26 +0000223 def _add_drone(self, hostname):
showardb18134f2009-03-20 20:52:18 +0000224 logging.info('Adding drone %s' % hostname)
showard170873e2009-01-07 00:22:26 +0000225 drone = drones.get_drone(hostname)
Eric Li861b2d52011-02-04 14:50:35 -0800226 if drone:
227 self._drones[drone.hostname] = drone
228 drone.call('initialize', self.absolute_path(''))
showard170873e2009-01-07 00:22:26 +0000229
230
231 def _remove_drone(self, hostname):
232 self._drones.pop(hostname, None)
233
234
showard324bf812009-01-20 23:23:38 +0000235 def refresh_drone_configs(self):
showardc5afc462009-01-13 00:09:39 +0000236 """
showard324bf812009-01-20 23:23:38 +0000237 Reread global config options for all drones.
showardc5afc462009-01-13 00:09:39 +0000238 """
showard324bf812009-01-20 23:23:38 +0000239 config = global_config.global_config
240 section = scheduler_config.CONFIG_SECTION
241 config.parse_config_file()
showardc5afc462009-01-13 00:09:39 +0000242 for hostname, drone in self._drones.iteritems():
showard324bf812009-01-20 23:23:38 +0000243 disabled = config.get_config_value(
showard9bb960b2009-11-19 01:02:11 +0000244 section, '%s_disabled' % hostname, default='')
showardc5afc462009-01-13 00:09:39 +0000245 drone.enabled = not bool(disabled)
246
showard324bf812009-01-20 23:23:38 +0000247 drone.max_processes = config.get_config_value(
showard9bb960b2009-11-19 01:02:11 +0000248 section, '%s_max_processes' % hostname, type=int,
249 default=scheduler_config.config.max_processes_per_drone)
250
251 allowed_users = config.get_config_value(
252 section, '%s_users' % hostname, default=None)
253 if allowed_users is not None:
showard1b7142d2010-01-15 00:21:37 +0000254 allowed_users = set(allowed_users.split())
255 drone.allowed_users = allowed_users
showardc5afc462009-01-13 00:09:39 +0000256
showard418785b2009-11-23 20:19:59 +0000257 self._reorder_drone_queue() # max_processes may have changed
Fang Deng9a0c6c32013-09-04 15:34:55 -0700258 # Clear notification record about reaching max_processes limit.
259 self._notify_record = {}
showard418785b2009-11-23 20:19:59 +0000260
showardc5afc462009-01-13 00:09:39 +0000261
showard324bf812009-01-20 23:23:38 +0000262 def get_drones(self):
263 return self._drones.itervalues()
showardc5afc462009-01-13 00:09:39 +0000264
265
showard170873e2009-01-07 00:22:26 +0000266 def _get_drone_for_process(self, process):
showard170873e2009-01-07 00:22:26 +0000267 return self._drones[process.hostname]
268
269
270 def _get_drone_for_pidfile_id(self, pidfile_id):
271 pidfile_contents = self.get_pidfile_contents(pidfile_id)
272 assert pidfile_contents.process is not None
273 return self._get_drone_for_process(pidfile_contents.process)
274
275
276 def _drop_old_pidfiles(self):
showardd3496242009-12-10 21:41:43 +0000277 # use items() since the dict is modified in unregister_pidfile()
278 for pidfile_id, info in self._registered_pidfile_info.items():
showardd1195652009-12-08 22:21:02 +0000279 if info.age > self._get_max_pidfile_refreshes():
showardf85a0b72009-10-07 20:48:45 +0000280 logging.warning('dropping leaked pidfile %s', pidfile_id)
281 self.unregister_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000282 else:
showardd1195652009-12-08 22:21:02 +0000283 info.age += 1
showard170873e2009-01-07 00:22:26 +0000284
285
286 def _reset(self):
showard170873e2009-01-07 00:22:26 +0000287 self._process_set = set()
Alex Millere76e2252013-08-15 09:24:27 -0700288 self._all_processes = {}
showard170873e2009-01-07 00:22:26 +0000289 self._pidfiles = {}
290 self._pidfiles_second_read = {}
291 self._drone_queue = []
292
293
294 def _call_all_drones(self, method, *args, **kwargs):
295 all_results = {}
showard324bf812009-01-20 23:23:38 +0000296 for drone in self.get_drones():
showard170873e2009-01-07 00:22:26 +0000297 all_results[drone] = drone.call(method, *args, **kwargs)
298 return all_results
299
300
301 def _parse_pidfile(self, drone, raw_contents):
302 contents = PidfileContents()
303 if not raw_contents:
304 return contents
305 lines = raw_contents.splitlines()
306 if len(lines) > 3:
307 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %
308 (len(lines), lines))
309 try:
310 pid = int(lines[0])
311 contents.process = Process(drone.hostname, pid)
312 # if len(lines) == 2, assume we caught Autoserv between writing
313 # exit_status and num_failed_tests, so just ignore it and wait for
314 # the next cycle
315 if len(lines) == 3:
316 contents.exit_status = int(lines[1])
317 contents.num_tests_failed = int(lines[2])
318 except ValueError, exc:
319 return InvalidPidfile('Corrupt pid file: ' + str(exc.args))
320
321 return contents
322
323
324 def _process_pidfiles(self, drone, pidfiles, store_in_dict):
325 for pidfile_path, contents in pidfiles.iteritems():
326 pidfile_id = PidfileId(pidfile_path)
327 contents = self._parse_pidfile(drone, contents)
328 store_in_dict[pidfile_id] = contents
329
330
showard0205a3e2009-01-16 03:03:50 +0000331 def _add_process(self, drone, process_info):
332 process = Process(drone.hostname, int(process_info['pid']),
333 int(process_info['ppid']))
334 self._process_set.add(process)
showard0205a3e2009-01-16 03:03:50 +0000335
336
337 def _add_autoserv_process(self, drone, process_info):
338 assert process_info['comm'] == 'autoserv'
339 # only root autoserv processes have pgid == pid
340 if process_info['pgid'] != process_info['pid']:
341 return
showardd1195652009-12-08 22:21:02 +0000342 self._add_process(drone, process_info)
showard0205a3e2009-01-16 03:03:50 +0000343
344
showard324bf812009-01-20 23:23:38 +0000345 def _enqueue_drone(self, drone):
showard418785b2009-11-23 20:19:59 +0000346 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
347
348
349 def _reorder_drone_queue(self):
350 heapq.heapify(self._drone_queue)
showard324bf812009-01-20 23:23:38 +0000351
352
showardd1195652009-12-08 22:21:02 +0000353 def _compute_active_processes(self, drone):
354 drone.active_processes = 0
355 for pidfile_id, contents in self._pidfiles.iteritems():
356 is_running = contents.exit_status is None
357 on_this_drone = (contents.process
358 and contents.process.hostname == drone.hostname)
359 if is_running and on_this_drone:
360 info = self._registered_pidfile_info[pidfile_id]
361 if info.num_processes is not None:
362 drone.active_processes += info.num_processes
363
364
Fang Deng9a0c6c32013-09-04 15:34:55 -0700365 def _check_drone_process_limit(self, drone):
366 """
367 Notify if the number of processes on |drone| is approaching limit.
368
369 @param drone: A Drone object.
370 """
Alex Millerda713d92013-12-06 10:02:43 -0800371 try:
372 percent = float(drone.active_processes) / drone.max_processes
373 except ZeroDivisionError:
374 percent = 100
Fang Deng9a0c6c32013-09-04 15:34:55 -0700375 max_percent = scheduler_config.config.max_processes_warning_threshold
376 if percent >= max_percent:
377 message = ('Drone %s is hitting %s of process limit.' %
378 (drone.hostname, format(percent, '.2%')))
379 logging.warning(message)
380 last_notified = self._notify_record.get(drone.hostname, 0)
381 now = time.time()
382 if last_notified + BaseDroneManager.NOTIFY_INTERVAL < now:
383 body = ('Active processes/Process limit: %d/%d (%s)' %
384 (drone.active_processes, drone.max_processes,
385 format(percent, '.2%')))
386 email_manager.manager.enqueue_notify_email(message, body)
387 self._notify_record[drone.hostname] = now
388
389
showard170873e2009-01-07 00:22:26 +0000390 def refresh(self):
391 """
392 Called at the beginning of a scheduler cycle to refresh all process
393 information.
394 """
395 self._reset()
showardbf9695d2009-07-06 20:22:24 +0000396 self._drop_old_pidfiles()
showardd1195652009-12-08 22:21:02 +0000397 pidfile_paths = [pidfile_id.path
398 for pidfile_id in self._registered_pidfile_info]
Alex Miller6cbd7582014-05-14 19:06:52 -0700399 with self._timer.get_client('refresh'):
400 all_results = self._call_all_drones('refresh', pidfile_paths)
showard170873e2009-01-07 00:22:26 +0000401
402 for drone, results_list in all_results.iteritems():
403 results = results_list[0]
showard0205a3e2009-01-16 03:03:50 +0000404
Alex Miller6cbd7582014-05-14 19:06:52 -0700405 with self._timer.get_client('%s.results' % drone):
406 for process_info in results['all_processes']:
407 if process_info['comm'] == 'autoserv':
408 self._add_autoserv_process(drone, process_info)
409 drone_pid = drone.hostname, int(process_info['pid'])
410 self._all_processes[drone_pid] = process_info
showard170873e2009-01-07 00:22:26 +0000411
Alex Miller6cbd7582014-05-14 19:06:52 -0700412 for process_info in results['parse_processes']:
413 self._add_process(drone, process_info)
414
415 with self._timer.get_client('%s.pidfiles' % drone):
416 self._process_pidfiles(drone, results['pidfiles'],
417 self._pidfiles)
418 with self._timer.get_client('%s.pidfiles_second' % drone):
419 self._process_pidfiles(drone, results['pidfiles_second_read'],
420 self._pidfiles_second_read)
showard170873e2009-01-07 00:22:26 +0000421
showardd1195652009-12-08 22:21:02 +0000422 self._compute_active_processes(drone)
423 if drone.enabled:
424 self._enqueue_drone(drone)
Fang Deng9a0c6c32013-09-04 15:34:55 -0700425 self._check_drone_process_limit(drone)
showard170873e2009-01-07 00:22:26 +0000426
427
428 def execute_actions(self):
429 """
430 Called at the end of a scheduler cycle to execute all queued actions
431 on drones.
432 """
433 for drone in self._drones.values():
434 drone.execute_queued_calls()
435
436 try:
mbligh1ef218d2009-08-03 16:57:56 +0000437 self._results_drone.execute_queued_calls()
showard170873e2009-01-07 00:22:26 +0000438 except error.AutoservError:
439 warning = ('Results repository failed to execute calls:\n' +
440 traceback.format_exc())
showard170873e2009-01-07 00:22:26 +0000441 email_manager.manager.enqueue_notify_email(
442 'Results repository error', warning)
443 self._results_drone.clear_call_queue()
444
445
446 def get_orphaned_autoserv_processes(self):
447 """
showardd3dc1992009-04-22 21:01:40 +0000448 Returns a set of Process objects for orphaned processes only.
showard170873e2009-01-07 00:22:26 +0000449 """
showardd3dc1992009-04-22 21:01:40 +0000450 return set(process for process in self._process_set
451 if process.ppid == 1)
showard170873e2009-01-07 00:22:26 +0000452
453
showard170873e2009-01-07 00:22:26 +0000454 def kill_process(self, process):
455 """
456 Kill the given process.
457 """
showardd3dc1992009-04-22 21:01:40 +0000458 logging.info('killing %s', process)
showard170873e2009-01-07 00:22:26 +0000459 drone = self._get_drone_for_process(process)
460 drone.queue_call('kill_process', process)
461
462
463 def _ensure_directory_exists(self, path):
464 if not os.path.exists(path):
465 os.makedirs(path)
466
467
showard324bf812009-01-20 23:23:38 +0000468 def total_running_processes(self):
469 return sum(drone.active_processes for drone in self.get_drones())
470
471
jamesren76fcf192010-04-21 20:39:50 +0000472 def max_runnable_processes(self, username, drone_hostnames_allowed):
showard324bf812009-01-20 23:23:38 +0000473 """
474 Return the maximum number of processes that can be run (in a single
475 execution) given the current load on drones.
showard9bb960b2009-11-19 01:02:11 +0000476 @param username: login of user to run a process. may be None.
jamesren76fcf192010-04-21 20:39:50 +0000477 @param drone_hostnames_allowed: list of drones that can be used. May be
478 None
showard324bf812009-01-20 23:23:38 +0000479 """
showard1b7142d2010-01-15 00:21:37 +0000480 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue
jamesren76fcf192010-04-21 20:39:50 +0000481 if wrapper.drone.usable_by(username) and
482 (drone_hostnames_allowed is None or
483 wrapper.drone.hostname in
484 drone_hostnames_allowed)]
showard1b7142d2010-01-15 00:21:37 +0000485 if not usable_drone_wrappers:
486 # all drones disabled or inaccessible
showardde700d32009-02-25 00:12:42 +0000487 return 0
jamesren37b50452010-03-25 20:38:56 +0000488 runnable_processes = [
489 wrapper.drone.max_processes - wrapper.drone.active_processes
490 for wrapper in usable_drone_wrappers]
491 return max([0] + runnable_processes)
showard324bf812009-01-20 23:23:38 +0000492
493
showarde39ebe92009-06-18 23:14:48 +0000494 def _least_loaded_drone(self, drones):
495 drone_to_use = drones[0]
496 for drone in drones[1:]:
497 if drone.used_capacity() < drone_to_use.used_capacity():
498 drone_to_use = drone
499 return drone_to_use
500
501
jamesren76fcf192010-04-21 20:39:50 +0000502 def _choose_drone_for_execution(self, num_processes, username,
503 drone_hostnames_allowed):
showard324bf812009-01-20 23:23:38 +0000504 # cycle through drones is order of increasing used capacity until
505 # we find one that can handle these processes
506 checked_drones = []
jamesren37b50452010-03-25 20:38:56 +0000507 usable_drones = []
showard324bf812009-01-20 23:23:38 +0000508 drone_to_use = None
509 while self._drone_queue:
showard418785b2009-11-23 20:19:59 +0000510 drone = heapq.heappop(self._drone_queue).drone
showard324bf812009-01-20 23:23:38 +0000511 checked_drones.append(drone)
Eric Lie0493a42010-11-15 13:05:43 -0800512 logging.info('Checking drone %s', drone.hostname)
showard9bb960b2009-11-19 01:02:11 +0000513 if not drone.usable_by(username):
514 continue
jamesren76fcf192010-04-21 20:39:50 +0000515
516 drone_allowed = (drone_hostnames_allowed is None
517 or drone.hostname in drone_hostnames_allowed)
518 if not drone_allowed:
Eric Lie0493a42010-11-15 13:05:43 -0800519 logging.debug('Drone %s not allowed: ', drone.hostname)
jamesren76fcf192010-04-21 20:39:50 +0000520 continue
521
jamesren37b50452010-03-25 20:38:56 +0000522 usable_drones.append(drone)
jamesren76fcf192010-04-21 20:39:50 +0000523
showard324bf812009-01-20 23:23:38 +0000524 if drone.active_processes + num_processes <= drone.max_processes:
525 drone_to_use = drone
526 break
Eric Lie0493a42010-11-15 13:05:43 -0800527 logging.info('Drone %s has %d active + %s requested > %s max',
528 drone.hostname, drone.active_processes, num_processes,
529 drone.max_processes)
showard324bf812009-01-20 23:23:38 +0000530
jamesren76fcf192010-04-21 20:39:50 +0000531 if not drone_to_use and usable_drones:
showard324bf812009-01-20 23:23:38 +0000532 drone_summary = ','.join('%s %s/%s' % (drone.hostname,
533 drone.active_processes,
534 drone.max_processes)
jamesren37b50452010-03-25 20:38:56 +0000535 for drone in usable_drones)
536 logging.error('No drone has capacity to handle %d processes (%s) '
537 'for user %s', num_processes, drone_summary, username)
538 drone_to_use = self._least_loaded_drone(usable_drones)
showarde39ebe92009-06-18 23:14:48 +0000539
showard324bf812009-01-20 23:23:38 +0000540 # refill _drone_queue
541 for drone in checked_drones:
542 self._enqueue_drone(drone)
543
showard170873e2009-01-07 00:22:26 +0000544 return drone_to_use
545
546
showarded2afea2009-07-07 20:54:07 +0000547 def _substitute_working_directory_into_command(self, command,
548 working_directory):
549 for i, item in enumerate(command):
550 if item is WORKING_DIRECTORY:
551 command[i] = working_directory
552
553
showardd3dc1992009-04-22 21:01:40 +0000554 def execute_command(self, command, working_directory, pidfile_name,
showard418785b2009-11-23 20:19:59 +0000555 num_processes, log_file=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000556 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000557 """
558 Execute the given command, taken as an argv list.
559
showarded2afea2009-07-07 20:54:07 +0000560 @param command: command to execute as a list. if any item is
561 WORKING_DIRECTORY, the absolute path to the working directory
562 will be substituted for it.
563 @param working_directory: directory in which the pidfile will be written
564 @param pidfile_name: name of the pidfile this process will write
showardd1195652009-12-08 22:21:02 +0000565 @param num_processes: number of processes to account for from this
566 execution
showarded2afea2009-07-07 20:54:07 +0000567 @param log_file (optional): path (in the results repository) to hold
568 command output.
569 @param paired_with_pidfile (optional): a PidfileId for an
570 already-executed process; the new process will execute on the
571 same drone as the previous process.
showard9bb960b2009-11-19 01:02:11 +0000572 @param username (optional): login of the user responsible for this
573 process.
jamesren76fcf192010-04-21 20:39:50 +0000574 @param drone_hostnames_allowed (optional): hostnames of the drones that
575 this command is allowed to
576 execute on
showard170873e2009-01-07 00:22:26 +0000577 """
showarddb502762009-09-09 15:31:20 +0000578 abs_working_directory = self.absolute_path(working_directory)
showard170873e2009-01-07 00:22:26 +0000579 if not log_file:
580 log_file = self.get_temporary_path('execute')
581 log_file = self.absolute_path(log_file)
showard170873e2009-01-07 00:22:26 +0000582
showarded2afea2009-07-07 20:54:07 +0000583 self._substitute_working_directory_into_command(command,
showarddb502762009-09-09 15:31:20 +0000584 abs_working_directory)
showarded2afea2009-07-07 20:54:07 +0000585
showard170873e2009-01-07 00:22:26 +0000586 if paired_with_pidfile:
587 drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
588 else:
jamesren76fcf192010-04-21 20:39:50 +0000589 drone = self._choose_drone_for_execution(num_processes, username,
590 drone_hostnames_allowed)
591
592 if not drone:
593 raise DroneManagerError('command failed; no drones available: %s'
594 % command)
595
showardb18134f2009-03-20 20:52:18 +0000596 logging.info("command = %s" % command)
597 logging.info('log file = %s:%s' % (drone.hostname, log_file))
showarddb502762009-09-09 15:31:20 +0000598 self._write_attached_files(working_directory, drone)
599 drone.queue_call('execute_command', command, abs_working_directory,
showard170873e2009-01-07 00:22:26 +0000600 log_file, pidfile_name)
showard418785b2009-11-23 20:19:59 +0000601 drone.active_processes += num_processes
602 self._reorder_drone_queue()
showard170873e2009-01-07 00:22:26 +0000603
showard42d44982009-10-12 20:34:03 +0000604 pidfile_path = os.path.join(abs_working_directory, pidfile_name)
showard170873e2009-01-07 00:22:26 +0000605 pidfile_id = PidfileId(pidfile_path)
606 self.register_pidfile(pidfile_id)
showardd1195652009-12-08 22:21:02 +0000607 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
showard170873e2009-01-07 00:22:26 +0000608 return pidfile_id
609
610
showardd3dc1992009-04-22 21:01:40 +0000611 def get_pidfile_id_from(self, execution_tag, pidfile_name):
612 path = os.path.join(self.absolute_path(execution_tag), pidfile_name)
showard170873e2009-01-07 00:22:26 +0000613 return PidfileId(path)
614
615
616 def register_pidfile(self, pidfile_id):
617 """
618 Indicate that the DroneManager should look for the given pidfile when
619 refreshing.
620 """
showardd1195652009-12-08 22:21:02 +0000621 if pidfile_id not in self._registered_pidfile_info:
showard37399782009-08-20 23:32:20 +0000622 logging.info('monitoring pidfile %s', pidfile_id)
showardd1195652009-12-08 22:21:02 +0000623 self._registered_pidfile_info[pidfile_id] = _PidfileInfo()
showardc6fb6042010-01-25 21:48:20 +0000624 self._reset_pidfile_age(pidfile_id)
625
626
627 def _reset_pidfile_age(self, pidfile_id):
628 if pidfile_id in self._registered_pidfile_info:
629 self._registered_pidfile_info[pidfile_id].age = 0
showard170873e2009-01-07 00:22:26 +0000630
631
showardf85a0b72009-10-07 20:48:45 +0000632 def unregister_pidfile(self, pidfile_id):
showardd1195652009-12-08 22:21:02 +0000633 if pidfile_id in self._registered_pidfile_info:
showardf85a0b72009-10-07 20:48:45 +0000634 logging.info('forgetting pidfile %s', pidfile_id)
showardd1195652009-12-08 22:21:02 +0000635 del self._registered_pidfile_info[pidfile_id]
636
637
638 def declare_process_count(self, pidfile_id, num_processes):
639 self._registered_pidfile_info[pidfile_id].num_processes = num_processes
showardf85a0b72009-10-07 20:48:45 +0000640
641
showard170873e2009-01-07 00:22:26 +0000642 def get_pidfile_contents(self, pidfile_id, use_second_read=False):
643 """
644 Retrieve a PidfileContents object for the given pidfile_id. If
645 use_second_read is True, use results that were read after the processes
646 were checked, instead of before.
647 """
showardc6fb6042010-01-25 21:48:20 +0000648 self._reset_pidfile_age(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000649 if use_second_read:
650 pidfile_map = self._pidfiles_second_read
651 else:
652 pidfile_map = self._pidfiles
653 return pidfile_map.get(pidfile_id, PidfileContents())
654
655
656 def is_process_running(self, process):
657 """
658 Check if the given process is in the running process list.
659 """
Alex Millere76e2252013-08-15 09:24:27 -0700660 if process in self._process_set:
661 return True
662
Alex Miller06a5f752013-08-15 11:16:40 -0700663 drone_pid = process.hostname, process.pid
Alex Millere76e2252013-08-15 09:24:27 -0700664 if drone_pid in self._all_processes:
665 logging.error('Process %s found, but not an autoserv process. '
666 'Is %s', process, self._all_processes[drone_pid])
667 return True
668
669 return False
showard170873e2009-01-07 00:22:26 +0000670
671
672 def get_temporary_path(self, base_name):
673 """
674 Get a new temporary path guaranteed to be unique across all drones
675 for this scheduler execution.
676 """
677 self._temporary_path_counter += 1
678 return os.path.join(drone_utility._TEMPORARY_DIRECTORY,
679 '%s.%s' % (base_name, self._temporary_path_counter))
680
681
showard42d44982009-10-12 20:34:03 +0000682 def absolute_path(self, path, on_results_repository=False):
683 if on_results_repository:
684 base_dir = self._results_dir
685 else:
showardc75fded2009-10-14 16:20:02 +0000686 base_dir = os.path.join(drones.AUTOTEST_INSTALL_DIR,
687 _DRONE_RESULTS_DIR_SUFFIX)
showard42d44982009-10-12 20:34:03 +0000688 return os.path.join(base_dir, path)
showard170873e2009-01-07 00:22:26 +0000689
690
showard678df4f2009-02-04 21:36:39 +0000691 def _copy_results_helper(self, process, source_path, destination_path,
692 to_results_repository=False):
Simran Basi882f15b2013-10-29 14:59:34 -0700693 logging.debug('_copy_results_helper. process: %s, source_path: %s, '
694 'destination_path: %s, to_results_repository: %s',
695 process, source_path, destination_path,
696 to_results_repository)
showard678df4f2009-02-04 21:36:39 +0000697 full_source = self.absolute_path(source_path)
showard42d44982009-10-12 20:34:03 +0000698 full_destination = self.absolute_path(
699 destination_path, on_results_repository=to_results_repository)
showard678df4f2009-02-04 21:36:39 +0000700 source_drone = self._get_drone_for_process(process)
701 if to_results_repository:
702 source_drone.send_file_to(self._results_drone, full_source,
703 full_destination, can_fail=True)
704 else:
705 source_drone.queue_call('copy_file_or_directory', full_source,
706 full_destination)
707
708
showard170873e2009-01-07 00:22:26 +0000709 def copy_to_results_repository(self, process, source_path,
710 destination_path=None):
711 """
712 Copy results from the given process at source_path to destination_path
713 in the results repository.
714 """
715 if destination_path is None:
716 destination_path = source_path
showard678df4f2009-02-04 21:36:39 +0000717 self._copy_results_helper(process, source_path, destination_path,
718 to_results_repository=True)
719
720
721 def copy_results_on_drone(self, process, source_path, destination_path):
722 """
723 Copy a results directory from one place to another on the drone.
724 """
725 self._copy_results_helper(process, source_path, destination_path)
showard170873e2009-01-07 00:22:26 +0000726
727
showarddb502762009-09-09 15:31:20 +0000728 def _write_attached_files(self, results_dir, drone):
729 attached_files = self._attached_files.pop(results_dir, {})
showard73ec0442009-02-07 02:05:20 +0000730 for file_path, contents in attached_files.iteritems():
showard170873e2009-01-07 00:22:26 +0000731 drone.queue_call('write_to_file', self.absolute_path(file_path),
732 contents)
733
734
showarddb502762009-09-09 15:31:20 +0000735 def attach_file_to_execution(self, results_dir, file_contents,
showard170873e2009-01-07 00:22:26 +0000736 file_path=None):
737 """
showarddb502762009-09-09 15:31:20 +0000738 When the process for the results directory is executed, the given file
739 contents will be placed in a file on the drone. Returns the path at
740 which the file will be placed.
showard170873e2009-01-07 00:22:26 +0000741 """
742 if not file_path:
743 file_path = self.get_temporary_path('attach')
showarddb502762009-09-09 15:31:20 +0000744 files_for_execution = self._attached_files.setdefault(results_dir, {})
showard73ec0442009-02-07 02:05:20 +0000745 assert file_path not in files_for_execution
746 files_for_execution[file_path] = file_contents
showard170873e2009-01-07 00:22:26 +0000747 return file_path
748
749
showard35162b02009-03-03 02:17:30 +0000750 def write_lines_to_file(self, file_path, lines, paired_with_process=None):
showard170873e2009-01-07 00:22:26 +0000751 """
752 Write the given lines (as a list of strings) to a file. If
showard35162b02009-03-03 02:17:30 +0000753 paired_with_process is given, the file will be written on the drone
754 running the given Process. Otherwise, the file will be written to the
showard170873e2009-01-07 00:22:26 +0000755 results repository.
756 """
showard170873e2009-01-07 00:22:26 +0000757 file_contents = '\n'.join(lines) + '\n'
showard35162b02009-03-03 02:17:30 +0000758 if paired_with_process:
759 drone = self._get_drone_for_process(paired_with_process)
showard42d44982009-10-12 20:34:03 +0000760 on_results_repository = False
showard170873e2009-01-07 00:22:26 +0000761 else:
762 drone = self._results_drone
showard42d44982009-10-12 20:34:03 +0000763 on_results_repository = True
764 full_path = self.absolute_path(
765 file_path, on_results_repository=on_results_repository)
showard170873e2009-01-07 00:22:26 +0000766 drone.queue_call('write_to_file', full_path, file_contents)
jamesrenc44ae992010-02-19 00:12:54 +0000767
768
Simran Basicced3092012-08-02 15:09:23 -0700769SiteDroneManager = utils.import_site_class(
770 __file__, 'autotest_lib.scheduler.site_drone_manager',
771 'SiteDroneManager', BaseDroneManager)
772
773
774class DroneManager(SiteDroneManager):
775 pass
776
777
jamesrenc44ae992010-02-19 00:12:54 +0000778_the_instance = None
779
780def instance():
781 if _the_instance is None:
782 _set_instance(DroneManager())
783 return _the_instance
784
785
786def _set_instance(instance): # usable for testing
787 global _the_instance
788 _the_instance = instance