bpo-40495: compileall option to hardlink duplicate pyc files (GH-19901)

compileall is now able to use hardlinks to prevent duplicates in a
case when .pyc files for different optimization levels have the same content.

Co-authored-by: Miro Hrončok <miro@hroncok.cz>
Co-authored-by: Victor Stinner <vstinner@python.org>
diff --git a/Lib/compileall.py b/Lib/compileall.py
index abe6cff..fe7f450 100644
--- a/Lib/compileall.py
+++ b/Lib/compileall.py
@@ -15,6 +15,7 @@
 import importlib.util
 import py_compile
 import struct
+import filecmp
 
 from functools import partial
 from pathlib import Path
@@ -47,7 +48,7 @@
 def compile_dir(dir, maxlevels=None, ddir=None, force=False,
                 rx=None, quiet=0, legacy=False, optimize=-1, workers=1,
                 invalidation_mode=None, *, stripdir=None,
-                prependdir=None, limit_sl_dest=None):
+                prependdir=None, limit_sl_dest=None, hardlink_dupes=False):
     """Byte-compile all modules in the given directory tree.
 
     Arguments (only dir is required):
@@ -70,6 +71,7 @@
                after stripdir
     limit_sl_dest: ignore symlinks if they are pointing outside of
                    the defined path
+    hardlink_dupes: hardlink duplicated pyc files
     """
     ProcessPoolExecutor = None
     if ddir is not None and (stripdir is not None or prependdir is not None):
@@ -104,7 +106,8 @@
                                            invalidation_mode=invalidation_mode,
                                            stripdir=stripdir,
                                            prependdir=prependdir,
-                                           limit_sl_dest=limit_sl_dest),
+                                           limit_sl_dest=limit_sl_dest,
+                                           hardlink_dupes=hardlink_dupes),
                                    files)
             success = min(results, default=True)
     else:
@@ -112,14 +115,15 @@
             if not compile_file(file, ddir, force, rx, quiet,
                                 legacy, optimize, invalidation_mode,
                                 stripdir=stripdir, prependdir=prependdir,
-                                limit_sl_dest=limit_sl_dest):
+                                limit_sl_dest=limit_sl_dest,
+                                hardlink_dupes=hardlink_dupes):
                 success = False
     return success
 
 def compile_file(fullname, ddir=None, force=False, rx=None, quiet=0,
                  legacy=False, optimize=-1,
                  invalidation_mode=None, *, stripdir=None, prependdir=None,
-                 limit_sl_dest=None):
+                 limit_sl_dest=None, hardlink_dupes=False):
     """Byte-compile one file.
 
     Arguments (only fullname is required):
@@ -140,6 +144,7 @@
                after stripdir
     limit_sl_dest: ignore symlinks if they are pointing outside of
                    the defined path.
+    hardlink_dupes: hardlink duplicated pyc files
     """
 
     if ddir is not None and (stripdir is not None or prependdir is not None):
@@ -176,6 +181,14 @@
     if isinstance(optimize, int):
         optimize = [optimize]
 
+    # Use set() to remove duplicates.
+    # Use sorted() to create pyc files in a deterministic order.
+    optimize = sorted(set(optimize))
+
+    if hardlink_dupes and len(optimize) < 2:
+        raise ValueError("Hardlinking of duplicated bytecode makes sense "
+                          "only for more than one optimization level")
+
     if rx is not None:
         mo = rx.search(fullname)
         if mo:
@@ -220,10 +233,16 @@
             if not quiet:
                 print('Compiling {!r}...'.format(fullname))
             try:
-                for opt_level, cfile in opt_cfiles.items():
+                for index, opt_level in enumerate(optimize):
+                    cfile = opt_cfiles[opt_level]
                     ok = py_compile.compile(fullname, cfile, dfile, True,
                                             optimize=opt_level,
                                             invalidation_mode=invalidation_mode)
