blob: 041efafed30c2c571ca8a9cf02681a7125734271 [file] [log] [blame]
import os
import re
import zipfile
from manifest import ProjectNotFoundError
from utility import exec_command as utility_exec_command
class SecurityPatch(object):
"""Abstract representation of a Security Patch from a Security Bulletin."""
def __init__(self, zip_file, filename, android_ids, manifest, args):
self.args = args
self.android_ids = android_ids
self.aosp_project_path = self._get_aosp_project_path(filename)
if self.aosp_project_path in manifest.projects:
self.project_rel_path = manifest.projects[self.aosp_project_path]
self.project_abs_path = os.path.join(
self.args.aosp_root, self.project_rel_path
)
else:
raise ProjectNotFoundError(self.aosp_project_path)
self.filename = filename
self.zip_file = zip_file
self.change_id = self.find_last_change_id()
def __repr__(self):
return self.filename
def _get_aosp_project_path(self, filename):
"""Get project path as expected by the AndroidManifest class.
The returned path matches or is close to the naming conventions for the
`path` attribute in AOSP manifest XMLs. The AndroidManifest class knows
how to map this path to actual file system paths.
"""
dirs_to_skip = (
2 if self.args.use_old_format else self.args.ignore_n_leading
)
return "/".join(filename.split("/")[dirs_to_skip:-1])
def find_last_change_id(self):
"""Find the Change-Id mentioned last in the patch.
Returns:
The Change-Id identifying the patch, or `None` if none was found.
"""
change_id = None
with self.zip_file.open(self.filename) as patchfile:
for raw_line in patchfile:
# TODO: Currently we actually expect a patch file here, but also
# get passed in zipped kernel patch files, which of course can't
# be parsed directly. This causes the decoding from bytes to
# string to fail, which we ignore for now.
line = raw_line.decode("utf-8", errors="ignore")
if line.startswith("Change-Id"):
change_id = line.split()[1]
if line.startswith("---"):
break
return change_id
def find_issues_for_patch(self):
return self.android_ids
def get_description(self):
if self.args.display_all:
return "{} {} {}".format(
str(self.find_issues_for_patch()),
self.aosp_project_path,
os.path.basename(self.filename),
)
elif self.args.use_a_number:
return self.find_issues_for_patch()
else:
return (
self.aosp_project_path + " " + os.path.basename(self.filename)
)
def check_and_apply(self):
if not self.change_id:
print(
"[BE]: Unable to verify patch, could not determine its "
"Change-Id: {}.".format(self.get_description())
)
return
if not self.is_applied():
print("[BE]: Not yet applied: {}".format(self.get_description()))
if self.args.apply_to_branch:
self.apply()
else:
print("[BE]: Already applied: {}".format(self.get_description()))
def is_applied(self):
assert self.change_id
found = (
self.exec_command(
"( cd {path} && git log --oneline --grep={changeid} | "
" grep [a-Z] >/dev/null )".format(
path=self.project_abs_path, changeid=self.change_id
)
)
== 0
)
return found
def rollback(self):
print("[BE]: Rolling back... ".format(self.filename))
self.exec_command(
"( cd {path} && git am --abort && git reset --hard )".format(
path=self.project_abs_path
)
)
self.exec_command(
"( cd {path} && git reset --hard )".format(
path=self.project_abs_path
)
)
self.exec_command(
"( cd {path} && repo sync .)".format(path=self.project_abs_path)
)
def amend(self):
print(
"[BE]: This patch addresses {} ".format(
self.find_issues_for_patch()
)
)
try:
input("Press enter to continue and change the commit message.")
except (KeyboardInterrupt, EOFError):
pass
os.system(
"( cd {path} && git commit --amend )".format(
path=self.project_abs_path,
tmpbranchname=self.args.apply_to_branch,
)
)
def apply(self):
self.zip_file.extract(self.filename, "/tmp/")
print("[BE]: Applying {}.".format(self.filename.split("/")[-1]))
print("[BE]: for Project {}.".format(self.aosp_project_path))
applies = (
self.exec_command(
"( cd {path} && repo start {tmpbranchname} . ;"
" git am -3 {filename} )".format(
path=self.project_abs_path,
tmpbranchname=self.args.apply_to_branch,
filename="/tmp/" + self.filename,
)
)
== 0
)
if not applies:
print("[BE]: FAIL: doesn't apply.".format(self.filename))
self.rollback()
else:
print("[BE]: SUCCESS: applied.".format(self.filename))
if not self.args.skip_amend and not self.args.cleanup_after:
self.amend()
def exec_command(self, command):
return utility_exec_command(command, self.args.quiet)
class PlatformPatch(SecurityPatch):
pass
class KernelPatch(SecurityPatch):
def __init__(
self,
zip_file,
filename,
android_id,
manifest,
args,
subpatch,
kernel,
extra,
):
super().__init__(zip_file, filename, [android_id], manifest, args)
self.subpatch = subpatch
self.kernel = kernel
self.extra = extra
def _get_aosp_project_path(self, filename):
return "kernel/msm"
def get_description(self):
return "(snippet) " + str(super().get_description())
class SecurityBulletin:
"""A collection of patches for the Android platform and kernel.
The Android Security Bulletins regroup security patches needed for an
Android operating system in order to comply with a so-called *Security Patch
Level*.
"""
# Handle special kernel snippet names before the machine-readable metadata
# introduced in August 2017. Before that:
# - ASB of 2017-05 introduces e.g.:
# - ANDROID-32402303
# - ANDROID-36000515_1
# - ASB of 2017-06 introduces e.g.:
# - ANDROID-35472278_dsx
# - ASB of 2017-07 introduces e.g.:
# - ANDROID-34620535_kernel3.10
KERNEL_SNIPPETS_FILENAME_PATTERN = re.compile(
r"code_snippets/ANDROID-(?P<android_id>[0-9]+)"
r"(_(?P<subpatch>[0-9]+)|_(kernel)?(?P<kernel>[0-9]\.[0-9]+)|"
r"(?P<extra>.*))?$"
)
#:
_ISSUE_MAPPING_FILENAME = re.compile(
r"patches/issue-mapping/ANDROID-(?P<android_id>\d+)"
)
def __init__(self, args, manifest):
self._args = args
#: The archive containing the bulletin patches and metadata.
self._archive = zipfile.ZipFile(self._args.zipfile)
#: The mapping of patch files to Android bug ids.
self._issue_mapping = self._get_issue_mapping()
self._manifest = manifest
def _get_issue_mapping(self):
"""Get the mapping of patch files to Android bug ids.
Returns:
The mapping of patch files to a list of Android bug id.
"""
mapping = {}
for filename in self._archive.namelist():
match = self._ISSUE_MAPPING_FILENAME.match(filename)
if not match:
continue
android_id = "A-" + match.group("android_id")
with self._archive.open(filename) as issue_file:
for raw_line in issue_file:
line = raw_line.decode("utf-8").rstrip()
if line not in mapping:
mapping[line] = []
mapping[line].append(android_id)
return mapping
def _get_patches(self, android_version, platform, kernel):
"""Find the patches grouped in this bulletin.
Arguments:
android_version: The version of the Android operating system to
find patches for.
platform (optional): Whether to find platform security patches.
kernel (optional): Whether to find kernel snippets.
Returns:
The list of patches found.
"""
patches = []
for item in self._archive.namelist():
snippet_match = (
self.KERNEL_SNIPPETS_FILENAME_PATTERN.search(item)
if kernel
else False
)
if snippet_match:
try:
patches.append(
KernelPatch(
self._archive,
item,
"A-" + snippet_match.group("android_id"),
self._manifest,
self._args,
snippet_match.group("subpatch"),
snippet_match.group("kernel"),
snippet_match.group("extra"),
)
)
except ProjectNotFoundError as e:
print("[BE]: WARNING: {}".format(e))
elif (
platform
and "android-{}".format(android_version) in item
and ".patch" in item
):
issues = self._issue_mapping.get(item, None)
# TODO: `issues` will always be None, also before the latest
# refactoring.
try:
patches.append(
PlatformPatch(
self._archive, item, issues, self._manifest, self._args
)
)
except ProjectNotFoundError as e:
print("[BE]: WARNING: {}".format(e))
return patches
def apply(self):
"""Apply all not yet applied patches in this bulletin.
This high-level function groups all functionality of this
SecurityBulletin class together: It finds all patches included in the
bulletin archive, checks if they are applied and applies them if
necessary.
"""
patches = self._get_patches(
self._args.version,
platform=not self._args.skip_patches,
kernel=not self._args.skip_snippets,
)
for patch in sorted(patches, key=lambda p: p.filename):
patch.check_and_apply()