Fixing up interactions with subprocess.Popen to be py3 compatible.
Going forward with Python 3 all interactions with subprocess are with
byte-like objects unless an encoding was specified when calling Popen()
and return values are of type bytes().
Going forward I will standardize on using bytes() instead of bytearray()
except for situations where the mutable property of bytesarray() is
actually used.
This change is still compatible with Python 2.
Bug: 151336743
Test: atest --host libavb_host_unittest
Test: atest --host aftltool_test
Test: ./aftltool_integration_test.py
Change-Id: I8d6c5459ce56ade8fee1e258a5309a261188ff21
diff --git a/avbtool b/avbtool
index 0e89f33..0900f53 100755
--- a/avbtool
+++ b/avbtool
@@ -345,7 +345,7 @@
num_bits: The key size.
"""
- MODULUS_PREFIX = 'modulus='
+ MODULUS_PREFIX = b'modulus='
def __init__(self, key_path):
"""Loads and parses an RSA key from either a private or public key file.
@@ -476,18 +476,18 @@
algorithm_name: The algorithm name as per the ALGORITHMS dict.
signature_num_bytes: Number of bytes used to store the signature.
key_path: Path to the private key file. Must be PEM format.
- raw_data_to_sign: Data to sign (bytearray or str expected).
+ raw_data_to_sign: Data to sign as bytes or bytearray.
Returns:
- A bytearray containing the signature.
+ The signature as bytes.
Raises:
- Exception: If an error occurs.
+ AvbError: If an error occurred during signing.
"""
p = None
if signing_helper_with_files is not None:
signing_file = tempfile.NamedTemporaryFile()
- signing_file.write(str(raw_data_to_sign))
+ signing_file.write(raw_data_to_sign)
signing_file.flush()
p = subprocess.Popen([
signing_helper_with_files, algorithm_name, key_path, signing_file.name])
@@ -495,7 +495,7 @@
if retcode != 0:
raise AvbError('Error signing')
signing_file.seek(0)
- signature = bytearray(signing_file.read())
+ signature = signing_file.read()
else:
if signing_helper is not None:
p = subprocess.Popen(
@@ -509,11 +509,11 @@
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
- (pout, perr) = p.communicate(str(raw_data_to_sign))
+ (pout, perr) = p.communicate(raw_data_to_sign)
retcode = p.wait()
if retcode != 0:
raise AvbError('Error signing: {}'.format(perr))
- signature = bytearray(pout)
+ signature = pout
if len(signature) != signature_num_bytes:
raise AvbError('Error signing: Invalid length of signature')
return signature
@@ -524,7 +524,8 @@
Arguments:
vbmeta_header: A AvbVBMetaHeader.
- vbmeta_blob: The whole vbmeta blob, including the header.
+ vbmeta_blob: The whole vbmeta blob, including the header as bytes or
+ bytearray.
Returns:
True if the signature is valid and corresponds to the embedded
@@ -597,34 +598,35 @@
'parameter=NULL\n'
'\n'
'[rsapubkey]\n'
- 'n=INTEGER:%s\n'
- 'e=INTEGER:%s\n' % (hex(modulus).rstrip('L'),
- hex(exponent).rstrip('L')))
- asn1_tmpfile = tempfile.NamedTemporaryFile()
- asn1_tmpfile.write(asn1_str)
- asn1_tmpfile.flush()
- der_tmpfile = tempfile.NamedTemporaryFile()
- p = subprocess.Popen(
- ['openssl', 'asn1parse', '-genconf', asn1_tmpfile.name, '-out',
- der_tmpfile.name, '-noout'])
- retcode = p.wait()
- if retcode != 0:
- raise AvbError('Error generating DER file')
+ 'n=INTEGER:{}\n'
+ 'e=INTEGER:{}\n').format(hex(modulus).rstrip('L'),
+ hex(exponent).rstrip('L'))
- p = subprocess.Popen(
- ['openssl', 'rsautl', '-verify', '-pubin', '-inkey', der_tmpfile.name,
- '-keyform', 'DER', '-raw'],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- (pout, perr) = p.communicate(str(sig_blob))
- retcode = p.wait()
- if retcode != 0:
- raise AvbError('Error verifying data: {}'.format(perr))
- recovered_data = bytearray(pout)
- if recovered_data != padding_and_digest:
- sys.stderr.write('Signature not correct\n')
- return False
+ with tempfile.NamedTemporaryFile() as asn1_tmpfile:
+ asn1_tmpfile.write(asn1_str.encode('ascii'))
+ asn1_tmpfile.flush()
+
+ with tempfile.NamedTemporaryFile() as der_tmpfile:
+ p = subprocess.Popen(
+ ['openssl', 'asn1parse', '-genconf', asn1_tmpfile.name, '-out',
+ der_tmpfile.name, '-noout'])
+ retcode = p.wait()
+ if retcode != 0:
+ raise AvbError('Error generating DER file')
+
+ p = subprocess.Popen(
+ ['openssl', 'rsautl', '-verify', '-pubin', '-inkey', der_tmpfile.name,
+ '-keyform', 'DER', '-raw'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ (pout, perr) = p.communicate(sig_blob)
+ retcode = p.wait()
+ if retcode != 0:
+ raise AvbError('Error verifying data: {}'.format(perr))
+ if pout != padding_and_digest:
+ sys.stderr.write('Signature not correct\n')
+ return False
return True
@@ -3851,7 +3853,6 @@
Raises:
ValueError: If output from the 'fec' tool is invalid.
-
"""
p = subprocess.Popen(
['fec', '--print-fec-size', str(image_size), '--roots', str(num_roots)],