+                    if index > 0 and hardlink_dupes:
+                        previous_cfile = opt_cfiles[optimize[index - 1]]
+                        if filecmp.cmp(cfile, previous_cfile, shallow=False):
+                            os.unlink(cfile)
+                            os.link(previous_cfile, cfile)
             except py_compile.PyCompileError as err:
                 success = False
                 if quiet >= 2:
@@ -352,6 +371,9 @@
                               'Python interpreter itself (specified by -O).'))
     parser.add_argument('-e', metavar='DIR', dest='limit_sl_dest',
                         help='Ignore symlinks pointing outsite of the DIR')
+    parser.add_argument('--hardlink-dupes', action='store_true',
+                        dest='hardlink_dupes',
+                        help='Hardlink duplicated pyc files')
 
     args = parser.parse_args()
     compile_dests = args.compile_dest
@@ -371,6 +393,10 @@
     if args.opt_levels is None:
         args.opt_levels = [-1]
 
+    if len(args.opt_levels) == 1 and args.hardlink_dupes:
+        parser.error(("Hardlinking of duplicated bytecode makes sense "
+                      "only for more than one optimization level."))
+
     if args.ddir is not None and (
         args.stripdir is not None or args.prependdir is not None
     ):
@@ -404,7 +430,8 @@
                                         stripdir=args.stripdir,
                                         prependdir=args.prependdir,
                                         optimize=args.opt_levels,
-                                        limit_sl_dest=args.limit_sl_dest):
+                                        limit_sl_dest=args.limit_sl_dest,
+                                        hardlink_dupes=args.hardlink_dupes):
                         success = False
                 else:
                     if not compile_dir(dest, maxlevels, args.ddir,
@@ -414,7 +441,8 @@
                                        stripdir=args.stripdir,
                                        prependdir=args.prependdir,
                                        optimize=args.opt_levels,
-                                       limit_sl_dest=args.limit_sl_dest):
+                                       limit_sl_dest=args.limit_sl_dest,
+                                       hardlink_dupes=args.hardlink_dupes):
                         success = False
             return success
         else:
diff --git a/Lib/test/test_compileall.py b/Lib/test/test_compileall.py
index 7267894..b4061b7 100644
--- a/Lib/test/test_compileall.py
+++ b/Lib/test/test_compileall.py
@@ -1,16 +1,19 @@
-import sys
 import compileall
+import contextlib
+import filecmp
 import importlib.util
-import test.test_importlib.util
+import io
+import itertools
 import os
 import pathlib
 import py_compile
 import shutil
 import struct
+import sys
 import tempfile
+import test.test_importlib.util
 import time
 import unittest
-import io
 
 from unittest import mock, skipUnless
 try:
@@ -26,6 +29,24 @@
 from .test_py_compile import SourceDateEpochTestMeta
 
 
+def get_pyc(script, opt):
+    if not opt:
+        # Replace None and 0 with ''
+        opt = ''
+    return importlib.util.cache_from_source(script, optimization=opt)
+
+
+def get_pycs(script):
+    return [get_pyc(script, opt) for opt in (0, 1, 2)]
+
+
+def is_hardlink(filename1, filename2):
+    """Returns True if two files have the same inode (hardlink)"""
+    inode1 = os.stat(filename1).st_ino
+    inode2 = os.stat(filename2).st_ino
+    return inode1 == inode2
+
+
 class CompileallTestsBase:
 
     def setUp(self):
@@ -825,6 +846,32 @@
         self.assertTrue(os.path.isfile(allowed_bc))
         self.assertFalse(os.path.isfile(prohibited_bc))
 
