| """Stuff to parse Sun and NeXT audio files. |
| |
| An audio file consists of a header followed by the data. The structure |
| of the header is as follows. |
| |
| +---------------+ |
| | magic word | |
| +---------------+ |
| | header size | |
| +---------------+ |
| | data size | |
| +---------------+ |
| | encoding | |
| +---------------+ |
| | sample rate | |
| +---------------+ |
| | # of channels | |
| +---------------+ |
| | info | |
| | | |
| +---------------+ |
| |
| The magic word consists of the 4 characters '.snd'. Apart from the |
| info field, all header fields are 4 bytes in size. They are all |
| 32-bit unsigned integers encoded in big-endian byte order. |
| |
| The header size really gives the start of the data. |
| The data size is the physical size of the data. From the other |
| parameters the number of frames can be calculated. |
| The encoding gives the way in which audio samples are encoded. |
| Possible values are listed below. |
| The info field currently consists of an ASCII string giving a |
| human-readable description of the audio file. The info field is |
| padded with NUL bytes to the header size. |
| |
| Usage. |
| |
| Reading audio files: |
| f = sunau.open(file, 'r') |
| where file is either the name of a file or an open file pointer. |
| The open file pointer must have methods read(), seek(), and close(). |
| When the setpos() and rewind() methods are not used, the seek() |
| method is not necessary. |
| |
| This returns an instance of a class with the following public methods: |
| getnchannels() -- returns number of audio channels (1 for |
| mono, 2 for stereo) |
| getsampwidth() -- returns sample width in bytes |
| getframerate() -- returns sampling frequency |
| getnframes() -- returns number of audio frames |
| getcomptype() -- returns compression type ('NONE' or 'ULAW') |
| getcompname() -- returns human-readable version of |
| compression type ('not compressed' matches 'NONE') |
| getparams() -- returns a namedtuple consisting of all of the |
| above in the above order |
| getmarkers() -- returns None (for compatibility with the |
| aifc module) |
| getmark(id) -- raises an error since the mark does not |
| exist (for compatibility with the aifc module) |
| readframes(n) -- returns at most n frames of audio |
| rewind() -- rewind to the beginning of the audio stream |
| setpos(pos) -- seek to the specified position |
| tell() -- return the current position |
| close() -- close the instance (make it unusable) |
| The position returned by tell() and the position given to setpos() |
| are compatible and have nothing to do with the actual position in the |
| file. |
| The close() method is called automatically when the class instance |
| is destroyed. |
| |
| Writing audio files: |
| f = sunau.open(file, 'w') |
| where file is either the name of a file or an open file pointer. |
| The open file pointer must have methods write(), tell(), seek(), and |
| close(). |
| |
| This returns an instance of a class with the following public methods: |
| setnchannels(n) -- set the number of channels |
| setsampwidth(n) -- set the sample width |
| setframerate(n) -- set the frame rate |
| setnframes(n) -- set the number of frames |
| setcomptype(type, name) |
| -- set the compression type and the |
| human-readable compression type |
| setparams(tuple)-- set all parameters at once |
| tell() -- return current position in output file |
| writeframesraw(data) |
| -- write audio frames without pathing up the |
| file header |
| writeframes(data) |
| -- write audio frames and patch up the file header |
| close() -- patch up the file header and close the |
| output file |
| You should set the parameters before the first writeframesraw or |
| writeframes. The total number of frames does not need to be set, |
| but when it is set to the correct value, the header does not have to |
| be patched up. |
| It is best to first set all parameters, perhaps possibly the |
| compression type, and then write audio frames using writeframesraw. |
| When all frames have been written, either call writeframes(b'') or |
| close() to patch up the sizes in the header. |
| The close() method is called automatically when the class instance |
| is destroyed. |
| """ |
| |
| from collections import namedtuple |
| import warnings |
| |
| _sunau_params = namedtuple('_sunau_params', |
| 'nchannels sampwidth framerate nframes comptype compname') |
| |
| # from <multimedia/audio_filehdr.h> |
| AUDIO_FILE_MAGIC = 0x2e736e64 |
| AUDIO_FILE_ENCODING_MULAW_8 = 1 |
| AUDIO_FILE_ENCODING_LINEAR_8 = 2 |
| AUDIO_FILE_ENCODING_LINEAR_16 = 3 |
| AUDIO_FILE_ENCODING_LINEAR_24 = 4 |
| AUDIO_FILE_ENCODING_LINEAR_32 = 5 |
| AUDIO_FILE_ENCODING_FLOAT = 6 |
| AUDIO_FILE_ENCODING_DOUBLE = 7 |
| AUDIO_FILE_ENCODING_ADPCM_G721 = 23 |
| AUDIO_FILE_ENCODING_ADPCM_G722 = 24 |
| AUDIO_FILE_ENCODING_ADPCM_G723_3 = 25 |
| AUDIO_FILE_ENCODING_ADPCM_G723_5 = 26 |
| AUDIO_FILE_ENCODING_ALAW_8 = 27 |
| |
| # from <multimedia/audio_hdr.h> |
| AUDIO_UNKNOWN_SIZE = 0xFFFFFFFF # ((unsigned)(~0)) |
| |
| _simple_encodings = [AUDIO_FILE_ENCODING_MULAW_8, |
| AUDIO_FILE_ENCODING_LINEAR_8, |
| AUDIO_FILE_ENCODING_LINEAR_16, |
| AUDIO_FILE_ENCODING_LINEAR_24, |
| AUDIO_FILE_ENCODING_LINEAR_32, |
| AUDIO_FILE_ENCODING_ALAW_8] |
| |
| class Error(Exception): |
| pass |
| |
| def _read_u32(file): |
| x = 0 |
| for i in range(4): |
| byte = file.read(1) |
| if not byte: |
| raise EOFError |
| x = x*256 + ord(byte) |
| return x |
| |
| def _write_u32(file, x): |
| data = [] |
| for i in range(4): |
| d, m = divmod(x, 256) |
| data.insert(0, int(m)) |
| x = d |
| file.write(bytes(data)) |
| |
| class Au_read: |
| |
| def __init__(self, f): |
| if type(f) == type(''): |
| import builtins |
| f = builtins.open(f, 'rb') |
| self._opened = True |
| else: |
| self._opened = False |
| self.initfp(f) |
| |
| def __del__(self): |
| if self._file: |
| self.close() |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, *args): |
| self.close() |
| |
| def initfp(self, file): |
| self._file = file |
| self._soundpos = 0 |
| magic = int(_read_u32(file)) |
| if magic != AUDIO_FILE_MAGIC: |
| raise Error('bad magic number') |
| self._hdr_size = int(_read_u32(file)) |
| if self._hdr_size < 24: |
| raise Error('header size too small') |
| if self._hdr_size > 100: |
| raise Error('header size ridiculously large') |
| self._data_size = _read_u32(file) |
| if self._data_size != AUDIO_UNKNOWN_SIZE: |
| self._data_size = int(self._data_size) |
| self._encoding = int(_read_u32(file)) |
| if self._encoding not in _simple_encodings: |
| raise Error('encoding not (yet) supported') |
| if self._encoding in (AUDIO_FILE_ENCODING_MULAW_8, |
| AUDIO_FILE_ENCODING_ALAW_8): |
| self._sampwidth = 2 |
| self._framesize = 1 |
| elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_8: |
| self._framesize = self._sampwidth = 1 |
| elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_16: |
| self._framesize = self._sampwidth = 2 |
| elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_24: |
| self._framesize = self._sampwidth = 3 |
| elif self._encoding == AUDIO_FILE_ENCODING_LINEAR_32: |
| self._framesize = self._sampwidth = 4 |
| else: |
| raise Error('unknown encoding') |
| self._framerate = int(_read_u32(file)) |
| self._nchannels = int(_read_u32(file)) |
| self._framesize = self._framesize * self._nchannels |
| if self._hdr_size > 24: |
| self._info = file.read(self._hdr_size - 24) |
| self._info, _, _ = self._info.partition(b'\0') |
| else: |
| self._info = b'' |
| try: |
| self._data_pos = file.tell() |
| except (AttributeError, OSError): |
| self._data_pos = None |
| |
| def getfp(self): |
| return self._file |
| |
| def getnchannels(self): |
| return self._nchannels |
| |
| def getsampwidth(self): |
| return self._sampwidth |
| |
| def getframerate(self): |
| return self._framerate |
| |
| def getnframes(self): |
| if self._data_size == AUDIO_UNKNOWN_SIZE: |
| return AUDIO_UNKNOWN_SIZE |
| if self._encoding in _simple_encodings: |
| return self._data_size // self._framesize |
| return 0 # XXX--must do some arithmetic here |
| |
| def getcomptype(self): |
| if self._encoding == AUDIO_FILE_ENCODING_MULAW_8: |
| return 'ULAW' |
| elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8: |
| return 'ALAW' |
| else: |
| return 'NONE' |
| |
| def getcompname(self): |
| if self._encoding == AUDIO_FILE_ENCODING_MULAW_8: |
| return 'CCITT G.711 u-law' |
| elif self._encoding == AUDIO_FILE_ENCODING_ALAW_8: |
| return 'CCITT G.711 A-law' |
| else: |
| return 'not compressed' |
| |
| def getparams(self): |
| return _sunau_params(self.getnchannels(), self.getsampwidth(), |
| self.getframerate(), self.getnframes(), |
| self.getcomptype(), self.getcompname()) |
| |
| def getmarkers(self): |
| return None |
| |
| def getmark(self, id): |
| raise Error('no marks') |
| |
| def readframes(self, nframes): |
| if self._encoding in _simple_encodings: |
| if nframes == AUDIO_UNKNOWN_SIZE: |
| data = self._file.read() |
| else: |
| data = self._file.read(nframes * self._framesize) |
| self._soundpos += len(data) // self._framesize |
| if self._encoding == AUDIO_FILE_ENCODING_MULAW_8: |
| import audioop |
| data = audioop.ulaw2lin(data, self._sampwidth) |
| return data |
| return None # XXX--not implemented yet |
| |
| def rewind(self): |
| if self._data_pos is None: |
| raise OSError('cannot seek') |
| self._file.seek(self._data_pos) |
| self._soundpos = 0 |
| |
| def tell(self): |
| return self._soundpos |
| |
| def setpos(self, pos): |
| if pos < 0 or pos > self.getnframes(): |
| raise Error('position not in range') |
| if self._data_pos is None: |
| raise OSError('cannot seek') |
| self._file.seek(self._data_pos + pos * self._framesize) |
| self._soundpos = pos |
| |
| def close(self): |
| file = self._file |
| if file: |
| self._file = None |
| if self._opened: |
| file.close() |
| |
| class Au_write: |
| |
| def __init__(self, f): |
| if type(f) == type(''): |
| import builtins |
| f = builtins.open(f, 'wb') |
| self._opened = True |
| else: |
| self._opened = False |
| self.initfp(f) |
| |
| def __del__(self): |
| if self._file: |
| self.close() |
| self._file = None |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, *args): |
| self.close() |
| |
| def initfp(self, file): |
| self._file = file |
| self._framerate = 0 |
| self._nchannels = 0 |
| self._sampwidth = 0 |
| self._framesize = 0 |
| self._nframes = AUDIO_UNKNOWN_SIZE |
| self._nframeswritten = 0 |
| self._datawritten = 0 |
| self._datalength = 0 |
| self._info = b'' |
| self._comptype = 'ULAW' # default is U-law |
| |
| def setnchannels(self, nchannels): |
| if self._nframeswritten: |
| raise Error('cannot change parameters after starting to write') |
| if nchannels not in (1, 2, 4): |
| raise Error('only 1, 2, or 4 channels supported') |
| self._nchannels = nchannels |
| |
| def getnchannels(self): |
| if not self._nchannels: |
| raise Error('number of channels not set') |
| return self._nchannels |
| |
| def setsampwidth(self, sampwidth): |
| if self._nframeswritten: |
| raise Error('cannot change parameters after starting to write') |
| if sampwidth not in (1, 2, 3, 4): |
| raise Error('bad sample width') |
| self._sampwidth = sampwidth |
| |
| def getsampwidth(self): |
| if not self._framerate: |
| raise Error('sample width not specified') |
| return self._sampwidth |
| |
| def setframerate(self, framerate): |
| if self._nframeswritten: |
| raise Error('cannot change parameters after starting to write') |
| self._framerate = framerate |
| |
| def getframerate(self): |
| if not self._framerate: |
| raise Error('frame rate not set') |
| return self._framerate |
| |
| def setnframes(self, nframes): |
| if self._nframeswritten: |
| raise Error('cannot change parameters after starting to write') |
| if nframes < 0: |
| raise Error('# of frames cannot be negative') |
| self._nframes = nframes |
| |
| def getnframes(self): |
| return self._nframeswritten |
| |
| def setcomptype(self, type, name): |
| if type in ('NONE', 'ULAW'): |
| self._comptype = type |
| else: |
| raise Error('unknown compression type') |
| |
| def getcomptype(self): |
| return self._comptype |
| |
| def getcompname(self): |
| if self._comptype == 'ULAW': |
| return 'CCITT G.711 u-law' |
| elif self._comptype == 'ALAW': |
| return 'CCITT G.711 A-law' |
| else: |
| return 'not compressed' |
| |
| def setparams(self, params): |
| nchannels, sampwidth, framerate, nframes, comptype, compname = params |
| self.setnchannels(nchannels) |
| self.setsampwidth(sampwidth) |
| self.setframerate(framerate) |
| self.setnframes(nframes) |
| self.setcomptype(comptype, compname) |
| |
| def getparams(self): |
| return _sunau_params(self.getnchannels(), self.getsampwidth(), |
| self.getframerate(), self.getnframes(), |
| self.getcomptype(), self.getcompname()) |
| |
| def tell(self): |
| return self._nframeswritten |
| |
| def writeframesraw(self, data): |
| if not isinstance(data, (bytes, bytearray)): |
| data = memoryview(data).cast('B') |
| self._ensure_header_written() |
| if self._comptype == 'ULAW': |
| import audioop |
| data = audioop.lin2ulaw(data, self._sampwidth) |
| nframes = len(data) // self._framesize |
| self._file.write(data) |
| self._nframeswritten = self._nframeswritten + nframes |
| self._datawritten = self._datawritten + len(data) |
| |
| def writeframes(self, data): |
| self.writeframesraw(data) |
| if self._nframeswritten != self._nframes or \ |
| self._datalength != self._datawritten: |
| self._patchheader() |
| |
| def close(self): |
| if self._file: |
| try: |
| self._ensure_header_written() |
| if self._nframeswritten != self._nframes or \ |
| self._datalength != self._datawritten: |
| self._patchheader() |
| self._file.flush() |
| finally: |
| file = self._file |
| self._file = None |
| if self._opened: |
| file.close() |
| |
| # |
| # private methods |
| # |
| |
| def _ensure_header_written(self): |
| if not self._nframeswritten: |
| if not self._nchannels: |
| raise Error('# of channels not specified') |
| if not self._sampwidth: |
| raise Error('sample width not specified') |
| if not self._framerate: |
| raise Error('frame rate not specified') |
| self._write_header() |
| |
| def _write_header(self): |
| if self._comptype == 'NONE': |
| if self._sampwidth == 1: |
| encoding = AUDIO_FILE_ENCODING_LINEAR_8 |
| self._framesize = 1 |
| elif self._sampwidth == 2: |
| encoding = AUDIO_FILE_ENCODING_LINEAR_16 |
| self._framesize = 2 |
| elif self._sampwidth == 3: |
| encoding = AUDIO_FILE_ENCODING_LINEAR_24 |
| self._framesize = 3 |
| elif self._sampwidth == 4: |
| encoding = AUDIO_FILE_ENCODING_LINEAR_32 |
| self._framesize = 4 |
| else: |
| raise Error('internal error') |
| elif self._comptype == 'ULAW': |
| encoding = AUDIO_FILE_ENCODING_MULAW_8 |
| self._framesize = 1 |
| else: |
| raise Error('internal error') |
| self._framesize = self._framesize * self._nchannels |
| _write_u32(self._file, AUDIO_FILE_MAGIC) |
| header_size = 25 + len(self._info) |
| header_size = (header_size + 7) & ~7 |
| _write_u32(self._file, header_size) |
| if self._nframes == AUDIO_UNKNOWN_SIZE: |
| length = AUDIO_UNKNOWN_SIZE |
| else: |
| length = self._nframes * self._framesize |
| try: |
| self._form_length_pos = self._file.tell() |
| except (AttributeError, OSError): |
| self._form_length_pos = None |
| _write_u32(self._file, length) |
| self._datalength = length |
| _write_u32(self._file, encoding) |
| _write_u32(self._file, self._framerate) |
| _write_u32(self._file, self._nchannels) |
| self._file.write(self._info) |
| self._file.write(b'\0'*(header_size - len(self._info) - 24)) |
| |
| def _patchheader(self): |
| if self._form_length_pos is None: |
| raise OSError('cannot seek') |
| self._file.seek(self._form_length_pos) |
| _write_u32(self._file, self._datawritten) |
| self._datalength = self._datawritten |
| self._file.seek(0, 2) |
| |
| def open(f, mode=None): |
| if mode is None: |
| if hasattr(f, 'mode'): |
| mode = f.mode |
| else: |
| mode = 'rb' |
| if mode in ('r', 'rb'): |
| return Au_read(f) |
| elif mode in ('w', 'wb'): |
| return Au_write(f) |
| else: |
| raise Error("mode must be 'r', 'rb', 'w', or 'wb'") |
| |
| def openfp(f, mode=None): |
| warnings.warn("sunau.openfp is deprecated since Python 3.7. " |
| "Use sunau.open instead.", DeprecationWarning, stacklevel=2) |
| return open(f, mode=mode) |