[autotest] Limit gs_offloader notifications to once per hour.

This change reworks gs_offloader e-mail notifications so that if
multiple directories fail in a single loop through current results,
all the failures will be summarized in a single e-mail message.
Moreover, once notification of a problem has been sent out, no new
notifications will be sent out for at least one hour.

Additionally, this restricts the definition of "offload failure" so
that a directory that fails to offload only 1 time will never
trigger notification at all.

Finally, this change includes new unit tests covering the new
functionality, as well as some pre-existing functionality.

BUG=chromium:232058
TEST=run the shiny, new unit tests
DEPLOY=gs_offloader

Change-Id: I27454de08ab75d822db10874a659627adc395aa8
Reviewed-on: https://chromium-review.googlesource.com/190440
Reviewed-by: Richard Barnette <jrbarnette@chromium.org>
Commit-Queue: Richard Barnette <jrbarnette@chromium.org>
Tested-by: Richard Barnette <jrbarnette@chromium.org>
diff --git a/global_config.ini b/global_config.ini
index 8f0fe66..7416700 100644
--- a/global_config.ini
+++ b/global_config.ini
@@ -156,6 +156,7 @@
 dns_zone: cros.corp.google.com
 source_tree: /usr/local/google/chromeos
 image_storage_server: gs://chromeos-image-archive/
+results_storage_server: gs://chromeos-autotest-results/
 # dev_server_hosts is the list of all servers running a devserver instance
 # (regardless of CrashServer/ImageServer/etc.) that should be considered for
 # monitoring/deploy actions.  You should very likely keep this list in sync with
diff --git a/site_utils/gs_offloader.py b/site_utils/gs_offloader.py
index 83a41d8..438239b 100755
--- a/site_utils/gs_offloader.py
+++ b/site_utils/gs_offloader.py
@@ -4,18 +4,15 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
-__help__ = """Script to archive old Autotest results to Google Storage.
+"""Script to archive old Autotest results to Google Storage.
 
-Uses gsutil to archive files to the configured Google Storage bucket. Upon
-successful copy, the local results directory is deleted.
+Uses gsutil to archive files to the configured Google Storage bucket.
+Upon successful copy, the local results directory is deleted.
 """
 
-__author__ = 'dalecurtis@google.com (Dale Curtis)'
-
 import datetime
 import logging
 import os
-import re
 import shutil
 import signal
 import socket
@@ -28,27 +25,29 @@
 
 import common
 
-import is_job_complete
+import job_directories
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.scheduler import email_manager
 from chromite.lib import parallel
 
 # Google Storage bucket URI to store results in.
-GS_URI = 'gs://chromeos-autotest-results/'
-
-# Set this to True to enable rsync otherwise results are offloaded to GS.
-USE_RSYNC = False
-RSYNC_HOST_PATH = 'chromeos-sam1:/usr/local/autotest/results/'
+GS_URI = global_config.global_config.get_config_value(
+        'CROS', 'results_storage_server')
+GS_URI_PATTERN = GS_URI + '%s'
 
 # Nice setting for process, the higher the number the lower the priority.
 NICENESS = 10
 
-# Setting timeout to 3 hours.
-TIMEOUT = 3 * 60 * 60
+# Maximum number of seconds to allow for offloading a single
+# directory.
+OFFLOAD_TIMEOUT_SECS = 3 * 60 * 60
 
 # Sleep time per loop.
 SLEEP_TIME_SECS = 5
 
+# Minimum number of seconds between e-mail reports.
+REPORT_INTERVAL_SECS = 60 * 60
+
 # Location of Autotest results on disk.
 RESULTS_DIR = '/usr/local/autotest/results'
 
@@ -60,15 +59,21 @@
 LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
 LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
 
-CLEAN_CMD = 'find %s -iname chrome_20[0-9][0-9]\* -exec rm {} \;'
-
 # pylint: disable=E1120
 NOTIFY_ADDRESS = global_config.global_config.get_config_value(
     'SCHEDULER', 'notify_email', default='')
 
 ERROR_EMAIL_SUBJECT_FORMAT = 'GS Offloader notifications from %s'
-ERROR_EMAIL_MSG_FORMAT = 'Error occured when offloading %s:\n%s'
+ERROR_EMAIL_REPORT_FORMAT = '''\
+gs_offloader is failing to offload results directories.
 
+First failure       Count   Directory name
+=================== ======  ==============================
+'''
+# --+----1----+----  ----+  ----+----1----+----2----+----3
+
+ERROR_EMAIL_DIRECTORY_FORMAT = '%19s  %5d  %-1s\n'
+ERROR_EMAIL_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
 
 class TimeoutException(Exception):
   """Exception raised by the timeout_handler."""
@@ -76,160 +81,46 @@
 
 
 def timeout_handler(_signum, _frame):
-  """
-  Called by the SIGALRM if the offloading process has timed out.
+  """Handler for SIGALRM when the offloading process times out.
 
   @param _signum: Signal number of the signal that was just caught.
                   14 for SIGALRM.
   @param _frame: Current stack frame.
   @raise TimeoutException: Automatically raises so that the time out is caught
                            by the try/except surrounding the Popen call.
+
   """
   raise TimeoutException('Process Timed Out')
 
 
 def get_cmd_list(dir_entry, relative_path):
-  """
-  Generate the cmd_list for the specified directory entry.
+  """Return the command to offload a specified directory.
 
   @param dir_entry: Directory entry/path that which we need a cmd_list to
                     offload.
-  @param relative_path: Location in google storage or rsync that we want to
-                        store this directory.
+  @param relative_path: Location in google storage where we will
+                        offload the directory.
 
   @return: A command list to be executed by Popen.
