bpo-40495: compileall option to hardlink duplicate pyc files (GH-19901)

compileall is now able to use hardlinks to prevent duplicates in a
case when .pyc files for different optimization levels have the same content.

Co-authored-by: Miro Hrončok <miro@hroncok.cz>
Co-authored-by: Victor Stinner <vstinner@python.org>
diff --git a/Lib/test/test_compileall.py b/Lib/test/test_compileall.py
index 7267894..b4061b7 100644
--- a/Lib/test/test_compileall.py
+++ b/Lib/test/test_compileall.py
@@ -1,16 +1,19 @@
-import sys
 import compileall
+import contextlib
+import filecmp
 import importlib.util
-import test.test_importlib.util
+import io
+import itertools
 import os
 import pathlib
 import py_compile
 import shutil
 import struct
+import sys
 import tempfile
+import test.test_importlib.util
 import time
 import unittest
-import io
 
 from unittest import mock, skipUnless
 try:
@@ -26,6 +29,24 @@
 from .test_py_compile import SourceDateEpochTestMeta
 
 
+def get_pyc(script, opt):
+    if not opt:
+        # Replace None and 0 with ''
+        opt = ''
+    return importlib.util.cache_from_source(script, optimization=opt)
+
+
+def get_pycs(script):
+    return [get_pyc(script, opt) for opt in (0, 1, 2)]
+
+
+def is_hardlink(filename1, filename2):
+    """Returns True if two files have the same inode (hardlink)"""
+    inode1 = os.stat(filename1).st_ino
+    inode2 = os.stat(filename2).st_ino
+    return inode1 == inode2
+
+
 class CompileallTestsBase:
 
     def setUp(self):
@@ -825,6 +846,32 @@
         self.assertTrue(os.path.isfile(allowed_bc))
         self.assertFalse(os.path.isfile(prohibited_bc))
 
