| import os |
| import re |
| import zipfile |
| |
| from manifest import ProjectNotFoundError |
| from utility import exec_command as utility_exec_command |
| |
| |
| class SecurityPatch(object): |
| """Abstract representation of a Security Patch from a Security Bulletin.""" |
| |
| def __init__(self, zip_file, filename, android_ids, manifest, args): |
| self.args = args |
| self.android_ids = android_ids |
| self.aosp_project_path = self._get_aosp_project_path(filename) |
| |
| if self.aosp_project_path in manifest.projects: |
| self.project_rel_path = manifest.projects[self.aosp_project_path] |
| self.project_abs_path = os.path.join( |
| self.args.aosp_root, self.project_rel_path |
| ) |
| else: |
| raise ProjectNotFoundError(self.aosp_project_path) |
| |
| self.filename = filename |
| self.zip_file = zip_file |
| self.change_id = self.find_last_change_id() |
| |
| def __repr__(self): |
| return self.filename |
| |
| def _get_aosp_project_path(self, filename): |
| """Get project path as expected by the AndroidManifest class. |
| |
| The returned path matches or is close to the naming conventions for the |
| `path` attribute in AOSP manifest XMLs. The AndroidManifest class knows |
| how to map this path to actual file system paths. |
| |
| """ |
| dirs_to_skip = ( |
| 2 if self.args.use_old_format else self.args.ignore_n_leading |
| ) |
| return "/".join(filename.split("/")[dirs_to_skip:-1]) |
| |
| def find_last_change_id(self): |
| """Find the Change-Id mentioned last in the patch. |
| |
| Returns: |
| The Change-Id identifying the patch, or `None` if none was found. |
| |
| """ |
| change_id = None |
| with self.zip_file.open(self.filename) as patchfile: |
| for raw_line in patchfile: |
| # TODO: Currently we actually expect a patch file here, but also |
| # get passed in zipped kernel patch files, which of course can't |
| # be parsed directly. This causes the decoding from bytes to |
| # string to fail, which we ignore for now. |
| line = raw_line.decode("utf-8", errors="ignore") |
| if line.startswith("Change-Id"): |
| change_id = line.split()[1] |
| if line.startswith("---"): |
| break |
| return change_id |
| |
| def find_issues_for_patch(self): |
| return self.android_ids |
| |
| def get_description(self): |
| if self.args.display_all: |
| return "{} {} {}".format( |
| str(self.find_issues_for_patch()), |
| self.aosp_project_path, |
| os.path.basename(self.filename), |
| ) |
| elif self.args.use_a_number: |
| return self.find_issues_for_patch() |
| else: |
| return ( |
| self.aosp_project_path + " " + os.path.basename(self.filename) |
| ) |
| |
| def check_and_apply(self): |
| if not self.change_id: |
| print( |
| "[BE]: Unable to verify patch, could not determine its " |
| "Change-Id: {}.".format(self.get_description()) |
| ) |
| return |
| |
| if not self.is_applied(): |
| print("[BE]: Not yet applied: {}".format(self.get_description())) |
| if self.args.apply_to_branch: |
| self.apply() |
| else: |
| print("[BE]: Already applied: {}".format(self.get_description())) |
| |
| def is_applied(self): |
| assert self.change_id |
| found = ( |
| self.exec_command( |
| "( cd {path} && git log --oneline --grep={changeid} | " |
| " grep [a-Z] >/dev/null )".format( |
| path=self.project_abs_path, changeid=self.change_id |
| ) |
| ) |
| == 0 |
| ) |
| return found |
| |
| def rollback(self): |
| print("[BE]: Rolling back... ".format(self.filename)) |
| self.exec_command( |
| "( cd {path} && git am --abort && git reset --hard )".format( |
| path=self.project_abs_path |
| ) |
| ) |
| self.exec_command( |
| "( cd {path} && git reset --hard )".format( |
| path=self.project_abs_path |
| ) |
| ) |
| self.exec_command( |
| "( cd {path} && repo sync .)".format(path=self.project_abs_path) |
| ) |
| |
| def amend(self): |
| print( |
| "[BE]: This patch addresses {} ".format( |
| self.find_issues_for_patch() |
| ) |
| ) |
| try: |
| input("Press enter to continue and change the commit message.") |
| except (KeyboardInterrupt, EOFError): |
| pass |
| |
| os.system( |
| "( cd {path} && git commit --amend )".format( |
| path=self.project_abs_path, |
| tmpbranchname=self.args.apply_to_branch, |
| ) |
| ) |
| |
| def apply(self): |
| self.zip_file.extract(self.filename, "/tmp/") |
| print("[BE]: Applying {}.".format(self.filename.split("/")[-1])) |
| print("[BE]: for Project {}.".format(self.aosp_project_path)) |
| applies = ( |
| self.exec_command( |
| "( cd {path} && repo start {tmpbranchname} . ;" |
| " git am -3 {filename} )".format( |
| path=self.project_abs_path, |
| tmpbranchname=self.args.apply_to_branch, |
| filename="/tmp/" + self.filename, |
| ) |
| ) |
| == 0 |
| ) |
| if not applies: |
| print("[BE]: FAIL: doesn't apply.".format(self.filename)) |
| self.rollback() |
| else: |
| print("[BE]: SUCCESS: applied.".format(self.filename)) |
| if not self.args.skip_amend and not self.args.cleanup_after: |
| self.amend() |
| |
| def exec_command(self, command): |
| return utility_exec_command(command, self.args.quiet) |
| |
| |
| class PlatformPatch(SecurityPatch): |
| pass |
| |
| |
| class KernelPatch(SecurityPatch): |
| def __init__( |
| self, |
| zip_file, |
| filename, |
| android_id, |
| manifest, |
| args, |
| subpatch, |
| kernel, |
| extra, |
| ): |
| super().__init__(zip_file, filename, [android_id], manifest, args) |
| |
| self.subpatch = subpatch |
| self.kernel = kernel |
| self.extra = extra |
| |
| def _get_aosp_project_path(self, filename): |
| return "kernel/msm" |
| |
| def get_description(self): |
| return "(snippet) " + str(super().get_description()) |
| |
| |
| class SecurityBulletin: |
| """A collection of patches for the Android platform and kernel. |
| |
| The Android Security Bulletins regroup security patches needed for an |
| Android operating system in order to comply with a so-called *Security Patch |
| Level*. |
| |
| """ |
| |
| # Handle special kernel snippet names before the machine-readable metadata |
| # introduced in August 2017. Before that: |
| # - ASB of 2017-05 introduces e.g.: |
| # - ANDROID-32402303 |
| # - ANDROID-36000515_1 |
| # - ASB of 2017-06 introduces e.g.: |
| # - ANDROID-35472278_dsx |
| # - ASB of 2017-07 introduces e.g.: |
| # - ANDROID-34620535_kernel3.10 |
| KERNEL_SNIPPETS_FILENAME_PATTERN = re.compile( |
| r"code_snippets/ANDROID-(?P<android_id>[0-9]+)" |
| r"(_(?P<subpatch>[0-9]+)|_(kernel)?(?P<kernel>[0-9]\.[0-9]+)|" |
| r"(?P<extra>.*))?$" |
| ) |
| #: |
| _ISSUE_MAPPING_FILENAME = re.compile( |
| r"patches/issue-mapping/ANDROID-(?P<android_id>\d+)" |
| ) |
| |
| def __init__(self, args, manifest): |
| self._args = args |
| #: The archive containing the bulletin patches and metadata. |
| self._archive = zipfile.ZipFile(self._args.zipfile) |
| #: The mapping of patch files to Android bug ids. |
| self._issue_mapping = self._get_issue_mapping() |
| self._manifest = manifest |
| |
| def _get_issue_mapping(self): |
| """Get the mapping of patch files to Android bug ids. |
| |
| Returns: |
| The mapping of patch files to a list of Android bug id. |
| |
| """ |
| mapping = {} |
| |
| for filename in self._archive.namelist(): |
| match = self._ISSUE_MAPPING_FILENAME.match(filename) |
| if not match: |
| continue |
| android_id = "A-" + match.group("android_id") |
| |
| with self._archive.open(filename) as issue_file: |
| for raw_line in issue_file: |
| line = raw_line.decode("utf-8").rstrip() |
| if line not in mapping: |
| mapping[line] = [] |
| mapping[line].append(android_id) |
| |
| return mapping |
| |
| def _get_patches(self, android_version, platform, kernel): |
| """Find the patches grouped in this bulletin. |
| |
| Arguments: |
| android_version: The version of the Android operating system to |
| find patches for. |
| platform (optional): Whether to find platform security patches. |
| kernel (optional): Whether to find kernel snippets. |
| Returns: |
| The list of patches found. |
| |
| """ |
| patches = [] |
| |
| for item in self._archive.namelist(): |
| snippet_match = ( |
| self.KERNEL_SNIPPETS_FILENAME_PATTERN.search(item) |
| if kernel |
| else False |
| ) |
| if snippet_match: |
| try: |
| patches.append( |
| KernelPatch( |
| self._archive, |
| item, |
| "A-" + snippet_match.group("android_id"), |
| self._manifest, |
| self._args, |
| snippet_match.group("subpatch"), |
| snippet_match.group("kernel"), |
| snippet_match.group("extra"), |
| ) |
| ) |
| except ProjectNotFoundError as e: |
| print("[BE]: WARNING: {}".format(e)) |
| elif ( |
| platform |
| and "android-{}".format(android_version) in item |
| and ".patch" in item |
| ): |
| issues = self._issue_mapping.get(item, None) |
| # TODO: `issues` will always be None, also before the latest |
| # refactoring. |
| try: |
| patches.append( |
| PlatformPatch( |
| self._archive, item, issues, self._manifest, self._args |
| ) |
| ) |
| except ProjectNotFoundError as e: |
| print("[BE]: WARNING: {}".format(e)) |
| |
| return patches |
| |
| def apply(self): |
| """Apply all not yet applied patches in this bulletin. |
| |
| This high-level function groups all functionality of this |
| SecurityBulletin class together: It finds all patches included in the |
| bulletin archive, checks if they are applied and applies them if |
| necessary. |
| |
| """ |
| patches = self._get_patches( |
| self._args.version, |
| platform=not self._args.skip_patches, |
| kernel=not self._args.skip_snippets, |
| ) |
| |
| for patch in sorted(patches, key=lambda p: p.filename): |
| patch.check_and_apply() |