J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 1 | import abc |
| 2 | import datetime |
| 3 | import glob |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 4 | import os |
| 5 | import time |
| 6 | |
| 7 | import common |
Dan Shi | dfea368 | 2014-08-10 23:38:40 -0700 | [diff] [blame] | 8 | from autotest_lib.client.common_lib import time_utils |
J. Richard Barnette | acdb013 | 2014-09-03 16:44:12 -0700 | [diff] [blame] | 9 | from autotest_lib.server.cros.dynamic_suite import frontend_wrappers |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 10 | |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 11 | |
Dan Shi | dfea368 | 2014-08-10 23:38:40 -0700 | [diff] [blame] | 12 | _AFE = frontend_wrappers.RetryingAFE() |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 13 | |
| 14 | def _is_job_expired(age_limit, timestamp): |
| 15 | """Check whether a job timestamp is older than an age limit. |
| 16 | |
| 17 | @param age_limit: Minimum age, measured in days. If the value is |
| 18 | not positive, the job is always expired. |
| 19 | @param timestamp: Timestamp of the job whose age we are checking. |
Dan Shi | dfea368 | 2014-08-10 23:38:40 -0700 | [diff] [blame] | 20 | The format must match time_utils.TIME_FMT. |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 21 | |
| 22 | @returns True iff the job is old enough to be expired. |
| 23 | """ |
| 24 | if age_limit <= 0: |
| 25 | return True |
Dan Shi | dfea368 | 2014-08-10 23:38:40 -0700 | [diff] [blame] | 26 | job_time = time_utils.time_string_to_datetime(timestamp) |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 27 | expiration = job_time + datetime.timedelta(days=age_limit) |
| 28 | return datetime.datetime.now() >= expiration |
| 29 | |
| 30 | |
| 31 | class _JobDirectory(object): |
| 32 | """State associated with a job to be offloaded. |
| 33 | |
| 34 | The full life-cycle of a job (including failure events that |
| 35 | normally don't occur) looks like this: |
| 36 | 1. The job's results directory is discovered by |
| 37 | `get_job_directories()`, and a job instance is created for it. |
| 38 | 2. Calls to `offload()` have no effect so long as the job |
| 39 | isn't complete in the database and the job isn't expired |
| 40 | according to the `age_limit` parameter. |
| 41 | 3. Eventually, the job is both finished and expired. The next |
| 42 | call to `offload()` makes the first attempt to offload the |
| 43 | directory to GS. Offload is attempted, but fails to complete |
| 44 | (e.g. because of a GS problem). |
| 45 | 4. After the first failed offload `is_offloaded()` is false, |
| 46 | but `is_reportable()` is also false, so the failure is not |
| 47 | reported. |
| 48 | 5. Another call to `offload()` again tries to offload the |
| 49 | directory, and again fails. |
| 50 | 6. After a second failure, `is_offloaded()` is false and |
| 51 | `is_reportable()` is true, so the failure generates an e-mail |
| 52 | notification. |
| 53 | 7. Finally, a call to `offload()` succeeds, and the directory no |
| 54 | longer exists. Now `is_offloaded()` is true, so the job |
| 55 | instance is deleted, and future failures will not mention this |
| 56 | directory any more. |
| 57 | |
| 58 | Only steps 1. and 7. are guaranteed to occur. The others depend |
| 59 | on the timing of calls to `offload()`, and on the reliability of |
| 60 | the actual offload process. |
| 61 | |
| 62 | """ |
| 63 | |
| 64 | __metaclass__ = abc.ABCMeta |
| 65 | |
| 66 | GLOB_PATTERN = None # must be redefined in subclass |
| 67 | |
| 68 | def __init__(self, resultsdir): |
| 69 | self._dirname = resultsdir |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 70 | self._id = os.path.basename(resultsdir).split('-')[0] |
| 71 | self._offload_count = 0 |
| 72 | self._first_offload_start = 0 |
| 73 | |
| 74 | @classmethod |
| 75 | def get_job_directories(cls): |
| 76 | """Return a list of directories of jobs that need offloading.""" |
| 77 | return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)] |
| 78 | |
| 79 | @abc.abstractmethod |
| 80 | def get_timestamp_if_finished(self): |
| 81 | """Return this job's timestamp from the database. |
| 82 | |
| 83 | If the database has not marked the job as finished, return |
| 84 | `None`. Otherwise, return a timestamp for the job. The |
| 85 | timestamp is to be used to determine expiration in |
| 86 | `_is_job_expired()`. |
| 87 | |
| 88 | @return Return `None` if the job is still running; otherwise |
| 89 | return a string with a timestamp in the appropriate |
| 90 | format. |
| 91 | """ |
| 92 | raise NotImplementedError("_JobDirectory.get_timestamp_if_finished") |
| 93 | |
| 94 | def enqueue_offload(self, queue, age_limit): |
| 95 | """Enqueue the job for offload, if it's eligible. |
| 96 | |
| 97 | The job is eligible for offloading if the database has marked |
| 98 | it finished, and the job is older than the `age_limit` |
| 99 | parameter. |
| 100 | |
| 101 | If the job is eligible, offload processing is requested by |
| 102 | passing the `queue` parameter's `put()` method a sequence with |
J. Richard Barnette | 2c72ddd | 2014-05-20 12:17:37 -0700 | [diff] [blame] | 103 | the job's `_dirname` attribute and its directory name. |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 104 | |
| 105 | @param queue If the job should be offloaded, put the offload |
| 106 | parameters into this queue for processing. |
| 107 | @param age_limit Minimum age for a job to be offloaded. A value |
| 108 | of 0 means that the job will be offloaded as |
| 109 | soon as it is finished. |
| 110 | |
| 111 | """ |
| 112 | if not self._offload_count: |
| 113 | timestamp = self.get_timestamp_if_finished() |
| 114 | if not timestamp: |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 115 | return |
| 116 | if not _is_job_expired(age_limit, timestamp): |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 117 | return |
| 118 | self._first_offload_start = time.time() |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 119 | self._offload_count += 1 |
J. Richard Barnette | 2c72ddd | 2014-05-20 12:17:37 -0700 | [diff] [blame] | 120 | queue.put([self._dirname, os.path.dirname(self._dirname)]) |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 121 | |
| 122 | def is_offloaded(self): |
| 123 | """Return whether this job has been successfully offloaded.""" |
| 124 | return not os.path.exists(self._dirname) |
| 125 | |
| 126 | def is_reportable(self): |
| 127 | """Return whether this job has a reportable failure.""" |
| 128 | return self._offload_count > 1 |
| 129 | |
| 130 | def get_failure_time(self): |
| 131 | """Return the time of the first offload failure.""" |
| 132 | return self._first_offload_start |
| 133 | |
| 134 | def get_failure_count(self): |
| 135 | """Return the number of times this job has failed to offload.""" |
| 136 | return self._offload_count |
| 137 | |
| 138 | def get_job_directory(self): |
| 139 | """Return the name of this job's results directory.""" |
| 140 | return self._dirname |
| 141 | |
| 142 | |
| 143 | class RegularJobDirectory(_JobDirectory): |
| 144 | """Subclass of _JobDirectory for regular test jobs.""" |
| 145 | |
| 146 | GLOB_PATTERN = '[0-9]*-*' |
| 147 | |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 148 | def get_timestamp_if_finished(self): |
Simran Basi | fb98e46 | 2014-08-18 12:35:44 -0700 | [diff] [blame] | 149 | """Get the timestamp to use for finished jobs. |
| 150 | |
| 151 | @returns the latest hqe finished_on time. If the finished_on times are null |
| 152 | returns the job's created_on time. |
| 153 | """ |
J. Richard Barnette | 3e3ed6a | 2014-05-19 07:59:00 -0700 | [diff] [blame] | 154 | entry = _AFE.run('get_jobs', id=self._id, finished=True) |
Simran Basi | fb98e46 | 2014-08-18 12:35:44 -0700 | [diff] [blame] | 155 | if not entry: |
| 156 | return None |
| 157 | hqes = _AFE.run('get_host_queue_entries', finished_on__isnull=False, |
| 158 | job_id=self._id) |
| 159 | if not hqes: |
| 160 | return entry[0]['created_on'] |
| 161 | latest_finished_time = hqes[0]['finished_on'] |
| 162 | # While most Jobs have 1 HQE, some can have multiple, so check them all. |
| 163 | for hqe in hqes[1:]: |
Dan Shi | dfea368 | 2014-08-10 23:38:40 -0700 | [diff] [blame] | 164 | if (time_utils.time_string_to_datetime(hqe['finished_on']) > |
| 165 | time_utils.time_string_to_datetime(latest_finished_time)): |
Simran Basi | fb98e46 | 2014-08-18 12:35:44 -0700 | [diff] [blame] | 166 | latest_finished_time = hqe['finished_on'] |
| 167 | return latest_finished_time |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 168 | |
| 169 | |
| 170 | class SpecialJobDirectory(_JobDirectory): |
| 171 | """Subclass of _JobDirectory for special (per-host) jobs.""" |
| 172 | |
| 173 | GLOB_PATTERN = 'hosts/*/[0-9]*-*' |
| 174 | |
| 175 | def __init__(self, resultsdir): |
| 176 | super(SpecialJobDirectory, self).__init__(resultsdir) |
J. Richard Barnette | ea78536 | 2014-03-17 16:00:53 -0700 | [diff] [blame] | 177 | |
| 178 | def get_timestamp_if_finished(self): |
J. Richard Barnette | 3e3ed6a | 2014-05-19 07:59:00 -0700 | [diff] [blame] | 179 | entry = _AFE.run('get_special_tasks', id=self._id, is_complete=True) |
Simran Basi | fb98e46 | 2014-08-18 12:35:44 -0700 | [diff] [blame] | 180 | return entry[0]['time_finished'] if entry else None |