| import fcntl |
| import IOCTL |
| from IOCTL import * |
| import sys |
| import struct |
| import select |
| import posix |
| import time |
| |
| DEVICE='/dev/ttyd2' |
| |
| class UnixFile: |
| def open(self, name, mode): |
| self.fd = posix.open(name, mode) |
| return self |
| |
| def read(self, len): |
| return posix.read(self.fd, len) |
| |
| def write(self, data): |
| dummy = posix.write(self.fd, data) |
| |
| def flush(self): |
| pass |
| |
| def fileno(self): |
| return self.fd |
| |
| def close(self): |
| dummy = posix.close(self.fd) |
| |
| def packttyargs(*args): |
| if type(args) <> type(()): |
| raise 'Incorrect argtype for packttyargs' |
| if type(args[0]) == type(1): |
| iflag, oflag, cflag, lflag, line, chars = args |
| elif type(args[0]) == type(()): |
| if len(args) <> 1: |
| raise 'Only 1 argument expected' |
| iflag, oflag, cflag, lflag, line, chars = args[0] |
| elif type(args[0]) == type([]): |
| if len(args) <> 1: |
| raise 'Only 1 argument expected' |
| [iflag, oflag, cflag, lflag, line, chars] = args[0] |
| str = struct.pack('hhhhb', iflag, oflag, cflag, lflag, line) |
| for c in chars: |
| str = str + c |
| return str |
| |
| def nullttyargs(): |
| chars = ['\0']*IOCTL.NCCS |
| return packttyargs(0, 0, 0, 0, 0, chars) |
| |
| def unpackttyargs(str): |
| args = str[:-IOCTL.NCCS] |
| rawchars = str[-IOCTL.NCCS:] |
| chars = [] |
| for c in rawchars: |
| chars.append(c) |
| iflag, oflag, cflag, lflag, line = struct.unpack('hhhhb', args) |
| return (iflag, oflag, cflag, lflag, line, chars) |
| |
| def initline(name): |
| fp = UnixFile().open(name, 2) |
| ofp = fp |
| fd = fp.fileno() |
| rv = fcntl.ioctl(fd, IOCTL.TCGETA, nullttyargs()) |
| iflag, oflag, cflag, lflag, line, chars = unpackttyargs(rv) |
| iflag = iflag & ~(INPCK|ISTRIP|INLCR|IUCLC|IXON|IXOFF) |
| oflag = oflag & ~OPOST |
| cflag = B9600|CS8|CREAD|CLOCAL |
| lflag = lflag & ~(ISIG|ICANON|ECHO|TOSTOP) |
| chars[VMIN] = chr(1) |
| chars[VTIME] = chr(0) |
| arg = packttyargs(iflag, oflag, cflag, lflag, line, chars) |
| dummy = fcntl.ioctl(fd, IOCTL.TCSETA, arg) |
| return fp, ofp |
| |
| # |
| # |
| error = 'VCR.error' |
| |
| # Commands/replies: |
| COMPLETION = '\x01' |
| ACK ='\x0a' |
| NAK ='\x0b' |
| |
| NUMBER_N = 0x30 |
| ENTER = '\x40' |
| |
| EXP_7= '\xde' |
| EXP_8= '\xdf' |
| |
| CL ='\x56' |
| CTRL_ENABLE = EXP_8 + '\xc6' |
| SEARCH_DATA = EXP_8 + '\x93' |
| ADDR_SENSE = '\x60' |
| |
| PLAY ='\x3a' |
| STOP ='\x3f' |
| EJECT='\x2a' |
| FF ='\xab' |
| REW ='\xac' |
| STILL='\x4f' |
| STEP_FWD ='\x2b' # Was: '\xad' |
| FM_SELECT=EXP_8 + '\xc8' |
| FM_STILL=EXP_8 + '\xcd' |
| PREVIEW=EXP_7 + '\x9d' |
| REVIEW=EXP_7 + '\x9e' |
| DM_OFF=EXP_8 + '\xc9' |
| DM_SET=EXP_8 + '\xc4' |
| FWD_SHUTTLE='\xb5' |
| REV_SHUTTLE='\xb6' |
| EM_SELECT=EXP_8 + '\xc0' |
| N_FRAME_REC=EXP_8 + '\x92' |
| SEARCH_PREROLL=EXP_8 + '\x90' |
| EDIT_PB_STANDBY=EXP_8 + '\x96' |
| EDIT_PLAY=EXP_8 + '\x98' |
| AUTO_EDIT=EXP_7 + '\x9c' |
| |
| IN_ENTRY=EXP_7 + '\x90' |
| IN_ENTRY_RESET=EXP_7 + '\x91' |
| IN_ENTRY_SET=EXP_7 + '\x98' |
| IN_ENTRY_INC=EXP_7 + '\x94' |
| IN_ENTRY_DEC=EXP_7 + '\x95' |
| IN_ENTRY_SENSE=EXP_7 + '\x9a' |
| |
| OUT_ENTRY=EXP_7 + '\x92' |
| OUT_ENTRY_RESET=EXP_7 + '\x93' |
| OUT_ENTRY_SET=EXP_7 + '\x99' |
| OUT_ENTRY_INC=EXP_7 + '\x96' |
| OUT_ENTRY_DEC=EXP_7 + '\x98' |
| OUT_ENTRY_SENSE=EXP_7 + '\x9b' |
| |
| MUTE_AUDIO = '\x24' |
| MUTE_AUDIO_OFF = '\x25' |
| MUTE_VIDEO = '\x26' |
| MUTE_VIDEO_OFF = '\x27' |
| MUTE_AV = EXP_7 + '\xc6' |
| MUTE_AV_OFF = EXP_7 + '\xc7' |
| |
| DEBUG=0 |
| |
| class VCR: |
| def __init__(self): |
| self.ifp, self.ofp = initline(DEVICE) |
| self.busy_cmd = None |
| self.async = 0 |
| self.cb = None |
| self.cb_arg = None |
| |
| def _check(self): |
| if self.busy_cmd: |
| raise error, 'Another command active: '+self.busy_cmd |
| |
| def _endlongcmd(self, name): |
| if not self.async: |
| self.waitready() |
| return 1 |
| self.busy_cmd = name |
| return 'VCR BUSY' |
| |
| def fileno(self): |
| return self.ifp.fileno() |
| |
| def setasync(self, async): |
| self.async = async |
| |
| def setcallback(self, cb, arg): |
| self.setasync(1) |
| self.cb = cb |
| self.cb_arg = arg |
| |
| def poll(self): |
| if not self.async: |
| raise error, 'Can only call poll() in async mode' |
| if not self.busy_cmd: |
| return |
| if self.testready(): |
| if self.cb: |
| apply(self.cb, self.cb_arg) |
| |
| def _cmd(self, cmd): |
| if DEBUG: |
| print '>>>',`cmd` |
| self.ofp.write(cmd) |
| self.ofp.flush() |
| |
| def _waitdata(self, len, tout): |
| rep = '' |
| while len > 0: |
| if tout == None: |
| ready, d1, d2 = select.select( \ |
| [self.ifp], [], []) |
| else: |
| ready, d1, d2 = select.select( \ |
| [self.ifp], [], [], tout) |
| if ready == []: |
| ## if rep: |
| ## print 'FLUSHED:', `rep` |
| return None |
| data = self.ifp.read(1) |
| if DEBUG: |
| print '<<<',`data` |
| if data == NAK: |
| return NAK |
| rep = rep + data |
| len = len - 1 |
| return rep |
| |
| def _reply(self, len): |
| data = self._waitdata(len, 10) |
| if data == None: |
| raise error, 'Lost contact with VCR' |
| return data |
| |
| def _getnumber(self, len): |
| data = self._reply(len) |
| number = 0 |
| for c in data: |
| digit = ord(c) - NUMBER_N |
| if digit < 0 or digit > 9: |
| raise error, 'Non-digit in number'+`c` |
| number = number*10 + digit |
| return number |
| |
| def _iflush(self): |
| dummy = self._waitdata(10000, 0) |
| ## if dummy: |
| ## print 'IFLUSH:', dummy |
| |
| def simplecmd(self,cmd): |
| self._iflush() |
| for ch in cmd: |
| self._cmd(ch) |
| rep = self._reply(1) |
| if rep == NAK: |
| return 0 |
| elif rep <> ACK: |
| raise error, 'Unexpected reply:' + `rep` |
| return 1 |
| |
| def replycmd(self, cmd): |
| if not self.simplecmd(cmd[:-1]): |
| return 0 |
| self._cmd(cmd[-1]) |
| |
| def _number(self, number, digits): |
| if number < 0: |
| raise error, 'Unexpected negative number:'+ `number` |
| maxnum = pow(10, digits) |
| if number > maxnum: |
| raise error, 'Number too big' |
| while maxnum > 1: |
| number = number % maxnum |
| maxnum = maxnum / 10 |
| digit = number / maxnum |
| ok = self.simplecmd(chr(NUMBER_N + digit)) |
| if not ok: |
| raise error, 'Error while transmitting number' |
| |
| def initvcr(self, *optarg): |
| timeout = None |
| if optarg <> (): |
| timeout = optarg[0] |
| starttime = time.time() |
| self.busy_cmd = None |
| self._iflush() |
| while 1: |
| ## print 'SENDCL' |
| self._cmd(CL) |
| rep = self._waitdata(1, 2) |
| ## print `rep` |
| if rep in ( None, CL, NAK ): |
| if timeout: |
| if time.time() - starttime > timeout: |
| raise error, \ |
| 'No reply from VCR' |
| continue |
| break |
| if rep <> ACK: |
| raise error, 'Unexpected reply:' + `rep` |
| dummy = self.simplecmd(CTRL_ENABLE) |
| |
| def waitready(self): |
| rep = self._waitdata(1, None) |
| if rep == None: |
| raise error, 'Unexpected None reply from waitdata' |
| if rep not in (COMPLETION, ACK): |
| self._iflush() |
| raise error, 'Unexpected waitready reply:' + `rep` |
| self.busy_cmd = None |
| |
| def testready(self): |
| rep = self._waitdata(1, 0) |
| if rep == None: |
| return 0 |
| if rep not in (COMPLETION, ACK): |
| self._iflush() |
| raise error, 'Unexpected waitready reply:' + `rep` |
| self.busy_cmd = None |
| return 1 |
| |
| def play(self): return self.simplecmd(PLAY) |
| def stop(self): return self.simplecmd(STOP) |
| def ff(self): return self.simplecmd(FF) |
| def rew(self): return self.simplecmd(REW) |
| def eject(self):return self.simplecmd(EJECT) |
| def still(self):return self.simplecmd(STILL) |
| def step(self): return self.simplecmd(STEP_FWD) |
| |
| def goto(self, (h, m, s, f)): |
| if not self.simplecmd(SEARCH_DATA): |
| return 0 |
| self._number(h, 2) |
| self._number(m, 2) |
| self._number(s, 2) |
| self._number(f, 2) |
| if not self.simplecmd(ENTER): |
| return 0 |
| return self._endlongcmd('goto') |
| |
| # XXXX TC_SENSE doesn't seem to work |
| def faulty_where(self): |
| self._check() |
| self._cmd(TC_SENSE) |
| h = self._getnumber(2) |
| m = self._getnumber(2) |
| s = self._getnumber(2) |
| f = self._getnumber(2) |
| return (h, m, s, f) |
| |
| def where(self): |
| return self.addr2tc(self.sense()) |
| |
| def sense(self): |
| self._check() |
| self._cmd(ADDR_SENSE) |
| num = self._getnumber(5) |
| return num |
| |
| def addr2tc(self, num): |
| f = num % 25 |
| num = num / 25 |
| s = num % 60 |
| num = num / 60 |
| m = num % 60 |
| h = num / 60 |
| return (h, m, s, f) |
| |
| def tc2addr(self, (h, m, s, f)): |
| return ((h*60 + m)*60 + s)*25 + f |
| |
| def fmmode(self, mode): |
| self._check() |
| if mode == 'off': |
| arg = 0 |
| elif mode == 'buffer': |
| arg = 1 |
| elif mode == 'dnr': |
| arg = 2 |
| else: |
| raise error, 'fmmode arg should be off, buffer or dnr' |
| if not self.simplecmd(FM_SELECT): |
| return 0 |
| self._number(arg, 1) |
| if not self.simplecmd(ENTER): |
| return 0 |
| return 1 |
| |
| def mute(self, mode, value): |
| self._check() |
| if mode == 'audio': |
| cmds = (MUTE_AUDIO_OFF, MUTE_AUDIO) |
| elif mode == 'video': |
| cmds = (MUTE_VIDEO_OFF, MUTE_VIDEO) |
| elif mode == 'av': |
| cmds = (MUTE_AV_OFF, MUTE_AV) |
| else: |
| raise error, 'mute type should be audio, video or av' |
| cmd = cmds[value] |
| return self.simplecmd(cmd) |
| |
| def editmode(self, mode): |
| self._check() |
| if mode == 'off': |
| a0 = a1 = a2 = 0 |
| elif mode == 'format': |
| a0 = 4 |
| a1 = 7 |
| a2 = 4 |
| elif mode == 'asmbl': |
| a0 = 1 |
| a1 = 7 |
| a2 = 4 |
| elif mode == 'insert-video': |
| a0 = 2 |
| a1 = 4 |
| a2 = 0 |
| else: |
| raise 'editmode should be off,format,asmbl or insert-video' |
| if not self.simplecmd(EM_SELECT): |
| return 0 |
| self._number(a0, 1) |
| self._number(a1, 1) |
| self._number(a2, 1) |
| if not self.simplecmd(ENTER): |
| return 0 |
| return 1 |
| |
| def autoedit(self): |
| self._check() |
| return self._endlongcmd(AUTO_EDIT) |
| |
| def nframerec(self, num): |
| if not self.simplecmd(N_FRAME_REC): |
| return 0 |
| self._number(num, 4) |
| if not self.simplecmd(ENTER): |
| return 0 |
| return self._endlongcmd('nframerec') |
| |
| def fmstill(self): |
| if not self.simplecmd(FM_STILL): |
| return 0 |
| return self._endlongcmd('fmstill') |
| |
| def preview(self): |
| if not self.simplecmd(PREVIEW): |
| return 0 |
| return self._endlongcmd('preview') |
| |
| def review(self): |
| if not self.simplecmd(REVIEW): |
| return 0 |
| return self._endlongcmd('review') |
| |
| def search_preroll(self): |
| if not self.simplecmd(SEARCH_PREROLL): |
| return 0 |
| return self._endlongcmd('search_preroll') |
| |
| def edit_pb_standby(self): |
| if not self.simplecmd(EDIT_PB_STANDBY): |
| return 0 |
| return self._endlongcmd('edit_pb_standby') |
| |
| def edit_play(self): |
| if not self.simplecmd(EDIT_PLAY): |
| return 0 |
| return self._endlongcmd('edit_play') |
| |
| def dmcontrol(self, mode): |
| self._check() |
| if mode == 'off': |
| return self.simplecmd(DM_OFF) |
| if mode == 'multi freeze': |
| num = 1000 |
| elif mode == 'zoom freeze': |
| num = 2000 |
| elif mode == 'digital slow': |
| num = 3000 |
| elif mode == 'freeze': |
| num = 4011 |
| else: |
| raise error, 'unknown dmcontrol argument: ' + `mode` |
| if not self.simplecmd(DM_SET): |
| return 0 |
| self._number(num, 4) |
| if not self.simplecmd(ENTER): |
| return 0 |
| return 1 |
| |
| def fwdshuttle(self, num): |
| if not self.simplecmd(FWD_SHUTTLE): |
| return 0 |
| self._number(num, 1) |
| return 1 |
| |
| def revshuttle(self, num): |
| if not self.simplecmd(REV_SHUTTLE): |
| return 0 |
| self._number(num, 1) |
| return 1 |
| |
| def getentry(self, which): |
| self._check() |
| if which == 'in': |
| cmd = IN_ENTRY_SENSE |
| elif which == 'out': |
| cmd = OUT_ENTRY_SENSE |
| self.replycmd(cmd) |
| h = self._getnumber(2) |
| m = self._getnumber(2) |
| s = self._getnumber(2) |
| f = self._getnumber(2) |
| return (h, m, s, f) |
| |
| def inentry(self, arg): |
| return self.ioentry(arg, (IN_ENTRY, IN_ENTRY_RESET, \ |
| IN_ENTRY_SET, IN_ENTRY_INC, IN_ENTRY_DEC)) |
| |
| def outentry(self, arg): |
| return self.ioentry(arg, (OUT_ENTRY, OUT_ENTRY_RESET, \ |
| OUT_ENTRY_SET, OUT_ENTRY_INC, OUT_ENTRY_DEC)) |
| |
| def ioentry(self, arg, (Load, Clear, Set, Inc, Dec)): |
| self._check() |
| if type(arg) == type(()): |
| h, m, s, f = arg |
| if not self.simplecmd(Set): |
| return 0 |
| self._number(h,2) |
| self._number(m,2) |
| self._number(s,2) |
| self._number(f,2) |
| if not self.simplecmd(ENTER): |
| return 0 |
| return 1 |
| elif arg == 'reset': |
| cmd = Clear |
| elif arg == 'load': |
| cmd = Load |
| elif arg == '+': |
| cmd = Inc |
| elif arg == '-': |
| cmd = Dec |
| else: |
| raise error, 'Arg should be +,-,reset,load or (h,m,s,f)' |
| return self.simplecmd(cmd) |
| |
| def cancel(self): |
| d = self.simplecmd(CL) |
| self.busy_cmd = None |