+    def test_hardlink_bad_args(self):
+        # Bad arguments combination, hardlink deduplication make sense
+        # only for more than one optimization level
+        self.assertRunNotOK(self.directory, "-o 1", "--hardlink-dupes")
+
+    def test_hardlink(self):
+        # 'a = 0' code produces the same bytecode for the 3 optimization
+        # levels. All three .pyc files must have the same inode (hardlinks).
+        #
+        # If deduplication is disabled, all pyc files must have different
+        # inodes.
+        for dedup in (True, False):
+            with tempfile.TemporaryDirectory() as path:
+                with self.subTest(dedup=dedup):
+                    script = script_helper.make_script(path, "script", "a = 0")
+                    pycs = get_pycs(script)
+
+                    args = ["-q", "-o 0", "-o 1", "-o 2"]
+                    if dedup:
+                        args.append("--hardlink-dupes")
+                    self.assertRunOK(path, *args)
+
+                    self.assertEqual(is_hardlink(pycs[0], pycs[1]), dedup)
+                    self.assertEqual(is_hardlink(pycs[1], pycs[2]), dedup)
+                    self.assertEqual(is_hardlink(pycs[0], pycs[2]), dedup)
+
 
 class CommandLineTestsWithSourceEpoch(CommandLineTestsBase,
                                        unittest.TestCase,
@@ -841,5 +888,176 @@
 
 
 
+class HardlinkDedupTestsBase:
+    # Test hardlink_dupes parameter of compileall.compile_dir()
+
+    def setUp(self):
+        self.path = None
+
+    @contextlib.contextmanager
+    def temporary_directory(self):
+        with tempfile.TemporaryDirectory() as path:
+            self.path = path
+            yield path
+            self.path = None
+
+    def make_script(self, code, name="script"):
+        return script_helper.make_script(self.path, name, code)
+
+    def compile_dir(self, *, dedup=True, optimize=(0, 1, 2), force=False):
+        compileall.compile_dir(self.path, quiet=True, optimize=optimize,
+                               hardlink_dupes=dedup, force=force)
+
+    def test_bad_args(self):
+        # Bad arguments combination, hardlink deduplication make sense
+        # only for more than one optimization level
+        with self.temporary_directory():
+            self.make_script("pass")
+            with self.assertRaises(ValueError):
+                compileall.compile_dir(self.path, quiet=True, optimize=0,
+                                       hardlink_dupes=True)
+            with self.assertRaises(ValueError):
+                # same optimization level specified twice:
+                # compile_dir() removes duplicates
+                compileall.compile_dir(self.path, quiet=True, optimize=[0, 0],
+                                       hardlink_dupes=True)
+
+    def create_code(self, docstring=False, assertion=False):
+        lines = []
+        if docstring:
+            lines.append("'module docstring'")
+        lines.append('x = 1')
+        if assertion:
+            lines.append("assert x == 1")
+        return '\n'.join(lines)
+
+    def iter_codes(self):
+        for docstring in (False, True):
+            for assertion in (False, True):
+                code = self.create_code(docstring=docstring, assertion=assertion)
+                yield (code, docstring, assertion)
+
+    def test_disabled(self):
+        # Deduplication disabled, no hardlinks
+        for code, docstring, assertion in self.iter_codes():
+            with self.subTest(docstring=docstring, assertion=assertion):
+                with self.temporary_directory():
+                    script = self.make_script(code)
+                    pycs = get_pycs(script)
+                    self.compile_dir(dedup=False)
+                    self.assertFalse(is_hardlink(pycs[0], pycs[1]))
+                    self.assertFalse(is_hardlink(pycs[0], pycs[2]))
+                    self.assertFalse(is_hardlink(pycs[1], pycs[2]))
+
+    def check_hardlinks(self, script, docstring=False, assertion=False):
+        pycs = get_pycs(script)
+        self.assertEqual(is_hardlink(pycs[0], pycs[1]),
+                         not assertion)
+        self.assertEqual(is_hardlink(pycs[0], pycs[2]),
+                         not assertion and not docstring)
+        self.assertEqual(is_hardlink(pycs[1], pycs[2]),
+                         not docstring)
+
+    def test_hardlink(self):
+        # Test deduplication on all combinations
+        for code, docstring, assertion in self.iter_codes():
+            with self.subTest(docstring=docstring, assertion=assertion):
+                with self.temporary_directory():
+                    script = self.make_script(code)
+                    self.compile_dir()
+                    self.check_hardlinks(script, docstring, assertion)
+
+    def test_only_two_levels(self):
+        # Don't build the 3 optimization levels, but only 2
+        for opts in ((0, 1), (1, 2), (0, 2)):
+            with self.subTest(opts=opts):
+                with self.temporary_directory():
+                    # code with no dostring and no assertion:
+                    # same bytecode for all optimization levels
+                    script = self.make_script(self.create_code())
+                    self.compile_dir(optimize=opts)
+                    pyc1 = get_pyc(script, opts[0])
+                    pyc2 = get_pyc(script, opts[1])
+                    self.assertTrue(is_hardlink(pyc1, pyc2))
+
+    def test_duplicated_levels(self):
+        # compile_dir() must not fail if optimize contains duplicated
+        # optimization levels and/or if optimization levels are not sorted.
+        with self.temporary_directory():
+            # code with no dostring and no assertion:
+            # same bytecode for all optimization levels
+            script = self.make_script(self.create_code())
+            self.compile_dir(optimize=[1, 0, 1, 0])
+            pyc1 = get_pyc(script, 0)
+            pyc2 = get_pyc(script, 1)
+            self.assertTrue(is_hardlink(pyc1, pyc2))
+
+    def test_recompilation(self):
+        # Test compile_dir() when pyc files already exists and the script
+        # content changed
+        with self.temporary_directory():
+            script = self.make_script("a = 0")
+            self.compile_dir()
+            # All three levels have the same inode
+            self.check_hardlinks(script)
+
+            pycs = get_pycs(script)
+            inode = os.stat(pycs[0]).st_ino
+
+            # Change of the module content
+            script = self.make_script("print(0)")
+
+            # Recompilation without -o 1
+            self.compile_dir(optimize=[0, 2], force=True)
+
+            # opt-1.pyc should have the same inode as before and others should not
+            self.assertEqual(inode, os.stat(pycs[1]).st_ino)
+            self.assertTrue(is_hardlink(pycs[0], pycs[2]))
+            self.assertNotEqual(inode, os.stat(pycs[2]).st_ino)
+            # opt-1.pyc and opt-2.pyc have different content
+            self.assertFalse(filecmp.cmp(pycs[1], pycs[2], shallow=True))
+
+    def test_import(self):
+        # Test that import updates a single pyc file when pyc files already
+        # exists and the script content changed
+        with self.temporary_directory():
+            script = self.make_script(self.create_code(), name="module")
+            self.compile_dir()
+            # All three levels have the same inode
+            self.check_hardlinks(script)
+
+            pycs = get_pycs(script)
+            inode = os.stat(pycs[0]).st_ino
+
+            # Change of the module content
+            script = self.make_script("print(0)", name="module")
+
+            # Import the module in Python with -O (optimization level 1)
+            script_helper.assert_python_ok(
+                "-O", "-c", "import module", __isolated=False, PYTHONPATH=self.path
+            )
+
+            # Only opt-1.pyc is changed
+            self.assertEqual(inode, os.stat(pycs[0]).st_ino)
+            self.assertEqual(inode, os.stat(pycs[2]).st_ino)
+            self.assertFalse(is_hardlink(pycs[1], pycs[2]))
+            # opt-1.pyc and opt-2.pyc have different content
+            self.assertFalse(filecmp.cmp(pycs[1], pycs[2], shallow=True))
+
+
+class HardlinkDedupTestsWithSourceEpoch(HardlinkDedupTestsBase,
+                                        unittest.TestCase,
+                                        metaclass=SourceDateEpochTestMeta,
+                                        source_date_epoch=True):
+    pass
+
+
+class HardlinkDedupTestsNoSourceEpoch(HardlinkDedupTestsBase,
+                                      unittest.TestCase,
+                                      metaclass=SourceDateEpochTestMeta,
+                                      source_date_epoch=False):
+    pass
+
+
 if __name__ == "__main__":
     unittest.main()