blob: b0ab9a09984259e334423a42466d483e0e23ef68 [file] [log] [blame]
Allen Licc153502017-09-15 16:08:25 -07001# Copyright 2017 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Job leasing.
6
7Jobs are leased to processes to own and run. A process owning a job
8grabs a fcntl lock on the corresponding job lease file. If the lock on
9the job is released, the owning process is considered dead and the job
10lease is considered expired. Some other process (job_aborter) will need
11to make the necessary updates to reflect the job's failure.
12"""
13
14from __future__ import absolute_import
15from __future__ import division
16from __future__ import print_function
17
18import fcntl
19import logging
20import os
21
22from scandir import scandir
23
24_HEARTBEAT_DEADLINE_SECS = 10 * 60
25_HEARTBEAT_SECS = 3 * 60
26
27logger = logging.getLogger(__name__)
28
29
30def get_expired_leases(jobdir):
31 """Yield expired JobLeases in jobdir.
32
33 Expired jobs are jobs whose lease files are no longer locked.
34
35 @param jobdir: job lease file directory
36 """
37 for lease in _job_leases_iter(jobdir):
38 if lease.expired():
39 yield lease
40
41
42def get_timed_out_leases(dbjob_model, jobdir):
43 """Yield timed out Jobs that are leased.
44
45 @param dbjob_model: Django model for Job
46 @param jobdir: job lease file directory
47 """
48 all_timed_out_dbjobs = (
49 dbjob_model.objects
50 .filter(hostqueueentry__complete=False)
51 .extra(where=['created_on + INTERVAL timeout_mins MINUTE < NOW()'])
52 .distinct()
53 )
54 for _, lease in _filter_leased(jobdir, all_timed_out_dbjobs):
55 yield lease
56
57
58def get_marked_aborting_leases(dbjob_model, jobdir):
59 """Yield Jobs marked for aborting that are leased.
60
61 @param dbjob_model: Django model for Job
62 @param jobdir: job lease file directory
63 """
64 all_aborting_dbjobs = (
65 dbjob_model.objects
66 .filter(hostqueueentry__aborted=True)
67 .filter(hostqueueentry__complete=False)
68 .distinct()
69 )
70 for _, lease in _filter_leased(jobdir, all_aborting_dbjobs):
71 yield lease
72
73
74def make_lease_file(jobdir, job_id):
75 """Make lease file corresponding to a job.
76
77 Kept to document/pin public API. The actual creation happens in the
78 job_shepherd (which is written in Go).
79
80 @param jobdir: job lease file directory
81 @param job_id: Job ID
82 """
83 path = os.path.join(jobdir, str(job_id))
84 with open(path, 'w'):
85 pass
86 return path
87
88
89class JobLease(object):
90 "Represents a job lease."
91
92 def __init__(self, entry):
93 """Initialize instance.
94
95 @param entry: scandir.DirEntry instance
96 """
97 self._entry = entry
98
99 @property
100 def id(self):
101 """Return id of leased job."""
102 return int(self._entry.name)
103
104 def expired(self):
105 """Return True if the lease is expired."""
106 return not _fcntl_locked(self._entry.path)
107
108 def cleanup(self):
109 """Remove the lease file."""
110 os.unlink(self._entry.path)
111
112
113def _filter_leased(jobdir, dbjobs):
114 """Filter Job models for leased jobs.
115
116 Yields pairs of Job model and JobLease instances.
117
118 @param jobdir: job lease file directory
119 @param dbjobs: iterable of Django model Job instances
120 """
121 our_jobs = {job.id: job for job in _job_leases_iter(jobdir)}
122 for dbjob in dbjobs:
123 if dbjob.id in our_jobs:
124 yield dbjob, our_jobs[dbjob.id]
125
126
127def _job_leases_iter(jobdir):
128 """Yield JobLease instances from jobdir.
129
130 @param jobdir: job lease file directory
131 """
132 for entry in scandir(jobdir):
133 yield JobLease(entry)
134
135
136def _fcntl_locked(path):
137 """Return True if a file is fcntl locked.
138
139 @param path: path to file
140 """
141 fd = os.open(path, os.O_WRONLY)
142 try:
143 fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
144 except IOError:
145 return True
146 else:
147 return False
148 finally:
149 os.close(fd)