+    def test_hardlink_bad_args(self):
+        # Bad arguments combination, hardlink deduplication make sense
+        # only for more than one optimization level
+        self.assertRunNotOK(self.directory, "-o 1", "--hardlink-dupes")
+
+    def test_hardlink(self):
+        # 'a = 0' code produces the same bytecode for the 3 optimization
+        # levels. All three .pyc files must have the same inode (hardlinks).
+        #
+        # If deduplication is disabled, all pyc files must have different
+        # inodes.
+        for dedup in (True, False):
+            with tempfile.TemporaryDirectory() as path:
+                with self.subTest(dedup=dedup):
+                    script = script_helper.make_script(path, "script", "a = 0")
+                    pycs = get_pycs(script)
+
+                    args = ["-q", "-o 0", "-o 1", "-o 2"]
+                    if dedup:
+                        args.append("--hardlink-dupes")
+                    self.assertRunOK(path, *args)
+
+                    self.assertEqual(is_hardlink(pycs[0], pycs[1]), dedup)
+                    self.assertEqual(is_hardlink(pycs[1], pycs[2]), dedup)
+                    self.assertEqual(is_hardlink(pycs[0], pycs[2]), dedup)
+
 
 class CommandLineTestsWithSourceEpoch(CommandLineTestsBase,
                                        unittest.TestCase,
@@ -841,5 +888,176 @@
 
 
 
+class HardlinkDedupTestsBase:
+    # Test hardlink_dupes parameter of compileall.compile_dir()
+
+    def setUp(self):
+        self.path = None
+
+    @contextlib.contextmanager
+    def temporary_directory(self):
+        with tempfile.TemporaryDirectory() as path:
+            self.path = path
+            yield path
+            self.path = None
+
+    def make_script(self, code, name="script"):
+        return script_helper.make_script(self.path, name, code)
+
+    def compile_dir(self, *, dedup=True, optimize=(0, 1, 2), force=False):
+        compileall.compile_dir(self.path, quiet=True, optimize=optimize,
+                               hardlink_dupes=dedup, force=force)
+
+    def test_bad_args(self):
+        # Bad arguments combination, hardlink deduplication make sense
+        # only for more than one optimization level
+        with self.temporary_directory():
+            self.make_script("pass")
+            with self.assertRaises(ValueError):
+                compileall.compile_dir(self.path, quiet=True, optimize=0,
+                                       hardlink_dupes=True)
+            with self.assertRaises(ValueError):
+                # same optimization level specified twice:
+                # compile_dir() removes duplicates
+                compileall.compile_dir(self.path, quiet=True, optimize=[0, 0],
+                                       hardlink_dupes=True)
+
+    def create_code(self, docstring=False, assertion=False):
+        lines = []
+        if docstring:
+            lines.append("'module docstring'")
+        lines.append('x = 1')
+        if assertion:
+            lines.append("assert x == 1")
+        return '\n'.join(lines)
+
+    def iter_codes(self):
+        for docstring in (False, True):
+            for assertion in (False, True):
+                code = self.create_code(docstring=docstring, assertion=assertion)
+                yield (code, docstring, assertion)
+
+    def test_disabled(self):
+        # Deduplication disabled, no hardlinks
+        for code, docstring, assertion in self.iter_codes():
+            with self.subTest(docstring=docstring, assertion=assertion):
+                with self.temporary_directory():
+                    script = self.make_script(code)
+                    pycs = get_pycs(script)
+                    self.compile_dir(dedup=False)
+                    self.assertFalse(is_hardlink(pycs[0], pycs[1]))
+                    self.assertFalse(is_hardlink(pycs[0], pycs[2]))
+                    self.assertFalse(is_hardlink(pycs[1], pycs[2]))
+
+    def check_hardlinks(self, script, docstring=False, assertion=False):
+        pycs = get_pycs(script)
+        self.assertEqual(is_hardlink(pycs[0], pycs[1]),
+                         not assertion)
+        self.assertEqual(is_hardlink(pycs[0], pycs[2]),
+                         not assertion and not docstring)
+        self.assertEqual(is_hardlink(pycs[1], pycs[2]),
+                         not docstring)
+
+    def test_hardlink(self):
+        # Test deduplication on all combinations
+        for code, docstring, assertion in self.iter_codes():
+            with self.subTest(docstring=docstring, assertion=assertion):
+                with self.temporary_directory():
+                    script = self.make_script(code)
+                    self.compile_dir()
+                    self.check_hardlinks(script, docstring, assertion)
+
+    def test_only_two_levels(self):
+        # Don't build the 3 optimization levels, but only 2
+        for opts in ((0, 1), (1, 2), (0, 2)):
+            with self.subTest(opts=opts):
+                with self.temporary_directory():
+                    # code with no dostring and no assertion:
+                    # same bytecode for all optimization levels
+                    script = self.make_script(self.create_code())
+                    self.compile_dir(optimize=opts)
+                    pyc1 = get_pyc(script, opts[0])
+                    pyc2 = get_pyc(script, opts[1])
+                    self.assertTrue(is_hardlink(pyc1, pyc2))
+
+    def test_duplicated_levels(self):
+        # compile_dir() must not fail if optimize contains duplicated
+        # optimization levels and/or if optimization levels are not sorted.
+        with self.temporary_directory():
+            # code with no dostring and no assertion:
+            # same bytecode for all optimization levels
+            script = self.make_script(self.create_code())
+            self.compile_dir(optimize=[1, 0, 1, 0])
+            pyc1 = get_pyc(script, 0)
+            pyc2 = get_pyc(script, 1)
+            self.assertTrue(is_hardlink(pyc1, pyc2))
+
+    def test_recompilation(self):
+        # Test compile_dir() when pyc files already exists and the script
+        # content changed
+        with self.temporary_directory():
+            script = self.make_script("a = 0")
+            self.compile_dir()
+            # All three levels have the same inode
+            self.check_hardlinks(script)
+
+            pycs = get_pycs(script)
+            inode = os.stat(pycs[0]).st_ino
+
+            # Change of the module content
+            script = self.make_script("print(0)")
+
+            # Recompilation without -o 1
+            self.compile_dir(optimize=[0, 2], force=True)
+
+            # opt-1.pyc should have the same inode as before and others should not
+            self.assertEqual(inode, os.stat(pycs[1]).st_ino)
+            self.assertTrue(is_hardlink(pycs[0], pycs[2]))
+            self.assertNotEqual(inode, os.stat(pycs[2]).st_ino)
+            # opt-1.pyc and opt-2.pyc have different content
+            self.assertFalse(filecmp.cmp(pycs[1], pycs[2], shallow=True))
+
+    def test_import(self):
+        # Test that import updates a single pyc file when pyc files already
+        # exists and the script content changed
+        with self.temporary_directory():
+            script = self.make_script(self.create_code(), name="module")
+            self.compile_dir()
+            # All three levels have the same inode
+            self.check_hardlinks(script)
+
+            pycs = get_pycs(script)
+            inode = os.stat(pycs[0]).st_ino
+
+            # Change of the module content
+            script = self.make_script("print(0)", name="module")
+
+            # Import the module in Python with -O (optimization level 1)
+            script_helper.assert_python_ok(
+                "-O", "-c", "import module", __isolated=False, PYTHONPATH=self.path
+            )
+
+            # Only opt-1.pyc is changed
+            self.assertEqual(inode, os.stat(pycs[0]).st_ino)
+            self.assertEqual(inode, os.stat(pycs[2]).st_ino)
+            self.assertFalse(is_hardlink(pycs[1], pycs[2]))
+            # opt-1.pyc and opt-2.pyc have different content
+            self.assertFalse(filecmp.cmp(pycs[1], pycs[2], shallow=True))
+
+
+class HardlinkDedupTestsWithSourceEpoch(HardlinkDedupTestsBase,
+                                        unittest.TestCase,
+                                        metaclass=SourceDateEpochTestMeta,
+                                        source_date_epoch=True):
+    pass
+
+
+class HardlinkDedupTestsNoSourceEpoch(HardlinkDedupTestsBase,
+                                      unittest.TestCase,
+                                      metaclass=SourceDateEpochTestMeta,
+                                      source_date_epoch=False):
+    pass
+
+
 if __name__ == "__main__":
     unittest.main()