Combine outbound CLs to avoid merge conflicts.
Combines 1182952, 1183006, 1189876, 1189886 to avoid merge conflicts.
Also adds Merkle tree validation and signature checking. Signature check
code is WIP.
Test: Successfully communicates with log. Everything but signature
validation is functional.
Change-Id: I7cbc81fc9003e41583261a234e1e9d99b1f18fbe
diff --git a/avbtool b/avbtool
index 6b7bb05..5b20c88 100755
--- a/avbtool
+++ b/avbtool
@@ -393,6 +393,32 @@
self.exponent = 65537
+# TODO(danielaustin): Should this be moved into the RSAPublicKey class?
+def rsa_key_read_pem_bytes(key_path):
+ """Reads the bytes out of the passed in PEM file.
+
+ Arguments:
+ key_path: A string containing the path to the PEM file.
+
+ Returns:
+ A bytearray containing the bytes in the PEM file.
+
+ Raises:
+ AvbError: If openssl cannot decode the PEM file.
+ """
+ # Use openssl to decode the PEM file.
+ args = ['openssl', 'rsa', '-in', key_path, '-pubout', '-outform', 'DER']
+ p = subprocess.Popen(args,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ (pout, perr) = p.communicate()
+ retcode = p.wait()
+ if retcode != 0:
+ raise AvbError('Error decoding: {}'.format(perr))
+ return bytearray(pout)
+
+
def encode_rsa_key(key_path):
"""Encodes a public RSA key in |AvbRSAPublicKeyHeader| format.
@@ -1964,7 +1990,7 @@
data: If not None, must be a bytearray of size == 18.
Raises:
- AvbError if invalid structure for AvbIcpHeader.
+ AvbError: If invalid structure for AvbIcpHeader.
"""
assert struct.calcsize(self.FORMAT_STRING) == self.SIZE
@@ -1999,7 +2025,7 @@
A bytearray() with the encoded header.
Raises:
- AvbError if invalid structure for AvbIcpHeader.
+ AvbError: If invalid structure for AvbIcpHeader.
"""
if not self.is_valid():
raise AvbError('Invalid structure for AvbIcpHeader')
@@ -2042,6 +2068,165 @@
return False
return True
+def check_signature(log_root, log_root_sig,
+ transparency_log_pub_key):
+ """Validates the signature provided by the transparency log.
+
+ Arguments:
+ log_root_hash: The root hash for the transparency log's Merkle tree.
+ log_root_sig: The signature of the root hash for the transparency log.
+ transparency_log_pub_key: The trusted public key of the transparency log.
+ algorithm: The algorithm ID given by the AvbIcpHeader to determine hash
+ and signature size.
+ Returns:
+ True if the signature check passes, otherwise False.
+ """
+
+ # TODO(danielaustin): Figure out why validation is not working.
+ logsig_tmp = tempfile.NamedTemporaryFile()
+ logsig_tmp.write(log_root_sig)
+ logsig_tmp.flush()
+ logroot_tmp = tempfile.NamedTemporaryFile()
+ logroot_tmp.write(log_root)
+ logroot_tmp.flush()
+
+ p = subprocess.Popen(['openssl', 'dgst', '-sha256', '-verify',
+ transparency_log_pub_key,
+ '-signature', logsig_tmp.name, logroot_tmp.name],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ (_, openssl_err) = p.communicate()
+ retcode = p.wait()
+ if not retcode:
+ return True
+ sys.stderr.write('openssl status {}'.format(openssl_err))
+ return False
+
+class AvbIcpSignedRootBlob(object):
+ """A class for the components required to validate the incusion proof.
+
+ This class contains the signed tree root components required to verify
+ an inclusion proof given a list of hashes.
+
+ Attributes:
+ leaf_hash: The hash of the leaf corresponding with this log entry.
+ tree_size: The size of the Merkle tree.
+ root_hash: The calculated root hash of the Merkle tree.
+ log_root_sig: The signed root hash. Used to verify the ICP.
+ """
+ #TODO(danielaustin): Match hash and signature size to algorithm value.
+ SIZE = 645
+ FORMAT_STRING = ('!32s' # The leaf hash corresponding to this vbmeta.
+ 'Q' # The Merkle tree size
+ '61s' # The log_root structure that is signed
+ '32s' # The Merkle tree root hash.
+ '512s') # The log_root signed with the transparency log key.
+
+ def __init__(self, data=None):
+ """Initializes a new signed_root_blob structure.
+
+ Arguments:
+ data: If not None, must be a bytearray of size |SIZE|.
+
+ Raises:
+ AvbError: If data does not represent a well-formed AvbIcpSignedRootBlob.
+ """
+ assert struct.calcsize(self.FORMAT_STRING) == self.SIZE
+
+ if data:
+ (self.leaf_hash, self.tree_size, self.log_root,
+ self.root_hash, self.log_root_sig) = struct.unpack(
+ self.FORMAT_STRING, data)
+ else:
+ self.leaf_hash = bytearray()
+ self.tree_size = 0
+ self.log_root = bytearray()
+ self.root_hash = bytearray()
+ self.log_root_sig = ''
+
+ if not self.is_valid():
+ raise AvbError('Invalid structure for AvbIcpSignedBlob')
+
+ def translate_afi_response(self, afi_response):
+ """Translates an AddFirmwareImageResponse message to AvbIcpSignedRootBlob.
+
+ Arguments:
+ afi_response: An AddFirmwareImageResponse proto message.
+
+ Raises:
+ AvbError: If unsupported hash size is detected.
+ """
+ # Do the hash calculation
+ self.leaf_hash = rfc6962_hash_leaf(afi_response.vbmeta_leaf)
+ self.log_root = afi_response.vbmeta_proof.sth.log_root
+ self.log_root_sig = str(afi_response.vbmeta_proof.sth.log_root_signature)
+ # Partial format string to extract the tree_size and root_hash from
+ # the log_root. THis structure is defined:
+ # https://github.com/google/trillian/blob/master/trillian.proto#L255
+
+ # TODO(danielaustin): Make this into a class.
+ partial_log_format_string = ('!H' # Version
+ 'Q' # tree_size
+ 'B' # hash_size, verify this is 32 for now
+ '32s') # The root_hash
+
+ (log_root_version, self.tree_size, root_hash_size,
+ self.root_hash) = struct.unpack(partial_log_format_string,
+ self.log_root[0:43])
+ if log_root_version != 1:
+ raise AvbError('Unsupported log root version: {}'.format(
+ log_root_version))
+ if not len(self.root_hash) == root_hash_size:
+ raise AvbError('Unsupported hash size.')
+
+ def encode(self):
+ """Serializes the AvbSignedRootBlob structure (584) to a bytearray.
+
+ Returns:
+ A bytearray with the AvbSignedRootBlob.
+
+ Raises:
+ AvbError: If data does not represent a well-formed AvbIcpSignedRootBlob.
+ """
+ if not self.is_valid():
+ raise AvbError('Invalid structure for AvbIcpSignedRootBlob')
+
+ return struct.pack(self.FORMAT_STRING,
+ str(self.leaf_hash),
+ self.tree_size,
+ str(self.log_root),
+ str(self.root_hash),
+ str(self.log_root_sig))
+ def is_valid(self):
+ """Ensures that values in the AvbIcpSignedRootBlob are sane.
+
+ Returns:
+ True if the values in the AvbIcpSignedRootBlob are sane, False otherwise.
+ """
+ #TODO(danielaustin): match these up with algorithm instead of defaults.
+ # All structures being of size 0 is valid
+ if not self.leaf_hash and self.tree_size == 0 and \
+ not self.root_hash and not self.log_root_sig:
+ return True
+ if not len(self.leaf_hash) == 32:
+ sys.stderr.write("AvbIcpSignedRootBlob: Bad leaf_hash size {}".format(
+ len(self.leaf_hash)))
+ return False
+ if self.tree_size < 0:
+ sys.stderr.write("AvbIcpSignedRootBlob: Bad tree_size value {}".format(
+ self.tree_size))
+ return False
+ if not len(self.root_hash) == 32:
+ sys.stderr.write("AvbIcpSignedRootBlob: Bad root_hash size {}".format(
+ len(self.root_hash)))
+ return False
+ if not len(self.log_root_sig) == 512:
+ sys.stderr.write("AvbIcpSignedRootBlob: Bad log_root_sig size {}".format(
+ len(self.log_root_sig)))
+ return False
+ return True
class AvbIcpEntry(object):
"""A class for the transparency log inclusion proof entries.
@@ -2098,10 +2283,22 @@
raise AvbError('ICP entry size is not valid {}/{}.'
.format(len(data), self.get_expected_size()))
# Deserialize ICP entry components from the data blob.
- data_format_string = "{}s{}s{}s".format(
- self.log_url_size, self.signed_root_blob_size, self.proof_size)
- (self.log_url, self.signed_root_blob, self.proofs) = struct.unpack(
- data_format_string, data[self.SIZE:self.get_expected_size()])
+ expected_format_string = '{}s{}s{}s'.format(
+ self.log_url_size,
+ AvbIcpSignedRootBlob.SIZE,
+ self.proof_size)
+
+ (self.log_url, signed_root_blob_bytes, proof_bytes) = struct.unpack(
+ expected_format_string, data[self.SIZE:self.get_expected_size()])
+ self.signed_root_blob = AvbIcpSignedRootBlob(signed_root_blob_bytes)
+ self.proofs = []
+ if self.proof_hash_count > 0:
+ proof_idx = 0
+ hash_size = self.proof_size // self.proof_hash_count
+ for _ in range(self.proof_hash_count):
+ proof = proof_bytes[proof_idx:(proof_idx+hash_size)]
+ self.proofs.append(proof)
+ proof_idx += hash_size
else:
self.log_url_size = 0
self.leaf_index = 0
@@ -2109,9 +2306,9 @@
self.proof_hash_count = 0
self.proof_size = 0
self.next_entry = 0
- self.log_url = ""
- self.signed_root_blob = bytearray()
- self.proofs = bytearray()
+ self.log_url = ''
+ self.signed_root_blob = AvbIcpSignedRootBlob()
+ self.proofs = []
if not self.is_valid():
raise AvbError('Invalid structure for AvbIcpEntry')
@@ -2128,22 +2325,42 @@
"""Sets signed_root_blob and signed_root_blob_size.
Arguments:
- signed_root_blob: bytearray containing the SignedLogRoot for the
- transparency log.
+ signed_root_blob: An AvbIcpSignedRootBlob containing the SignedLogRoot
+ for the transparency log.
"""
self.signed_root_blob = signed_root_blob
- self.signed_root_blob_size = len(signed_root_blob)
+ self.signed_root_blob_size = signed_root_blob.SIZE
- def set_proofs(self, proof_hash_count, proofs):
+ def set_proofs(self, proofs):
"""Sets the proof_hash_count, proofs, and proof_size.
Arguments:
- proof_hash_count: Number of hashes comprising the inclusion proof.
proofs: A bytearray of concatenated hashes comprising the inclusion proof.
"""
- self.proof_hash_count = proof_hash_count
+ self.proof_hash_count = 0
self.proofs = proofs
- self.proof_size = len(proofs)
+ proof_size = 0
+ for proof in proofs:
+ proof_size += len(proof)
+ self.proof_hash_count += 1
+ self.proof_size = proof_size
+
+ def verify_icp(self, transparency_log_pub_key):
+ """Verifies the contained inclusion proof given the public log key.
+
+ Arguments:
+ transparency_log_pub_key: The trusted public key for the log.
+
+ Returns:
+ True if the calculated signature matches AvbIcpEntry's. False otherwise.
+ """
+ calc_root = root_from_icp(self.leaf_index, self.signed_root_blob.tree_size,
+ self.proofs, self.signed_root_blob.leaf_hash)
+ if (calc_root == self.signed_root_blob.root_hash) and check_signature(
+ self.signed_root_blob.log_root, self.signed_root_blob.log_root_sig,
+ transparency_log_pub_key):
+ return True
+ return False
def save(self, output):
"""Serializes the transparency header (22) and data to disk.
@@ -2152,7 +2369,7 @@
output: The object to write the header to.
Raises:
- AvbError if invalid entry structure.
+ AvbError: If invalid entry structure.
"""
output.write(self.encode())
@@ -2163,19 +2380,42 @@
A bytearray() with the encoded header.
Raises:
- AvbError if invalid entry structure.
+ AvbError: If invalid entry structure.
"""
+ proof_bytes = bytearray()
if not self.is_valid():
raise AvbError('Invalid AvbIcpEntry structure')
- expected_format_string = "{}{}s{}s{}s".format(self.FORMAT_STRING,
- self.log_url_size,
- self.signed_root_blob_size,
- self.proof_size)
+ expected_format_string = '{}{}s{}s{}s'.format(
+ self.FORMAT_STRING, self.log_url_size,
+ self.signed_root_blob.SIZE,
+ self.proof_size)
+
+ for proof in self.proofs:
+ proof_bytes.extend(proof)
+
return struct.pack(expected_format_string,
self.log_url_size, self.leaf_index,
self.signed_root_blob_size, self.proof_hash_count,
self.proof_size, self.next_entry, self.log_url,
- str(self.signed_root_blob), str(self.proofs))
+ self.signed_root_blob.encode(),
+ str(proof_bytes))
+
+ # TODO(danielaustin): Add unit test.
+ def translate_response(self, transparency_log, afi_response):
+ """Takes an AddFirmwareInfoResponse object and translates to an AvbIcpEntry.
+
+ Arguments:
+ transparency_log: String representing the transparency log URL.
+ afi_response: The AddFirmwareResponse object to translate.
+ """
+ self.set_log_url(transparency_log)
+ self.leaf_index = afi_response.vbmeta_proof.proof.leaf_index
+ self.signed_root_blob = AvbIcpSignedRootBlob()
+ self.signed_root_blob.translate_afi_response(afi_response)
+ self.signed_root_blob_size = self.signed_root_blob.SIZE
+ # Calculate the number of hashes.
+ proof_hashes = afi_response.vbmeta_proof.proof.hashes
+ self.set_proofs(proof_hashes)
def get_expected_size(self):
"""Gets the expected size of the full entry out of the header.
@@ -2203,11 +2443,16 @@
'{}\n'.format(self.leaf_index))
return False
- if ((self.signed_root_blob and self.signed_root_blob_size
- != len(self.signed_root_blob))
- or (not self.signed_root_blob and self.signed_root_blob_size != 0)):
+ if not self.signed_root_blob or not self.signed_root_blob.is_valid():
+ sys.stderr.write('ICP entry: invalid AvbIcpSignedRootBlob\n')
+ return False
+
+ if (self.signed_root_blob_size != 0) and (
+ not self.signed_root_blob_size == self.signed_root_blob.SIZE):
sys.stderr.write('ICP entry: invalid signed root blob size: '
- '{}\n'.format(self.signed_root_blob_size))
+ '{}, should be {}\n'.format(
+ self.signed_root_blob_size,
+ self.signed_root_blob.SIZE))
return False
if self.proof_hash_count < 0:
@@ -2215,12 +2460,19 @@
self.proof_hash_count))
return False
- if ((self.proofs and self.proof_size != len(self.proofs))
- or (not self.proofs and self.proof_size != 0)):
- sys.stderr.write('ICP entry: invalid transparency log proof size: ')
- sys.stderr.write('{}\n'.format(self.proof_size))
+ proof_size = 0
+ if self.proofs:
+ for proof in self.proofs:
+ proof_size += len(proof)
+ if self.proof_size != proof_size:
+ sys.stderr.write('ICP entry: invalid transparency log proof size: ')
+ sys.stderr.write('{}, calculated {}\n'.format(self.proof_size,
+ proof_size))
+ return False
+ elif self.proof_size != 0:
+ sys.stderr.write('ICP entry: invalid transparency log proof size '
+ '(should be 0): {}'.format(self.proof_size))
return False
-
if self.next_entry != 0 and self.next_entry != 1:
sys.stderr.write('ICP entry: invalid next entry value: {}\n'.format(
self.next_entry))
@@ -2311,18 +2563,18 @@
output: The object to write the blob to.
Raises:
- AvbError if invalid blob structure.
+ AvbError: If invalid blob structure.
"""
output.write(self.encode())
def encode(self):
- """Serialize the AvbIcpBlob to a bytearray()
+ """Serialize the AvbIcpBlob to a bytearray().
Returns:
A bytearray() with the encoded header.
Raises:
- AvbError if invalid blob structure.
+ AvbError: If invalid blob structure.
"""
# The header and entries are guaranteed to be valid when encode is called.
# Check the entire structure as a whole.
@@ -2353,6 +2605,111 @@
return True
+# AFTL Merkle Tree Functionality
+# TODO(danielaustin): Encapsulate this behavior in a class.
+def rfc6962_hash_leaf(leaf):
+ """RFC6962 hashing function for hashing leaves of a Merkle tree.
+
+ Arguments:
+ leaf: A bytearray containing the Merkle tree leaf to be hashed.
+
+ Returns:
+ A bytearray containing the RFC6962 SHA256 hash of the leaf.
+ """
+ hasher = hashlib.sha256()
+ # RFC6962 states a '0' byte should be prepended to the data.
+ # This is done in conjunction with the '1' byte for non-leaf
+ # nodes for 2nd preimage attack resistance.
+ hasher.update(b'\x00')
+ hasher.update(leaf)
+ return hasher.digest()
+
+
+def rfc6962_hash_children(l, r):
+ """Calculates the inner Merkle tree node hash of child nodes l and r.
+
+ Arguments:
+ l: A bytearray containing the left child node to be hashed.
+ r: A bytearray containing the right child node to be hashed.
+
+ Returns:
+ A bytearray containing the RFC6962 SHA256 hash of 1|l|r.
+ """
+ hasher = hashlib.sha256()
+ # RFC6962 states a '1' byte should be prepended to the concatenated data.
+ # This is done in conjunction with the '0' byte for leaf
+ # nodes for 2nd preimage attack resistance.
+ hasher.update(b'\x01')
+ hasher.update(l)
+ hasher.update(r)
+ return hasher.digest()
+
+
+def chain_border_right(seed, proof):
+ """Computes a subtree hash along the left-side tree border.
+
+ Arguments:
+ seed: A bytearray containing the starting hash.
+ proof: A list of bytearrays representing the hashes in the inclusion proof.
+
+ Returns:
+ A bytearray containing the left-side subtree hash.
+ """
+ for h in proof:
+ seed = rfc6962_hash_children(h, seed)
+ return seed
+
+
+def chain_inner(seed, proof, leaf_index):
+ """Computes a subtree hash on or below the tree's right border.
+
+ Arguments:
+ seed: A bytearray containing the starting hash.
+ proof: A list of bytearrays representing the hashes in the inclusion proof.
+ leaf_index: The current leaf index.
+
+ Returns:
+ A bytearray containing the subtree hash.
+ """
+ for i, h in enumerate(proof):
+ if leaf_index >> i & 1 == 0:
+ seed = rfc6962_hash_children(seed, h)
+ else:
+ seed = rfc6962_hash_children(h, seed)
+ return seed
+
+
+def root_from_icp(leaf_index, tree_size, proof, leaf_hash):
+ """Calculates the expected Merkle tree root hash.
+
+ Arguments:
+ leaf_index: The current leaf index.
+ tree_size: The number of nodes in the Merkle tree.
+ proof: A list of bytearrays containing the inclusion proof.
+ leaf_hash: A bytearray containing the initial leaf hash.
+
+ Returns:
+ A bytearray containing the calculated Merkle tree root hash.
+
+ Raises:
+ AvbError: If invalid parameters are passed in.
+ """
+ if leaf_index < 0:
+ raise AvbError('Invalid leaf_index value: {}'.format(leaf_index))
+ if tree_size < 0:
+ raise AvbError('Invalid tree_size value: {}'.format(tree_size))
+ if leaf_index >= tree_size:
+ err_str = 'leaf_index cannot be equal or larger than tree_size: {}, {}'
+ raise AvbError(err_str.format(leaf_index, tree_size))
+
+ # Calculate the point to split the proof into two parts.
+ # The split is where the paths to leaves diverge.
+ inner = (leaf_index ^ (tree_size - 1)).bit_length()
+ result = chain_inner(leaf_hash, proof[:inner], leaf_index)
+ result = chain_border_right(result, proof[inner:])
+ return result
+
+
class AvbVBMetaHeader(object):
"""A class for parsing and writing AVB vbmeta images.
@@ -3138,6 +3495,73 @@
return self._get_cmdline_descriptors_for_hashtree_descriptor(ht)
+ # TODO(danielaustin): Add unit tests.
+ def request_inclusion_proof(self, transparency_log, vbmeta_blob,
+ version_inc, manufacturer_key_path):
+ """Packages and sends a request to the specified transparency log.
+
+ Arguments:
+ transparency_log: String containing the URL of a transparency log server.
+ vbmeta_blob: A bytearray with the vbmeta blob.
+ version_inc: Subcomponent of the build fingerprint.
+ manufacturer_key_path: Path to key used to sign messages sent to the
+ transparency log servers.
+
+ Returns:
+ An AvbIcpEntry with the inclusion proof for the log entry.
+
+ Raises:
+ AvbError: If grpc or the proto modules cannot be loaded, if there is an
+ error communicating with the log or if the manufacturer_key_path
+ cannot be decoded.
+ """
+ # Import grpc and proto.api_pb2_grpc now to avoid global dependencies.
+ try:
+ import grpc
+ import proto.api_pb2_grpc
+ except ImportError as e:
+ err_str = 'grpc can be installed with python pip install grpcio.\n'
+ raise AvbError('Failed to import module: ({}).\n{}'.format(e, err_str))
+
+ # Set up the gRPC channel with the transparency log.
+ sys.stdout.write('Preparing to request inclusion proof from {}. This could '
+ 'take ~30 seconds for the process to complete.\n'.format(
+ transparency_log))
+ channel = grpc.insecure_channel(transparency_log)
+ stub = proto.api_pb2_grpc.AFTLogStub(channel)
+
+ # Calculate the hash of the vbmeta image.
+ hasher = hashlib.sha256()
+ hasher.update(vbmeta_blob)
+ vbmeta_hash = hasher.digest()
+ # Extract the key data from the PEM file.
+ manufacturer_key_data = rsa_key_read_pem_bytes(manufacturer_key_path)
+ # Calculate the hash of the manufacturer key data.
+ hasher = hashlib.sha256()
+ hasher.update(manufacturer_key_data)
+ m_key_hash = hasher.digest()
+ # Create an AddFirmwareInfoRequest protobuf for transmission to the
+ # transparency log.
+ fw_info = proto.aftl_pb2.FirmwareInfo(vbmeta_hash=vbmeta_hash,
+ version_incremental=version_inc,
+ manufacturer_key_hash=m_key_hash)
+ # TODO(danielaustin): Sign the message with the manufacturer key.
+ sfw_info = proto.aftl_pb2.SignedFirmwareInfo(info=fw_info)
+ request = proto.api_pb2.AddFirmwareInfoRequest(vbmeta=bytes(
+ str(vbmeta_blob)), fw_info=sfw_info)
+ # Attempt to transmit to the transparency log.
+ try:
+ # TODO(danielaustin): Set a reasonable timeout deadline here.
+ sys.stdout.write('ICP is about to be requested from transparency log '
+ 'with domain {}.\n'.format(transparency_log))
+ response = stub.AddFirmwareInfo(request)
+ except grpc.RpcError as e:
+ raise AvbError('Error: grpc failure ({})'.format(e))
+ # Return an AvbIcpEntry representing this response.
+ icp_entry = AvbIcpEntry()
+ icp_entry.translate_response(transparency_log, response)
+ return icp_entry
+
def make_vbmeta_image(self, output, chain_partitions, algorithm_name,
key_path, public_key_metadata_path, rollback_index,
flags, props, props_from_file, kernel_cmdlines,
@@ -3213,6 +3637,148 @@
padding_needed = padded_size - len(vbmeta_blob)
output.write('\0' * padding_needed)
+
+ def make_icp_from_vbmeta(self, vbmeta_image_path, output, algorithm,
+ signing_helper, signing_helper_with_files,
+ version_incremental, transparency_log_servers,
+ transparency_log_pub_keys, manufacturer_key,
+ padding_size):
+ """Generates a vbmeta image with inclusion proof given a vbmeta image.
+
+ This blob (struct AvbIcpBlob) contains the information required to
+ validate an inclusion proof for a specific vbmeta image. It consists
+ of a header (struct AvbIcpHeader) and zero or more entry structures
+ (struct AvbIcpEntry) that contain the vbmeta leaf hash, tree size,
+ root hash, inclusion proof hashes, and the signature for the root hash.
+
+ The vbmeta image, its hash, the version_incremental part of the build
+ fingerprint, and the hash of the manufacturer key are sent to the
+ transparency log, with the message signed by the manufacturer key.
+ An inclusion proof is calculated and returned. This inclusion proof is
+ then packaged in a AvbIcpBlob structure. The existing vbmeta data is
+ copied to a new file, appended with the AvbIcpBlob data, and written to
+ output. Validation of the inclusion proof does not require
+ communication with the transparency log.
+
+ Arguments:
+ vbmeta_image_path: Path to a vbmeta image file.
+ output: File to write the results to.
+ algorithm: The algorithm ID for signing and hashing (see ALGORITHMS). This
+ will be used for hash and signature size calculation and padding.
+ signing_helper: Program which signs a hash and returns a signature.
+ signing_helper_with_files: Same as signing_helper but uses files instead.
+ version_incremental: A string representing the subcomponent of the
+ build fingerprint used to identify the vbmeta in the transparency log.
+ transparency_log_servers: List of strings containing URLs of transparency
+ log servers where inclusion proofs are requested from.
+ transparency_log_pub_keys: List of paths to PEM files containing trusted
+ public keys that correspond with the transparency_logs. There must be
+ the same number of keys as log servers and they must be in the same
+ order, that is, transparency_log_pub_keys[n] corresponds to
+ transparency_log_servers[n].
+ manufacturer_key: Path to PEM file containting the key file used to sign
+ messages sent to the transparency log servers.
+ padding_size: If not 0, pads output so size is a multiple of the number.
+
+ Returns:
+ True if the inclusion proofs could be fetched from the transparency log
+ servers and could be successfully validated, False otherwise.
+
+ Raises:
+ AvbError: If any parameters are invalid, communication with the log
+ fails or the structures are malformed.
+ """
+ # TODO(danielaustin): Determine the best way to handle chained vbmeta
+ # structures. Currently, we only put the main one in the transparency
+ # log.
+
+ # Validates command line parameters.
+ if not vbmeta_image_path:
+ raise AvbError('No vbmeta image path found.')
+ if not transparency_log_servers:
+ raise AvbError('No transparency log servers given.')
+ if not transparency_log_pub_keys:
+ raise AvbError('No transparency log public keys given.')
+ if len(transparency_log_servers) != len(transparency_log_pub_keys):
+ raise AvbError('Transparency log count and public key count mismatch: '
+ '{} servers and {} public keys'.format(
+ len(transparency_log_servers),
+ len(transparency_log_pub_keys)))
+ if not manufacturer_key:
+ raise AvbError('No manufacturer key path given.')
+
+ # TODO(danielaustin): add support for signing_helper and
+ # signing_helper_with_files
+ if signing_helper is not None or signing_helper_with_files is not None:
+ raise AvbError('signing_helper support not yet implemented for ICP.')
+
+ try:
+ algorithm_id = ALGORITHMS[algorithm].algorithm_type
+ except KeyError:
+ raise AvbError('Unknown algorithm with name {}'.format(algorithm))
+
+ # Retrieves vbmeta structure from given partition image.
+ image = ImageHandler(vbmeta_image_path)
+ (footer, header, _, _) = self._parse_image(image)
+ offset = 0
+ if footer:
+ offset = footer.vbmeta_offset
+ image.seek(offset)
+ vbmeta_blob = image.read(header.SIZE +
+ header.authentication_data_block_size +
+ header.auxiliary_data_block_size)
+
+ # Fetches inclusion proofs for vbmeta structure from all transparency logs.
+ icp_entries = []
+ for i, transparency_log in enumerate(transparency_log_servers):
+ try:
+ icp_entry = self.request_inclusion_proof(transparency_log, vbmeta_blob,
+ version_incremental,
+ manufacturer_key)
+ if not icp_entry.verify_icp(transparency_log_pub_keys[i]):
+ sys.stderr.write('The ICP from {} could not be verified\n'.format(
+ transparency_log))
+ icp_entries.append(icp_entry)
+ except AvbError as e:
+ sys.stderr.write('AvbError: {}'.format(e))
+ # The inclusion proof request failed.
+ # Continue and see if another will succeed.
+ continue
+ if not icp_entries:
+ sys.stderr.write('No inclusion proofs could be validated from any log.\n')
+ return False
+
+ # Prepares the inclusion proof blob to be appended to the vbmeta image.
+ icp_blob = AvbIcpBlob()
+ icp_blob.set_algorithm(algorithm_id)
+ for icp_entry in icp_entries:
+ icp_blob.add_icp_entry(icp_entry)
+ if not icp_blob.is_valid():
+ sys.stderr.write('Resulting AvbIcpBlob structure is malformed\n.')
+ return False
+
+ # Write the original vbmeta blob, followed by the AvbIcpBlob.
+ if footer: # Checks if it is a chained partition.
+ # TODO(danielaustin): Add support for chained partitions like system.img
+ # using similar functionality as implemented in append_vbmeta_image().
+ sys.stderr.write('Image has a footer and ICP for this format is not '
+ 'implemented.')
+ return False
+
+ # Writes vbmeta image with inclusion proof into a new vbmeta image.
+ output.seek(0)
+ output.write(vbmeta_blob)
+ encoded_icp_blob = icp_blob.encode()
+ output.write(encoded_icp_blob)
+
+ if padding_size > 0:
+ blob_size = len(vbmeta_blob) + len(encoded_icp_blob)
+ padded_size = round_to_multiple(blob_size, padding_size)
+ padding_needed = padded_size - blob_size
+ output.write('\0' * padding_needed)
+
+ return True
+
def _generate_vbmeta_blob(self, algorithm_name, key_path,
public_key_metadata_path, descriptors,
chain_partitions,
@@ -4510,6 +5076,43 @@
self._add_common_args(sub_parser)
sub_parser.set_defaults(func=self.make_vbmeta_image)
+ sub_parser = subparsers.add_parser('make_icp_from_vbmeta',
+ help='Makes an ICP enhanced vbmeta image'
+ ' from an existing vbmeta image.')
+ sub_parser.add_argument('--output',
+ help='Output file name.',
+ type=argparse.FileType('wb'),
+ default=sys.stdout)
+ sub_parser.add_argument('--vbmeta_image_path',
+ help='Path to a generate vbmeta image file.')
+ sub_parser.add_argument('--version_incremental', help='Current build ID.')
+ sub_parser.add_argument('--manufacturer_key',
+ help='Path to the PEM file containing the '
+ 'manufacturer key for use with the log.')
+ sub_parser.add_argument('--transparency_log_servers',
+ help='List of transparency log servers in '
+ 'host:port format. This must not be None and must '
+ 'be the same size as transparency_log_pub_keys. '
+ 'Also, transparency_log_servers[n] must correspond '
+ 'to transparency_log_pub_keys[n] for all values n.',
+ nargs='*')
+ sub_parser.add_argument('--transparency_log_pub_keys',
+ help='Paths to PEM files containing transparency '
+ 'log server key(s). This must not be None and must '
+ 'be the same size as transparency_log_servers. '
+ 'Also, transparency_log_pub_keys[n] must '
+ 'correspond to transparency_log_servers[n] for all '
+ 'values n.', nargs='*')
+ sub_parser.add_argument('--padding_size',
+ metavar='NUMBER',
+ help='If non-zero, pads output with NUL bytes so '
+ 'its size is a multiple of NUMBER '
+ '(default: 0)',
+ type=parse_number,
+ default=0)
+ self._add_common_args(sub_parser)
+ sub_parser.set_defaults(func=self.make_icp_from_vbmeta)
+
sub_parser = subparsers.add_parser('add_hash_footer',
help='Add hashes and footer to image.')
sub_parser.add_argument('--image',
@@ -4903,6 +5506,19 @@
args.print_required_libavb_version,
args.padding_size)
+ def make_icp_from_vbmeta(self, args):
+ """Implements the 'make_icp_from_vbmeta' sub-command."""
+ args = self._fixup_common_args(args)
+ self.avb.make_icp_from_vbmeta(args.vbmeta_image_path,
+ args.output, args.algorithm,
+ args.signing_helper,
+ args.signing_helper_with_files,
+ args.version_incremental,
+ args.transparency_log_servers,
+ args.transparency_log_pub_keys,
+ args.manufacturer_key,
+ args.padding_size)
+
def append_vbmeta_image(self, args):
"""Implements the 'append_vbmeta_image' sub-command."""
self.avb.append_vbmeta_image(args.image.name, args.vbmeta_image.name,