[analyzer] SATest: Add posibility to download source from git and zip
Differential Revision: https://reviews.llvm.org/D81564
diff --git a/clang/utils/analyzer/SATestBuild.py b/clang/utils/analyzer/SATestBuild.py
index 8b0b803..41cb5db 100755
--- a/clang/utils/analyzer/SATestBuild.py
+++ b/clang/utils/analyzer/SATestBuild.py
@@ -44,7 +44,7 @@
"""
import CmpRuns
import SATestUtils
-from ProjectMap import ProjectInfo, ProjectMap
+from ProjectMap import DownloadType, ProjectInfo, ProjectMap
import argparse
import glob
@@ -57,6 +57,7 @@
import sys
import threading
import time
+import zipfile
from queue import Queue
# mypy has problems finding InvalidFileException in the module
@@ -198,63 +199,6 @@
verbose=VERBOSE)
-def download_and_patch(directory: str, build_log_file: IO):
- """
- Download the project and apply the local patchfile if it exists.
- """
- cached_source = os.path.join(directory, CACHED_SOURCE_DIR_NAME)
-
- # If the we don't already have the cached source, run the project's
- # download script to download it.
- if not os.path.exists(cached_source):
- download(directory, build_log_file)
- if not os.path.exists(cached_source):
- stderr(f"Error: '{cached_source}' not found after download.\n")
- exit(1)
-
- patched_source = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
-
- # Remove potentially stale patched source.
- if os.path.exists(patched_source):
- shutil.rmtree(patched_source)
-
- # Copy the cached source and apply any patches to the copy.
- shutil.copytree(cached_source, patched_source, symlinks=True)
- apply_patch(directory, build_log_file)
-
-
-def download(directory: str, build_log_file: IO):
- """
- Run the script to download the project, if it exists.
- """
- script_path = os.path.join(directory, DOWNLOAD_SCRIPT)
- SATestUtils.run_script(script_path, build_log_file, directory,
- out=LOCAL.stdout, err=LOCAL.stderr,
- verbose=VERBOSE)
-
-
-def apply_patch(directory: str, build_log_file: IO):
- patchfile_path = os.path.join(directory, PATCHFILE_NAME)
- patched_source = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
-
- if not os.path.exists(patchfile_path):
- stdout(" No local patches.\n")
- return
-
- stdout(" Applying patch.\n")
- try:
- check_call(f"patch -p1 < '{patchfile_path}'",
- cwd=patched_source,
- stderr=build_log_file,
- stdout=build_log_file,
- shell=True)
-
- except CalledProcessError:
- stderr(f"Error: Patch failed. "
- f"See {build_log_file.name} for details.\n")
- sys.exit(1)
-
-
class TestInfo(NamedTuple):
"""
Information about a project and settings for its analysis.
@@ -430,7 +374,7 @@
# Build and analyze the project.
with open(build_log_path, "w+") as build_log_file:
if self.project.mode == 1:
- download_and_patch(directory, build_log_file)
+ self._download_and_patch(directory, build_log_file)
run_cleanup_script(directory, build_log_file)
self.scan_build(directory, output_dir, build_log_file)
else:
@@ -587,6 +531,100 @@
return out
+ def _download_and_patch(self, directory: str, build_log_file: IO):
+ """
+ Download the project and apply the local patchfile if it exists.
+ """
+ cached_source = os.path.join(directory, CACHED_SOURCE_DIR_NAME)
+
+ # If the we don't already have the cached source, run the project's
+ # download script to download it.
+ if not os.path.exists(cached_source):
+ self._download(directory, build_log_file)
+ if not os.path.exists(cached_source):
+ stderr(f"Error: '{cached_source}' not found after download.\n")
+ exit(1)
+
+ patched_source = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
+
+ # Remove potentially stale patched source.
+ if os.path.exists(patched_source):
+ shutil.rmtree(patched_source)
+
+ # Copy the cached source and apply any patches to the copy.
+ shutil.copytree(cached_source, patched_source, symlinks=True)
+ self._apply_patch(directory, build_log_file)
+
+ def _download(self, directory: str, build_log_file: IO):
+ """
+ Run the script to download the project, if it exists.
+ """
+ if self.project.source == DownloadType.GIT:
+ self._download_from_git(directory, build_log_file)
+ elif self.project.source == DownloadType.ZIP:
+ self._unpack_zip(directory, build_log_file)
+ elif self.project.source == DownloadType.SCRIPT:
+ self._run_download_script(directory, build_log_file)
+ else:
+ raise ValueError(
+ f"Unknown source type '{self.project.source}' is found "
+ f"for the '{self.project.name}' project")
+
+ def _download_from_git(self, directory: str, build_log_file: IO):
+ cached_source = os.path.join(directory, CACHED_SOURCE_DIR_NAME)
+ check_call(f"git clone {self.project.origin} {cached_source}",
+ cwd=directory, stderr=build_log_file,
+ stdout=build_log_file, shell=True)
+ check_call(f"git checkout --quiet {self.project.commit}",
+ cwd=cached_source, stderr=build_log_file,
+ stdout=build_log_file, shell=True)
+
+ def _unpack_zip(self, directory: str, build_log_file: IO):
+ zip_files = list(glob.glob(os.path.join(directory, "/*.zip")))
+
+ if len(zip_files) == 0:
+ raise ValueError(
+ f"Couldn't find any zip files to unpack for the "
+ f"'{self.project.name}' project")
+
+ if len(zip_files) > 1:
+ raise ValueError(
+ f"Couldn't decide which of the zip files ({zip_files}) "
+ f"for the '{self.project.name}' project to unpack")
+
+ with zipfile.ZipFile(zip_files[0], "r") as zip_file:
+ zip_file.extractall(os.path.join(directory,
+ CACHED_SOURCE_DIR_NAME))
+
+ @staticmethod
+ def _run_download_script(directory: str, build_log_file: IO):
+ script_path = os.path.join(directory, DOWNLOAD_SCRIPT)
+ SATestUtils.run_script(script_path, build_log_file, directory,
+ out=LOCAL.stdout, err=LOCAL.stderr,
+ verbose=VERBOSE)
+
+ @staticmethod
+ def _apply_patch(directory: str, build_log_file: IO):
+ patchfile_path = os.path.join(directory, PATCHFILE_NAME)
+ patched_source = os.path.join(directory, PATCHED_SOURCE_DIR_NAME)
+
+ if not os.path.exists(patchfile_path):
+ stdout(" No local patches.\n")
+ return
+
+ stdout(" Applying patch.\n")
+ try:
+ check_call(f"patch -p1 < '{patchfile_path}'",
+ cwd=patched_source,
+ stderr=build_log_file,
+ stdout=build_log_file,
+ shell=True)
+
+ except CalledProcessError:
+ stderr(f"Error: Patch failed. "
+ f"See {build_log_file.name} for details.\n")
+ sys.exit(1)
+
class TestProjectThread(threading.Thread):
def __init__(self, tasks_queue: TestQueue,