+
   """
-  if USE_RSYNC:
-    dest_path = os.path.join(RSYNC_HOST_PATH, relative_path)
-    logging.debug('Using rsync for offloading %s to %s.', dir_entry,
-                  dest_path)
-    return ['rsync', '-a', dir_entry, dest_path]
-  else:
-    dest_path = os.path.join(GS_URI, relative_path)
-    logging.debug('Using google storage for offloading %s to %s.',
-                  dir_entry, dest_path)
-    return ['gsutil', '-m', 'cp', '-eR', '-a', 'project-private', dir_entry,
-            dest_path]
+  logging.debug('Using google storage for offloading %s to %s.',
+                dir_entry, relative_path)
+  return ['gsutil', '-m', 'cp', '-eR', '-a', 'project-private',
+          dir_entry, GS_URI_PATTERN % relative_path]
 
 
-def check_age(days_old, timestamp):
-  """Check to make sure a timestamp is older than the number of days specified.
-
-  @param days_old: Number of days that the job needs to be older than to be
-                   processed.
-  @param timestamp: Timestamp of the job whose age we are checking. Must be in
-                    '%Y-%m-%d %H:%M:%S' format.
-
-  @returns True if the job is old enough to process, False if its not.
-  """
-  if days_old <= 0:
-    return True
-  job_time = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
-  if job_time > (datetime.datetime.now() - datetime.timedelta(days=days_old)):
-    return False
-  return True
-
-
-def offload_hosts_sub_dir(queue, days_old):
-  """
-  Loop over the hosts/ sub directory and offload all the Cleanup, Verify and
-  Repair Jobs.
-
-  This will delete the job folders inside each host directory.
-
-  @param queue The work queue to place uploading tasks onto.
-  @param days_old: Only process a special task if its older than the number of
-                   days specified.
-  """
-  logging.debug('Offloading Cleanup, Verify and Repair jobs from'
-                'results/hosts/')
-  # Store these results in gs://chromeos-autotest-results/hosts
-  for host_entry in os.listdir(HOSTS_SUB_DIR):
-    # Inside a host directory.
-    # Store these results in gs://chromeos-autotest-results/hosts/{host_name}
-    host_path = os.path.join(HOSTS_SUB_DIR, host_entry)
-    if not os.path.isdir(host_path):
-      continue
-    for job_entry in os.listdir(host_path):
-      # Offload all the verify, clean and repair jobs for this host.
-      dir_path = os.path.join(host_path, job_entry)
-      if not os.path.isdir(dir_path):
-        continue
-      job_id = os.path.basename(dir_path).split('-')[0]
-
-      try:
-        special_task = is_job_complete.get_special_task(job_id)
-        if special_task['is_complete']:
-          if not check_age(days_old, special_task['time_started']):
-            continue
-          logging.debug('Processing %s', dir_path)
-          queue.put([dir_path, dir_path])
-        else:
-          logging.debug('Special Task %s is not yet complete; skipping.',
-                        dir_path)
-      except is_job_complete.DatabaseAnomaly as e:
-        email_subject = ERROR_EMAIL_SUBJECT_FORMAT % socket.gethostname()
-        email_msg = ERROR_EMAIL_MSG_FORMAT % (dir_path, str(e))
-        email_manager.manager.send_email(NOTIFY_ADDRESS, email_subject,
-                                         email_msg)
-
-
-def offload_job_results(queue, process_all, days_old):
-  """
-  Loop over all of the job directories and offload them.
-
-  This will delete the job result folders within the results/ directory.
-
-  @param queue The work queue to place uploading tasks onto.
-  @param process_all True if we should process both job and hosts folders.
-                     False if we should process only job folders.
-  @param days_old: Only process a job if its older than the number of days
-                   specified.
-  """
-  # Only pick up directories of the form <job #>-<job user>.
-  job_matcher = re.compile('^\d+-\w+')
-
-  # Iterate over all directories in results_dir.
-  for dir_entry in os.listdir('.'):
-    logging.debug('Processing %s', dir_entry)
-    if dir_entry == HOSTS_SUB_DIR and process_all:
-      offload_hosts_sub_dir(queue)
-      continue
-    if not job_matcher.match(dir_entry):
-      logging.debug('Skipping dir %s', dir_entry)
-      continue
-    # Directory names are in the format of <job #>-<job user>. We want just
-    # the job # to see if it has completed.
-    job_id = os.path.basename(dir_entry).split('-')[0]
-    job = is_job_complete.is_job_complete(job_id)
-    if not job:
-      logging.debug('Job %s is not yet complete; skipping.', dir_entry)
-      continue
-    if (job_matcher.match(dir_entry) and os.path.isdir(dir_entry)):
-      # The way we collect results currently is naive and results in a lot
-      # of extra data collection. Clear these for now until we can be more
-      # exact about what logs we care about. crosbug.com/26784.
-      # logging.debug('Cleaning %s of extra data.', dir_entry)
-      # os.system(CLEAN_CMD % dir_entry)
-      # TODO(scottz): Monitor offloading and make sure chrome logs are
-      # no longer an issue.
-      if not check_age(days_old, job[0]['created_on']):
-        continue
-      queue.put([dir_entry])
-
-
-def offload_dir(dir_entry, dest_path=''):
-  """
-  Offload the specified directory entry to the Google storage or the RSYNC host,
-  but timeout if it takes too long.
+def offload_dir(dir_entry, dest_path):
+  """Offload the specified directory entry to Google storage.
 
   @param dir_entry: Directory entry to offload.
-  @param dest_path: Location in google storage or rsync that we want to store
-                    this directory.
+  @param dest_path: Location in google storage where we will offload
+                    the directory.
+
   """
   try:
     error = False
-    signal.alarm(TIMEOUT)
+    signal.alarm(OFFLOAD_TIMEOUT_SECS)
     stdout_file = tempfile.TemporaryFile('w+')
     stderr_file = tempfile.TemporaryFile('w+')
     process = subprocess.Popen(get_cmd_list(dir_entry, dest_path),
@@ -243,7 +134,7 @@
   except TimeoutException:
     process.terminate()
     logging.error('Offloading %s timed out after waiting %d seconds.',
-                  dir_entry, TIMEOUT)
+                  dir_entry, OFFLOAD_TIMEOUT_SECS)
     error = True
   finally:
     signal.alarm(0)
@@ -251,30 +142,14 @@
       # Rewind the log files for stdout and stderr and log their contents.
       stdout_file.seek(0)
       stderr_file.seek(0)
-      stderr = stderr_file.read()
-
-      # The second to last line of stderr has the main error message we're
-      # interested in.
-      try:
-        error_msg = stderr.split('\n')[-2]
-      except IndexError:
-        # In case stderr does not meet our expected format, send out the whole
-        # message.
-        error_msg = stderr
-
-      email_subject = ERROR_EMAIL_SUBJECT_FORMAT % socket.gethostname()
-      email_msg = ERROR_EMAIL_MSG_FORMAT % (dir_entry, error_msg)
-      email_manager.manager.send_email(NOTIFY_ADDRESS, email_subject,
-                                       email_msg)
-      logging.error(email_msg)
-      logging.error('Stdout:\n%s \nStderr:\n%s', stdout_file.read(),
-                    stderr)
-
+      logging.error('Error occurred when offloading %s:', dir_entry)
+      logging.error('Stdout:\n%s \nStderr:\n%s',
+                    stdout_file.read(), stderr_file.read())
     stdout_file.close()
     stderr_file.close()
 
 
