blob: 35a8c74f8c424dcba7fcbd827f32e8a12e55e86e [file] [log] [blame]
J. Richard Barnetteea785362014-03-17 16:00:53 -07001import abc
2import datetime
3import glob
J. Richard Barnetteea785362014-03-17 16:00:53 -07004import os
5import time
6
7import common
Dan Shidfea3682014-08-10 23:38:40 -07008from autotest_lib.client.common_lib import time_utils
J. Richard Barnetteacdb0132014-09-03 16:44:12 -07009from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
J. Richard Barnetteea785362014-03-17 16:00:53 -070010
J. Richard Barnetteea785362014-03-17 16:00:53 -070011
Dan Shidfea3682014-08-10 23:38:40 -070012_AFE = frontend_wrappers.RetryingAFE()
J. Richard Barnetteea785362014-03-17 16:00:53 -070013
14def _is_job_expired(age_limit, timestamp):
15 """Check whether a job timestamp is older than an age limit.
16
17 @param age_limit: Minimum age, measured in days. If the value is
18 not positive, the job is always expired.
19 @param timestamp: Timestamp of the job whose age we are checking.
Dan Shidfea3682014-08-10 23:38:40 -070020 The format must match time_utils.TIME_FMT.
J. Richard Barnetteea785362014-03-17 16:00:53 -070021
22 @returns True iff the job is old enough to be expired.
23 """
24 if age_limit <= 0:
25 return True
Dan Shidfea3682014-08-10 23:38:40 -070026 job_time = time_utils.time_string_to_datetime(timestamp)
J. Richard Barnetteea785362014-03-17 16:00:53 -070027 expiration = job_time + datetime.timedelta(days=age_limit)
28 return datetime.datetime.now() >= expiration
29
30
31class _JobDirectory(object):
32 """State associated with a job to be offloaded.
33
34 The full life-cycle of a job (including failure events that
35 normally don't occur) looks like this:
36 1. The job's results directory is discovered by
37 `get_job_directories()`, and a job instance is created for it.
38 2. Calls to `offload()` have no effect so long as the job
39 isn't complete in the database and the job isn't expired
40 according to the `age_limit` parameter.
41 3. Eventually, the job is both finished and expired. The next
42 call to `offload()` makes the first attempt to offload the
43 directory to GS. Offload is attempted, but fails to complete
44 (e.g. because of a GS problem).
45 4. After the first failed offload `is_offloaded()` is false,
46 but `is_reportable()` is also false, so the failure is not
47 reported.
48 5. Another call to `offload()` again tries to offload the
49 directory, and again fails.
50 6. After a second failure, `is_offloaded()` is false and
51 `is_reportable()` is true, so the failure generates an e-mail
52 notification.
53 7. Finally, a call to `offload()` succeeds, and the directory no
54 longer exists. Now `is_offloaded()` is true, so the job
55 instance is deleted, and future failures will not mention this
56 directory any more.
57
58 Only steps 1. and 7. are guaranteed to occur. The others depend
59 on the timing of calls to `offload()`, and on the reliability of
60 the actual offload process.
61
62 """
63
64 __metaclass__ = abc.ABCMeta
65
66 GLOB_PATTERN = None # must be redefined in subclass
67
68 def __init__(self, resultsdir):
69 self._dirname = resultsdir
J. Richard Barnetteea785362014-03-17 16:00:53 -070070 self._id = os.path.basename(resultsdir).split('-')[0]
71 self._offload_count = 0
72 self._first_offload_start = 0
73
74 @classmethod
75 def get_job_directories(cls):
76 """Return a list of directories of jobs that need offloading."""
77 return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)]
78
79 @abc.abstractmethod
80 def get_timestamp_if_finished(self):
81 """Return this job's timestamp from the database.
82
83 If the database has not marked the job as finished, return
84 `None`. Otherwise, return a timestamp for the job. The
85 timestamp is to be used to determine expiration in
86 `_is_job_expired()`.
87
88 @return Return `None` if the job is still running; otherwise
89 return a string with a timestamp in the appropriate
90 format.
91 """
92 raise NotImplementedError("_JobDirectory.get_timestamp_if_finished")
93
94 def enqueue_offload(self, queue, age_limit):
95 """Enqueue the job for offload, if it's eligible.
96
97 The job is eligible for offloading if the database has marked
98 it finished, and the job is older than the `age_limit`
99 parameter.
100
101 If the job is eligible, offload processing is requested by
102 passing the `queue` parameter's `put()` method a sequence with
J. Richard Barnette2c72ddd2014-05-20 12:17:37 -0700103 the job's `_dirname` attribute and its directory name.
J. Richard Barnetteea785362014-03-17 16:00:53 -0700104
105 @param queue If the job should be offloaded, put the offload
106 parameters into this queue for processing.
107 @param age_limit Minimum age for a job to be offloaded. A value
108 of 0 means that the job will be offloaded as
109 soon as it is finished.
110
111 """
112 if not self._offload_count:
113 timestamp = self.get_timestamp_if_finished()
114 if not timestamp:
J. Richard Barnetteea785362014-03-17 16:00:53 -0700115 return
116 if not _is_job_expired(age_limit, timestamp):
J. Richard Barnetteea785362014-03-17 16:00:53 -0700117 return
118 self._first_offload_start = time.time()
J. Richard Barnetteea785362014-03-17 16:00:53 -0700119 self._offload_count += 1
J. Richard Barnette2c72ddd2014-05-20 12:17:37 -0700120 queue.put([self._dirname, os.path.dirname(self._dirname)])
J. Richard Barnetteea785362014-03-17 16:00:53 -0700121
122 def is_offloaded(self):
123 """Return whether this job has been successfully offloaded."""
124 return not os.path.exists(self._dirname)
125
126 def is_reportable(self):
127 """Return whether this job has a reportable failure."""
128 return self._offload_count > 1
129
130 def get_failure_time(self):
131 """Return the time of the first offload failure."""
132 return self._first_offload_start
133
134 def get_failure_count(self):
135 """Return the number of times this job has failed to offload."""
136 return self._offload_count
137
138 def get_job_directory(self):
139 """Return the name of this job's results directory."""
140 return self._dirname
141
142
143class RegularJobDirectory(_JobDirectory):
144 """Subclass of _JobDirectory for regular test jobs."""
145
146 GLOB_PATTERN = '[0-9]*-*'
147
J. Richard Barnetteea785362014-03-17 16:00:53 -0700148 def get_timestamp_if_finished(self):
Simran Basifb98e462014-08-18 12:35:44 -0700149 """Get the timestamp to use for finished jobs.
150
151 @returns the latest hqe finished_on time. If the finished_on times are null
152 returns the job's created_on time.
153 """
J. Richard Barnette3e3ed6a2014-05-19 07:59:00 -0700154 entry = _AFE.run('get_jobs', id=self._id, finished=True)
Simran Basifb98e462014-08-18 12:35:44 -0700155 if not entry:
156 return None
157 hqes = _AFE.run('get_host_queue_entries', finished_on__isnull=False,
158 job_id=self._id)
159 if not hqes:
160 return entry[0]['created_on']
161 latest_finished_time = hqes[0]['finished_on']
162 # While most Jobs have 1 HQE, some can have multiple, so check them all.
163 for hqe in hqes[1:]:
Dan Shidfea3682014-08-10 23:38:40 -0700164 if (time_utils.time_string_to_datetime(hqe['finished_on']) >
165 time_utils.time_string_to_datetime(latest_finished_time)):
Simran Basifb98e462014-08-18 12:35:44 -0700166 latest_finished_time = hqe['finished_on']
167 return latest_finished_time
J. Richard Barnetteea785362014-03-17 16:00:53 -0700168
169
170class SpecialJobDirectory(_JobDirectory):
171 """Subclass of _JobDirectory for special (per-host) jobs."""
172
173 GLOB_PATTERN = 'hosts/*/[0-9]*-*'
174
175 def __init__(self, resultsdir):
176 super(SpecialJobDirectory, self).__init__(resultsdir)
J. Richard Barnetteea785362014-03-17 16:00:53 -0700177
178 def get_timestamp_if_finished(self):
J. Richard Barnette3e3ed6a2014-05-19 07:59:00 -0700179 entry = _AFE.run('get_special_tasks', id=self._id, is_complete=True)
Simran Basifb98e462014-08-18 12:35:44 -0700180 return entry[0]['time_finished'] if entry else None