blob: 066fcc323e4795745411db98dd5205a7ee59af6f [file] [log] [blame]
Scott Zawalski20a9b582011-11-21 11:49:40 -08001#!/usr/bin/python
2#
Scott Zawalskicb2e2b72012-04-17 12:10:05 -04003# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
Scott Zawalski20a9b582011-11-21 11:49:40 -08004# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
J. Richard Barnetteea785362014-03-17 16:00:53 -07007"""Script to archive old Autotest results to Google Storage.
Scott Zawalski20a9b582011-11-21 11:49:40 -08008
J. Richard Barnetteea785362014-03-17 16:00:53 -07009Uses gsutil to archive files to the configured Google Storage bucket.
10Upon successful copy, the local results directory is deleted.
Scott Zawalski20a9b582011-11-21 11:49:40 -080011"""
12
Michael Tang97d188c2016-06-25 11:18:42 -070013import base64
Simran Basibd9ded02013-11-04 15:49:11 -080014import datetime
Dan Shifaf50db2015-09-25 13:40:45 -070015import errno
Ningning Xia42111242016-06-15 14:35:58 -070016import glob
17import gzip
Simran Basi9523eaa2012-06-28 17:18:45 -070018import logging
Simran Basia2532282014-12-04 13:28:16 -080019import logging.handlers
Scott Zawalski20a9b582011-11-21 11:49:40 -080020import os
Dan Shiaffb9222015-04-15 17:05:47 -070021import re
Scott Zawalski20a9b582011-11-21 11:49:40 -080022import shutil
Simran Basi9523eaa2012-06-28 17:18:45 -070023import signal
Simran Basi981a9272012-11-14 10:46:03 -080024import socket
Scott Zawalski20a9b582011-11-21 11:49:40 -080025import subprocess
26import sys
Simran Basi9523eaa2012-06-28 17:18:45 -070027import tempfile
28import time
Scott Zawalskicb2e2b72012-04-17 12:10:05 -040029
Simran Basi7d9a1492012-10-25 13:51:54 -070030from optparse import OptionParser
31
Simran Basi981a9272012-11-14 10:46:03 -080032import common
Dan Shi1b4c7c32015-10-05 10:38:57 -070033from autotest_lib.client.common_lib import error
Simran Basidd129972014-09-11 14:34:49 -070034from autotest_lib.client.common_lib import utils
Dan Shi1b4c7c32015-10-05 10:38:57 -070035from autotest_lib.site_utils import job_directories
Ningning Xia2d981ee2016-07-06 17:59:54 -070036from autotest_lib.tko import models
Simran Basi981a9272012-11-14 10:46:03 -080037
Alex Millerc900b342014-06-09 16:52:07 -070038try:
39 # Does not exist, nor is needed, on moblab.
40 import psutil
41except ImportError:
42 psutil = None
43
J. Richard Barnetteea785362014-03-17 16:00:53 -070044import job_directories
Michael Tang97d188c2016-06-25 11:18:42 -070045import pubsub_utils
Simran Basi981a9272012-11-14 10:46:03 -080046from autotest_lib.client.common_lib import global_config
Gabe Black1e1c41b2015-02-04 23:55:15 -080047from autotest_lib.client.common_lib.cros.graphite import autotest_stats
Simran Basi981a9272012-11-14 10:46:03 -080048from autotest_lib.scheduler import email_manager
Fang Deng970b6a72013-04-09 11:59:16 -070049from chromite.lib import parallel
Scott Zawalski20a9b582011-11-21 11:49:40 -080050
Scott Zawalski20a9b582011-11-21 11:49:40 -080051
Simran Basif3e305f2014-10-03 14:43:53 -070052GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value(
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -080053 'CROS', 'gs_offloading_enabled', type=bool, default=True)
Simran Basif3e305f2014-10-03 14:43:53 -070054
Dan Shie27e50f2015-08-27 15:11:29 -070055STATS_KEY = 'gs_offloader.%s' % socket.gethostname().replace('.', '_')
56METADATA_TYPE = 'result_dir_size'
Jakob Juelichc17f3112014-09-11 14:32:30 -070057
Gabe Black1e1c41b2015-02-04 23:55:15 -080058timer = autotest_stats.Timer(STATS_KEY)
Jakob Juelichc17f3112014-09-11 14:32:30 -070059
Scott Zawalski20a9b582011-11-21 11:49:40 -080060# Nice setting for process, the higher the number the lower the priority.
61NICENESS = 10
62
J. Richard Barnetteea785362014-03-17 16:00:53 -070063# Maximum number of seconds to allow for offloading a single
64# directory.
J. Richard Barnette7e0f8592014-09-03 17:00:55 -070065OFFLOAD_TIMEOUT_SECS = 60 * 60
Simran Basi9523eaa2012-06-28 17:18:45 -070066
Simran Basi392d4a52012-12-14 10:29:44 -080067# Sleep time per loop.
68SLEEP_TIME_SECS = 5
69
J. Richard Barnetteea785362014-03-17 16:00:53 -070070# Minimum number of seconds between e-mail reports.
71REPORT_INTERVAL_SECS = 60 * 60
72
Scott Zawalski20a9b582011-11-21 11:49:40 -080073# Location of Autotest results on disk.
74RESULTS_DIR = '/usr/local/autotest/results'
75
Simran Basi31d561d2012-07-31 13:44:40 -070076# Hosts sub-directory that contains cleanup, verify and repair jobs.
77HOSTS_SUB_DIR = 'hosts'
78
Alex Miller0c8db6d2013-02-15 15:41:00 -080079LOG_LOCATION = '/usr/local/autotest/logs/'
80LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt'
81LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
Simran Basi9523eaa2012-06-28 17:18:45 -070082LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
83
Alex Miller0c8db6d2013-02-15 15:41:00 -080084# pylint: disable=E1120
Simran Basi981a9272012-11-14 10:46:03 -080085NOTIFY_ADDRESS = global_config.global_config.get_config_value(
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -080086 'SCHEDULER', 'notify_email', default='')
Simran Basi981a9272012-11-14 10:46:03 -080087
Kevin Cheng686ae8c2015-09-09 11:56:38 -070088ERROR_EMAIL_HELPER_URL = 'http://go/cros-triage-gsoffloader'
Simran Basi981a9272012-11-14 10:46:03 -080089ERROR_EMAIL_SUBJECT_FORMAT = 'GS Offloader notifications from %s'
J. Richard Barnetteea785362014-03-17 16:00:53 -070090ERROR_EMAIL_REPORT_FORMAT = '''\
91gs_offloader is failing to offload results directories.
Simran Basi981a9272012-11-14 10:46:03 -080092
Kevin Cheng686ae8c2015-09-09 11:56:38 -070093Check %s to triage the issue.
94
J. Richard Barnetteea785362014-03-17 16:00:53 -070095First failure Count Directory name
96=================== ====== ==============================
Kevin Cheng686ae8c2015-09-09 11:56:38 -070097''' % ERROR_EMAIL_HELPER_URL
J. Richard Barnetteea785362014-03-17 16:00:53 -070098# --+----1----+---- ----+ ----+----1----+----2----+----3
99
100ERROR_EMAIL_DIRECTORY_FORMAT = '%19s %5d %-1s\n'
101ERROR_EMAIL_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
Simran Basi9523eaa2012-06-28 17:18:45 -0700102
Jakob Juelich24f22c22014-09-26 11:46:11 -0700103USE_RSYNC_ENABLED = global_config.global_config.get_config_value(
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800104 'CROS', 'gs_offloader_use_rsync', type=bool, default=False)
Jakob Juelich24f22c22014-09-26 11:46:11 -0700105
Dan Shiaffb9222015-04-15 17:05:47 -0700106# According to https://cloud.google.com/storage/docs/bucket-naming#objectnames
107INVALID_GS_CHARS = ['[', ']', '*', '?', '#']
108INVALID_GS_CHAR_RANGE = [(0x00, 0x1F), (0x7F, 0x84), (0x86, 0xFF)]
Jakob Juelich24f22c22014-09-26 11:46:11 -0700109
Dan Shi1b4c7c32015-10-05 10:38:57 -0700110# Maximum number of files in the folder.
111MAX_FILE_COUNT = 500
112FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs']
113LIMIT_FILE_COUNT = global_config.global_config.get_config_value(
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800114 'CROS', 'gs_offloader_limit_file_count', type=bool, default=False)
115
Michael Tang0df2eb42016-05-13 19:06:54 -0700116# Use multiprocessing for gsutil uploading.
117GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value(
118 'CROS', 'gs_offloader_multiprocessing', type=bool, default=False)
119
Ningning Xia42111242016-06-15 14:35:58 -0700120D = '[0-9][0-9]'
Michael Tang97d188c2016-06-25 11:18:42 -0700121TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D)
Ningning Xia42111242016-06-15 14:35:58 -0700122CTS_RESULT_PATTERN = 'testResult.xml'
Ningning Xia2d88eec2016-07-25 23:18:46 -0700123GTS_RESULT_PATTERN = 'xtsTestResult.xml'
Ningning Xia42111242016-06-15 14:35:58 -0700124# Google Storage bucket URI to store results in.
125DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value(
126 'CROS', 'cts_results_server', default='')
Ningning Xia205a1d42016-06-21 16:46:28 -0700127DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value(
128 'CROS', 'cts_apfe_server', default='')
Dan Shi1b4c7c32015-10-05 10:38:57 -0700129
Michael Tang97d188c2016-06-25 11:18:42 -0700130_PUBSUB_ENABLED = global_config.global_config.get_config_value(
131 'CROS', 'cloud_notification_enabled:', type=bool, default=False)
132_PUBSUB_TOPIC = global_config.global_config.get_config_value(
133 'CROS', 'cloud_notification_topic::', type='string', default=None)
134
135# the message data for new test result notification.
136NEW_TEST_RESULT_MESSAGE = 'NEW_TEST_RESULT'
137
138
Simran Basi9523eaa2012-06-28 17:18:45 -0700139class TimeoutException(Exception):
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800140 """Exception raised by the timeout_handler."""
141 pass
Simran Basi9523eaa2012-06-28 17:18:45 -0700142
143
144def timeout_handler(_signum, _frame):
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800145 """Handler for SIGALRM when the offloading process times out.
Simran Basi9523eaa2012-06-28 17:18:45 -0700146
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800147 @param _signum: Signal number of the signal that was just caught.
148 14 for SIGALRM.
149 @param _frame: Current stack frame.
J. Richard Barnetteea785362014-03-17 16:00:53 -0700150
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800151 @raise TimeoutException: Automatically raises so that the time out
152 is caught by the try/except surrounding the
153 Popen call.
154 """
155 raise TimeoutException('Process Timed Out')
Simran Basi9523eaa2012-06-28 17:18:45 -0700156
157
MK Ryue93c8572015-08-11 11:53:00 -0700158def get_cmd_list(multiprocessing, dir_entry, gs_path):
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800159 """Return the command to offload a specified directory.
Simran Basi9523eaa2012-06-28 17:18:45 -0700160
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800161 @param multiprocessing: True to turn on -m option for gsutil.
162 @param dir_entry: Directory entry/path that which we need a cmd_list
163 to offload.
164 @param gs_path: Location in google storage where we will
165 offload the directory.
Simran Basi9523eaa2012-06-28 17:18:45 -0700166
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800167 @return A command list to be executed by Popen.
168 """
169 cmd = ['gsutil']
170 if multiprocessing:
171 cmd.append('-m')
172 if USE_RSYNC_ENABLED:
173 cmd.append('rsync')
174 target = os.path.join(gs_path, os.path.basename(dir_entry))
175 else:
176 cmd.append('cp')
177 target = gs_path
178 cmd += ['-eR', dir_entry, target]
179 return cmd
Simran Basi9523eaa2012-06-28 17:18:45 -0700180
Jakob Juelich24f22c22014-09-26 11:46:11 -0700181
Jakob Juelichc17f3112014-09-11 14:32:30 -0700182def get_directory_size_kibibytes_cmd_list(directory):
183 """Returns command to get a directory's total size."""
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800184 # Having this in its own method makes it easier to mock in
185 # unittests.
Jakob Juelichc17f3112014-09-11 14:32:30 -0700186 return ['du', '-sk', directory]
187
188
189def get_directory_size_kibibytes(directory):
190 """Calculate the total size of a directory with all its contents.
191
192 @param directory: Path to the directory
193
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800194 @return Size of the directory in kibibytes.
Jakob Juelichc17f3112014-09-11 14:32:30 -0700195 """
196 cmd = get_directory_size_kibibytes_cmd_list(directory)
197 process = subprocess.Popen(cmd,
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800198 stdout=subprocess.PIPE,
199 stderr=subprocess.PIPE)
Jakob Juelichc17f3112014-09-11 14:32:30 -0700200 stdout_data, stderr_data = process.communicate()
201
202 if process.returncode != 0:
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800203 # This function is used for statistics only, if it fails,
204 # nothing else should crash.
Jakob Juelichc17f3112014-09-11 14:32:30 -0700205 logging.warning('Getting size of %s failed. Stderr:', directory)
206 logging.warning(stderr_data)
207 return 0
208
209 return int(stdout_data.split('\t', 1)[0])
210
211
Dan Shiaffb9222015-04-15 17:05:47 -0700212def get_sanitized_name(name):
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800213 """Get a string with all invalid characters in the name being replaced.
Dan Shiaffb9222015-04-15 17:05:47 -0700214
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800215 @param name: Name to be processed.
Dan Shiaffb9222015-04-15 17:05:47 -0700216
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800217 @return A string with all invalid characters in the name being
218 replaced.
219 """
220 match_pattern = ''.join([re.escape(c) for c in INVALID_GS_CHARS])
221 match_pattern += ''.join([r'\x%02x-\x%02x' % (r[0], r[1])
222 for r in INVALID_GS_CHAR_RANGE])
223 invalid = re.compile('[%s]' % match_pattern)
224 return invalid.sub(lambda x: '%%%02x' % ord(x.group(0)), name)
Dan Shiaffb9222015-04-15 17:05:47 -0700225
226
227def sanitize_dir(dir_entry):
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800228 """Replace all invalid characters in folder and file names with valid ones.
Dan Shiaffb9222015-04-15 17:05:47 -0700229
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800230 @param dir_entry: Directory entry to be sanitized.
231 """
232 if not os.path.exists(dir_entry):
233 return
234 renames = []
235 for root, dirs, files in os.walk(dir_entry):
236 sanitized_root = get_sanitized_name(root)
237 for name in dirs + files:
238 sanitized_name = get_sanitized_name(name)
239 if name != sanitized_name:
240 orig_path = os.path.join(sanitized_root, name)
241 rename_path = os.path.join(sanitized_root,
242 sanitized_name)
243 renames.append((orig_path, rename_path))
244 for src, dest in renames:
245 logging.warn('Invalid character found. Renaming %s to %s.',
246 src, dest)
247 shutil.move(src, dest)
248
249
250def _get_zippable_folders(dir_entry):
251 folders_list = []
252 for folder in os.listdir(dir_entry):
253 folder_path = os.path.join(dir_entry, folder)
254 if (not os.path.isfile(folder_path) and
255 not folder in FOLDERS_NEVER_ZIP):
256 folders_list.append(folder_path)
257 return folders_list
Dan Shiaffb9222015-04-15 17:05:47 -0700258
259
Dan Shi1b4c7c32015-10-05 10:38:57 -0700260def limit_file_count(dir_entry):
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800261 """Limit the number of files in given directory.
Dan Shi1b4c7c32015-10-05 10:38:57 -0700262
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800263 The method checks the total number of files in the given directory.
264 If the number is greater than MAX_FILE_COUNT, the method will
265 compress each folder in the given directory, except folders in
266 FOLDERS_NEVER_ZIP.
Dan Shi1b4c7c32015-10-05 10:38:57 -0700267
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800268 @param dir_entry: Directory entry to be checked.
269 """
270 count = utils.run('find "%s" | wc -l' % dir_entry,
271 ignore_status=True).stdout.strip()
Dan Shi1b4c7c32015-10-05 10:38:57 -0700272 try:
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800273 count = int(count)
274 except ValueError, TypeError:
275 logging.warn('Fail to get the file count in folder %s.',
276 dir_entry)
277 return
278 if count < MAX_FILE_COUNT:
279 return
280
281 # For test job, zip folders in a second level, e.g. 123-debug/host1.
282 # This is to allow autoserv debug folder still be accessible.
283 # For special task, it does not need to dig one level deeper.
284 is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN,
285 dir_entry)
286
287 folders = _get_zippable_folders(dir_entry)
288 if not is_special_task:
289 subfolders = []
290 for folder in folders:
291 subfolders.extend(_get_zippable_folders(folder))
292 folders = subfolders
293
294 for folder in folders:
295 try:
296 zip_name = '%s.tgz' % folder
297 utils.run('tar -cz -C "%s" -f "%s" "%s"' %
298 (os.path.dirname(folder), zip_name,
299 os.path.basename(folder)))
300 except error.CmdError as e:
301 logging.error('Fail to compress folder %s. Error: %s',
302 folder, e)
303 continue
304 shutil.rmtree(folder)
Dan Shi1b4c7c32015-10-05 10:38:57 -0700305
306
Dan Shie4a4f9f2015-07-20 09:00:25 -0700307def correct_results_folder_permission(dir_entry):
308 """Make sure the results folder has the right permission settings.
309
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800310 For tests running with server-side packaging, the results folder has
311 the owner of root. This must be changed to the user running the
312 autoserv process, so parsing job can access the results folder.
Dan Shie4a4f9f2015-07-20 09:00:25 -0700313
314 @param dir_entry: Path to the results folder.
Dan Shie4a4f9f2015-07-20 09:00:25 -0700315 """
316 if not dir_entry:
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800317 return
Dan Shie4a4f9f2015-07-20 09:00:25 -0700318 try:
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800319 subprocess.check_call(
320 ['sudo', '-n', 'chown', '-R', str(os.getuid()), dir_entry])
321 subprocess.check_call(
322 ['sudo', '-n', 'chgrp', '-R', str(os.getgid()), dir_entry])
Dan Shie4a4f9f2015-07-20 09:00:25 -0700323 except subprocess.CalledProcessError as e:
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800324 logging.error('Failed to modify permission for %s: %s',
325 dir_entry, e)
Dan Shie4a4f9f2015-07-20 09:00:25 -0700326
327
Ningning Xia42111242016-06-15 14:35:58 -0700328def upload_testresult_files(dir_entry, multiprocessing):
Ningning Xia2d88eec2016-07-25 23:18:46 -0700329 """Upload test results to separate gs buckets.
Ningning Xia42111242016-06-15 14:35:58 -0700330
Ningning Xia2d88eec2016-07-25 23:18:46 -0700331 Upload testResult.xml.gz/xtsTestResult.xml.gz file to cts_results_bucket.
Ningning Xia205a1d42016-06-21 16:46:28 -0700332 Upload timestamp.zip to cts_apfe_bucket.
Ningning Xia42111242016-06-15 14:35:58 -0700333 @param dir_entry: Path to the results folder.
334 @param multiprocessing: True to turn on -m option for gsutil.
335 """
Ningning Xia2d981ee2016-07-06 17:59:54 -0700336 for host in glob.glob(os.path.join(dir_entry, '*')):
Ningning Xia2d88eec2016-07-25 23:18:46 -0700337 cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*',
338 TIMESTAMP_PATTERN)
339 gts_path = os.path.join(host, 'cheets_GTS.*', 'results', '*',
340 TIMESTAMP_PATTERN)
341 for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN),
342 (gts_path, GTS_RESULT_PATTERN)]:
343 for path in glob.glob(result_path):
344 _upload_files(host, path, result_pattern, multiprocessing)
Ningning Xia205a1d42016-06-21 16:46:28 -0700345
Ningning Xia205a1d42016-06-21 16:46:28 -0700346
Ningning Xia21922c82016-07-29 11:03:15 -0700347def _is_release_build(build):
348 """Check if it's a release build."""
349 return re.match(r'(?!trybot-).*-release/.*', build)
350
351
Ningning Xia2d88eec2016-07-25 23:18:46 -0700352def _upload_files(host, path, result_pattern, multiprocessing):
353 try:
354 keyval = models.test.parse_job_keyval(host)
Ningning Xia2d88eec2016-07-25 23:18:46 -0700355 build = keyval['build']
Ningning Xia42111242016-06-15 14:35:58 -0700356
Ningning Xia21922c82016-07-29 11:03:15 -0700357 if not _is_release_build(build):
358 # Only upload results for release builds.
359 return
360
361 parent_job_id = str(keyval['parent_job_id'])
362
Ningning Xia2d88eec2016-07-25 23:18:46 -0700363 folders = path.split(os.sep)
364 job_id = folders[-6]
365 package = folders[-4]
366 timestamp = folders[-1]
Ningning Xia2d981ee2016-07-06 17:59:54 -0700367
Ningning Xia2d88eec2016-07-25 23:18:46 -0700368 # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/
369 # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/
370 cts_apfe_gs_path = os.path.join(
371 DEFAULT_CTS_APFE_GSURI, build, parent_job_id,
372 package, job_id + '_' + timestamp) + '/'
Ningning Xia2d981ee2016-07-06 17:59:54 -0700373
Ningning Xia2d88eec2016-07-25 23:18:46 -0700374 # Path: bucket/cheets_CTS.*/job_id_timestamp/
375 # or bucket/cheets_GTS.*/job_id_timestamp/
376 test_result_gs_path = os.path.join(
377 DEFAULT_CTS_RESULTS_GSURI, package,
378 job_id + '_' + timestamp) + '/'
Ningning Xia42111242016-06-15 14:35:58 -0700379
Ningning Xia2d88eec2016-07-25 23:18:46 -0700380 for zip_file in glob.glob(os.path.join('%s.zip' % path)):
381 utils.run(' '.join(get_cmd_list(
382 multiprocessing, zip_file, cts_apfe_gs_path)))
383 logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path)
384
385 for test_result_file in glob.glob(os.path.join(path, result_pattern)):
386 # gzip test_result_file(testResult.xml/xtsTestResult.xml)
387 test_result_file_gz = '%s.gz' % test_result_file
388 with open(test_result_file, 'r') as f_in, (
389 gzip.open(test_result_file_gz, 'w')) as f_out:
390 shutil.copyfileobj(f_in, f_out)
391 utils.run(' '.join(get_cmd_list(
392 multiprocessing, test_result_file_gz, test_result_gs_path)))
393 logging.debug('Zip and upload %s to %s',
394 test_result_file_gz, test_result_gs_path)
395 # Remove test_result_file_gz(estResult.xml.gz/xtsTestResult.xml.gz)
396 os.remove(test_result_file_gz)
397 except Exception as e:
398 logging.error('ERROR uploading test results %s to GS: %s',
399 path, e)
Ningning Xia42111242016-06-15 14:35:58 -0700400
Michael Tang97d188c2016-06-25 11:18:42 -0700401def _create_test_result_notification(gs_path):
402 """Construct a test result notification.
403
404 @param gs_path: The test result Google Cloud Storage URI.
405
406 @returns The notification message.
407 """
408 data = base64.b64encode(NEW_TEST_RESULT_MESSAGE)
409 msg_payload = {'data': data}
410 msg_attributes = {}
411 msg_attributes['gcs_uri'] = gs_path
412 msg_payload['attributes'] = msg_attributes
413
414 return msg_payload
415
416
417def get_offload_dir_func(gs_uri, multiprocessing, pubsub_topic=None):
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800418 """Returns the offload directory function for the given gs_uri
Simran Basi9523eaa2012-06-28 17:18:45 -0700419
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800420 @param gs_uri: Google storage bucket uri to offload to.
421 @param multiprocessing: True to turn on -m option for gsutil.
Michael Tang97d188c2016-06-25 11:18:42 -0700422 @param pubsub_topic: The pubsub topic to publish notificaitons. If None,
423 pubsub is not enabled.
J. Richard Barnetteea785362014-03-17 16:00:53 -0700424
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800425 @return offload_dir function to perform the offload.
Simran Basidd129972014-09-11 14:34:49 -0700426 """
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800427 @timer.decorate
428 def offload_dir(dir_entry, dest_path):
429 """Offload the specified directory entry to Google storage.
Jakob Juelichc17f3112014-09-11 14:32:30 -0700430
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800431 @param dir_entry: Directory entry to offload.
432 @param dest_path: Location in google storage where we will
433 offload the directory.
Dan Shiaffb9222015-04-15 17:05:47 -0700434
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800435 """
436 try:
437 counter = autotest_stats.Counter(STATS_KEY)
438 counter.increment('jobs_offload_started')
Dan Shi1b4c7c32015-10-05 10:38:57 -0700439
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800440 sanitize_dir(dir_entry)
Ningning Xia42111242016-06-15 14:35:58 -0700441 if DEFAULT_CTS_RESULTS_GSURI:
Dan Shidb9d0532016-06-22 14:35:41 -0700442 upload_testresult_files(dir_entry, multiprocessing)
Simran Basidd129972014-09-11 14:34:49 -0700443
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800444 if LIMIT_FILE_COUNT:
445 limit_file_count(dir_entry)
Simran Basidd129972014-09-11 14:34:49 -0700446
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800447 error = False
448 stdout_file = tempfile.TemporaryFile('w+')
449 stderr_file = tempfile.TemporaryFile('w+')
450 process = None
451 signal.alarm(OFFLOAD_TIMEOUT_SECS)
452 gs_path = '%s%s' % (gs_uri, dest_path)
453 process = subprocess.Popen(
454 get_cmd_list(multiprocessing, dir_entry, gs_path),
455 stdout=stdout_file, stderr=stderr_file)
456 process.wait()
457 signal.alarm(0)
458
459 if process.returncode == 0:
460 dir_size = get_directory_size_kibibytes(dir_entry)
461
462 counter.increment('kibibytes_transferred_total',
463 dir_size)
464 metadata = {
465 '_type': METADATA_TYPE,
466 'size_KB': dir_size,
Dan Shie27e50f2015-08-27 15:11:29 -0700467 'result_dir': dir_entry,
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800468 'drone': socket.gethostname().replace('.', '_')
469 }
470 autotest_stats.Gauge(STATS_KEY, metadata=metadata).send(
471 'kibibytes_transferred', dir_size)
472 counter.increment('jobs_offloaded')
Michael Tang97d188c2016-06-25 11:18:42 -0700473
474 if pubsub_topic:
475 message = _create_test_result_notification(gs_path)
476 msg_ids = pubsub_utils.publish_notifications(
477 pubsub_topic, [message])
478 if not msg_ids:
479 error = True
480
481 if not error:
482 shutil.rmtree(dir_entry)
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800483 else:
484 error = True
485 except TimeoutException:
486 # If we finished the call to Popen(), we may need to
487 # terminate the child process. We don't bother calling
488 # process.poll(); that inherently races because the child
489 # can die any time it wants.
490 if process:
491 try:
492 process.terminate()
493 except OSError:
494 # We don't expect any error other than "No such
495 # process".
496 pass
497 logging.error('Offloading %s timed out after waiting %d '
498 'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS)
499 error = True
500 except OSError as e:
501 # The wrong file permission can lead call
502 # `shutil.rmtree(dir_entry)` to raise OSError with message
503 # 'Permission denied'. Details can be found in
504 # crbug.com/536151
505 if e.errno == errno.EACCES:
506 logging.warn('Try to correct file permission of %s.', dir_entry)
507 correct_results_folder_permission(dir_entry)
508 finally:
509 signal.alarm(0)
510 if error:
511 # Rewind the log files for stdout and stderr and log
512 # their contents.
513 stdout_file.seek(0)
514 stderr_file.seek(0)
515 stderr_content = stderr_file.read()
516 logging.error('Error occurred when offloading %s:',
517 dir_entry)
518 logging.error('Stdout:\n%s \nStderr:\n%s',
519 stdout_file.read(), stderr_content)
520 # Some result files may have wrong file permission. Try
521 # to correct such error so later try can success.
522 # TODO(dshi): The code is added to correct result files
523 # with wrong file permission caused by bug 511778. After
524 # this code is pushed to lab and run for a while to
525 # clean up these files, following code and function
526 # correct_results_folder_permission can be deleted.
527 if 'CommandException: Error opening file' in stderr_content:
528 logging.warn('Try to correct file permission of %s.',
529 dir_entry)
530 correct_results_folder_permission(dir_entry)
531 stdout_file.close()
532 stderr_file.close()
533 return offload_dir
Simran Basi9523eaa2012-06-28 17:18:45 -0700534
Scott Zawalski20a9b582011-11-21 11:49:40 -0800535
J. Richard Barnetteea785362014-03-17 16:00:53 -0700536def delete_files(dir_entry, dest_path):
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800537 """Simply deletes the dir_entry from the filesystem.
Simran Basibd9ded02013-11-04 15:49:11 -0800538
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800539 Uses same arguments as offload_dir so that it can be used in replace
540 of it on systems that only want to delete files instead of
541 offloading them.
Simran Basibd9ded02013-11-04 15:49:11 -0800542
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800543 @param dir_entry: Directory entry to offload.
544 @param dest_path: NOT USED.
545 """
546 shutil.rmtree(dir_entry)
Simran Basibd9ded02013-11-04 15:49:11 -0800547
548
J. Richard Barnetteea785362014-03-17 16:00:53 -0700549def report_offload_failures(joblist):
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800550 """Generate e-mail notification for failed offloads.
J. Richard Barnetteea785362014-03-17 16:00:53 -0700551
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800552 The e-mail report will include data from all jobs in `joblist`.
J. Richard Barnetteea785362014-03-17 16:00:53 -0700553
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800554 @param joblist List of jobs to be reported in the message.
555 """
556 def _format_job(job):
557 d = datetime.datetime.fromtimestamp(job.get_failure_time())
558 data = (d.strftime(ERROR_EMAIL_TIME_FORMAT),
559 job.get_failure_count(),
560 job.get_job_directory())
561 return ERROR_EMAIL_DIRECTORY_FORMAT % data
562 joblines = [_format_job(job) for job in joblist]
563 joblines.sort()
564 email_subject = ERROR_EMAIL_SUBJECT_FORMAT % socket.gethostname()
565 email_message = ERROR_EMAIL_REPORT_FORMAT + ''.join(joblines)
566 email_manager.manager.send_email(NOTIFY_ADDRESS, email_subject,
567 email_message)
Simran Basi9523eaa2012-06-28 17:18:45 -0700568
Scott Zawalski20a9b582011-11-21 11:49:40 -0800569
Simran Basiac0edb22015-04-23 16:15:51 -0700570def wait_for_gs_write_access(gs_uri):
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800571 """Verify and wait until we have write access to Google Storage.
Simran Basiac0edb22015-04-23 16:15:51 -0700572
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800573 @param gs_uri: The Google Storage URI we are trying to offload to.
574 """
575 # TODO (sbasi) Try to use the gsutil command to check write access.
576 # Ensure we have write access to gs_uri.
577 dummy_file = tempfile.NamedTemporaryFile()
578 test_cmd = get_cmd_list(False, dummy_file.name, gs_uri)
579 while True:
580 try:
581 subprocess.check_call(test_cmd)
582 subprocess.check_call(
583 ['gsutil', 'rm',
584 os.path.join(gs_uri,
585 os.path.basename(dummy_file.name))])
586 break
587 except subprocess.CalledProcessError:
588 logging.debug('Unable to offload to %s, sleeping.', gs_uri)
589 time.sleep(120)
Simran Basiac0edb22015-04-23 16:15:51 -0700590
591
J. Richard Barnetteea785362014-03-17 16:00:53 -0700592class Offloader(object):
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800593 """State of the offload process.
J. Richard Barnetteea785362014-03-17 16:00:53 -0700594
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800595 Contains the following member fields:
596 * _offload_func: Function to call for each attempt to offload
597 a job directory.
598 * _jobdir_classes: List of classes of job directory to be
599 offloaded.
600 * _processes: Maximum number of outstanding offload processes
601 to allow during an offload cycle.
602 * _age_limit: Minimum age in days at which a job may be
603 offloaded.
604 * _open_jobs: a dictionary mapping directory paths to Job
605 objects.
606 * _next_report_time: Earliest time that we should send e-mail
607 if there are failures to be reported.
J. Richard Barnetteea785362014-03-17 16:00:53 -0700608 """
J. Richard Barnetteea785362014-03-17 16:00:53 -0700609
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800610 def __init__(self, options):
Michael Tang97d188c2016-06-25 11:18:42 -0700611 self._pubsub_topic = None
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800612 if options.delete_only:
613 self._offload_func = delete_files
614 else:
615 self.gs_uri = utils.get_offload_gsuri()
616 logging.debug('Offloading to: %s', self.gs_uri)
Michael Tang0df2eb42016-05-13 19:06:54 -0700617 multiprocessing = False
618 if options.multiprocessing:
619 multiprocessing = True
620 elif options.multiprocessing is None:
621 multiprocessing = GS_OFFLOADER_MULTIPROCESSING
622 logging.info(
623 'Offloader multiprocessing is set to:%r', multiprocessing)
Michael Tang97d188c2016-06-25 11:18:42 -0700624 if options.pubsub_topic_for_job_upload:
625 self._pubsub_topic = options.pubsub_topic_for_job_upload
626 elif _PUBSUB_ENABLED:
627 self._pubsub_topic = _PUBSUB_TOPIC
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800628 self._offload_func = get_offload_dir_func(
Michael Tang97d188c2016-06-25 11:18:42 -0700629 self.gs_uri, multiprocessing, self._pubsub_topic)
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800630 classlist = []
631 if options.process_hosts_only or options.process_all:
632 classlist.append(job_directories.SpecialJobDirectory)
633 if not options.process_hosts_only:
634 classlist.append(job_directories.RegularJobDirectory)
635 self._jobdir_classes = classlist
636 assert self._jobdir_classes
637 self._processes = options.parallelism
638 self._age_limit = options.days_old
639 self._open_jobs = {}
640 self._next_report_time = time.time()
Michael Tang97d188c2016-06-25 11:18:42 -0700641 self._pusub_topic = None
J. Richard Barnetteea785362014-03-17 16:00:53 -0700642
J. Richard Barnetteea785362014-03-17 16:00:53 -0700643
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800644 def _add_new_jobs(self):
645 """Find new job directories that need offloading.
J. Richard Barnetteea785362014-03-17 16:00:53 -0700646
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800647 Go through the file system looking for valid job directories
648 that are currently not in `self._open_jobs`, and add them in.
J. Richard Barnetteea785362014-03-17 16:00:53 -0700649
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800650 """
651 new_job_count = 0
652 for cls in self._jobdir_classes:
653 for resultsdir in cls.get_job_directories():
654 if resultsdir in self._open_jobs:
655 continue
656 self._open_jobs[resultsdir] = cls(resultsdir)
657 new_job_count += 1
658 logging.debug('Start of offload cycle - found %d new jobs',
659 new_job_count)
J. Richard Barnetteea785362014-03-17 16:00:53 -0700660
J. Richard Barnetteea785362014-03-17 16:00:53 -0700661
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800662 def _remove_offloaded_jobs(self):
663 """Removed offloaded jobs from `self._open_jobs`."""
664 removed_job_count = 0
665 for jobkey, job in self._open_jobs.items():
666 if job.is_offloaded():
667 del self._open_jobs[jobkey]
668 removed_job_count += 1
669 logging.debug('End of offload cycle - cleared %d new jobs, '
670 'carrying %d open jobs',
671 removed_job_count, len(self._open_jobs))
J. Richard Barnetteea785362014-03-17 16:00:53 -0700672
J. Richard Barnetteea785362014-03-17 16:00:53 -0700673
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800674 def _have_reportable_errors(self):
675 """Return whether any jobs need reporting via e-mail.
J. Richard Barnetteea785362014-03-17 16:00:53 -0700676
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800677 @return True if there are reportable jobs in `self._open_jobs`,
678 or False otherwise.
679 """
680 for job in self._open_jobs.values():
681 if job.is_reportable():
682 return True
683 return False
J. Richard Barnetteea785362014-03-17 16:00:53 -0700684
J. Richard Barnetteea785362014-03-17 16:00:53 -0700685
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800686 def _update_offload_results(self):
687 """Check and report status after attempting offload.
688
689 This function processes all jobs in `self._open_jobs`, assuming
690 an attempt has just been made to offload all of them.
691
692 Any jobs that have been successfully offloaded are removed.
693
694 If any jobs have reportable errors, and we haven't generated
695 an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
696 send new e-mail describing the failures.
697
698 """
699 self._remove_offloaded_jobs()
700 if self._have_reportable_errors():
701 # N.B. We include all jobs that have failed at least once,
702 # which may include jobs that aren't otherwise reportable.
703 failed_jobs = [j for j in self._open_jobs.values()
704 if j.get_failure_time()]
705 logging.debug('Currently there are %d jobs with offload '
706 'failures', len(failed_jobs))
707 if time.time() >= self._next_report_time:
708 logging.debug('Reporting failures by e-mail')
709 report_offload_failures(failed_jobs)
710 self._next_report_time = (
711 time.time() + REPORT_INTERVAL_SECS)
712
713
714 def offload_once(self):
715 """Perform one offload cycle.
716
717 Find all job directories for new jobs that we haven't seen
718 before. Then, attempt to offload the directories for any
719 jobs that have finished running. Offload of multiple jobs
720 is done in parallel, up to `self._processes` at a time.
721
722 After we've tried uploading all directories, go through the list
723 checking the status of all uploaded directories. If necessary,
724 report failures via e-mail.
725
726 """
727 self._add_new_jobs()
728 with parallel.BackgroundTaskRunner(
729 self._offload_func, processes=self._processes) as queue:
730 for job in self._open_jobs.values():
731 job.enqueue_offload(queue, self._age_limit)
732 self._update_offload_results()
Scott Zawalski20a9b582011-11-21 11:49:40 -0800733
734
Simran Basi7d9a1492012-10-25 13:51:54 -0700735def parse_options():
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800736 """Parse the args passed into gs_offloader."""
737 defaults = 'Defaults:\n Destination: %s\n Results Path: %s' % (
738 utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR)
739 usage = 'usage: %prog [options]\n' + defaults
740 parser = OptionParser(usage)
741 parser.add_option('-a', '--all', dest='process_all',
742 action='store_true',
743 help='Offload all files in the results directory.')
744 parser.add_option('-s', '--hosts', dest='process_hosts_only',
745 action='store_true',
746 help='Offload only the special tasks result files '
747 'located in the results/hosts subdirectory')
748 parser.add_option('-p', '--parallelism', dest='parallelism',
749 type='int', default=1,
750 help='Number of parallel workers to use.')
751 parser.add_option('-o', '--delete_only', dest='delete_only',
752 action='store_true',
753 help='GS Offloader will only the delete the '
754 'directories and will not offload them to google '
755 'storage. NOTE: If global_config variable '
756 'CROS.gs_offloading_enabled is False, --delete_only '
757 'is automatically True.',
758 default=not GS_OFFLOADING_ENABLED)
759 parser.add_option('-d', '--days_old', dest='days_old',
760 help='Minimum job age in days before a result can be '
761 'offloaded.', type='int', default=0)
Michael Tang97d188c2016-06-25 11:18:42 -0700762 parser.add_option('-t', '--pubsub_topic_for_job_upload',
763 dest='pubsub_topic_for_job_upload',
764 help='The pubsub topic to send notifciations for '
765 'new job upload',
766 action='store', type='string', default=None)
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800767 parser.add_option('-l', '--log_size', dest='log_size',
768 help='Limit the offloader logs to a specified '
769 'number of Mega Bytes.', type='int', default=0)
770 parser.add_option('-m', dest='multiprocessing', action='store_true',
Michael Tang0df2eb42016-05-13 19:06:54 -0700771 help='Turn on -m option for gsutil. If not set, the '
772 'global config setting gs_offloader_multiprocessing '
773 'under CROS section is applied.')
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800774 options = parser.parse_args()[0]
775 if options.process_all and options.process_hosts_only:
776 parser.print_help()
777 print ('Cannot process all files and only the hosts '
778 'subdirectory. Please remove an argument.')
779 sys.exit(1)
780 return options
Scott Zawalskicb2e2b72012-04-17 12:10:05 -0400781
Simran Basi9523eaa2012-06-28 17:18:45 -0700782
783def main():
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800784 """Main method of gs_offloader."""
785 options = parse_options()
Alex Miller0c8db6d2013-02-15 15:41:00 -0800786
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800787 if options.process_all:
788 offloader_type = 'all'
789 elif options.process_hosts_only:
790 offloader_type = 'hosts'
791 else:
792 offloader_type = 'jobs'
Alex Miller0c8db6d2013-02-15 15:41:00 -0800793
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800794 log_timestamp = time.strftime(LOG_TIMESTAMP_FORMAT)
795 if options.log_size > 0:
796 log_timestamp = ''
797 log_basename = LOG_FILENAME_FORMAT % (offloader_type, log_timestamp)
798 log_filename = os.path.join(LOG_LOCATION, log_basename)
799 log_formatter = logging.Formatter(LOGGING_FORMAT)
800 # Replace the default logging handler with a RotatingFileHandler. If
801 # options.log_size is 0, the file size will not be limited. Keeps
802 # one backup just in case.
803 handler = logging.handlers.RotatingFileHandler(
804 log_filename, maxBytes=1024 * options.log_size, backupCount=1)
805 handler.setFormatter(log_formatter)
806 logger = logging.getLogger()
807 logger.setLevel(logging.DEBUG)
808 logger.addHandler(handler)
J. Richard Barnetteea785362014-03-17 16:00:53 -0700809
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800810 # Nice our process (carried to subprocesses) so we don't overload
811 # the system.
812 logging.debug('Set process to nice value: %d', NICENESS)
813 os.nice(NICENESS)
814 if psutil:
815 proc = psutil.Process()
816 logging.debug('Set process to ionice IDLE')
817 proc.ionice(psutil.IOPRIO_CLASS_IDLE)
J. Richard Barnetteea785362014-03-17 16:00:53 -0700818
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800819 # os.listdir returns relative paths, so change to where we need to
820 # be to avoid an os.path.join on each loop.
821 logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
822 os.chdir(RESULTS_DIR)
J. Richard Barnetteea785362014-03-17 16:00:53 -0700823
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800824 signal.signal(signal.SIGALRM, timeout_handler)
J. Richard Barnetteea785362014-03-17 16:00:53 -0700825
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800826 offloader = Offloader(options)
827 if not options.delete_only:
828 wait_for_gs_write_access(offloader.gs_uri)
829 while True:
830 offloader.offload_once()
831 time.sleep(SLEEP_TIME_SECS)
Scott Zawalskicb2e2b72012-04-17 12:10:05 -0400832
833
Scott Zawalski20a9b582011-11-21 11:49:40 -0800834if __name__ == '__main__':
J. Richard Barnette2c41e1e2015-12-08 16:21:10 -0800835 main()