-def delete_files(dir_entry, dest_path=''):
+def delete_files(dir_entry, dest_path):
   """Simply deletes the dir_entry from the filesystem.
 
   Uses same arguments as offload_dir so that it can be used in replace of it on
@@ -286,56 +161,139 @@
   shutil.rmtree(dir_entry)
 
 
-def offload_files(results_dir, process_all, process_hosts_only, processes,
-                  delete_only, days_old):
+def report_offload_failures(joblist):
+  """Generate e-mail notification for failed offloads.
+
+  The e-mail report will include data from all jobs in `joblist`.
+
+  @param joblist List of jobs to be reported in the message.
+
   """
-  Offload files to Google Storage or the RSYNC_HOST_PATH host if USE_RSYNC is
-  True.
+  def _format_job(job):
+    d = datetime.datetime.fromtimestamp(job.get_failure_time())
+    data = (d.strftime(ERROR_EMAIL_TIME_FORMAT),
+            job.get_failure_count(),
+            job.get_job_directory())
+    return ERROR_EMAIL_DIRECTORY_FORMAT % data
+  joblines = [_format_job(job) for job in joblist]
+  joblines.sort()
+  email_subject = ERROR_EMAIL_SUBJECT_FORMAT % socket.gethostname()
+  email_message = ERROR_EMAIL_REPORT_FORMAT + ''.join(joblines)
+  email_manager.manager.send_email(NOTIFY_ADDRESS, email_subject,
+                                   email_message)
 
-  To ensure that the offloading times out properly we utilize a SIGALRM by
-  assigning a simple function, timeout_handler, to be called if the SIGALRM is
-  raised. timeout_handler will raise an exception that we can catch so that we
-  know the timeout has occured and can react accordingly.
 
-  @param results_dir: The Autotest results dir to look for dirs to offload.
-  @param process_all: Indicates whether or not we want to process all the
-                      files in results or just the larger test job files.
-  @param process_hosts_only: Indicates whether we only want to process files
-                             in the hosts subdirectory.
-  @param processes:  The number of uploading processes to kick off.
-  @param delete_only: If True, don't offload to google storage, just delete the
-                      files.
-  @param days_old: Only process a result if its older than the number of days
-                   specified.
+class Offloader(object):
+  """State of the offload process.
+
+  Contains the following member fields:
+    * _offload_func:  Function to call for each attempt to offload
+      a job directory.
+    * _jobdir_classes:  List of classes of job directory to be
+      offloaded.
+    * _processes:  Maximum number of outstanding offload processes
+      to allow during an offload cycle.
+    * _age_limit:  Minimum age in days at which a job may be
+      offloaded.
+    * _open_jobs: a dictionary mapping directory paths to Job
+      objects.
+    * _next_report_time:  Earliest time that we should send e-mail
+      if there are failures to be reported.
+
   """
-  # Nice our process (carried to subprocesses) so we don't kill the system.
-  os.nice(NICENESS)
-  logging.debug('Set process to nice value: %d', NICENESS)
-  # os.listdir returns relative paths, so change to where we need to be to avoid
-  # an os.path.join on each loop.
-  os.chdir(results_dir)
-  logging.debug('Looking for Autotest results in %s', results_dir)
-  signal.signal(signal.SIGALRM, timeout_handler)
-  if delete_only:
-    offloading_func = delete_files
-  else:
-    offloading_func = offload_dir
 
-  while True:
+  def __init__(self, options):
+    if options.delete_only:
+      self._offload_func = delete_files
+    else:
+      self._offload_func = offload_dir
+    classlist = []
+    if options.process_hosts_only or options.process_all:
+      classlist.append(job_directories.SpecialJobDirectory)
+    if not options.process_hosts_only:
+      classlist.append(job_directories.RegularJobDirectory)
+    self._jobdir_classes = classlist
+    assert self._jobdir_classes
+    self._processes = options.parallelism
+    self._age_limit = options.days_old
+    self._open_jobs = {}
+    self._next_report_time = time.time()
+
+  def _add_new_jobs(self):
+    """Find new job directories that need offloading.
+
+    Go through the file system looking for valid job directories
+    that are currently not in `self._open_jobs`, and add them in.
+
+    """
+    for cls in self._jobdir_classes:
+      for resultsdir in cls.get_job_directories():
+        if resultsdir in self._open_jobs:
+          continue
+        self._open_jobs[resultsdir] = cls(resultsdir)
+
+  def _remove_offloaded_jobs(self):
+    """Removed offloaded jobs from `self._open_jobs`."""
+    for jobkey, job in self._open_jobs.items():
+      if job.is_offloaded():
+        del self._open_jobs[jobkey]
+
+  def _have_reportable_errors(self):
+    """Return whether any jobs need reporting via e-mail.
+
+    @returns True if there are reportable jobs in `self._open_jobs`,
+             or False otherwise.
+    """
+    for job in self._open_jobs.values():
+      if job.is_reportable():
+        return True
+    return False
+
+  def _update_offload_results(self):
+    """Check and report status after attempting offload.
+
+    This function processes all jobs in `self._open_jobs`, assuming
+    an attempt has just been made to offload all of them.
+
+    Any jobs that have been successfully offloaded are removed.
+
+    If any jobs have reportable errors, and we haven't generated
+    an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
+    send new e-mail describing the failures.
+
+    """
+    self._remove_offloaded_jobs()
+    if (self._have_reportable_errors() and
+        time.time() >= self._next_report_time):
+      # N.B. We include all jobs that have failed at least once,
+      # which may include jobs that aren't otherwise reportable.
+      report_offload_failures([j for j in self._open_jobs.values()
+                                 if j.get_failure_time()])
+      self._next_report_time = time.time() + REPORT_INTERVAL_SECS
+
+  def offload_once(self):
+    """Perform one offload cycle.
+
+    Find all job directories for new jobs that we haven't seen
+    before.  Then, attempt to offload the directories for any
+    jobs that have finished running.  Offload of multiple jobs
+    is done in parallel, up to `self._processes` at a time.
+
+    After we've tried uploading all directories, go through the list
+    checking the status of all uploaded directories.  If necessary,
+    report failures via e-mail.
+
+    """
+    self._add_new_jobs()
     with parallel.BackgroundTaskRunner(
-        offloading_func, processes=processes) as queue:
-      if process_hosts_only:
-        # Only offload the hosts/ sub directory.
-        offload_hosts_sub_dir(queue, days_old)
-      else:
-        offload_job_results(queue, process_all, days_old)
-    time.sleep(SLEEP_TIME_SECS)
+        self._offload_func, processes=self._processes) as queue:
+      for job in self._open_jobs.values():
+        job.enqueue_offload(queue, self._age_limit)
+    self._update_offload_results()
 
 
 def parse_options():
-  """
-  Parse the args passed into gs_offloader.
-  """
+  """Parse the args passed into gs_offloader."""
   defaults = 'Defaults:\n  Destination: %s\n  Results Path: %s' % (GS_URI,
                                                                    RESULTS_DIR)
   usage = 'usage: %prog [options]\n' + defaults
@@ -381,8 +339,23 @@
           LOG_FILENAME_FORMAT % (offloader_type, log_timestamp))
   logging.basicConfig(filename=log_filename, level=logging.DEBUG,
                       format=LOGGING_FORMAT)
-  offload_files(RESULTS_DIR, options.process_all, options.process_hosts_only,
-                options.parallelism, options.delete_only, options.days_old)
+
+  # Nice our process (carried to subprocesses) so we don't overload
+  # the system.
+  logging.debug('Set process to nice value: %d', NICENESS)
+  os.nice(NICENESS)
+
+  # os.listdir returns relative paths, so change to where we need to be to avoid
+  # an os.path.join on each loop.
+  logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
+  os.chdir(RESULTS_DIR)
+
+  signal.signal(signal.SIGALRM, timeout_handler)
+
+  offloader = Offloader(options)
+  while True:
+    offloader.offload_once()
+    time.sleep(SLEEP_TIME_SECS)
 
 
 if __name__ == '__main__':
diff --git a/site_utils/gs_offloader_unittest.py b/site_utils/gs_offloader_unittest.py
new file mode 100644
index 0000000..0135acc
--- /dev/null
+++ b/site_utils/gs_offloader_unittest.py
@@ -0,0 +1,789 @@
+# Copyright 2013 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import Queue
+import datetime
+import logging
+import os
+import shutil
+import sys
+import tempfile
+import time
+import unittest
+
+import mox
+
+import common
+import gs_offloader
+import job_directories
+
+from autotest_lib.scheduler import email_manager
+
+# Test value to use for `days_old`, if nothing else is required.
+_TEST_EXPIRATION_AGE = 7
+
+# When constructing sample time values for testing expiration,
+# allow this many seconds between the expiration time and the
+# current time.
+_MARGIN_SECS = 10.0
+
+
+def _get_options(argv):
+    """Helper function to exercise command line parsing.
+
+    @param argv Value of sys.argv to be parsed.
+
+    """
+    sys.argv = ['bogus.py'] + argv
+    return gs_offloader.parse_options()
+
+
+class ParseOptionsTests(unittest.TestCase):
+    """Tests for `gs_offloader.parse_options()`."""
+
+    def test_process_no_options(self):
+        """Test option parsing when there are no arguments."""
+        options = _get_options([])
+        self.assertFalse(options.process_all)
+        self.assertFalse(options.process_hosts_only)
+        self.assertEqual(options.parallelism, 1)
+        self.assertFalse(options.delete_only)
+        self.assertEqual(options.days_old, 0)
+
+    def test_process_all_option(self):
+        """Test option parsing for the --all option."""
+        options = _get_options(['--all'])
+        self.assertTrue(options.process_all)
+        self.assertFalse(options.process_hosts_only)
+        self.assertEqual(options.parallelism, 1)
+        self.assertFalse(options.delete_only)
+        self.assertEqual(options.days_old, 0)
+
+    def test_process_hosts_option(self):
+        """Test option parsing for the --hosts option."""
+        options = _get_options(['--hosts'])
+        self.assertFalse(options.process_all)
+        self.assertTrue(options.process_hosts_only)
+        self.assertEqual(options.parallelism, 1)
+        self.assertFalse(options.delete_only)
+        self.assertEqual(options.days_old, 0)
+
+    def test_parallelism_option(self):
+        """Test option parsing for the --parallelism option."""
+        options = _get_options(['--parallelism', '2'])
+        self.assertFalse(options.process_all)
+        self.assertFalse(options.process_hosts_only)
+        self.assertEqual(options.parallelism, 2)
+        self.assertFalse(options.delete_only)
+        self.assertEqual(options.days_old, 0)
+
+    def test_delete_only_option(self):
+        """Test option parsing for the --delete_only option."""
+        options = _get_options(['--delete_only', '2'])
+        self.assertFalse(options.process_all)
+        self.assertFalse(options.process_hosts_only)
+        self.assertEqual(options.parallelism, 1)
+        self.assertTrue(options.delete_only)
+        self.assertEqual(options.days_old, 0)
+
+    def test_delete_only_option(self):
+        """Test option parsing for the --days_old option."""
+        options = _get_options(['--days_old', '7'])
+        self.assertFalse(options.process_all)
+        self.assertFalse(options.process_hosts_only)
+        self.assertEqual(options.parallelism, 1)
+        self.assertFalse(options.delete_only)
+        self.assertEqual(options.days_old, 7)
+
+
+def _make_timestamp(age_limit, is_expired):
+    """Create a timestamp for use by `job_directories._is_job_expired()`.
+
+    The timestamp will meet the syntactic requirements for
+    timestamps used as input to `_is_job_expired()`.  If
+    `is_expired` is true, the timestamp will be older than
+    `age_limit` days before the current time; otherwise, the
+    date will be younger.
+
+    @param age_limit    The number of days before expiration of the
+                        target timestamp.
+    @param is_expired   Whether the timestamp should be expired
+                        relative to `age_limit`.
+
+    """
+    seconds = -_MARGIN_SECS
+    if is_expired:
+        seconds = -seconds
+    delta = datetime.timedelta(days=age_limit, seconds=seconds)
+    reference_time = datetime.datetime.now() - delta
+    return reference_time.strftime(job_directories.JOB_TIME_FORMAT)
+
+
+class JobExpirationTests(unittest.TestCase):
+    """Tests to exercise `job_directories._is_job_expired()`."""
+
+    def test_expired(self):
+        """Test detection of an expired job."""
+        timestamp = _make_timestamp(_TEST_EXPIRATION_AGE, True)
+        self.assertTrue(
+            job_directories._is_job_expired(
+                _TEST_EXPIRATION_AGE, timestamp))
+
+
+    def test_alive(self):
+        """Test detection of a job that's not expired."""
+        # N.B.  This test may fail if its run time exceeds more than
+        # about _MARGIN_SECS seconds.
+        timestamp = _make_timestamp(_TEST_EXPIRATION_AGE, False)
+        self.assertFalse(
+            job_directories._is_job_expired(
+                _TEST_EXPIRATION_AGE, timestamp))
+
+
+class _MockJobDirectory(job_directories._JobDirectory):
+    """Subclass of `_JobDirectory` used as a helper for tests."""
+
+    GLOB_PATTERN = '[0-9]*-*'
+
+    def __init__(self, resultsdir):
+        """Create new job in initial state."""
+        super(_MockJobDirectory, self).__init__(resultsdir)
+        self._destname = 'fubar'
+        self._timestamp = None
+        if (os.path.isdir(os.path.dirname(self._dirname)) and
+                not os.path.isdir(self._dirname)):
+            os.mkdir(self._dirname)
+
+    def get_timestamp_if_finished(self):
+        return self._timestamp
+
+    def set_finished(self, days_old):
+        """Make this job appear to be finished.
+
+        After calling this function, calls to `enqueue_offload()`
+        will find this job as finished, but not expired and ready
+        for offload.  Note that when `days_old` is 0,
+        `enqueue_offload()` will treat a finished job as eligible
+        for offload.
+
+        @param days_old The value of the `days_old` parameter that
+                        will be passed to `enqueue_offload()` for
+                        testing.
+
+        """
+        self._timestamp = _make_timestamp(days_old, False)
+
+    def set_expired(self, days_old):
+        """Make this job eligible to be offloaded.
+
+        After calling this function, calls to `offload` will attempt
+        to offload this job.
+
+        @param days_old The value of the `days_old` parameter that
+                        will be passed to `enqueue_offload()` for
+                        testing.
+
+        """
+        self._timestamp = _make_timestamp(days_old, True)
+
+    def set_incomplete(self):
+        """Make this job appear to have failed offload just once."""
+        self._offload_count += 1
+        if not os.path.isdir(self._dirname):
+            os.mkdir(self._dirname)
+
+    def set_reportable(self):
+        """Make this job be reportable."""
+        self._offload_count += 1
+        self.set_incomplete()
+
+    def set_complete(self):
+        """Make this job be completed."""
+        self._offload_count += 1
+        if os.path.isdir(self._dirname):
+            os.rmdir(self._dirname)
+
+
+# Below is partial sample of e-mail notification text.  This text is
+# deliberately hard-coded and then parsed to create the test data;
+# the idea is to make sure the actual text format will be reviewed
+# by a human being.
+#
+# first offload      count  directory
+# --+----1----+----  ----+  ----+----1----+----2----+----3
+_SAMPLE_DIRECTORIES_REPORT = '''\
+=================== ======  ==============================
+2014-03-14 15:09:26      1  118-fubar
+2014-03-14 15:19:23      2  117-fubar
+2014-03-14 15:29:20      6  116-fubar
+2014-03-14 15:39:17     24  115-fubar
+2014-03-14 15:49:14    120  114-fubar
+2014-03-14 15:59:11    720  113-fubar
+2014-03-14 16:09:08   5040  112-fubar
+2014-03-14 16:19:05  40320  111-fubar
+'''
+
+
+class EmailTemplateTests(mox.MoxTestBase):
+    """Test the formatting of e-mail notifications."""
+
+    def setUp(self):
+        super(EmailTemplateTests, self).setUp()
+        self.mox.StubOutWithMock(email_manager.manager,
+                                 'send_email')
+        self._joblist = []
+        for line in _SAMPLE_DIRECTORIES_REPORT.split('\n')[1 : -1]:
+            date_, time_, count, dir_ = line.split()
+            job = _MockJobDirectory(dir_)
+            job._offload_count = int(count)
+            timestruct = time.strptime(
+                    "%s %s" % (date_, time_),
+                    gs_offloader.ERROR_EMAIL_TIME_FORMAT)
+            job._first_offload_start = time.mktime(timestruct)
+            # enter the jobs in reverse order, to make sure we
+            # test that the output will be sorted.
+            self._joblist.insert(0, job)
+
+    def test_email_template(self):
+        """Trigger an e-mail report and check its contents."""
+        # The last line of the report is a separator that we
+        # repeat in the first line of our expected result data.
+        # So, we remove that separator from the end of the of
+        # the e-mail report message.
+        #
+        # The last element in the list returned by split('\n')
+        # will be an empty string, so to remove the separator,
+        # we remove the next-to-last entry in the list.
+        report_lines = gs_offloader.ERROR_EMAIL_REPORT_FORMAT.split('\n')
+        expected_message = ('\n'.join(report_lines[: -2] +
+                                      report_lines[-1 :]) +
+                            _SAMPLE_DIRECTORIES_REPORT)
+        email_manager.manager.send_email(
+            mox.IgnoreArg(), mox.IgnoreArg(), expected_message)
+        self.mox.ReplayAll()
+        gs_offloader.report_offload_failures(self._joblist)
+
+
+class _TempResultsDirTestBase(mox.MoxTestBase):
+    """Base class for tests using a temporary results directory."""
+
+    def setUp(self):
+        super(_TempResultsDirTestBase, self).setUp()
+        self._resultsroot = tempfile.mkdtemp(dir='.')
+
+    def tearDown(self):
+        shutil.rmtree(self._resultsroot)
+        super(_TempResultsDirTestBase, self).tearDown()
+
+    def make_job(self, jobdir):
+        """Create a job with results in `self._resultsroot`.
+
+        @param jobdir Name of the subdirectory to be created in
+                      `self._resultsroot`.
+
+        """
+        d = os.path.join(self._resultsroot, jobdir)
+        return _MockJobDirectory(d)
+
+
+class JobDirectoryOffloadTests(_TempResultsDirTestBase):
+    """Tests for `_JobDirectory.enqueue_offload()`.
+
+    When testing with a `days_old` parameter of 0, we use
+    `set_finished()` instead of `set_expired()`.  This causes the
+    job's timestamp to be set in the future.  This is done so as
+    to test that when `days_old` is 0, the job is always treated
+    as eligible for offload, regardless of the timestamp's value.
+
+    Testing covers the following assertions:
+     A. Each time `enqueue_offload()` is called, a message that
+        includes the job's directory name will be logged using
+        `logging.debug()`, regardless of whether the job was
+        enqueued.  Nothing else is allowed to be logged.
+     B. If the job is not eligible to be offloaded,
+        `_first_offload_start` and `_offload_count` are 0.
+     C. If the job is not eligible for offload, nothing is
+        enqueued in `queue`.
+     D. When the job is offloaded, `_offload_count` increments
+        each time.
+     E. When the job is offloaded, the appropriate parameters are
+        enqueued exactly once.
+     F. The first time a job is offloaded, `_first_offload_start` is
+        set to the current time.
+     G. `_first_offload_start` only changes the first time that the
+        job is offloaded.
+
+    The test cases below are designed to exercise all of the
+    meaningful state transitions at least once.
+
+    """
+
+    def setUp(self):
+        super(JobDirectoryOffloadTests, self).setUp()
+        self._job = self.make_job('1-aaa')
+        self._queue = Queue.Queue()
+        self.mox.StubOutWithMock(logging, 'debug')
+
+    def _offload_once(self, days_old):
+        """Make one call to the `enqueue_offload()` method.
+
+        This method tests assertion A regarding message
+        logging.
+
+        """
+        logging.debug(mox.IgnoreArg(), self._job._dirname)
+        self.mox.ReplayAll()
+        self._job.enqueue_offload(self._queue, days_old)
+        self.mox.VerifyAll()
+        self.mox.ResetAll()
+
+    def _offload_unexpired_job(self, days_old):
+        """Make calls to `enqueue_offload()` for an unexpired job.
+
+        This method tests assertions B and C that calling
+        `enqueue_offload()` has no effect.
+
+        """
+        self.assertEqual(self._job._offload_count, 0)
+        self.assertEqual(self._job._first_offload_start, 0)
+        self._offload_once(days_old)
+        self._offload_once(days_old)
+        self.assertTrue(self._queue.empty())
+        self.assertEqual(self._job._offload_count, 0)
+        self.assertEqual(self._job._first_offload_start, 0)
+
+    def _offload_expired_once(self, days_old, count):
+        """Make one call to `enqueue_offload()` for an expired job.
+
+        This method tests assertions D and E regarding side-effects
+        expected when a job is offloaded.
+
+        """
+        self._offload_once(days_old)
+        self.assertEqual(self._job._offload_count, count)
+        self.assertFalse(self._queue.empty())
+        v = self._queue.get_nowait()
+        self.assertTrue(self._queue.empty())
+        self.assertEqual(v, [self._job._dirname, self._job._destname])
+
+    def _offload_expired_job(self, days_old):
+        """Make calls to `enqueue_offload()` for a just-expired job.
+
+        This method directly tests assertions F and G regarding
+        side-effects on `_first_offload_start`.
+
+        """
+        t0 = time.time()
+        self._offload_expired_once(days_old, 1)
+        t1 = self._job._first_offload_start
+        self.assertLessEqual(t1, time.time())
+        self.assertGreaterEqual(t1, t0)
+        self._offload_expired_once(days_old, 2)
+        self.assertEqual(self._job._first_offload_start, t1)
+        self._offload_expired_once(days_old, 3)
+        self.assertEqual(self._job._first_offload_start, t1)
+
+    def test_case_1_no_expiration(self):
+        """Test a series of `enqueue_offload()` calls with `days_old` of 0.
+
+        This tests that offload works as expected if calls are
+        made both before and after the job becomes expired.
+
+        """
+        self._offload_unexpired_job(0)
+        self._job.set_finished(0)
+        self._offload_expired_job(0)
+
+    def test_case_2_no_expiration(self):
+        """Test a series of `enqueue_offload()` calls with `days_old` of 0.
+
+        This tests that offload works as expected if calls are made
+        only after the job becomes expired.
+
+        """
+        self._job.set_finished(0)
+        self._offload_expired_job(0)
+
+    def test_case_1_with_expiration(self):
+        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
+
+        This tests that offload works as expected if calls are made
+        before the job finishes, before the job expires, and after
+        the job expires.
+
+        """
+        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
+        self._job.set_finished(_TEST_EXPIRATION_AGE)
+        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
+        self._job.set_expired(_TEST_EXPIRATION_AGE)
+        self._offload_expired_job(_TEST_EXPIRATION_AGE)
+
+    def test_case_2_with_expiration(self):
+        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
+
+        This tests that offload works as expected if calls are made
+        between finishing and expiration, and after the job expires.
+
+        """
+        self._job.set_finished(_TEST_EXPIRATION_AGE)
+        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
+        self._job.set_expired(_TEST_EXPIRATION_AGE)
+        self._offload_expired_job(_TEST_EXPIRATION_AGE)
+
+    def test_case_3_with_expiration(self):
+        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
+
+        This tests that offload works as expected if calls are made
+        only before finishing and after expiration.
+
+        """
+        self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
+        self._job.set_expired(_TEST_EXPIRATION_AGE)
+        self._offload_expired_job(_TEST_EXPIRATION_AGE)
+
+    def test_case_4_with_expiration(self):
+        """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
+
+        This tests that offload works as expected if calls are made
+        only after expiration.
+
+        """
+        self._job.set_expired(_TEST_EXPIRATION_AGE)
+        self._offload_expired_job(_TEST_EXPIRATION_AGE)
+
+
+class GetJobDirectoriesTests(_TempResultsDirTestBase):
+    """Tests for `_JobDirectory.get_job_directories()`."""
+
+    _REGULAR_JOBLIST = [
+        '111-fubar', '112-fubar', '113-fubar', '114-snafu']
+    _HOSTS = ['host1', 'host2', 'host3']
+    _SPECIAL_JOBLIST = [
+        'hosts/host1/333-reset', 'hosts/host1/334-reset',
+        'hosts/host2/444-reset', 'hosts/host3/555-reset']
+
+    def setUp(self):
+        super(GetJobDirectoriesTests, self).setUp()
+        self._make_job_directories(self._REGULAR_JOBLIST)
+        os.mkdir(os.path.join(self._resultsroot, 'not-a-job'))
+        just_a_file_path = os.path.join(self._resultsroot, 'not-a-dir')
+        open(just_a_file_path, 'w').close()
+        hostsdir = os.path.join(self._resultsroot, 'hosts')
+        os.mkdir(hostsdir)
+        for host in self._HOSTS:
+            os.mkdir(os.path.join(hostsdir, host))
+        self._make_job_directories(self._SPECIAL_JOBLIST)
+
+    def _make_job_directories(self, dirlist):
+        for d in dirlist:
+            os.mkdir(os.path.join(self._resultsroot, d))
+
+    def _run_get_directories(self, cls, expected_list):
+        """Test `get_job_directories()` for the given class.
+
+        Calls the method, and asserts that the returned list of
+        directories matches the expected return value.
+
+        @param expected_list Expected return value from the call.
+        """
+        cwd = os.getcwd()
+        os.chdir(self._resultsroot)
+        dirlist = cls.get_job_directories()
+        self.assertEqual(set(dirlist), set(expected_list))
+        os.chdir(cwd)
+
+    def test_get_regular_jobs(self):
+        """Test `RegularJobDirectory.get_job_directories()`."""
+        self._run_get_directories(job_directories.RegularJobDirectory,
+                                  self._REGULAR_JOBLIST)
+
+    def test_get_special_jobs(self):
+        """Test `SpecialJobDirectory.get_job_directories()`."""
+        self._run_get_directories(job_directories.SpecialJobDirectory,
+                                  self._SPECIAL_JOBLIST)
+
+
+class AddJobsTests(_TempResultsDirTestBase):
+    """Tests for `Offloader._add_new_jobs()`."""
+
+    _JOBLIST = ['111-fubar', '112-fubar', '113-fubar', '114-snafu']
+    _MOREJOBS = ['115-fubar', '116-fubar', '117-fubar', '118-snafu']
+
+    def setUp(self):
+        super(AddJobsTests, self).setUp()
+        self._cwd = os.getcwd()
+        self._offloader = gs_offloader.Offloader(_get_options([]))
+        self._offloader._jobdir_classes = [_MockJobDirectory]
+        self._make_job_directories(self._JOBLIST)
+        os.mkdir(os.path.join(self._resultsroot, 'not-a-job'))
+        just_a_file_path = os.path.join(self._resultsroot, 'not-a-dir')
+        open(just_a_file_path, 'w').close()
+
+    def _make_job_directories(self, dirlist):
+        for d in dirlist:
+            os.mkdir(os.path.join(self._resultsroot, d))
+
+    def _run_add_new_jobs(self):
+        os.chdir(self._resultsroot)
+        self._offloader._add_new_jobs()
+        os.chdir(self._cwd)
+
+    def _check_open_jobs(self, expected_key_set):
+        """Basic test assertions for `_add_new_jobs()`.
+
+        Asserts the following:
+          * The keys in the offloader's `_open_jobs` dictionary
+            matches the expected set of keys.
+          * For every job in `_open_jobs`, the job has the expected
+            directory name.
+
+        """
+        self.assertEqual(expected_key_set,
+                         set(self._offloader._open_jobs.keys()))
+        for jobkey, job in self._offloader._open_jobs.items():
+            self.assertEqual(jobkey, job._dirname)
+
+    def test_add_jobs_empty(self):
+        """Test adding jobs to an empty dictionary.
+
+        Calls the offloader's `_add_new_jobs()`, then perform
+        the assertions of `self._check_open_jobs()`.
+
+        """
+        self._run_add_new_jobs()
+        self._check_open_jobs(set(self._JOBLIST))
+
+    def test_add_jobs_non_empty(self):
+        """Test adding jobs to a non-empty dictionary.
+
+        Calls the offloader's `_add_new_jobs()` twice; once from
+        initial conditions, and then again after adding more
+        directories.  After the second call, perform the assertions
+        of `self._check_open_jobs()`.  Additionally, assert that
+        keys added by the first call still map to their original
+        job object after the second call.
+
+        """
+        self._run_add_new_jobs()
+        jobs_copy = self._offloader._open_jobs.copy()
+        self._make_job_directories(self._MOREJOBS)
+        self._run_add_new_jobs()
+        self._check_open_jobs(set(self._JOBLIST) | set(self._MOREJOBS))
+        for key in jobs_copy.keys():
+            self.assertIs(jobs_copy[key],
+                          self._offloader._open_jobs[key])
+
+
+class JobStateTests(_TempResultsDirTestBase):
+    """Tests for job state predicates.
+
+    This tests for the expected results from the
+    `is_offloaded()` and `is_reportable()` predicate
+    methods.
+
+    """
+
+    def test_unfinished_job(self):
+        """Test that an unfinished job reports the correct state.
+
+        A job is "unfinished" if it isn't marked complete in the
+        database.  A job in this state is neither "complete" nor
+        "reportable".
+
+        """
+        job = self.make_job('1-fubar')
+        self.assertFalse(job.is_offloaded())
+        self.assertFalse(job.is_reportable())
+
+    def test_incomplete_job(self):
+        """Test that an incomplete job reports the correct state.
+
+        A job is "incomplete" if exactly one attempt has been made
+        to offload the job, but its results directory still exists.
+        A job in this state is neither "complete" nor "reportable".
+
+        """
+        job = self.make_job('1-fubar')
+        job.set_incomplete()
+        self.assertFalse(job.is_offloaded())
+        self.assertFalse(job.is_reportable())
+
+    def test_reportable_job(self):
+        """Test that a reportable job reports the correct state.
+
+        A job is "reportable" if more than one attempt has been made
+        to offload the job, and its results directory still exists.
+        A job in this state is "reportable", but not "complete".
+
+        """
+        job = self.make_job('1-fubar')
+        job.set_reportable()
+        self.assertFalse(job.is_offloaded())
+        self.assertTrue(job.is_reportable())
+
+    def test_completed_job(self):
+        """Test that a completed job reports the correct state.
+
+        A job is "completed" if at least one attempt has been made
+        to offload the job, and its results directory still exists.
+        A job in this state is "complete", and not "reportable".
+
+        """
+        job = self.make_job('1-fubar')
+        job.set_complete()
+        self.assertTrue(job.is_offloaded())
+        self.assertFalse(job.is_reportable())
+
+
+class ReportingTests(_TempResultsDirTestBase):
+    """Tests for `Offloader._update_offload_results()`."""
+
+    JOBDIRS = [ '1-fubar', '2-fubar']
+
+    def setUp(self):
+        super(ReportingTests, self).setUp()
+        self._offloader = gs_offloader.Offloader(_get_options([]))
+        self.mox.StubOutWithMock(email_manager.manager,
+                                 'send_email')
+
+    def _add_job(self, jobdir):
+        """Add a job to the dictionary of unfinished jobs."""
+        j = self.make_job(jobdir)
+        self._offloader._open_jobs[j._dirname] = j
+        return j
+
+    def _run_update_no_report(self, new_open_jobs):
+        """Call `_update_offload_results()` expecting no report.
+
+        Initial conditions are set up by the caller.  This calls
+        `_update_offload_results()` once, and then checks these
+        assertions:
+          * The offloader's `_next_report_time` field is unchanged.
+          * The offloader's new `_open_jobs` field contains only
+            the entries in `new_open_jobs`.
+          * The email_manager's `send_email` stub wasn't called.
+
+        @param new_open_jobs A dictionary representing the expected
+                             new value of the offloader's
+                             `_open_jobs` field.
+        """
+        self.mox.ReplayAll()
+        next_report_time = self._offloader._next_report_time
+        self._offloader._update_offload_results()
+        self.assertEqual(next_report_time,
+                         self._offloader._next_report_time)
+        self.assertEqual(self._offloader._open_jobs, new_open_jobs)
+        self.mox.VerifyAll()
+        self.mox.ResetAll()
+
+    def _run_update_with_report(self, new_open_jobs):
+        """Call `_update_offload_results()` expecting an e-mail report.
+
+        Initial conditions are set up by the caller.  This calls
+        `_update_offload_results()` once, and then checks these
+        assertions:
+          * The offloader's `_next_report_time` field is updated
+            to an appropriate new time.
+          * The offloader's new `_open_jobs` field contains only
+            the entries in `new_open_jobs`.
+          * The email_manager's `send_email` stub was called.
+
+        @param new_open_jobs A dictionary representing the expected
+                             new value of the offloader's
+                             `_open_jobs` field.
+        """
+        email_manager.manager.send_email(
+            mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg())
+        self.mox.ReplayAll()
+        t0 = time.time() + gs_offloader.REPORT_INTERVAL_SECS
+        self._offloader._update_offload_results()
+        t1 = time.time() + gs_offloader.REPORT_INTERVAL_SECS
+        next_report_time = self._offloader._next_report_time
+        self.assertGreaterEqual(next_report_time, t0)
+        self.assertLessEqual(next_report_time, t1)
+        self.assertEqual(self._offloader._open_jobs, new_open_jobs)
+        self.mox.VerifyAll()
+        self.mox.ResetAll()
+
+    def test_no_jobs(self):
+        """Test `_update_offload_results()` with no open jobs.
+
+        Initial conditions are an empty `_open_jobs` list and
+        `_next_report_time` in the past.  Expected result is no
+        e-mail report, and an empty `_open_jobs` list.
+
+        """
+        self._run_update_no_report({})
+
+    def test_all_completed(self):
+        """Test `_update_offload_results()` with only complete jobs.
+
+        Initial conditions are an `_open_jobs` list consisting of
+        only completed jobs and `_next_report_time` in the past.
+        Expected result is no e-mail report, and an empty
+        `_open_jobs` list.
+
+        """
+        for d in self.JOBDIRS:
+            self._add_job(d).set_complete()
+        self._run_update_no_report({})
+
+    def test_none_finished(self):
+        """Test `_update_offload_results()` with only unfinished jobs.
+
+        Initial conditions are an `_open_jobs` list consisting of
+        only unfinished jobs and `_next_report_time` in the past.
+        Expected result is no e-mail report, and no change to the
+        `_open_jobs` list.
+
+        """
+        for d in self.JOBDIRS:
+            self._add_job(d)
+        self._run_update_no_report(self._offloader._open_jobs.copy())
+
+    def test_none_reportable(self):
+        """Test `_update_offload_results()` with only incomplete jobs.
+
+        Initial conditions are an `_open_jobs` list consisting of
+        only incomplete jobs and `_next_report_time` in the past.
+        Expected result is no e-mail report, and no change to the
+        `_open_jobs` list.
+
+        """
+        for d in self.JOBDIRS:
+            self._add_job(d).set_incomplete()
+        self._run_update_no_report(self._offloader._open_jobs.copy())
+
+    def test_report_not_ready(self):
+        """Test `_update_offload_results()` e-mail throttling.
+
+        Initial conditions are an `_open_jobs` list consisting of
+        only reportable jobs but with `_next_report_time` in
+        the future.  Expected result is no e-mail report, and no
+        change to the `_open_jobs` list.
+
+        """
+        # N.B.  This test may fail if its run time exceeds more than
+        # about _MARGIN_SECS seconds.
+        for d in self.JOBDIRS:
+            self._add_job(d).set_reportable()
+        self._offloader._next_report_time += _MARGIN_SECS
+        self._run_update_no_report(self._offloader._open_jobs.copy())
+
+    def test_reportable(self):
+        """Test `_update_offload_results()` with reportable jobs.
+
+        Initial conditions are an `_open_jobs` list consisting of
+        only reportable jobs and with `_next_report_time` in
+        the past.  Expected result is an e-mail report, and no
+        change to the `_open_jobs` list.
+
+        """
+        for d in self.JOBDIRS:
+            self._add_job(d).set_reportable()
+        self._run_update_with_report(self._offloader._open_jobs.copy())
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/site_utils/is_job_complete.py b/site_utils/is_job_complete.py
deleted file mode 100755
index f1b2d5a..0000000
--- a/site_utils/is_job_complete.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#!/usr/bin/python
-
-import common
-import logging
-import sys
-from autotest_lib.server import frontend
-
-_AFE = frontend.AFE(debug=False)
-
-
-class DatabaseAnomaly(Exception):
-    """Raised when we observe a database anomaly."""
-
-
-def is_job_complete(job_id):
-    """
-    Check if a job is no longer active.
-
-    @param job_id: afe job id like 123 from 123-scottza
-    @return: An empty list if the job isn't complete.
-             A list containing the job details, if it is.
-    """
-    return _AFE.run('get_jobs', finished=True, id=job_id)
-
-
-def get_special_task(job_id):
-    """
-    Retrieve a special task (Cleanup, Verify, Repair) job from the database.
-
-    @param job_id: job id in string format like '123' from '123-cleanup'
-
-    @return A dictionary representation of the special task.
-    """
-    # Make sure the job_id is a number.
-    if not job_id.isdigit():
-        logging.error('Job_id: %s is not a number returning False.', job_id)
-        return False
-
-    task = _AFE.run('get_special_tasks', id=job_id)
-    if not task:
-        raise DatabaseAnomaly('Special Task %s not found in database.' % job_id)
-    return task[0]
-
-
-if __name__ == '__main__':
-  if len(sys.argv) != 2:
-      print ('Set return status to 0 if job is complete or 1 if it is not.\n'
-             'Usage: is_job_complete.py <job_id>')
-  else:
-      sys.exit(not is_job_complete(sys.argv[1]))
diff --git a/site_utils/job_directories.py b/site_utils/job_directories.py
new file mode 100755
index 0000000..cd0dd75
--- /dev/null
+++ b/site_utils/job_directories.py
@@ -0,0 +1,172 @@
+import abc
+import datetime
+import glob
+import logging
+import os
+import time
+
+import common
+from autotest_lib.server import frontend
+
+_AFE = frontend.AFE(debug=False)
+
+JOB_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
+
+def _is_job_expired(age_limit, timestamp):
+  """Check whether a job timestamp is older than an age limit.
+
+  @param age_limit: Minimum age, measured in days.  If the value is
+                    not positive, the job is always expired.
+  @param timestamp: Timestamp of the job whose age we are checking.
+                    The format must match JOB_TIME_FORMAT.
+
+  @returns True iff the job is old enough to be expired.
+  """
+  if age_limit <= 0:
+    return True
+  job_time = datetime.datetime.strptime(timestamp, JOB_TIME_FORMAT)
+  expiration = job_time + datetime.timedelta(days=age_limit)
+  return datetime.datetime.now() >= expiration
+
+
+class _JobDirectory(object):
+  """State associated with a job to be offloaded.
+
+  The full life-cycle of a job (including failure events that
+  normally don't occur) looks like this:
+   1. The job's results directory is discovered by
+      `get_job_directories()`, and a job instance is created for it.
+   2. Calls to `offload()` have no effect so long as the job
+      isn't complete in the database and the job isn't expired
+      according to the `age_limit` parameter.
+   3. Eventually, the job is both finished and expired.  The next
+      call to `offload()` makes the first attempt to offload the
+      directory to GS.  Offload is attempted, but fails to complete
+      (e.g. because of a GS problem).
+   4. After the first failed offload `is_offloaded()` is false,
+      but `is_reportable()` is also false, so the failure is not
+      reported.
+   5. Another call to `offload()` again tries to offload the
+      directory, and again fails.
+   6. After a second failure, `is_offloaded()` is false and
+      `is_reportable()` is true, so the failure generates an e-mail
+      notification.
+   7. Finally, a call to `offload()` succeeds, and the directory no
+      longer exists.  Now `is_offloaded()` is true, so the job
+      instance is deleted, and future failures will not mention this
+      directory any more.
+
+  Only steps 1. and 7. are guaranteed to occur.  The others depend
+  on the timing of calls to `offload()`, and on the reliability of
+  the actual offload process.
+
+  """
+
+  __metaclass__ = abc.ABCMeta
+
+  GLOB_PATTERN = None   # must be redefined in subclass
+
+  def __init__(self, resultsdir):
+    self._dirname = resultsdir
+    self._destname = ''
+    self._id = os.path.basename(resultsdir).split('-')[0]
+    self._offload_count = 0
+    self._first_offload_start = 0
+
+  @classmethod
+  def get_job_directories(cls):
+    """Return a list of directories of jobs that need offloading."""
+    return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)]
+
+  @abc.abstractmethod
+  def get_timestamp_if_finished(self):
+    """Return this job's timestamp from the database.
+
+    If the database has not marked the job as finished, return
+    `None`.  Otherwise, return a timestamp for the job.  The
+    timestamp is to be used to determine expiration in
+    `_is_job_expired()`.
+
+    @return Return `None` if the job is still running; otherwise
+            return a string with a timestamp in the appropriate
+            format.
+    """
+    raise NotImplementedError("_JobDirectory.get_timestamp_if_finished")
+
+  def enqueue_offload(self, queue, age_limit):
+    """Enqueue the job for offload, if it's eligible.
+
+    The job is eligible for offloading if the database has marked
+    it finished, and the job is older than the `age_limit`
+    parameter.
+
+    If the job is eligible, offload processing is requested by
+    passing the `queue` parameter's `put()` method a sequence with
+    the job's `_dirname` and `_destname` attributes.
+
+    @param queue     If the job should be offloaded, put the offload
+                     parameters into this queue for processing.
+    @param age_limit Minimum age for a job to be offloaded.  A value
+                     of 0 means that the job will be offloaded as
+                     soon as it is finished.
+
+    """
+    if not self._offload_count:
+      timestamp = self.get_timestamp_if_finished()
+      if not timestamp:
+        logging.debug('Skipping %s - not finished.', self._dirname)
+        return
+      if not _is_job_expired(age_limit, timestamp):
+        logging.debug('Skipping %s - not old enough.', self._dirname)
+        return
+      self._first_offload_start = time.time()
+    logging.debug('Processing %s', self._dirname)
+    self._offload_count += 1
+    queue.put([self._dirname, self._destname])
+
+  def is_offloaded(self):
+    """Return whether this job has been successfully offloaded."""
+    return not os.path.exists(self._dirname)
+
+  def is_reportable(self):
+    """Return whether this job has a reportable failure."""
+    return self._offload_count > 1
+
+  def get_failure_time(self):
+    """Return the time of the first offload failure."""
+    return self._first_offload_start
+
+  def get_failure_count(self):
+    """Return the number of times this job has failed to offload."""
+    return self._offload_count
+
+  def get_job_directory(self):
+    """Return the name of this job's results directory."""
+    return self._dirname
+
+
+class RegularJobDirectory(_JobDirectory):
+  """Subclass of _JobDirectory for regular test jobs."""
+
+  GLOB_PATTERN = '[0-9]*-*'
+
+  def __init__(self, resultsdir):
+    super(SpecialJobDirectory, self).__init__(resultsdir)
+
+  def get_timestamp_if_finished(self):
+    entry = _AFE.run('get_jobs', id=self.id, finished=True)
+    return entry[0]['created_on'] if entry else None
+
+
+class SpecialJobDirectory(_JobDirectory):
+  """Subclass of _JobDirectory for special (per-host) jobs."""
+
+  GLOB_PATTERN = 'hosts/*/[0-9]*-*'
+
+  def __init__(self, resultsdir):
+    super(SpecialJobDirectory, self).__init__(resultsdir)
+    self._destname = os.path.dirname(resultsdir)
+
+  def get_timestamp_if_finished(self):
+    entry = _AFE.run('get_special_tasks', id=self.id, is_complete=True)
+    return entry[0]['time_started'] if entry else None