blob: 02f6b975801a1dd5dee882be56a649a6d83d1b67 [file] [log] [blame]
Scott Zawalski20a9b582011-11-21 11:49:40 -08001#!/usr/bin/python
2#
Scott Zawalskicb2e2b72012-04-17 12:10:05 -04003# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
Scott Zawalski20a9b582011-11-21 11:49:40 -08004# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
J. Richard Barnetteea785362014-03-17 16:00:53 -07007"""Script to archive old Autotest results to Google Storage.
Scott Zawalski20a9b582011-11-21 11:49:40 -08008
J. Richard Barnetteea785362014-03-17 16:00:53 -07009Uses gsutil to archive files to the configured Google Storage bucket.
10Upon successful copy, the local results directory is deleted.
Scott Zawalski20a9b582011-11-21 11:49:40 -080011"""
12
Simran Basibd9ded02013-11-04 15:49:11 -080013import datetime
Simran Basi9523eaa2012-06-28 17:18:45 -070014import logging
Scott Zawalski20a9b582011-11-21 11:49:40 -080015import os
Scott Zawalski20a9b582011-11-21 11:49:40 -080016import shutil
Simran Basi9523eaa2012-06-28 17:18:45 -070017import signal
Simran Basi981a9272012-11-14 10:46:03 -080018import socket
Scott Zawalski20a9b582011-11-21 11:49:40 -080019import subprocess
20import sys
Simran Basi9523eaa2012-06-28 17:18:45 -070021import tempfile
22import time
Scott Zawalskicb2e2b72012-04-17 12:10:05 -040023
Simran Basi7d9a1492012-10-25 13:51:54 -070024from optparse import OptionParser
25
Simran Basi981a9272012-11-14 10:46:03 -080026import common
Simran Basidd129972014-09-11 14:34:49 -070027from autotest_lib.client.common_lib import utils
Simran Basi981a9272012-11-14 10:46:03 -080028
Alex Millerc900b342014-06-09 16:52:07 -070029try:
30 # Does not exist, nor is needed, on moblab.
31 import psutil
32except ImportError:
33 psutil = None
34
J. Richard Barnetteea785362014-03-17 16:00:53 -070035import job_directories
Simran Basi981a9272012-11-14 10:46:03 -080036from autotest_lib.client.common_lib import global_config
Jakob Juelichc17f3112014-09-11 14:32:30 -070037from autotest_lib.client.common_lib.cros.graphite import stats
Simran Basi981a9272012-11-14 10:46:03 -080038from autotest_lib.scheduler import email_manager
Fang Deng970b6a72013-04-09 11:59:16 -070039from chromite.lib import parallel
Scott Zawalski20a9b582011-11-21 11:49:40 -080040
Scott Zawalski20a9b582011-11-21 11:49:40 -080041
Jakob Juelichc17f3112014-09-11 14:32:30 -070042STATS_KEY = 'gs_offloader.%s' % socket.gethostname()
43
44timer = stats.Timer(STATS_KEY)
45
Scott Zawalski20a9b582011-11-21 11:49:40 -080046# Nice setting for process, the higher the number the lower the priority.
47NICENESS = 10
48
J. Richard Barnetteea785362014-03-17 16:00:53 -070049# Maximum number of seconds to allow for offloading a single
50# directory.
J. Richard Barnette7e0f8592014-09-03 17:00:55 -070051OFFLOAD_TIMEOUT_SECS = 60 * 60
Simran Basi9523eaa2012-06-28 17:18:45 -070052
Simran Basi392d4a52012-12-14 10:29:44 -080053# Sleep time per loop.
54SLEEP_TIME_SECS = 5
55
J. Richard Barnetteea785362014-03-17 16:00:53 -070056# Minimum number of seconds between e-mail reports.
57REPORT_INTERVAL_SECS = 60 * 60
58
Scott Zawalski20a9b582011-11-21 11:49:40 -080059# Location of Autotest results on disk.
60RESULTS_DIR = '/usr/local/autotest/results'
61
Simran Basi31d561d2012-07-31 13:44:40 -070062# Hosts sub-directory that contains cleanup, verify and repair jobs.
63HOSTS_SUB_DIR = 'hosts'
64
Alex Miller0c8db6d2013-02-15 15:41:00 -080065LOG_LOCATION = '/usr/local/autotest/logs/'
66LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt'
67LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
Simran Basi9523eaa2012-06-28 17:18:45 -070068LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
69
Alex Miller0c8db6d2013-02-15 15:41:00 -080070# pylint: disable=E1120
Simran Basi981a9272012-11-14 10:46:03 -080071NOTIFY_ADDRESS = global_config.global_config.get_config_value(
72 'SCHEDULER', 'notify_email', default='')
73
74ERROR_EMAIL_SUBJECT_FORMAT = 'GS Offloader notifications from %s'
J. Richard Barnetteea785362014-03-17 16:00:53 -070075ERROR_EMAIL_REPORT_FORMAT = '''\
76gs_offloader is failing to offload results directories.
Simran Basi981a9272012-11-14 10:46:03 -080077
J. Richard Barnetteea785362014-03-17 16:00:53 -070078First failure Count Directory name
79=================== ====== ==============================
80'''
81# --+----1----+---- ----+ ----+----1----+----2----+----3
82
83ERROR_EMAIL_DIRECTORY_FORMAT = '%19s %5d %-1s\n'
84ERROR_EMAIL_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
Simran Basi9523eaa2012-06-28 17:18:45 -070085
Jakob Juelich24f22c22014-09-26 11:46:11 -070086USE_RSYNC_ENABLED = global_config.global_config.get_config_value(
87 'CROS', 'gs_offloader_use_rsync', type=bool, default=False)
88
89
Simran Basi9523eaa2012-06-28 17:18:45 -070090class TimeoutException(Exception):
beeps8c30db12013-03-30 18:20:27 -070091 """Exception raised by the timeout_handler."""
Simran Basi9523eaa2012-06-28 17:18:45 -070092 pass
93
94
95def timeout_handler(_signum, _frame):
J. Richard Barnetteea785362014-03-17 16:00:53 -070096 """Handler for SIGALRM when the offloading process times out.
Simran Basi9523eaa2012-06-28 17:18:45 -070097
beeps8c30db12013-03-30 18:20:27 -070098 @param _signum: Signal number of the signal that was just caught.
99 14 for SIGALRM.
100 @param _frame: Current stack frame.
Simran Basi9523eaa2012-06-28 17:18:45 -0700101 @raise TimeoutException: Automatically raises so that the time out is caught
102 by the try/except surrounding the Popen call.
J. Richard Barnetteea785362014-03-17 16:00:53 -0700103
Simran Basi9523eaa2012-06-28 17:18:45 -0700104 """
105 raise TimeoutException('Process Timed Out')
106
107
Simran Basidd129972014-09-11 14:34:49 -0700108def get_cmd_list(dir_entry, gs_path):
J. Richard Barnetteea785362014-03-17 16:00:53 -0700109 """Return the command to offload a specified directory.
Simran Basi9523eaa2012-06-28 17:18:45 -0700110
111 @param dir_entry: Directory entry/path that which we need a cmd_list to
112 offload.
Simran Basidd129972014-09-11 14:34:49 -0700113 @param gs_path: Location in google storage where we will
114 offload the directory.
Simran Basi9523eaa2012-06-28 17:18:45 -0700115
116 @return: A command list to be executed by Popen.
J. Richard Barnetteea785362014-03-17 16:00:53 -0700117
Simran Basi9523eaa2012-06-28 17:18:45 -0700118 """
Jakob Juelich24f22c22014-09-26 11:46:11 -0700119 if USE_RSYNC_ENABLED:
120 return ['gsutil', '-m', 'rsync', '-eR',
121 dir_entry, os.path.join(gs_path, os.path.basename(dir_entry))]
J. Richard Barnetteea785362014-03-17 16:00:53 -0700122 return ['gsutil', '-m', 'cp', '-eR', '-a', 'project-private',
Simran Basidd129972014-09-11 14:34:49 -0700123 dir_entry, gs_path]
Simran Basi9523eaa2012-06-28 17:18:45 -0700124
Jakob Juelich24f22c22014-09-26 11:46:11 -0700125
Jakob Juelichc17f3112014-09-11 14:32:30 -0700126def get_directory_size_kibibytes_cmd_list(directory):
127 """Returns command to get a directory's total size."""
128 # Having this in its own method makes it easier to mock in unittests.
129 return ['du', '-sk', directory]
130
131
132def get_directory_size_kibibytes(directory):
133 """Calculate the total size of a directory with all its contents.
134
135 @param directory: Path to the directory
136
137 @returns: Size of the directory in kibibytes.
138 """
139 cmd = get_directory_size_kibibytes_cmd_list(directory)
140 process = subprocess.Popen(cmd,
141 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
142 stdout_data, stderr_data = process.communicate()
143
144 if process.returncode != 0:
145 # This function is used for statistics only, if it fails, nothing else
146 # should crash.
147 logging.warning('Getting size of %s failed. Stderr:', directory)
148 logging.warning(stderr_data)
149 return 0
150
151 return int(stdout_data.split('\t', 1)[0])
152
153
Simran Basidd129972014-09-11 14:34:49 -0700154def get_offload_dir_func(gs_uri):
155 """Returns the offload directory function for the given gs_uri
Simran Basi9523eaa2012-06-28 17:18:45 -0700156
Simran Basidd129972014-09-11 14:34:49 -0700157 @param gs_uri: Google storage bucket uri to offload to.
J. Richard Barnetteea785362014-03-17 16:00:53 -0700158
Simran Basidd129972014-09-11 14:34:49 -0700159 @returns offload_dir function to preform the offload.
Simran Basi9523eaa2012-06-28 17:18:45 -0700160 """
Simran Basidd129972014-09-11 14:34:49 -0700161 @timer.decorate
162 def offload_dir(dir_entry, dest_path):
163 """Offload the specified directory entry to Google storage.
Jakob Juelichc17f3112014-09-11 14:32:30 -0700164
Simran Basidd129972014-09-11 14:34:49 -0700165 @param dir_entry: Directory entry to offload.
166 @param dest_path: Location in google storage where we will offload
167 the directory.
Jakob Juelichc17f3112014-09-11 14:32:30 -0700168
Simran Basidd129972014-09-11 14:34:49 -0700169 """
170 try:
171 counter = stats.Counter(STATS_KEY)
172 counter.increment('jobs_offload_started')
Jakob Juelichc17f3112014-09-11 14:32:30 -0700173
Simran Basidd129972014-09-11 14:34:49 -0700174 error = False
175 stdout_file = tempfile.TemporaryFile('w+')
176 stderr_file = tempfile.TemporaryFile('w+')
177 process = None
178 signal.alarm(OFFLOAD_TIMEOUT_SECS)
179 gs_path = '%s%s' % (gs_uri, dest_path)
180 process = subprocess.Popen(get_cmd_list(dir_entry, gs_path),
181 stdout=stdout_file, stderr=stderr_file)
182 process.wait()
183 signal.alarm(0)
184
185 if process.returncode == 0:
186 kibibytes_transferred = get_directory_size_kibibytes(dir_entry)
187
188 counter.increment('kibibytes_transferred_total', kibibytes_transferred)
189 stats.Gauge(STATS_KEY).send(
190 'kibibytes_transferred', kibibytes_transferred)
191 counter.increment('jobs_offloaded')
192 shutil.rmtree(dir_entry)
193 else:
194 error = True
195 except TimeoutException:
196 # If we finished the call to Popen(), we may need to terminate
197 # the child process. We don't bother calling process.poll();
198 # that inherently races because the child can die any time it
199 # wants.
200 if process:
201 try:
202 process.terminate()
203 except OSError:
204 # We don't expect any error other than "No such
205 # process".
206 pass
207 logging.error('Offloading %s timed out after waiting %d seconds.',
208 dir_entry, OFFLOAD_TIMEOUT_SECS)
Simran Basi9523eaa2012-06-28 17:18:45 -0700209 error = True
Simran Basidd129972014-09-11 14:34:49 -0700210 finally:
211 signal.alarm(0)
212 if error:
213 # Rewind the log files for stdout and stderr and log their contents.
214 stdout_file.seek(0)
215 stderr_file.seek(0)
216 logging.error('Error occurred when offloading %s:', dir_entry)
217 logging.error('Stdout:\n%s \nStderr:\n%s',
218 stdout_file.read(), stderr_file.read())
219 stdout_file.close()
220 stderr_file.close()
221 return offload_dir
Simran Basi9523eaa2012-06-28 17:18:45 -0700222
Scott Zawalski20a9b582011-11-21 11:49:40 -0800223
J. Richard Barnetteea785362014-03-17 16:00:53 -0700224def delete_files(dir_entry, dest_path):
Simran Basibd9ded02013-11-04 15:49:11 -0800225 """Simply deletes the dir_entry from the filesystem.
226
227 Uses same arguments as offload_dir so that it can be used in replace of it on
228 systems that only want to delete files instead of offloading them.
229
230 @param dir_entry: Directory entry to offload.
231 @param dest_path: NOT USED.
232 """
233 shutil.rmtree(dir_entry)
234
235
J. Richard Barnetteea785362014-03-17 16:00:53 -0700236def report_offload_failures(joblist):
237 """Generate e-mail notification for failed offloads.
238
239 The e-mail report will include data from all jobs in `joblist`.
240
241 @param joblist List of jobs to be reported in the message.
242
Scott Zawalskicb2e2b72012-04-17 12:10:05 -0400243 """
J. Richard Barnetteea785362014-03-17 16:00:53 -0700244 def _format_job(job):
245 d = datetime.datetime.fromtimestamp(job.get_failure_time())
246 data = (d.strftime(ERROR_EMAIL_TIME_FORMAT),
247 job.get_failure_count(),
248 job.get_job_directory())
249 return ERROR_EMAIL_DIRECTORY_FORMAT % data
250 joblines = [_format_job(job) for job in joblist]
251 joblines.sort()
252 email_subject = ERROR_EMAIL_SUBJECT_FORMAT % socket.gethostname()
253 email_message = ERROR_EMAIL_REPORT_FORMAT + ''.join(joblines)
254 email_manager.manager.send_email(NOTIFY_ADDRESS, email_subject,
255 email_message)
Simran Basi9523eaa2012-06-28 17:18:45 -0700256
Scott Zawalski20a9b582011-11-21 11:49:40 -0800257
J. Richard Barnetteea785362014-03-17 16:00:53 -0700258class Offloader(object):
259 """State of the offload process.
260
261 Contains the following member fields:
262 * _offload_func: Function to call for each attempt to offload
263 a job directory.
264 * _jobdir_classes: List of classes of job directory to be
265 offloaded.
266 * _processes: Maximum number of outstanding offload processes
267 to allow during an offload cycle.
268 * _age_limit: Minimum age in days at which a job may be
269 offloaded.
270 * _open_jobs: a dictionary mapping directory paths to Job
271 objects.
272 * _next_report_time: Earliest time that we should send e-mail
273 if there are failures to be reported.
274
Scott Zawalskicb2e2b72012-04-17 12:10:05 -0400275 """
Alex Miller95c3a4e2012-11-30 19:14:39 -0800276
J. Richard Barnetteea785362014-03-17 16:00:53 -0700277 def __init__(self, options):
278 if options.delete_only:
279 self._offload_func = delete_files
280 else:
Simran Basidd129972014-09-11 14:34:49 -0700281 gs_uri = utils.get_offload_gsuri()
282 logging.debug('Offloading to: %s', gs_uri)
283 self._offload_func = get_offload_dir_func(gs_uri)
J. Richard Barnetteea785362014-03-17 16:00:53 -0700284 classlist = []
285 if options.process_hosts_only or options.process_all:
286 classlist.append(job_directories.SpecialJobDirectory)
287 if not options.process_hosts_only:
288 classlist.append(job_directories.RegularJobDirectory)
289 self._jobdir_classes = classlist
290 assert self._jobdir_classes
291 self._processes = options.parallelism
292 self._age_limit = options.days_old
293 self._open_jobs = {}
294 self._next_report_time = time.time()
295
296 def _add_new_jobs(self):
297 """Find new job directories that need offloading.
298
299 Go through the file system looking for valid job directories
300 that are currently not in `self._open_jobs`, and add them in.
301
302 """
J. Richard Barnette22dd7482014-06-23 12:25:02 -0700303 new_job_count = 0
J. Richard Barnetteea785362014-03-17 16:00:53 -0700304 for cls in self._jobdir_classes:
305 for resultsdir in cls.get_job_directories():
306 if resultsdir in self._open_jobs:
307 continue
308 self._open_jobs[resultsdir] = cls(resultsdir)
J. Richard Barnette22dd7482014-06-23 12:25:02 -0700309 new_job_count += 1
310 logging.debug("Start of offload cycle - found %d new jobs",
311 new_job_count)
J. Richard Barnetteea785362014-03-17 16:00:53 -0700312
313 def _remove_offloaded_jobs(self):
314 """Removed offloaded jobs from `self._open_jobs`."""
J. Richard Barnette22dd7482014-06-23 12:25:02 -0700315 removed_job_count = 0
J. Richard Barnetteea785362014-03-17 16:00:53 -0700316 for jobkey, job in self._open_jobs.items():
317 if job.is_offloaded():
318 del self._open_jobs[jobkey]
J. Richard Barnette22dd7482014-06-23 12:25:02 -0700319 removed_job_count += 1
320 logging.debug("End of offload cycle - cleared %d new jobs, "
321 "carrying %d open jobs",
322 removed_job_count, len(self._open_jobs))
J. Richard Barnetteea785362014-03-17 16:00:53 -0700323
324 def _have_reportable_errors(self):
325 """Return whether any jobs need reporting via e-mail.
326
327 @returns True if there are reportable jobs in `self._open_jobs`,
328 or False otherwise.
329 """
330 for job in self._open_jobs.values():
331 if job.is_reportable():
332 return True
333 return False
334
335 def _update_offload_results(self):
336 """Check and report status after attempting offload.
337
338 This function processes all jobs in `self._open_jobs`, assuming
339 an attempt has just been made to offload all of them.
340
341 Any jobs that have been successfully offloaded are removed.
342
343 If any jobs have reportable errors, and we haven't generated
344 an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
345 send new e-mail describing the failures.
346
347 """
348 self._remove_offloaded_jobs()
J. Richard Barnette22dd7482014-06-23 12:25:02 -0700349 if self._have_reportable_errors():
J. Richard Barnetteea785362014-03-17 16:00:53 -0700350 # N.B. We include all jobs that have failed at least once,
351 # which may include jobs that aren't otherwise reportable.
J. Richard Barnette22dd7482014-06-23 12:25:02 -0700352 failed_jobs = [j for j in self._open_jobs.values()
353 if j.get_failure_time()]
354 logging.debug("Currently there are %d jobs with offload failures",
355 len(failed_jobs))
356 if time.time() >= self._next_report_time:
357 logging.debug("Reporting failures by e-mail")
358 report_offload_failures(failed_jobs)
359 self._next_report_time = time.time() + REPORT_INTERVAL_SECS
J. Richard Barnetteea785362014-03-17 16:00:53 -0700360
361 def offload_once(self):
362 """Perform one offload cycle.
363
364 Find all job directories for new jobs that we haven't seen
365 before. Then, attempt to offload the directories for any
366 jobs that have finished running. Offload of multiple jobs
367 is done in parallel, up to `self._processes` at a time.
368
369 After we've tried uploading all directories, go through the list
370 checking the status of all uploaded directories. If necessary,
371 report failures via e-mail.
372
373 """
374 self._add_new_jobs()
Fang Deng970b6a72013-04-09 11:59:16 -0700375 with parallel.BackgroundTaskRunner(
J. Richard Barnetteea785362014-03-17 16:00:53 -0700376 self._offload_func, processes=self._processes) as queue:
377 for job in self._open_jobs.values():
378 job.enqueue_offload(queue, self._age_limit)
379 self._update_offload_results()
Scott Zawalski20a9b582011-11-21 11:49:40 -0800380
381
Simran Basi7d9a1492012-10-25 13:51:54 -0700382def parse_options():
J. Richard Barnetteea785362014-03-17 16:00:53 -0700383 """Parse the args passed into gs_offloader."""
Simran Basidd129972014-09-11 14:34:49 -0700384 defaults = 'Defaults:\n Destination: %s\n Results Path: %s' % (
385 utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR)
Simran Basi7d9a1492012-10-25 13:51:54 -0700386 usage = 'usage: %prog [options]\n' + defaults
387 parser = OptionParser(usage)
388 parser.add_option('-a', '--all', dest='process_all', action='store_true',
389 help='Offload all files in the results directory.')
390 parser.add_option('-s', '--hosts', dest='process_hosts_only',
391 action='store_true',
392 help='Offload only the special tasks result files located'
393 'in the results/hosts subdirectory')
Fang Deng970b6a72013-04-09 11:59:16 -0700394 parser.add_option('-p', '--parallelism', dest='parallelism', type='int',
395 default=1, help='Number of parallel workers to use.')
Simran Basi9244c332013-11-12 15:40:03 -0800396 parser.add_option('-o', '--delete_only', dest='delete_only',
Simran Basibd9ded02013-11-04 15:49:11 -0800397 action='store_true',
398 help='GS Offloader will only the delete the directories '
399 'and will not offload them to google storage.',
400 default=False)
Simran Basi9244c332013-11-12 15:40:03 -0800401 parser.add_option('-d', '--days_old', dest='days_old',
Simran Basibd9ded02013-11-04 15:49:11 -0800402 help='Minimum job age in days before a result can be '
403 'offloaded.', type='int', default=0)
Simran Basi7d9a1492012-10-25 13:51:54 -0700404 options = parser.parse_args()[0]
405 if options.process_all and options.process_hosts_only:
406 parser.print_help()
407 print ('Cannot process all files and only the hosts subdirectory. '
408 'Please remove an argument.')
409 sys.exit(1)
410 return options
Scott Zawalskicb2e2b72012-04-17 12:10:05 -0400411
Simran Basi9523eaa2012-06-28 17:18:45 -0700412
413def main():
beeps8c30db12013-03-30 18:20:27 -0700414 """Main method of gs_offloader."""
Simran Basi7d9a1492012-10-25 13:51:54 -0700415 options = parse_options()
Alex Miller0c8db6d2013-02-15 15:41:00 -0800416
417 if options.process_all:
418 offloader_type = 'all'
419 elif options.process_hosts_only:
420 offloader_type = 'hosts'
421 else:
422 offloader_type = 'jobs'
423
424 log_timestamp = time.strftime(LOG_TIMESTAMP_FORMAT)
425 log_filename = os.path.join(LOG_LOCATION,
426 LOG_FILENAME_FORMAT % (offloader_type, log_timestamp))
Simran Basi9523eaa2012-06-28 17:18:45 -0700427 logging.basicConfig(filename=log_filename, level=logging.DEBUG,
428 format=LOGGING_FORMAT)
J. Richard Barnetteea785362014-03-17 16:00:53 -0700429
430 # Nice our process (carried to subprocesses) so we don't overload
431 # the system.
432 logging.debug('Set process to nice value: %d', NICENESS)
433 os.nice(NICENESS)
Alex Millerc900b342014-06-09 16:52:07 -0700434 if psutil:
435 proc = psutil.Process()
436 logging.debug('Set process to ionice IDLE')
437 proc.ionice(psutil.IOPRIO_CLASS_IDLE)
J. Richard Barnetteea785362014-03-17 16:00:53 -0700438
439 # os.listdir returns relative paths, so change to where we need to be to avoid
440 # an os.path.join on each loop.
441 logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
442 os.chdir(RESULTS_DIR)
443
444 signal.signal(signal.SIGALRM, timeout_handler)
445
446 offloader = Offloader(options)
447 while True:
448 offloader.offload_once()
449 time.sleep(SLEEP_TIME_SECS)
Scott Zawalskicb2e2b72012-04-17 12:10:05 -0400450
451
Scott Zawalski20a9b582011-11-21 11:49:40 -0800452if __name__ == '__main__':
453 main()