| """RCS interface module. |
| |
| Defines the class RCS, which represents a directory with rcs version |
| files and (possibly) corresponding work files. |
| |
| """ |
| |
| |
| import fnmatch |
| import os |
| import regsub |
| import string |
| import tempfile |
| |
| |
| class RCS: |
| |
| """RCS interface class (local filesystem version). |
| |
| An instance of this class represents a directory with rcs version |
| files and (possible) corresponding work files. |
| |
| Methods provide access to most rcs operations such as |
| checkin/checkout, access to the rcs metadata (revisions, logs, |
| branches etc.) as well as some filesystem operations such as |
| listing all rcs version files. |
| |
| XXX BUGS / PROBLEMS |
| |
| - The instance always represents the current directory so it's not |
| very useful to have more than one instance around simultaneously |
| |
| """ |
| |
| # Characters allowed in work file names |
| okchars = string.letters + string.digits + '-_=+.' |
| |
| def __init__(self): |
| """Constructor.""" |
| pass |
| |
| def __del__(self): |
| """Destructor.""" |
| pass |
| |
| # --- Informational methods about a single file/revision --- |
| |
| def log(self, name_rev, otherflags = ''): |
| """Return the full log text for NAME_REV as a string. |
| |
| Optional OTHERFLAGS are passed to rlog. |
| |
| """ |
| f = self._open(name_rev, 'rlog ' + otherflags) |
| data = f.read() |
| status = self._closepipe(f) |
| if status: |
| data = data + "%s: %s" % status |
| elif data[-1] == '\n': |
| data = data[:-1] |
| return data |
| |
| def head(self, name_rev): |
| """Return the head revision for NAME_REV""" |
| dict = self.info(name_rev) |
| return dict['head'] |
| |
| def info(self, name_rev): |
| """Return a dictionary of info (from rlog -h) for NAME_REV |
| |
| The dictionary's keys are the keywords that rlog prints |
| (e.g. 'head' and its values are the corresponding data |
| (e.g. '1.3'). |
| |
| XXX symbolic names and locks are not returned |
| |
| """ |
| f = self._open(name_rev, 'rlog -h') |
| dict = {} |
| while 1: |
| line = f.readline() |
| if not line: break |
| if line[0] == '\t': |
| # XXX could be a lock or symbolic name |
| # Anything else? |
| continue |
| i = string.find(line, ':') |
| if i > 0: |
| key, value = line[:i], string.strip(line[i+1:]) |
| dict[key] = value |
| status = self._closepipe(f) |
| if status: |
| raise IOError, status |
| return dict |
| |
| # --- Methods that change files --- |
| |
| def lock(self, name_rev): |
| """Set an rcs lock on NAME_REV.""" |
| name, rev = self.checkfile(name_rev) |
| cmd = "rcs -l%s %s" % (rev, name) |
| return self._system(cmd) |
| |
| def unlock(self, name_rev): |
| """Clear an rcs lock on NAME_REV.""" |
| name, rev = self.checkfile(name_rev) |
| cmd = "rcs -u%s %s" % (rev, name) |
| return self._system(cmd) |
| |
| def checkout(self, name_rev, withlock=0, otherflags=""): |
| """Check out NAME_REV to its work file. |
| |
| If optional WITHLOCK is set, check out locked, else unlocked. |
| |
| The optional OTHERFLAGS is passed to co without |
| interpretation. |
| |
| Any output from co goes to directly to stdout. |
| |
| """ |
| name, rev = self.checkfile(name_rev) |
| if withlock: lockflag = "-l" |
| else: lockflag = "-u" |
| cmd = 'co %s%s %s %s' % (lockflag, rev, otherflags, name) |
| return self._system(cmd) |
| |
| def checkin(self, name_rev, message=None, otherflags=""): |
| """Check in NAME_REV from its work file. |
| |
| The optional MESSAGE argument becomes the checkin message |
| (default "<none>" if None); or the file description if this is |
| a new file. |
| |
| The optional OTHERFLAGS argument is passed to ci without |
| interpretation. |
| |
| Any output from ci goes to directly to stdout. |
| |
| """ |
| name, rev = self._unmangle(name_rev) |
| new = not self.isvalid(name) |
| if not message: message = "<none>" |
| if message and message[-1] != '\n': |
| message = message + '\n' |
| lockflag = "-u" |
| textfile = None |
| try: |
| if new: |
| textfile = tempfile.mktemp() |
| f = open(textfile, 'w') |
| f.write(message) |
| f.close() |
| cmd = 'ci %s%s -t%s %s %s' % \ |
| (lockflag, rev, textfile, otherflags, name) |
| else: |
| message = regsub.gsub('\([\\"$`]\)', '\\\\\\1', message) |
| cmd = 'ci %s%s -m"%s" %s %s' % \ |
| (lockflag, rev, message, otherflags, name) |
| return self._system(cmd) |
| finally: |
| if textfile: self._remove(textfile) |
| |
| # --- Exported support methods --- |
| |
| def listfiles(self, pat = None): |
| """Return a list of all version files matching optional PATTERN.""" |
| files = os.listdir(os.curdir) |
| files = filter(self._isrcs, files) |
| if os.path.isdir('RCS'): |
| files2 = os.listdir('RCS') |
| files2 = filter(self._isrcs, files2) |
| files = files + files2 |
| files = map(self.realname, files) |
| return self._filter(files, pat) |
| |
| def isvalid(self, name): |
| """Test whether NAME has a version file associated.""" |
| namev = self.rcsname(name) |
| return (os.path.isfile(namev) or |
| os.path.isfile(os.path.join('RCS', namev))) |
| |
| def rcsname(self, name): |
| """Return the pathname of the version file for NAME. |
| |
| The argument can be a work file name or a version file name. |
| If the version file does not exist, the name of the version |
| file that would be created by "ci" is returned. |
| |
| """ |
| if self._isrcs(name): namev = name |
| else: namev = name + ',v' |
| if os.path.isfile(namev): return namev |
| namev = os.path.join('RCS', os.path.basename(namev)) |
| if os.path.isfile(namev): return namev |
| if os.path.isdir('RCS'): |
| return os.path.join('RCS', namev) |
| else: |
| return namev |
| |
| def realname(self, namev): |
| """Return the pathname of the work file for NAME. |
| |
| The argument can be a work file name or a version file name. |
| If the work file does not exist, the name of the work file |
| that would be created by "co" is returned. |
| |
| """ |
| if self._isrcs(namev): name = namev[:-2] |
| else: name = namev |
| if os.path.isfile(name): return name |
| name = os.path.basename(name) |
| return name |
| |
| def islocked(self, name_rev): |
| """Test whether FILE (which must have a version file) is locked. |
| |
| XXX This does not tell you which revision number is locked and |
| ignores any revision you may pass in (by virtue of using rlog |
| -L -R). |
| |
| """ |
| f = self._open(name_rev, 'rlog -L -R') |
| line = f.readline() |
| status = self._closepipe(f) |
| if status: |
| raise IOError, status |
| if not line: return None |
| if line[-1] == '\n': |
| line = line[:-1] |
| return self.realname(name_rev) == self.realname(line) |
| |
| def checkfile(self, name_rev): |
| """Normalize NAME_REV into a (NAME, REV) tuple. |
| |
| Raise an exception if there is no corresponding version file. |
| |
| """ |
| name, rev = self._unmangle(name_rev) |
| if not self.isvalid(name): |
| raise os.error, 'not an rcs file %s' % `name` |
| return name, rev |
| |
| # --- Internal methods --- |
| |
| def _open(self, name_rev, cmd = 'co -p', rflag = '-r'): |
| """INTERNAL: open a read pipe to NAME_REV using optional COMMAND. |
| |
| Optional FLAG is used to indicate the revision (default -r). |
| |
| Default COMMAND is "co -p". |
| |
| Return a file object connected by a pipe to the command's |
| output. |
| |
| """ |
| name, rev = self.checkfile(name_rev) |
| namev = self.rcsname(name) |
| if rev: |
| cmd = cmd + ' ' + rflag + rev |
| return os.popen("%s %s" % (cmd, `namev`)) |
| |
| def _unmangle(self, name_rev): |
| """INTERNAL: Normalize NAME_REV argument to (NAME, REV) tuple. |
| |
| Raise an exception if NAME contains invalid characters. |
| |
| A NAME_REV argument is either NAME string (implying REV='') or |
| a tuple of the form (NAME, REV). |
| |
| """ |
| if type(name_rev) == type(''): |
| name_rev = name, rev = name_rev, '' |
| else: |
| name, rev = name_rev |
| for c in rev: |
| if c not in self.okchars: |
| raise ValueError, "bad char in rev" |
| return name_rev |
| |
| def _closepipe(self, f): |
| """INTERNAL: Close PIPE and print its exit status if nonzero.""" |
| sts = f.close() |
| if not sts: return None |
| detail, reason = divmod(sts, 256) |
| if reason == 0: return 'exit', detail # Exit status |
| signal = reason&0x7F |
| if signal == 0x7F: |
| code = 'stopped' |
| signal = detail |
| else: |
| code = 'killed' |
| if reason&0x80: |
| code = code + '(coredump)' |
| return code, signal |
| |
| def _system(self, cmd): |
| """INTERNAL: run COMMAND in a subshell. |
| |
| Standard input for the command is taken fron /dev/null. |
| |
| Raise IOError when the exit status is not zero. |
| |
| Return whatever the calling method should return; normally |
| None. |
| |
| A derived class may override this method and redefine it to |
| capture stdout/stderr of the command and return it. |
| |
| """ |
| cmd = cmd + " </dev/null" |
| sts = os.system(cmd) |
| if sts: raise IOError, "command exit status %d" % sts |
| |
| def _filter(self, files, pat = None): |
| """INTERNAL: Return a sorted copy of the given list of FILES. |
| |
| If a second PATTERN argument is given, only files matching it |
| are kept. No check for valid filenames is made. |
| |
| """ |
| if pat: |
| def keep(name, pat = pat): |
| return fnmatch.fnmatch(name, pat) |
| files = filter(keep, files) |
| else: |
| files = files[:] |
| files.sort() |
| return files |
| |
| def _remove(self, fn): |
| """INTERNAL: remove FILE without complaints.""" |
| try: |
| os.unlink(fn) |
| except os.error: |
| pass |
| |
| def _isrcs(self, name): |
| """INTERNAL: Test whether NAME ends in ',v'.""" |
| return name[-2:] == ',v' |