Allen Li | cc15350 | 2017-09-15 16:08:25 -0700 | [diff] [blame^] | 1 | # Copyright 2017 The Chromium Authors. All rights reserved. |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | |
| 5 | """Job leasing. |
| 6 | |
| 7 | Jobs are leased to processes to own and run. A process owning a job |
| 8 | grabs a fcntl lock on the corresponding job lease file. If the lock on |
| 9 | the job is released, the owning process is considered dead and the job |
| 10 | lease is considered expired. Some other process (job_aborter) will need |
| 11 | to make the necessary updates to reflect the job's failure. |
| 12 | """ |
| 13 | |
| 14 | from __future__ import absolute_import |
| 15 | from __future__ import division |
| 16 | from __future__ import print_function |
| 17 | |
| 18 | import fcntl |
| 19 | import logging |
| 20 | import os |
| 21 | |
| 22 | from scandir import scandir |
| 23 | |
| 24 | _HEARTBEAT_DEADLINE_SECS = 10 * 60 |
| 25 | _HEARTBEAT_SECS = 3 * 60 |
| 26 | |
| 27 | logger = logging.getLogger(__name__) |
| 28 | |
| 29 | |
| 30 | def get_expired_leases(jobdir): |
| 31 | """Yield expired JobLeases in jobdir. |
| 32 | |
| 33 | Expired jobs are jobs whose lease files are no longer locked. |
| 34 | |
| 35 | @param jobdir: job lease file directory |
| 36 | """ |
| 37 | for lease in _job_leases_iter(jobdir): |
| 38 | if lease.expired(): |
| 39 | yield lease |
| 40 | |
| 41 | |
| 42 | def get_timed_out_leases(dbjob_model, jobdir): |
| 43 | """Yield timed out Jobs that are leased. |
| 44 | |
| 45 | @param dbjob_model: Django model for Job |
| 46 | @param jobdir: job lease file directory |
| 47 | """ |
| 48 | all_timed_out_dbjobs = ( |
| 49 | dbjob_model.objects |
| 50 | .filter(hostqueueentry__complete=False) |
| 51 | .extra(where=['created_on + INTERVAL timeout_mins MINUTE < NOW()']) |
| 52 | .distinct() |
| 53 | ) |
| 54 | for _, lease in _filter_leased(jobdir, all_timed_out_dbjobs): |
| 55 | yield lease |
| 56 | |
| 57 | |
| 58 | def get_marked_aborting_leases(dbjob_model, jobdir): |
| 59 | """Yield Jobs marked for aborting that are leased. |
| 60 | |
| 61 | @param dbjob_model: Django model for Job |
| 62 | @param jobdir: job lease file directory |
| 63 | """ |
| 64 | all_aborting_dbjobs = ( |
| 65 | dbjob_model.objects |
| 66 | .filter(hostqueueentry__aborted=True) |
| 67 | .filter(hostqueueentry__complete=False) |
| 68 | .distinct() |
| 69 | ) |
| 70 | for _, lease in _filter_leased(jobdir, all_aborting_dbjobs): |
| 71 | yield lease |
| 72 | |
| 73 | |
| 74 | def make_lease_file(jobdir, job_id): |
| 75 | """Make lease file corresponding to a job. |
| 76 | |
| 77 | Kept to document/pin public API. The actual creation happens in the |
| 78 | job_shepherd (which is written in Go). |
| 79 | |
| 80 | @param jobdir: job lease file directory |
| 81 | @param job_id: Job ID |
| 82 | """ |
| 83 | path = os.path.join(jobdir, str(job_id)) |
| 84 | with open(path, 'w'): |
| 85 | pass |
| 86 | return path |
| 87 | |
| 88 | |
| 89 | class JobLease(object): |
| 90 | "Represents a job lease." |
| 91 | |
| 92 | def __init__(self, entry): |
| 93 | """Initialize instance. |
| 94 | |
| 95 | @param entry: scandir.DirEntry instance |
| 96 | """ |
| 97 | self._entry = entry |
| 98 | |
| 99 | @property |
| 100 | def id(self): |
| 101 | """Return id of leased job.""" |
| 102 | return int(self._entry.name) |
| 103 | |
| 104 | def expired(self): |
| 105 | """Return True if the lease is expired.""" |
| 106 | return not _fcntl_locked(self._entry.path) |
| 107 | |
| 108 | def cleanup(self): |
| 109 | """Remove the lease file.""" |
| 110 | os.unlink(self._entry.path) |
| 111 | |
| 112 | |
| 113 | def _filter_leased(jobdir, dbjobs): |
| 114 | """Filter Job models for leased jobs. |
| 115 | |
| 116 | Yields pairs of Job model and JobLease instances. |
| 117 | |
| 118 | @param jobdir: job lease file directory |
| 119 | @param dbjobs: iterable of Django model Job instances |
| 120 | """ |
| 121 | our_jobs = {job.id: job for job in _job_leases_iter(jobdir)} |
| 122 | for dbjob in dbjobs: |
| 123 | if dbjob.id in our_jobs: |
| 124 | yield dbjob, our_jobs[dbjob.id] |
| 125 | |
| 126 | |
| 127 | def _job_leases_iter(jobdir): |
| 128 | """Yield JobLease instances from jobdir. |
| 129 | |
| 130 | @param jobdir: job lease file directory |
| 131 | """ |
| 132 | for entry in scandir(jobdir): |
| 133 | yield JobLease(entry) |
| 134 | |
| 135 | |
| 136 | def _fcntl_locked(path): |
| 137 | """Return True if a file is fcntl locked. |
| 138 | |
| 139 | @param path: path to file |
| 140 | """ |
| 141 | fd = os.open(path, os.O_WRONLY) |
| 142 | try: |
| 143 | fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) |
| 144 | except IOError: |
| 145 | return True |
| 146 | else: |
| 147 | return False |
| 148 | finally: |
| 149 | os.close(fd) |