Scott Zawalski | 20a9b58 | 2011-11-21 11:49:40 -0800 | [diff] [blame] | 1 | #!/usr/bin/python |
| 2 | # |
| 3 | # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. |
| 4 | # Use of this source code is governed by a BSD-style license that can be |
| 5 | # found in the LICENSE file. |
| 6 | |
| 7 | """Updates all unlocked hosts in Autotest lab in parallel at a given rate. |
| 8 | |
| 9 | Used to update all hosts, or only those of a given platform, in the Autotest |
| 10 | lab to a given version. Allows a configurable number of updates to be started in |
| 11 | parallel. Updates can also be staggered to reduce load.""" |
| 12 | |
| 13 | import logging |
| 14 | import os |
| 15 | import subprocess |
| 16 | import sys |
| 17 | import threading |
| 18 | import time |
| 19 | import traceback |
| 20 | |
| 21 | from collections import deque |
| 22 | from optparse import OptionParser |
| 23 | |
| 24 | |
| 25 | # Default number of hosts to update in parallel. |
| 26 | DEFAULT_CONCURRENCY = 10 |
| 27 | |
| 28 | |
| 29 | # By default do not stagger any of the updates. |
| 30 | DEFAULT_STAGGER = 0 |
| 31 | |
| 32 | |
| 33 | # Default location of ChromeOS checkout. |
| 34 | DEFAULT_GCLIENT_ROOT = '/usr/local/google/home/${USER}/chromeos/chromeos' |
| 35 | |
| 36 | |
| 37 | # Default path for individual host logs. Each host will have it's own file. E.g. |
| 38 | # <default_log_path>/<host>.log |
| 39 | DEFAULT_LOG_PATH = '/tmp/mass_update_logs/%s/' % time.strftime('%Y-%m-%d-%H-%M', |
| 40 | time.gmtime()) |
| 41 | |
| 42 | |
| 43 | # Location of Autotest cli executable. |
| 44 | AUTOTEST_LOCATION = '/home/chromeos-test/autotest/cli' |
| 45 | |
| 46 | |
| 47 | # Default time in seconds to sleep while waiting for threads to complete. |
| 48 | DEFAULT_SLEEP = 10 |
| 49 | |
| 50 | |
| 51 | # Amount of time in seconds to wait before declaring an update as failed. |
| 52 | DEFAULT_TIMEOUT = 2400 |
| 53 | |
| 54 | |
| 55 | class MassUpdateStatus(): |
| 56 | """Used to track status for all updates.""" |
| 57 | ssh_failures = [] |
| 58 | update_failures = [] |
| 59 | successful_updates = 0 |
| 60 | |
| 61 | |
| 62 | class UpdateThread(threading.Thread): |
| 63 | """Responsible for ssh-test, locking, imaging, and unlocking a host. |
| 64 | |
| 65 | Uses the atest CLI for host control and the image_to_live script to actually |
| 66 | update the host. Each thread will continue to process hosts until the queue |
| 67 | is empty.""" |
| 68 | |
| 69 | _SUCCESS = 0 # Update was successful. |
| 70 | _SSH_FAILURE = 1 # Could not SSH to host or related SSH failure. |
| 71 | _UPDATE_FAILURE = 2 # Update failed for any reason other than SSH. |
| 72 | |
| 73 | def __init__(self, options, hosts, status): |
| 74 | self._options = options |
| 75 | self._hosts = hosts |
| 76 | self._status = status |
| 77 | self._logger = logging.getLogger() |
| 78 | threading.Thread.__init__(self) |
| 79 | |
| 80 | def run(self): |
| 81 | while self._hosts: |
| 82 | host = self._hosts.popleft() |
| 83 | status = UpdateThread._UPDATE_FAILURE |
| 84 | |
| 85 | self._logger.info('Updating host %s' % host) |
| 86 | try: |
| 87 | try: |
| 88 | if not CheckSSH(host=host, options=self._options): |
| 89 | status = UpdateThread._SSH_FAILURE |
| 90 | elif LockHost(host) and ImageHost(host=host, options=self._options): |
| 91 | status = UpdateThread._SUCCESS |
| 92 | finally: |
| 93 | if status == UpdateThread._SUCCESS: |
| 94 | self._logger.info( |
| 95 | 'Completed update for host %s successfully.' % host) |
| 96 | self._status.successful_updates += 1 |
| 97 | elif status == UpdateThread._SSH_FAILURE: |
| 98 | self._logger.info('Failed to SSH to host %s.' % host) |
| 99 | self._status.ssh_failures.append(host) |
| 100 | else: |
| 101 | self._logger.info('Failed to update host %s.' % host) |
| 102 | self._status.update_failures.append(host) |
| 103 | |
| 104 | UnlockHost(host) |
| 105 | except: |
| 106 | traceback.print_exc() |
| 107 | self._logger.warning( |
| 108 | 'Exception encountered during update. Skipping host %s.' % host) |
| 109 | |
| 110 | |
| 111 | def CheckSSH(host, options): |
| 112 | """Uses the ssh_test script to ensure SSH access to a host is available. |
| 113 | |
| 114 | Returns true if an SSH connection to the host was successful.""" |
| 115 | return subprocess.Popen( |
| 116 | '%s/src/scripts/ssh_test.sh --remote=%s' % (options.gclient, host), |
| 117 | shell=True, |
| 118 | stdout=subprocess.PIPE, |
| 119 | stderr=subprocess.PIPE).wait() == 0 |
| 120 | |
| 121 | |
| 122 | def ImageHost(host, options): |
| 123 | """Uses the image_to_live script to update a host. |
| 124 | |
| 125 | Returns true if the imaging process was successful.""" |
| 126 | log_file = open(os.path.join(options.log, host + '.log'), 'w') |
| 127 | log_file_err = open(os.path.join(options.log, host + '.log.err'), 'w') |
| 128 | |
| 129 | exit_code = subprocess.Popen( |
| 130 | ('/usr/local/scripts/alarm %d %s/src/scripts/image_to_live.sh ' |
| 131 | '--update_url %s --remote %s' % (DEFAULT_TIMEOUT, options.gclient, |
| 132 | options.url, host)), |
| 133 | shell=True, |
| 134 | stdout=log_file, |
| 135 | stderr=log_file_err).wait() |
| 136 | |
| 137 | log_file.close() |
| 138 | log_file_err.close() |
| 139 | |
| 140 | return exit_code == 0 |
| 141 | |
| 142 | |
| 143 | def LockHost(host): |
| 144 | """Locks a host using the atest CLI. |
| 145 | |
| 146 | Locking a host tells Autotest that the host shouldn't be scheduled for |
| 147 | any other tasks. Returns true if the locking process was successful.""" |
| 148 | success = subprocess.Popen( |
| 149 | '%s/atest host mod -l %s' % (AUTOTEST_LOCATION, host), |
| 150 | shell=True, |
| 151 | stdout=subprocess.PIPE, |
| 152 | stderr=subprocess.PIPE).wait() == 0 |
| 153 | |
| 154 | if not success: |
| 155 | logging.getLogger().info('Failed to lock host %s.' % host) |
| 156 | |
| 157 | return success |
| 158 | |
| 159 | |
| 160 | def UnlockHost(host): |
| 161 | """Unlocks a host using the atest CLI. |
| 162 | |
| 163 | Unlocking a host tells Autotest that the host is okay to be scheduled |
| 164 | for other tasks. Returns true if the unlocking process was successful.""" |
| 165 | success = subprocess.Popen( |
| 166 | '%s/atest host mod -u %s' % (AUTOTEST_LOCATION, host), |
| 167 | shell=True, |
| 168 | stdout=subprocess.PIPE, |
| 169 | stderr=subprocess.PIPE).wait() == 0 |
| 170 | |
| 171 | if not success: |
| 172 | logging.getLogger().info('Failed to unlock host %s.' % host) |
| 173 | |
| 174 | return success |
| 175 | |
| 176 | |
| 177 | def GetHostQueue(options): |
| 178 | """Returns a queue containing unlocked hosts retrieved from the atest CLI. |
| 179 | |
| 180 | If options.label has been specified only unlocked hosts with the specified |
| 181 | label will be returned.""" |
| 182 | cmd = ('%s/atest host list --unlocked -s Ready -a acl_cros_test' |
| 183 | % AUTOTEST_LOCATION) |
| 184 | |
| 185 | if options.label: |
| 186 | cmd += ' -b ' + options.label |
| 187 | |
| 188 | # atest host list will return a tabular data set. Use sed to remove the first |
| 189 | # line which contains column labels we don't need. Then since the first column |
| 190 | # contains the host name, use awk to extract it |
| 191 | cmd += " | sed '1d' | awk '{print $1}'" |
| 192 | |
| 193 | stdout = subprocess.Popen(cmd, |
| 194 | shell=True, |
| 195 | stdout=subprocess.PIPE, |
| 196 | stderr=subprocess.PIPE).communicate()[0] |
| 197 | |
| 198 | return deque(item.strip() for item in stdout.split('\n') if item.strip()) |
| 199 | |
| 200 | |
| 201 | def ParseOptions(): |
| 202 | usage = 'usage: %prog --url=<update url> [options]' |
| 203 | parser = OptionParser(usage) |
| 204 | parser.add_option('-b', '--label', dest='label', |
| 205 | help='Only update hosts with the specified label.') |
| 206 | parser.add_option('-c', '--concurrent', dest='concurrent', |
| 207 | default=DEFAULT_CONCURRENCY, |
| 208 | help=('Number of hosts to be updated concurrently. ' |
| 209 | 'Defaults to %d hosts.') % DEFAULT_CONCURRENCY) |
| 210 | parser.add_option('-g', '--gclient', dest='gclient', |
| 211 | default=DEFAULT_GCLIENT_ROOT, |
| 212 | help=('Location of ChromeOS checkout. defaults to %s' |
| 213 | % DEFAULT_GCLIENT_ROOT)) |
| 214 | parser.add_option('-l', '--log', dest='log', |
| 215 | default=DEFAULT_LOG_PATH, |
| 216 | help=('Where to put individual host log files. ' |
| 217 | 'Defaults to %s' % DEFAULT_LOG_PATH)) |
| 218 | parser.add_option('-s', '--stagger', dest='stagger', |
| 219 | default=DEFAULT_STAGGER, |
| 220 | help=('Attempt to stagger updates. Waits the given amount ' |
| 221 | 'of time in minutes before starting each updater. ' |
| 222 | 'Updates will still overlap if the value is set as a ' |
| 223 | 'multiple of the update period.')) |
| 224 | parser.add_option('-u', '--url', dest='url', |
| 225 | help='Update URL. Points to build for updating hosts.') |
| 226 | |
| 227 | options = parser.parse_args()[0] |
| 228 | |
| 229 | if options.url is None: |
| 230 | parser.error('An update URL must be provided.') |
| 231 | |
| 232 | return options |
| 233 | |
| 234 | |
| 235 | def InitializeLogging(): |
| 236 | """Configure the global logger for time/date stamping console output. |
| 237 | |
| 238 | Returns a logger object for convenience.""" |
| 239 | logger = logging.getLogger() |
| 240 | logger.setLevel(logging.INFO) |
| 241 | |
| 242 | stream_handler = logging.StreamHandler() |
| 243 | stream_handler.setLevel(logging.INFO) |
| 244 | stream_handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) |
| 245 | logger.addHandler(stream_handler) |
| 246 | return logger |
| 247 | |
| 248 | |
| 249 | def main(): |
| 250 | options = ParseOptions() |
| 251 | hosts = GetHostQueue(options) |
| 252 | logger = InitializeLogging() |
| 253 | status = MassUpdateStatus() |
| 254 | |
| 255 | # Create log folder if it doesn't exist. |
| 256 | if not os.path.exists(options.log): |
| 257 | os.makedirs(options.log) |
| 258 | |
| 259 | logger.info('Starting update using URL %s' % options.url) |
| 260 | logger.info('Individual host logs can be found under %s' % options.log) |
| 261 | |
| 262 | try: |
| 263 | # Spawn processing threads which will handle lock, update, and unlock. |
| 264 | for i in range(int(options.concurrent)): |
| 265 | UpdateThread(hosts=hosts, options=options, status=status).start() |
| 266 | |
| 267 | # Stagger threads if the option has been enabled. |
| 268 | if options.stagger > 0: |
| 269 | time.sleep(int(options.stagger) * 60) |
| 270 | |
| 271 | # Wait for all hosts to be processed and threads to complete. NOTE: Not |
| 272 | # using hosts.join() here because it does not behave properly with CTRL-C |
| 273 | # and KeyboardInterrupt. |
| 274 | while len(threading.enumerate()) > 1: |
| 275 | time.sleep(DEFAULT_SLEEP) |
| 276 | except: |
| 277 | traceback.print_exc() |
| 278 | logger.warning( |
| 279 | 'Update process aborted. Some machines may be left locked or updating.') |
| 280 | sys.exit(1) |
| 281 | finally: |
| 282 | logger.info( |
| 283 | ('Mass updating complete. %d hosts updated successfully, %d failed.' % |
| 284 | (status.successful_updates, len(status.ssh_failures) + |
| 285 | len(status.update_failures)))) |
| 286 | |
| 287 | logger.info(('-' * 25) + '[ SUMMARY ]' + ('-' * 25)) |
| 288 | |
| 289 | for host in status.ssh_failures: |
| 290 | logger.info('Failed to SSH to host %s.' % host) |
| 291 | |
| 292 | for host in status.update_failures: |
| 293 | logger.info('Failed to update host %s.' % host) |
| 294 | |
| 295 | if len(status.ssh_failures) == 0 and len(status.update_failures) == 0: |
| 296 | logger.info('All hosts updated successfully.') |
| 297 | |
| 298 | logger.info('-' * 61) |
| 299 | |
| 300 | |
| 301 | if __name__ == '__main__': |
| 302 | main() |