| # Copyright 2021 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Common Utils.""" |
| |
| # pylint: disable=g-importing-member |
| from dataclasses import dataclass |
| from pathlib import Path |
| from pathlib import PurePath |
| import sys |
| from typing import List |
| from typing import Set |
| |
| # pylint: disable=g-import-not-at-top |
| try: |
| from git import Blob |
| from git import Commit |
| from git import Tree |
| except ModuleNotFoundError: |
| print( |
| 'ERROR: Please install GitPython by `pip3 install GitPython`.', |
| file=sys.stderr) |
| exit(1) |
| |
| THIS_DIR = Path(__file__).resolve().parent |
| LIBCORE_DIR = THIS_DIR.parent.parent.resolve() |
| |
| UPSTREAM_CLASS_PATHS = [ |
| 'jdk/src/share/classes/', |
| 'src/java.base/share/classes/', |
| 'src/java.base/linux/classes/', |
| 'src/java.base/unix/classes/', |
| 'src/java.sql/share/classes/', |
| 'src/java.logging/share/classes/', |
| 'src/java.prefs/share/classes/', |
| 'src/java.net/share/classes/', |
| ] |
| |
| UPSTREAM_TEST_PATHS = [ |
| 'jdk/test/', |
| 'test/jdk/', |
| ] |
| |
| UPSTREAM_SEARCH_PATHS = UPSTREAM_CLASS_PATHS + UPSTREAM_TEST_PATHS |
| |
| OJLUNI_JAVA_BASE_PATH = 'ojluni/src/main/java/' |
| OJLUNI_TEST_PATH = 'ojluni/src/' |
| |
| |
| @dataclass |
| class ExpectedUpstreamEntry: |
| """A map entry in the EXPECTED_UPSTREAM file.""" |
| dst_path: str # destination path |
| git_ref: str # a git reference to an upstream commit |
| src_path: str # source path in the commit pointed by the git_ref |
| comment_lines: str = '' # The comment lines above the entry line |
| |
| |
| class ExpectedUpstreamFile: |
| """A file object representing the EXPECTED_UPSTREAM file.""" |
| |
| def __init__(self, file_path: str = LIBCORE_DIR / 'EXPECTED_UPSTREAM'): |
| self.path = Path(file_path) |
| |
| def read_all_entries(self) -> List[ExpectedUpstreamEntry]: |
| """Read all entries from the file.""" |
| result: List[ExpectedUpstreamEntry] = [] |
| with self.path.open() as file: |
| comment_lines = '' # Store the comment lines in the next entry |
| for line in file: |
| stripped = line.strip() |
| # Ignore empty lines and comments starting with '#' |
| if not stripped or stripped.startswith('#'): |
| comment_lines += line |
| continue |
| |
| entry = self.parse_line(stripped, comment_lines) |
| result.append(entry) |
| comment_lines = '' |
| |
| return result |
| |
| def write_all_entries(self, entries: List[ExpectedUpstreamEntry]) -> None: |
| """Write all entries into the file.""" |
| with self.path.open('w') as file: |
| for e in entries: |
| file.write(e.comment_lines) |
| file.write(','.join([e.dst_path, e.git_ref, e.src_path])) |
| file.write('\n') |
| |
| def write_new_entry(self, entry: ExpectedUpstreamEntry, |
| entries: List[ExpectedUpstreamEntry] = None) -> None: |
| if entries is None: |
| entries = self.read_all_entries() |
| |
| entries.append(entry) |
| self.sort_and_write_all_entries(entries) |
| |
| def sort_and_write_all_entries(self, |
| entries: List[ExpectedUpstreamEntry]) -> None: |
| header = entries[0].comment_lines |
| entries[0].comment_lines = '' |
| entries.sort(key=lambda e: e.dst_path) |
| # Keep the header above the first entry |
| entries[0].comment_lines = header + entries[0].comment_lines |
| self.write_all_entries(entries) |
| |
| @staticmethod |
| def parse_line(line: str, comment_lines: str) -> ExpectedUpstreamEntry: |
| items = line.split(',') |
| size = len(items) |
| if size != 3: |
| raise ValueError( |
| f"The size must be 3, but is {size}. The line is '{line}'") |
| |
| return ExpectedUpstreamEntry(items[0], items[1], items[2], comment_lines) |
| |
| |
| class OjluniFinder: |
| """Finder for java classes or ojluni/ paths.""" |
| |
| def __init__(self, existing_paths: List[str]): |
| self.existing_paths = existing_paths |
| |
| @staticmethod |
| def translate_from_class_name_to_ojluni_path(class_or_path: str) -> str: |
| """Returns a ojluni path from a class name.""" |
| # if it contains '/', then it's a path |
| if '/' in class_or_path: |
| return class_or_path |
| |
| base_path = OJLUNI_TEST_PATH if class_or_path.startswith( |
| 'test.') else OJLUNI_JAVA_BASE_PATH |
| |
| relative_path = class_or_path.replace('.', '/') |
| return f'{base_path}{relative_path}.java' |
| |
| def match_path_prefix(self, input_path: str) -> Set[str]: |
| """Returns a set of existing file paths matching the given partial path.""" |
| path_matches = list( |
| filter(lambda path: path.startswith(input_path), self.existing_paths)) |
| result_set: Set[str] = set() |
| # if it's found, just return the result |
| if input_path in path_matches: |
| result_set.add(input_path) |
| else: |
| input_ojluni_path = PurePath(input_path) |
| # the input ends with '/', the autocompletion result contain the children |
| # instead of the matching the prefix in its parent directory |
| input_path_parent_or_self = input_ojluni_path |
| if not input_path.endswith('/'): |
| input_path_parent_or_self = input_path_parent_or_self.parent |
| n_parts = len(input_path_parent_or_self.parts) |
| for match in path_matches: |
| path = PurePath(match) |
| # path.parts[n_parts] should not exceed the index and should be |
| # a valid child path because input_path_parent_or_self must be a |
| # valid directory |
| child = list(path.parts)[n_parts] |
| result = (input_path_parent_or_self / child).as_posix() |
| # if result is not exact, the result represents a directory. |
| if result != match: |
| result += '/' |
| result_set.add(result) |
| |
| return result_set |
| |
| def match_classname_prefix(self, input_class_name: str) -> List[str]: |
| """Returns a list of package / class names given the partial class name.""" |
| # If '/' exists, it's probably a path, not a partial class name |
| if '/' in input_class_name: |
| return [] |
| |
| result_list = [] |
| partial_relative_path = input_class_name.replace('.', '/') |
| for base_path in [OJLUNI_JAVA_BASE_PATH, OJLUNI_TEST_PATH]: |
| partial_ojluni_path = base_path + partial_relative_path |
| result_paths = self.match_path_prefix(partial_ojluni_path) |
| # pylint: disable=cell-var-from-loop |
| result_list.extend( |
| map(lambda path: convert_path_to_java_class_name(path, base_path), |
| list(result_paths))) |
| |
| return result_list |
| |
| |
| class OpenjdkFinder: |
| """Finder for java classes or paths in a upstream OpenJDK commit.""" |
| |
| def __init__(self, commit: Commit): |
| self.commit = commit |
| |
| @staticmethod |
| def translate_src_path_to_ojluni_path(src_path: str) -> str: |
| """Returns None if src_path isn't in a known source directory.""" |
| relative_path = None |
| for base_path in UPSTREAM_TEST_PATHS: |
| if src_path.startswith(base_path): |
| length = len(base_path) |
| relative_path = src_path[length:] |
| break |
| |
| if relative_path: |
| return f'{OJLUNI_TEST_PATH}test/{relative_path}' |
| |
| for base_path in UPSTREAM_CLASS_PATHS: |
| if src_path.startswith(base_path): |
| length = len(base_path) |
| relative_path = src_path[length:] |
| break |
| |
| if relative_path: |
| return f'{OJLUNI_JAVA_BASE_PATH}{relative_path}' |
| |
| return None |
| |
| def find_src_path_from_classname(self, class_or_path: str) -> str: |
| """Finds a valid source path given a valid class name or path.""" |
| # if it contains '/', then it's a path |
| if '/' in class_or_path: |
| if self.has_file(class_or_path): |
| return class_or_path |
| else: |
| return None |
| |
| relative_path = class_or_path.replace('.', '/') |
| src_path = None |
| for base_path in UPSTREAM_SEARCH_PATHS: |
| full_path = f'{base_path}{relative_path}.java' |
| if self.has_file(full_path): |
| src_path = full_path |
| break |
| |
| return src_path |
| |
| def get_search_paths(self) -> List[str]: |
| return UPSTREAM_SEARCH_PATHS |
| |
| def find_src_path_from_ojluni_path(self, ojluni_path: str) -> str: |
| """Returns a source path that guessed from the ojluni_path.""" |
| base_paths = None |
| relative_path = None |
| |
| TEST_PATH = OJLUNI_TEST_PATH + 'test/' |
| if ojluni_path.startswith(OJLUNI_JAVA_BASE_PATH): |
| base_paths = UPSTREAM_CLASS_PATHS |
| length = len(OJLUNI_JAVA_BASE_PATH) |
| relative_path = ojluni_path[length:] |
| elif ojluni_path.startswith(TEST_PATH): |
| base_paths = UPSTREAM_TEST_PATHS |
| length = len(TEST_PATH) |
| relative_path = ojluni_path[length:] |
| else: |
| return None |
| |
| for base_path in base_paths: |
| full_path = base_path + relative_path |
| if self.has_file(full_path): |
| return full_path |
| |
| return None |
| |
| def match_path_prefix(self, input_path: str) -> List[str]: |
| """Returns a list of source paths matching the given partial string.""" |
| result_list = [] |
| |
| search_tree = self.commit.tree |
| path_obj = PurePath(input_path) |
| is_exact = self.has_file(path_obj.as_posix()) |
| is_directory_path = input_path.endswith('/') |
| exact_obj = search_tree[path_obj.as_posix()] if is_exact else None |
| search_word = '' |
| if is_exact and isinstance(exact_obj, Blob): |
| # an exact file path |
| result_list.append(input_path) |
| return result_list |
| elif is_directory_path: |
| # an exact directory path and can't be a prefix directory name. |
| if is_exact: |
| search_tree = exact_obj |
| else: |
| # Such path doesn't exist, and thus returns empty list |
| return result_list |
| elif len(path_obj.parts) >= 2 and not is_directory_path: |
| parent_path = path_obj.parent.as_posix() |
| if self.has_file(parent_path): |
| search_tree = search_tree[parent_path] |
| search_word = path_obj.name |
| else: |
| # Return empty list because no such path is found |
| return result_list |
| else: |
| search_word = input_path |
| |
| for tree in search_tree.trees: |
| tree_path = PurePath(tree.path) |
| if tree_path.name.startswith(search_word): |
| # Append '/' to indicate directory type. If the result has this item |
| # only, shell should auto-fill the input, and thus |
| # next tabbing in shell should fall into the above condition |
| # `is_exact and input_path.endswith('/')` and will search in the child |
| # tree. |
| result_path = tree.path + '/' |
| result_list.append(result_path) |
| |
| for blob in search_tree.blobs: |
| blob_path = PurePath(blob.path) |
| if blob_path.name.startswith(search_word): |
| result_list.append(blob.path) |
| |
| return result_list |
| |
| def match_classname_prefix(self, input_class_name: str) -> List[str]: |
| """Return a list of package / class names from given commit and input.""" |
| # If '/' exists, it's probably a path, not a class name. |
| if '/' in input_class_name: |
| return [] |
| |
| result_list = [] |
| for base_path in UPSTREAM_SEARCH_PATHS: |
| base_len = len(base_path) |
| path = base_path + input_class_name.replace('.', '/') |
| path_results = self.match_path_prefix(path) |
| for p in path_results: |
| relative_path = p[base_len:] |
| if relative_path.endswith('.java'): |
| relative_path = relative_path[0:-5] |
| result_list.append(relative_path.replace('/', '.')) |
| |
| return result_list |
| |
| def has_file(self, path: str) -> bool: |
| """Returns True if the directory / file exists in the tree.""" |
| return has_file_in_tree(path, self.commit.tree) |
| |
| |
| def convert_path_to_java_class_name(path: str, base_path: str) -> str: |
| base_len = len(base_path) |
| result = path[base_len:] |
| if result.endswith('.java'): |
| result = result[0:-5] |
| result = result.replace('/', '.') |
| return result |
| |
| |
| def has_file_in_tree(path: str, tree: Tree) -> bool: |
| """Returns True if the directory / file exists in the tree.""" |
| try: |
| # pylint: disable=pointless-statement |
| tree[path] |
| return True |
| except KeyError: |
| return False |