* move some scheduler config options into a separate module, scheduler_config
* add a little embedded HTTP server to the scheduler, defined in status_server.py, running in a separate thread. this displays loaded config values and allows reloading of those config values at runtime. in the future we can extend this to do much more.
* make global_config handles empty values as nonexistent values by default. otherwise, we would have to both pass a default= and check for value == '' separately. Now, we just pass default= and it's all taken care of.
Signed-off-by: Steve Howard <showard@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@2608 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/client/common_lib/global_config.py b/client/common_lib/global_config.py
index 18c810e..309f552 100644
--- a/client/common_lib/global_config.py
+++ b/client/common_lib/global_config.py
@@ -38,18 +38,26 @@
self.config = None
- def get_config_value(self, section, key, type=str, default=None):
+ def _handle_no_value(self, section, key, default):
+ if default is None:
+ msg = ("Value '%s' not found in section '%s'" %
+ (key, section))
+ raise ConfigError(msg)
+ else:
+ return default
+
+
+ def get_config_value(self, section, key, type=str, default=None,
+ allow_blank=False):
self._ensure_config_parsed()
try:
val = self.config.get(section, key)
- except:
- if default is None:
- msg = ("Value '%s' not found in section '%s'" %
- (key, section))
- raise ConfigError(msg)
- else:
- return default
+ except ConfigParser.Error:
+ return self._handle_no_value(section, key, default)
+
+ if not val.strip() and not allow_blank:
+ return self._handle_no_value(section, key, default)
return self.convert_value(key, section, val, type, default)
diff --git a/scheduler/drone_utility.py b/scheduler/drone_utility.py
index 7e69ba0..2d305bc 100644
--- a/scheduler/drone_utility.py
+++ b/scheduler/drone_utility.py
@@ -5,7 +5,7 @@
import common
from autotest_lib.client.common_lib import utils, global_config, error
from autotest_lib.server import hosts, subcommand
-from autotest_lib.scheduler import email_manager
+from autotest_lib.scheduler import email_manager, scheduler_config
_TEMPORARY_DIRECTORY = 'drone_tmp'
_TRANSFER_FAILED_FILE = '.transfer_failed'
@@ -41,8 +41,6 @@
All paths going into and out of this class are absolute.
"""
_PS_ARGS = ['pid', 'pgid', 'ppid', 'comm', 'args']
- _MAX_TRANSFER_PROCESSES = global_config.global_config.get_config_value(
- 'SCHEDULER', 'max_transfer_processes', type=int)
_WARNING_DURATION = 60
def __init__(self):
@@ -243,7 +241,8 @@
start_time = time.time()
for method_call in calls:
results.append(method_call.execute_on(self))
- if len(self._subcommands) >= self._MAX_TRANSFER_PROCESSES:
+ max_processes = scheduler_config.config.max_transfer_processes
+ if len(self._subcommands) >= max_processes:
self.wait_for_async_commands()
self.wait_for_async_commands()
diff --git a/scheduler/email_manager.py b/scheduler/email_manager.py
index bc30f78..157fcf1 100644
--- a/scheduler/email_manager.py
+++ b/scheduler/email_manager.py
@@ -9,12 +9,10 @@
self._emails = []
self._from_address = global_config.global_config.get_config_value(
- CONFIG_SECTION, "notify_email_from")
- if not self._from_address:
- self._from_address = getpass.getuser()
+ CONFIG_SECTION, "notify_email_from", default=getpass.getuser())
self._notify_address = global_config.global_config.get_config_value(
- CONFIG_SECTION, "notify_email")
+ CONFIG_SECTION, "notify_email", default='')
def send_email(self, to_string, subject, body):
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index a47f520..ee62ed8 100644
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -15,11 +15,11 @@
from autotest_lib.database import database_connection
from autotest_lib.frontend.afe import models
from autotest_lib.scheduler import drone_manager, drones, email_manager
+from autotest_lib.scheduler import status_server, scheduler_config
RESULTS_DIR = '.'
AUTOSERV_NICE_LEVEL = 10
-CONFIG_SECTION = 'SCHEDULER'
DB_CONFIG_SECTION = 'AUTOTEST_WEB'
AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
@@ -66,16 +66,14 @@
RESULTS_DIR = args[0]
c = global_config.global_config
- notify_statuses_list = c.get_config_value(CONFIG_SECTION,
- "notify_email_statuses")
+ notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
+ "notify_email_statuses",
+ default='')
global _notify_email_statuses
_notify_email_statuses = [status for status in
re.split(r'[\s,;:]', notify_statuses_list.lower())
if status]
- tick_pause = c.get_config_value(CONFIG_SECTION, 'tick_pause_sec',
- type=int)
-
if options.test:
global _autoserv_path
_autoserv_path = 'autoserv_dummy'
@@ -98,6 +96,9 @@
sys.exit(1)
_base_url = 'http://%s/afe/' % server_name
+ server = status_server.StatusServer()
+ server.start()
+
init(options.logfile)
dispatcher = Dispatcher()
dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
@@ -105,7 +106,7 @@
try:
while not _shutdown:
dispatcher.tick()
- time.sleep(tick_pause)
+ time.sleep(scheduler_config.config.tick_pause_sec)
except:
email_manager.manager.log_stacktrace(
"Uncaught exception; terminating monitor_db")
@@ -145,14 +146,11 @@
print "Setting signal handler"
signal.signal(signal.SIGINT, handle_sigint)
- drones = global_config.global_config.get_config_value(CONFIG_SECTION,
- 'drones')
- if drones:
- drone_list = [hostname.strip() for hostname in drones.split(',')]
- else:
- drone_list = ['localhost']
+ drones = global_config.global_config.get_config_value(
+ scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
+ drone_list = [hostname.strip() for hostname in drones.split(',')]
results_host = global_config.global_config.get_config_value(
- CONFIG_SECTION, 'results_host', default='localhost')
+ scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
_drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
print "Connected! Running..."
@@ -379,19 +377,6 @@
class Dispatcher(object):
- max_running_processes = global_config.global_config.get_config_value(
- CONFIG_SECTION, 'max_running_jobs', type=int)
- max_processes_started_per_cycle = (
- global_config.global_config.get_config_value(
- CONFIG_SECTION, 'max_jobs_started_per_cycle', type=int))
- clean_interval = (
- global_config.global_config.get_config_value(
- CONFIG_SECTION, 'clean_interval_minutes', type=int))
- synch_job_start_timeout_minutes = (
- global_config.global_config.get_config_value(
- CONFIG_SECTION, 'synch_job_start_timeout_minutes',
- type=int))
-
def __init__(self):
self._agents = []
self._last_clean_time = time.time()
@@ -419,7 +404,10 @@
def _run_cleanup_maybe(self):
- if self._last_clean_time + self.clean_interval * 60 < time.time():
+ should_cleanup = (self._last_clean_time +
+ scheduler_config.config.clean_interval * 60 <
+ time.time())
+ if should_cleanup:
print 'Running cleanup'
self._abort_timed_out_jobs()
self._abort_jobs_past_synch_start_timeout()
@@ -635,7 +623,7 @@
config) and are holding a machine that's in everyone.
"""
timeout_delta = datetime.timedelta(
- minutes=self.synch_job_start_timeout_minutes)
+ minutes=scheduler_config.config.synch_job_start_timeout_minutes)
timeout_start = datetime.datetime.now() - timeout_delta
query = models.Job.objects.filter(
created_on__lt=timeout_start,
@@ -712,7 +700,7 @@
return False
# total process throttling
if (num_running_processes + agent.num_processes >
- self.max_running_processes):
+ scheduler_config.config.max_running_processes):
return False
# if a single agent exceeds the per-cycle throttling, still allow it to
# run when it's the first agent in the cycle
@@ -720,7 +708,7 @@
return True
# per-cycle throttling
if (num_started_this_cycle + agent.num_processes >
- self.max_processes_started_per_cycle):
+ scheduler_config.config.max_processes_started_per_cycle):
return False
return True
@@ -1387,8 +1375,6 @@
class FinalReparseTask(AgentTask):
- MAX_PARSE_PROCESSES = global_config.global_config.get_config_value(
- CONFIG_SECTION, 'max_parse_processes', type=int)
_num_running_parses = 0
def __init__(self, queue_entries):
@@ -1428,7 +1414,8 @@
@classmethod
def _can_run_new_parse(cls):
- return cls._num_running_parses < cls.MAX_PARSE_PROCESSES
+ return (cls._num_running_parses <
+ scheduler_config.config.max_parse_processes)
def _determine_final_status(self):
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 2dff024..13ccfa7 100644
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -11,6 +11,7 @@
from autotest_lib.frontend import thread_local
from autotest_lib.frontend.afe import models
from autotest_lib.scheduler import monitor_db, drone_manager, email_manager
+from autotest_lib.scheduler import scheduler_config
_DEBUG = False
@@ -419,8 +420,9 @@
def setUp(self):
super(DispatcherThrottlingTest, self).setUp()
- self._dispatcher.max_running_processes = self._MAX_RUNNING
- self._dispatcher.max_processes_started_per_cycle = self._MAX_STARTED
+ scheduler_config.config.max_running_processes = self._MAX_RUNNING
+ scheduler_config.config.max_processes_started_per_cycle = (
+ self._MAX_STARTED)
def _setup_some_agents(self, num_agents):
@@ -563,7 +565,7 @@
def _test_synch_start_timeout_helper(self, expect_abort,
set_created_on=True, set_active=True,
set_acl=True):
- self._dispatcher.synch_job_start_timeout_minutes = 60
+ scheduler_config.config.synch_job_start_timeout_minutes = 60
job = self._create_job(hosts=[1, 2])
if set_active:
hqe = job.hostqueueentry_set.filter(host__id=1)[0]
diff --git a/scheduler/scheduler_config.py b/scheduler/scheduler_config.py
new file mode 100644
index 0000000..5698f87
--- /dev/null
+++ b/scheduler/scheduler_config.py
@@ -0,0 +1,32 @@
+import common
+from autotest_lib.client.common_lib import global_config
+
+CONFIG_SECTION = 'SCHEDULER'
+
+class SchedulerConfig(object):
+ """
+ Contains configuration that can be changed during scheduler execution.
+ """
+ FIELDS = {'max_running_processes' : 'max_running_jobs',
+ 'max_processes_started_per_cycle' : 'max_jobs_started_per_cycle',
+ 'clean_interval' : 'clean_interval_minutes',
+ 'synch_job_start_timeout_minutes' :
+ 'synch_job_start_timeout_minutes',
+ 'max_parse_processes' : 'max_parse_processes',
+ 'tick_pause_sec' : 'tick_pause_sec',
+ 'max_transfer_processes' : 'max_transfer_processes',
+ }
+
+ def __init__(self):
+ self.read_config()
+
+
+ def read_config(self):
+ config = global_config.global_config
+ config.parse_config_file()
+ for field, config_option in self.FIELDS.iteritems():
+ setattr(self, field, config.get_config_value(CONFIG_SECTION,
+ config_option,
+ type=int))
+
+config = SchedulerConfig()
diff --git a/scheduler/status_server.py b/scheduler/status_server.py
new file mode 100644
index 0000000..2444f8d
--- /dev/null
+++ b/scheduler/status_server.py
@@ -0,0 +1,84 @@
+import os, BaseHTTPServer, cgi, threading
+import common
+from autotest_lib.scheduler import scheduler_config
+
+_PORT = 13467
+
+_HEADER = """
+<html>
+<head><title>Scheduler status</title></head>
+<body>
+Actions:<br>
+<a href="?reparse_config=1">Reparse global config values</a><br>
+<br>
+"""
+
+_FOOTER = """
+</body>
+</html>
+"""
+
+class StatusServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ def _send_headers(self):
+ self.send_response(200, 'OK')
+ self.send_header('Content-Type', 'text/html')
+ self.end_headers()
+
+
+ def _parse_arguments(self):
+ path_parts = self.path.split('?', 1)
+ if len(path_parts) == 1:
+ return {}
+
+ encoded_args = path_parts[1]
+ return cgi.parse_qs(encoded_args)
+
+
+ def _write_line(self, line=''):
+ self.wfile.write(line + '<br>\n')
+
+
+ def _write_field(self, field, value):
+ self._write_line('%s=%s' % (field, value))
+
+
+ def _write_all_fields(self):
+ self._write_line('Config values:')
+ for field in scheduler_config.SchedulerConfig.FIELDS:
+ self._write_field(field, getattr(scheduler_config.config, field))
+ self._write_line()
+
+
+ def _execute_actions(self, arguments):
+ if 'reparse_config' in arguments:
+ scheduler_config.config.read_config()
+ self._write_line('Updated config!')
+ self._write_line()
+
+
+ def do_GET(self):
+ self._send_headers()
+ self.wfile.write(_HEADER)
+
+ arguments = self._parse_arguments()
+ self._execute_actions(arguments)
+ self._write_all_fields()
+
+ self.wfile.write(_FOOTER)
+
+
+class StatusServer(object):
+ def __init__(self):
+ self._address = ('', _PORT)
+ self._httpd = BaseHTTPServer.HTTPServer(self._address,
+ StatusServerRequestHandler)
+
+
+ def _run(self):
+ print 'Status server running on', self._address
+ self._httpd.serve_forever()
+
+
+ def start(self):
+ self._thread = threading.Thread(target=self._run, name='status_server')
+ self._thread.start()