bpo-40495: compileall option to hardlink duplicate pyc files (GH-19901)
compileall is now able to use hardlinks to prevent duplicates in a
case when .pyc files for different optimization levels have the same content.
Co-authored-by: Miro Hrončok <miro@hroncok.cz>
Co-authored-by: Victor Stinner <vstinner@python.org>
diff --git a/Lib/compileall.py b/Lib/compileall.py
index abe6cff..fe7f450 100644
--- a/Lib/compileall.py
+++ b/Lib/compileall.py
@@ -15,6 +15,7 @@
import importlib.util
import py_compile
import struct
+import filecmp
from functools import partial
from pathlib import Path
@@ -47,7 +48,7 @@
def compile_dir(dir, maxlevels=None, ddir=None, force=False,
rx=None, quiet=0, legacy=False, optimize=-1, workers=1,
invalidation_mode=None, *, stripdir=None,
- prependdir=None, limit_sl_dest=None):
+ prependdir=None, limit_sl_dest=None, hardlink_dupes=False):
"""Byte-compile all modules in the given directory tree.
Arguments (only dir is required):
@@ -70,6 +71,7 @@
after stripdir
limit_sl_dest: ignore symlinks if they are pointing outside of
the defined path
+ hardlink_dupes: hardlink duplicated pyc files
"""
ProcessPoolExecutor = None
if ddir is not None and (stripdir is not None or prependdir is not None):
@@ -104,7 +106,8 @@
invalidation_mode=invalidation_mode,
stripdir=stripdir,
prependdir=prependdir,
- limit_sl_dest=limit_sl_dest),
+ limit_sl_dest=limit_sl_dest,
+ hardlink_dupes=hardlink_dupes),
files)
success = min(results, default=True)
else:
@@ -112,14 +115,15 @@
if not compile_file(file, ddir, force, rx, quiet,
legacy, optimize, invalidation_mode,
stripdir=stripdir, prependdir=prependdir,
- limit_sl_dest=limit_sl_dest):
+ limit_sl_dest=limit_sl_dest,
+ hardlink_dupes=hardlink_dupes):
success = False
return success
def compile_file(fullname, ddir=None, force=False, rx=None, quiet=0,
legacy=False, optimize=-1,
invalidation_mode=None, *, stripdir=None, prependdir=None,
- limit_sl_dest=None):
+ limit_sl_dest=None, hardlink_dupes=False):
"""Byte-compile one file.
Arguments (only fullname is required):
@@ -140,6 +144,7 @@
after stripdir
limit_sl_dest: ignore symlinks if they are pointing outside of
the defined path.
+ hardlink_dupes: hardlink duplicated pyc files
"""
if ddir is not None and (stripdir is not None or prependdir is not None):
@@ -176,6 +181,14 @@
if isinstance(optimize, int):
optimize = [optimize]
+ # Use set() to remove duplicates.
+ # Use sorted() to create pyc files in a deterministic order.
+ optimize = sorted(set(optimize))
+
+ if hardlink_dupes and len(optimize) < 2:
+ raise ValueError("Hardlinking of duplicated bytecode makes sense "
+ "only for more than one optimization level")
+
if rx is not None:
mo = rx.search(fullname)
if mo:
@@ -220,10 +233,16 @@
if not quiet:
print('Compiling {!r}...'.format(fullname))
try:
- for opt_level, cfile in opt_cfiles.items():
+ for index, opt_level in enumerate(optimize):
+ cfile = opt_cfiles[opt_level]
ok = py_compile.compile(fullname, cfile, dfile, True,
optimize=opt_level,
invalidation_mode=invalidation_mode)
+ if index > 0 and hardlink_dupes:
+ previous_cfile = opt_cfiles[optimize[index - 1]]
+ if filecmp.cmp(cfile, previous_cfile, shallow=False):
+ os.unlink(cfile)
+ os.link(previous_cfile, cfile)
except py_compile.PyCompileError as err:
success = False
if quiet >= 2:
@@ -352,6 +371,9 @@
'Python interpreter itself (specified by -O).'))
parser.add_argument('-e', metavar='DIR', dest='limit_sl_dest',
help='Ignore symlinks pointing outsite of the DIR')
+ parser.add_argument('--hardlink-dupes', action='store_true',
+ dest='hardlink_dupes',
+ help='Hardlink duplicated pyc files')
args = parser.parse_args()
compile_dests = args.compile_dest
@@ -371,6 +393,10 @@
if args.opt_levels is None:
args.opt_levels = [-1]
+ if len(args.opt_levels) == 1 and args.hardlink_dupes:
+ parser.error(("Hardlinking of duplicated bytecode makes sense "
+ "only for more than one optimization level."))
+
if args.ddir is not None and (
args.stripdir is not None or args.prependdir is not None
):
@@ -404,7 +430,8 @@
stripdir=args.stripdir,
prependdir=args.prependdir,
optimize=args.opt_levels,
- limit_sl_dest=args.limit_sl_dest):
+ limit_sl_dest=args.limit_sl_dest,
+ hardlink_dupes=args.hardlink_dupes):
success = False
else:
if not compile_dir(dest, maxlevels, args.ddir,
@@ -414,7 +441,8 @@
stripdir=args.stripdir,
prependdir=args.prependdir,
optimize=args.opt_levels,
- limit_sl_dest=args.limit_sl_dest):
+ limit_sl_dest=args.limit_sl_dest,
+ hardlink_dupes=args.hardlink_dupes):
success = False
return success
else:
diff --git a/Lib/test/test_compileall.py b/Lib/test/test_compileall.py
index 7267894..b4061b7 100644
--- a/Lib/test/test_compileall.py
+++ b/Lib/test/test_compileall.py
@@ -1,16 +1,19 @@
-import sys
import compileall
+import contextlib
+import filecmp
import importlib.util
-import test.test_importlib.util
+import io
+import itertools
import os
import pathlib
import py_compile
import shutil
import struct
+import sys
import tempfile
+import test.test_importlib.util
import time
import unittest
-import io
from unittest import mock, skipUnless
try:
@@ -26,6 +29,24 @@
from .test_py_compile import SourceDateEpochTestMeta
+def get_pyc(script, opt):
+ if not opt:
+ # Replace None and 0 with ''
+ opt = ''
+ return importlib.util.cache_from_source(script, optimization=opt)
+
+
+def get_pycs(script):
+ return [get_pyc(script, opt) for opt in (0, 1, 2)]
+
+
+def is_hardlink(filename1, filename2):
+ """Returns True if two files have the same inode (hardlink)"""
+ inode1 = os.stat(filename1).st_ino
+ inode2 = os.stat(filename2).st_ino
+ return inode1 == inode2
+
+
class CompileallTestsBase:
def setUp(self):
@@ -825,6 +846,32 @@
self.assertTrue(os.path.isfile(allowed_bc))
self.assertFalse(os.path.isfile(prohibited_bc))
+ def test_hardlink_bad_args(self):
+ # Bad arguments combination, hardlink deduplication make sense
+ # only for more than one optimization level
+ self.assertRunNotOK(self.directory, "-o 1", "--hardlink-dupes")
+
+ def test_hardlink(self):
+ # 'a = 0' code produces the same bytecode for the 3 optimization
+ # levels. All three .pyc files must have the same inode (hardlinks).
+ #
+ # If deduplication is disabled, all pyc files must have different
+ # inodes.
+ for dedup in (True, False):
+ with tempfile.TemporaryDirectory() as path:
+ with self.subTest(dedup=dedup):
+ script = script_helper.make_script(path, "script", "a = 0")
+ pycs = get_pycs(script)
+
+ args = ["-q", "-o 0", "-o 1", "-o 2"]
+ if dedup:
+ args.append("--hardlink-dupes")
+ self.assertRunOK(path, *args)
+
+ self.assertEqual(is_hardlink(pycs[0], pycs[1]), dedup)
+ self.assertEqual(is_hardlink(pycs[1], pycs[2]), dedup)
+ self.assertEqual(is_hardlink(pycs[0], pycs[2]), dedup)
+
class CommandLineTestsWithSourceEpoch(CommandLineTestsBase,
unittest.TestCase,
@@ -841,5 +888,176 @@
+class HardlinkDedupTestsBase:
+ # Test hardlink_dupes parameter of compileall.compile_dir()
+
+ def setUp(self):
+ self.path = None
+
+ @contextlib.contextmanager
+ def temporary_directory(self):
+ with tempfile.TemporaryDirectory() as path:
+ self.path = path
+ yield path
+ self.path = None
+
+ def make_script(self, code, name="script"):
+ return script_helper.make_script(self.path, name, code)
+
+ def compile_dir(self, *, dedup=True, optimize=(0, 1, 2), force=False):
+ compileall.compile_dir(self.path, quiet=True, optimize=optimize,
+ hardlink_dupes=dedup, force=force)
+
+ def test_bad_args(self):
+ # Bad arguments combination, hardlink deduplication make sense
+ # only for more than one optimization level
+ with self.temporary_directory():
+ self.make_script("pass")
+ with self.assertRaises(ValueError):
+ compileall.compile_dir(self.path, quiet=True, optimize=0,
+ hardlink_dupes=True)
+ with self.assertRaises(ValueError):
+ # same optimization level specified twice:
+ # compile_dir() removes duplicates
+ compileall.compile_dir(self.path, quiet=True, optimize=[0, 0],
+ hardlink_dupes=True)
+
+ def create_code(self, docstring=False, assertion=False):
+ lines = []
+ if docstring:
+ lines.append("'module docstring'")
+ lines.append('x = 1')
+ if assertion:
+ lines.append("assert x == 1")
+ return '\n'.join(lines)
+
+ def iter_codes(self):
+ for docstring in (False, True):
+ for assertion in (False, True):
+ code = self.create_code(docstring=docstring, assertion=assertion)
+ yield (code, docstring, assertion)
+
+ def test_disabled(self):
+ # Deduplication disabled, no hardlinks
+ for code, docstring, assertion in self.iter_codes():
+ with self.subTest(docstring=docstring, assertion=assertion):
+ with self.temporary_directory():
+ script = self.make_script(code)
+ pycs = get_pycs(script)
+ self.compile_dir(dedup=False)
+ self.assertFalse(is_hardlink(pycs[0], pycs[1]))
+ self.assertFalse(is_hardlink(pycs[0], pycs[2]))
+ self.assertFalse(is_hardlink(pycs[1], pycs[2]))
+
+ def check_hardlinks(self, script, docstring=False, assertion=False):
+ pycs = get_pycs(script)
+ self.assertEqual(is_hardlink(pycs[0], pycs[1]),
+ not assertion)
+ self.assertEqual(is_hardlink(pycs[0], pycs[2]),
+ not assertion and not docstring)
+ self.assertEqual(is_hardlink(pycs[1], pycs[2]),
+ not docstring)
+
+ def test_hardlink(self):
+ # Test deduplication on all combinations
+ for code, docstring, assertion in self.iter_codes():
+ with self.subTest(docstring=docstring, assertion=assertion):
+ with self.temporary_directory():
+ script = self.make_script(code)
+ self.compile_dir()
+ self.check_hardlinks(script, docstring, assertion)
+
+ def test_only_two_levels(self):
+ # Don't build the 3 optimization levels, but only 2
+ for opts in ((0, 1), (1, 2), (0, 2)):
+ with self.subTest(opts=opts):
+ with self.temporary_directory():
+ # code with no dostring and no assertion:
+ # same bytecode for all optimization levels
+ script = self.make_script(self.create_code())
+ self.compile_dir(optimize=opts)
+ pyc1 = get_pyc(script, opts[0])
+ pyc2 = get_pyc(script, opts[1])
+ self.assertTrue(is_hardlink(pyc1, pyc2))
+
+ def test_duplicated_levels(self):
+ # compile_dir() must not fail if optimize contains duplicated
+ # optimization levels and/or if optimization levels are not sorted.
+ with self.temporary_directory():
+ # code with no dostring and no assertion:
+ # same bytecode for all optimization levels
+ script = self.make_script(self.create_code())
+ self.compile_dir(optimize=[1, 0, 1, 0])
+ pyc1 = get_pyc(script, 0)
+ pyc2 = get_pyc(script, 1)
+ self.assertTrue(is_hardlink(pyc1, pyc2))
+
+ def test_recompilation(self):
+ # Test compile_dir() when pyc files already exists and the script
+ # content changed
+ with self.temporary_directory():
+ script = self.make_script("a = 0")
+ self.compile_dir()
+ # All three levels have the same inode
+ self.check_hardlinks(script)
+
+ pycs = get_pycs(script)
+ inode = os.stat(pycs[0]).st_ino
+
+ # Change of the module content
+ script = self.make_script("print(0)")
+
+ # Recompilation without -o 1
+ self.compile_dir(optimize=[0, 2], force=True)
+
+ # opt-1.pyc should have the same inode as before and others should not
+ self.assertEqual(inode, os.stat(pycs[1]).st_ino)
+ self.assertTrue(is_hardlink(pycs[0], pycs[2]))
+ self.assertNotEqual(inode, os.stat(pycs[2]).st_ino)
+ # opt-1.pyc and opt-2.pyc have different content
+ self.assertFalse(filecmp.cmp(pycs[1], pycs[2], shallow=True))
+
+ def test_import(self):
+ # Test that import updates a single pyc file when pyc files already
+ # exists and the script content changed
+ with self.temporary_directory():
+ script = self.make_script(self.create_code(), name="module")
+ self.compile_dir()
+ # All three levels have the same inode
+ self.check_hardlinks(script)
+
+ pycs = get_pycs(script)
+ inode = os.stat(pycs[0]).st_ino
+
+ # Change of the module content
+ script = self.make_script("print(0)", name="module")
+
+ # Import the module in Python with -O (optimization level 1)
+ script_helper.assert_python_ok(
+ "-O", "-c", "import module", __isolated=False, PYTHONPATH=self.path
+ )
+
+ # Only opt-1.pyc is changed
+ self.assertEqual(inode, os.stat(pycs[0]).st_ino)
+ self.assertEqual(inode, os.stat(pycs[2]).st_ino)
+ self.assertFalse(is_hardlink(pycs[1], pycs[2]))
+ # opt-1.pyc and opt-2.pyc have different content
+ self.assertFalse(filecmp.cmp(pycs[1], pycs[2], shallow=True))
+
+
+class HardlinkDedupTestsWithSourceEpoch(HardlinkDedupTestsBase,
+ unittest.TestCase,
+ metaclass=SourceDateEpochTestMeta,
+ source_date_epoch=True):
+ pass
+
+
+class HardlinkDedupTestsNoSourceEpoch(HardlinkDedupTestsBase,
+ unittest.TestCase,
+ metaclass=SourceDateEpochTestMeta,
+ source_date_epoch=False):
+ pass
+
+
if __name__ == "__main__":
unittest.main()