| # |
| # DEPRECATED: implementation for ffi.verify() |
| # |
| import sys, os, binascii, shutil, io |
| from . import __version_verifier_modules__ |
| from . import ffiplatform |
| from .error import VerificationError |
| |
| if sys.version_info >= (3, 3): |
| import importlib.machinery |
| def _extension_suffixes(): |
| return importlib.machinery.EXTENSION_SUFFIXES[:] |
| else: |
| import imp |
| def _extension_suffixes(): |
| return [suffix for suffix, _, type in imp.get_suffixes() |
| if type == imp.C_EXTENSION] |
| |
| |
| if sys.version_info >= (3,): |
| NativeIO = io.StringIO |
| else: |
| class NativeIO(io.BytesIO): |
| def write(self, s): |
| if isinstance(s, unicode): |
| s = s.encode('ascii') |
| super(NativeIO, self).write(s) |
| |
| |
| class Verifier(object): |
| |
| def __init__(self, ffi, preamble, tmpdir=None, modulename=None, |
| ext_package=None, tag='', force_generic_engine=False, |
| source_extension='.c', flags=None, relative_to=None, **kwds): |
| if ffi._parser._uses_new_feature: |
| raise VerificationError( |
| "feature not supported with ffi.verify(), but only " |
| "with ffi.set_source(): %s" % (ffi._parser._uses_new_feature,)) |
| self.ffi = ffi |
| self.preamble = preamble |
| if not modulename: |
| flattened_kwds = ffiplatform.flatten(kwds) |
| vengine_class = _locate_engine_class(ffi, force_generic_engine) |
| self._vengine = vengine_class(self) |
| self._vengine.patch_extension_kwds(kwds) |
| self.flags = flags |
| self.kwds = self.make_relative_to(kwds, relative_to) |
| # |
| if modulename: |
| if tag: |
| raise TypeError("can't specify both 'modulename' and 'tag'") |
| else: |
| key = '\x00'.join([sys.version[:3], __version_verifier_modules__, |
| preamble, flattened_kwds] + |
| ffi._cdefsources) |
| if sys.version_info >= (3,): |
| key = key.encode('utf-8') |
| k1 = hex(binascii.crc32(key[0::2]) & 0xffffffff) |
| k1 = k1.lstrip('0x').rstrip('L') |
| k2 = hex(binascii.crc32(key[1::2]) & 0xffffffff) |
| k2 = k2.lstrip('0').rstrip('L') |
| modulename = '_cffi_%s_%s%s%s' % (tag, self._vengine._class_key, |
| k1, k2) |
| suffix = _get_so_suffixes()[0] |
| self.tmpdir = tmpdir or _caller_dir_pycache() |
| self.sourcefilename = os.path.join(self.tmpdir, modulename + source_extension) |
| self.modulefilename = os.path.join(self.tmpdir, modulename + suffix) |
| self.ext_package = ext_package |
| self._has_source = False |
| self._has_module = False |
| |
| def write_source(self, file=None): |
| """Write the C source code. It is produced in 'self.sourcefilename', |
| which can be tweaked beforehand.""" |
| with self.ffi._lock: |
| if self._has_source and file is None: |
| raise VerificationError( |
| "source code already written") |
| self._write_source(file) |
| |
| def compile_module(self): |
| """Write the C source code (if not done already) and compile it. |
| This produces a dynamic link library in 'self.modulefilename'.""" |
| with self.ffi._lock: |
| if self._has_module: |
| raise VerificationError("module already compiled") |
| if not self._has_source: |
| self._write_source() |
| self._compile_module() |
| |
| def load_library(self): |
| """Get a C module from this Verifier instance. |
| Returns an instance of a FFILibrary class that behaves like the |
| objects returned by ffi.dlopen(), but that delegates all |
| operations to the C module. If necessary, the C code is written |
| and compiled first. |
| """ |
| with self.ffi._lock: |
| if not self._has_module: |
| self._locate_module() |
| if not self._has_module: |
| if not self._has_source: |
| self._write_source() |
| self._compile_module() |
| return self._load_library() |
| |
| def get_module_name(self): |
| basename = os.path.basename(self.modulefilename) |
| # kill both the .so extension and the other .'s, as introduced |
| # by Python 3: 'basename.cpython-33m.so' |
| basename = basename.split('.', 1)[0] |
| # and the _d added in Python 2 debug builds --- but try to be |
| # conservative and not kill a legitimate _d |
| if basename.endswith('_d') and hasattr(sys, 'gettotalrefcount'): |
| basename = basename[:-2] |
| return basename |
| |
| def get_extension(self): |
| ffiplatform._hack_at_distutils() # backward compatibility hack |
| if not self._has_source: |
| with self.ffi._lock: |
| if not self._has_source: |
| self._write_source() |
| sourcename = ffiplatform.maybe_relative_path(self.sourcefilename) |
| modname = self.get_module_name() |
| return ffiplatform.get_extension(sourcename, modname, **self.kwds) |
| |
| def generates_python_module(self): |
| return self._vengine._gen_python_module |
| |
| def make_relative_to(self, kwds, relative_to): |
| if relative_to and os.path.dirname(relative_to): |
| dirname = os.path.dirname(relative_to) |
| kwds = kwds.copy() |
| for key in ffiplatform.LIST_OF_FILE_NAMES: |
| if key in kwds: |
| lst = kwds[key] |
| if not isinstance(lst, (list, tuple)): |
| raise TypeError("keyword '%s' should be a list or tuple" |
| % (key,)) |
| lst = [os.path.join(dirname, fn) for fn in lst] |
| kwds[key] = lst |
| return kwds |
| |
| # ---------- |
| |
| def _locate_module(self): |
| if not os.path.isfile(self.modulefilename): |
| if self.ext_package: |
| try: |
| pkg = __import__(self.ext_package, None, None, ['__doc__']) |
| except ImportError: |
| return # cannot import the package itself, give up |
| # (e.g. it might be called differently before installation) |
| path = pkg.__path__ |
| else: |
| path = None |
| filename = self._vengine.find_module(self.get_module_name(), path, |
| _get_so_suffixes()) |
| if filename is None: |
| return |
| self.modulefilename = filename |
| self._vengine.collect_types() |
| self._has_module = True |
| |
| def _write_source_to(self, file): |
| self._vengine._f = file |
| try: |
| self._vengine.write_source_to_f() |
| finally: |
| del self._vengine._f |
| |
| def _write_source(self, file=None): |
| if file is not None: |
| self._write_source_to(file) |
| else: |
| # Write our source file to an in memory file. |
| f = NativeIO() |
| self._write_source_to(f) |
| source_data = f.getvalue() |
| |
| # Determine if this matches the current file |
| if os.path.exists(self.sourcefilename): |
| with open(self.sourcefilename, "r") as fp: |
| needs_written = not (fp.read() == source_data) |
| else: |
| needs_written = True |
| |
| # Actually write the file out if it doesn't match |
| if needs_written: |
| _ensure_dir(self.sourcefilename) |
| with open(self.sourcefilename, "w") as fp: |
| fp.write(source_data) |
| |
| # Set this flag |
| self._has_source = True |
| |
| def _compile_module(self): |
| # compile this C source |
| tmpdir = os.path.dirname(self.sourcefilename) |
| outputfilename = ffiplatform.compile(tmpdir, self.get_extension()) |
| try: |
| same = ffiplatform.samefile(outputfilename, self.modulefilename) |
| except OSError: |
| same = False |
| if not same: |
| _ensure_dir(self.modulefilename) |
| shutil.move(outputfilename, self.modulefilename) |
| self._has_module = True |
| |
| def _load_library(self): |
| assert self._has_module |
| if self.flags is not None: |
| return self._vengine.load_library(self.flags) |
| else: |
| return self._vengine.load_library() |
| |
| # ____________________________________________________________ |
| |
| _FORCE_GENERIC_ENGINE = False # for tests |
| |
| def _locate_engine_class(ffi, force_generic_engine): |
| if _FORCE_GENERIC_ENGINE: |
| force_generic_engine = True |
| if not force_generic_engine: |
| if '__pypy__' in sys.builtin_module_names: |
| force_generic_engine = True |
| else: |
| try: |
| import _cffi_backend |
| except ImportError: |
| _cffi_backend = '?' |
| if ffi._backend is not _cffi_backend: |
| force_generic_engine = True |
| if force_generic_engine: |
| from . import vengine_gen |
| return vengine_gen.VGenericEngine |
| else: |
| from . import vengine_cpy |
| return vengine_cpy.VCPythonEngine |
| |
| # ____________________________________________________________ |
| |
| _TMPDIR = None |
| |
| def _caller_dir_pycache(): |
| if _TMPDIR: |
| return _TMPDIR |
| result = os.environ.get('CFFI_TMPDIR') |
| if result: |
| return result |
| filename = sys._getframe(2).f_code.co_filename |
| return os.path.abspath(os.path.join(os.path.dirname(filename), |
| '__pycache__')) |
| |
| def set_tmpdir(dirname): |
| """Set the temporary directory to use instead of __pycache__.""" |
| global _TMPDIR |
| _TMPDIR = dirname |
| |
| def cleanup_tmpdir(tmpdir=None, keep_so=False): |
| """Clean up the temporary directory by removing all files in it |
| called `_cffi_*.{c,so}` as well as the `build` subdirectory.""" |
| tmpdir = tmpdir or _caller_dir_pycache() |
| try: |
| filelist = os.listdir(tmpdir) |
| except OSError: |
| return |
| if keep_so: |
| suffix = '.c' # only remove .c files |
| else: |
| suffix = _get_so_suffixes()[0].lower() |
| for fn in filelist: |
| if fn.lower().startswith('_cffi_') and ( |
| fn.lower().endswith(suffix) or fn.lower().endswith('.c')): |
| try: |
| os.unlink(os.path.join(tmpdir, fn)) |
| except OSError: |
| pass |
| clean_dir = [os.path.join(tmpdir, 'build')] |
| for dir in clean_dir: |
| try: |
| for fn in os.listdir(dir): |
| fn = os.path.join(dir, fn) |
| if os.path.isdir(fn): |
| clean_dir.append(fn) |
| else: |
| os.unlink(fn) |
| except OSError: |
| pass |
| |
| def _get_so_suffixes(): |
| suffixes = _extension_suffixes() |
| if not suffixes: |
| # bah, no C_EXTENSION available. Occurs on pypy without cpyext |
| if sys.platform == 'win32': |
| suffixes = [".pyd"] |
| else: |
| suffixes = [".so"] |
| |
| return suffixes |
| |
| def _ensure_dir(filename): |
| dirname = os.path.dirname(filename) |
| if dirname and not os.path.isdir(dirname): |
| os.makedirs(dirname) |