blob: 81412dad475f490d7027e516099af7e76a3f4e7f [file] [log] [blame]
#!/usr/bin/python
# Copyright (C) 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse, math, re, sys
import xml.etree.ElementTree as ET
from collections import defaultdict, namedtuple
import itertools
def createLookup(values, key):
"""Creates a lookup table for a collection of values based on keys.
Arguments:
values: a collection of arbitrary values. Must be iterable.
key: a function of one argument that returns the key for a value.
Returns:
A dict mapping keys (as generated by the key argument) to lists of
values. All values in the lists have the same key, and are in the order
they appeared in the collection.
"""
lookup = defaultdict(list)
for v in values:
lookup[key(v)].append(v)
return lookup
def _intify(value):
"""Returns a value converted to int if possible, else the original value."""
try:
return int(value)
except ValueError:
return value
class Size(namedtuple('Size', ['width', 'height'])):
"""A namedtuple with width and height fields."""
def __str__(self):
return '%dx%d' % (self.width, self.height)
class _VideoResultBase(object):
"""Helper methods for results. Not for use by applications.
Attributes:
codec: The name of the codec (string) or None
size: Size representing the video size or None
mime: The mime-type of the codec (string) or None
rates: The measured achievable frame rates
is_decoder: True iff codec is a decoder.
"""
def __init__(self, is_decoder):
self.codec = None
self.mime = None
self.size = None
self._rates_from_failure = []
self._rates_from_message = []
self.is_decoder = is_decoder
def _inited(self):
"""Returns true iff codec, mime and size was set."""
return None not in (self.codec, self.mime, self.size)
def __len__(self):
# don't report any result if codec name, mime type and size is unclear
if not self._inited():
return 0
return len(self.rates)
@property
def rates(self):
return self._rates_from_failure or self._rates_from_message
def _parseDict(self, value):
"""Parses a MediaFormat from its string representation sans brackets."""
return dict((k, _intify(v))
for k, v in re.findall(r'([^ =]+)=([^ [=]+(?:|\[[^\]]+\]))(?:, |$)', value))
def _cleanFormat(self, format):
"""Removes internal fields from a parsed MediaFormat."""
format.pop('what', None)
format.pop('image-data', None)
MESSAGE_PATTERN = r'(?P<key>\w+)=(?P<value>\{[^}]*\}|[^ ,{}]+)'
def _parsePartialResult(self, message_match):
"""Parses a partial test result conforming to the message pattern.
Returns:
A tuple of string key and int, string or dict value, where dict has
string keys mapping to int or string values.
"""
key, value = message_match.group('key', 'value')
if value.startswith('{'):
value = self._parseDict(value[1:-1])
if key.endswith('Format'):
self._cleanFormat(value)
else:
value = _intify(value)
return key, value
def _parseValuesFromBracket(self, line):
"""Returns the values enclosed in brackets without the brackets.
Parses a line matching the pattern "<tag>: [<values>]" and returns <values>.
Raises:
ValueError: if the line does not match the pattern.
"""
try:
return re.match(r'^[^:]+: *\[(?P<values>.*)\]\.$', line).group('values')
except AttributeError:
raise ValueError('line does not match "tag: [value]": %s' % line)
def _parseRawData(self, line):
"""Parses the raw data line for video performance tests.
Yields:
Dict objects corresponding to parsed results, mapping string keys to
int, string or dict values.
"""
try:
values = self._parseValuesFromBracket(line)
result = {}
for m in re.finditer(self.MESSAGE_PATTERN + r'(?P<sep>,? +|$)', values):
key, value = self._parsePartialResult(m)
result[key] = value
if m.group('sep') != ' ':
yield result
result = {}
except ValueError:
print >> sys.stderr, 'could not parse line %s' % repr(line)
def _tryParseMeasuredFrameRate(self, line):
"""Parses a line starting with 'Measured frame rate:'."""
if line.startswith('Measured frame rate: '):
try:
values = self._parseValuesFromBracket(line)
values = re.split(r' *, *', values)
self._rates_from_failure = list(map(float, values))
except ValueError:
print >> sys.stderr, 'could not parse line %s' % repr(line)
def parse(self, test):
"""Parses the ValueArray and FailedScene lines of a test result.
Arguments:
test: An ElementTree <Test> element.
"""
failure = test.find('FailedScene')
if failure is not None:
trace = failure.find('StackTrace')
if trace is not None:
for line in re.split(r'[\r\n]+', trace.text):
self._parseFailureLine(line)
details = test.find('Details')
if details is not None:
for array in details.iter('ValueArray'):
message = array.get('message')
self._parseMessage(message, array)
def _parseFailureLine(self, line):
raise NotImplementedError
def _parseMessage(self, message, array):
raise NotImplementedError
def getData(self):
"""Gets the parsed test result data.
Yields:
Result objects containing at least codec, size, mime and rates attributes."""
yield self
class VideoEncoderDecoderTestResult(_VideoResultBase):
"""Represents a result from a VideoEncoderDecoderTest performance case."""
def __init__(self, unused_m):
super(VideoEncoderDecoderTestResult, self).__init__(is_decoder=False)
# If a VideoEncoderDecoderTest succeeds, it provides the results in the
# message of a ValueArray. If fails, it provides the results in the failure
# using raw data. (For now it also includes some data in the ValueArrays even
# if it fails, which we ignore.)
def _parseFailureLine(self, line):
"""Handles parsing a line from the failure log."""
self._tryParseMeasuredFrameRate(line)
def _parseMessage(self, message, array):
"""Handles parsing a message from ValueArrays."""
if message.startswith('codec='):
result = dict(self._parsePartialResult(m)
for m in re.finditer(self.MESSAGE_PATTERN + '(?: |$)', message))
if 'EncInputFormat' in result:
self.codec = result['codec']
fmt = result['EncInputFormat']
self.size = Size(fmt['width'], fmt['height'])
self.mime = result['EncOutputFormat']['mime']
self._rates_from_message.append(1000000./result['min'])
class VideoDecoderPerfTestResult(_VideoResultBase):
"""Represents a result from a VideoDecoderPerfTest performance case."""
# If a VideoDecoderPerfTest succeeds, it provides the results in the message
# of a ValueArray. If fails, it provides the results in the failure only
# using raw data.
def __init__(self, unused_m):
super(VideoDecoderPerfTestResult, self).__init__(is_decoder=True)
def _parseFailureLine(self, line):
"""Handles parsing a line from the failure log."""
self._tryParseMeasuredFrameRate(line)
# if the test failed, we can only get the codec/size/mime from the raw data.
if line.startswith('Raw data: '):
for result in self._parseRawData(line):
fmt = result['DecOutputFormat']
self.size = Size(fmt['width'], fmt['height'])
self.codec = result['codec']
self.mime = result['mime']
def _parseMessage(self, message, array):
"""Handles parsing a message from ValueArrays."""
if message.startswith('codec='):
result = dict(self._parsePartialResult(m)
for m in re.finditer(self.MESSAGE_PATTERN + '(?: |$)', message))
if result.get('decodeto') == 'surface':
self.codec = result['codec']
fmt = result['DecOutputFormat']
self.size = Size(fmt['width'], fmt['height'])
self.mime = result['mime']
self._rates_from_message.append(1000000. / result['min'])
class Results(object):
"""Container that keeps all test results."""
def __init__(self):
self._results = [] # namedtuples
self._device = None
VIDEO_ENCODER_DECODER_TEST_REGEX = re.compile(
'test(.*)(\d{4})x(\d{4})(Goog|Other)$')
VIDEO_DECODER_PERF_TEST_REGEX = re.compile(
'test(VP[89]|H26[34]|MPEG4|HEVC)(\d+)x(\d+)(.*)$')
TestCaseSpec = namedtuple('TestCaseSpec', 'package path class_ regex result_class')
def _getTestCases(self):
return [
self.TestCaseSpec(package='CtsDeviceVideoPerf',
path='TestSuite/TestSuite/TestSuite/TestSuite/TestCase',
class_='VideoEncoderDecoderTest',
regex=self.VIDEO_ENCODER_DECODER_TEST_REGEX,
result_class=VideoEncoderDecoderTestResult),
self.TestCaseSpec(package='CtsMediaTestCases',
path='TestSuite/TestSuite/TestSuite/TestCase',
class_='VideoDecoderPerfTest',
regex=self.VIDEO_DECODER_PERF_TEST_REGEX,
result_class=VideoDecoderPerfTestResult)
]
def _verifyDeviceInfo(self, device):
assert self._device in (None, device), "expected %s device" % self._device
self._device = device
def importXml(self, xml):
self._verifyDeviceInfo(xml.find('DeviceInfo/BuildInfo').get('buildName'))
packages = createLookup(self._getTestCases(), lambda tc: tc.package)
for pkg in xml.iter('TestPackage'):
tests_in_package = packages.get(pkg.get('name'))
if not tests_in_package:
continue
paths = createLookup(tests_in_package, lambda tc: tc.path)
for path, tests_in_path in paths.items():
classes = createLookup(tests_in_path, lambda tc: tc.class_)
for tc in pkg.iterfind(path):
tests_in_class = classes.get(tc.get('name'))
if not tests_in_class:
continue
for test in tc.iter('Test'):
for tc in tests_in_class:
m = tc.regex.match(test.get('name'))
if m:
result = tc.result_class(m)
result.parse(test)
self._results.append(result)
def importFile(self, path):
print >> sys.stderr, 'Importing "%s"...' % path
try:
return self.importXml(ET.parse(path))
except ET.ParseError:
raise ValueError('not a valid XML file')
def getData(self):
for result in self._results:
for data in result.getData():
yield data
def dumpXml(self, results):
yield '<?xml version="1.0" encoding="utf-8" ?>'
yield '<!-- Copyright 2015 The Android Open Source Project'
yield ''
yield ' Licensed under the Apache License, Version 2.0 (the "License");'
yield ' you may not use this file except in compliance with the License.'
yield ' You may obtain a copy of the License at'
yield ''
yield ' http://www.apache.org/licenses/LICENSE-2.0'
yield ''
yield ' Unless required by applicable law or agreed to in writing, software'
yield ' distributed under the License is distributed on an "AS IS" BASIS,'
yield ' WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.'
yield ' See the License for the specific language governing permissions and'
yield ' limitations under the License.'
yield '-->'
yield ''
yield '<MediaCodecs>'
last_section = None
Comp = namedtuple('Comp', 'is_decoder google mime name')
by_comp = createLookup(results,
lambda e: Comp(is_decoder=e.is_decoder, google='.google.' in e.codec, mime=e.mime, name=e.codec))
for comp in sorted(by_comp):
section = 'Decoders' if comp.is_decoder else 'Encoders'
if section != last_section:
if last_section:
yield ' </%s>' % last_section
yield ' <%s>' % section
last_section = section
yield ' <MediaCodec name="%s" type="%s" update="true">' % (comp.name, comp.mime)
by_size = createLookup(by_comp[comp], lambda e: e.size)
for size in sorted(by_size):
values = list(itertools.chain(*(e.rates for e in by_size[size])))
min_, max_ = min(values), max(values)
med_ = int(math.sqrt(min_ * max_))
yield ' <Limit name="measured-frame-rate-%s" range="%d-%d" />' % (size, med_, med_)
yield ' </MediaCodec>'
if last_section:
yield ' </%s>' % last_section
yield '</MediaCodecs>'
class Main(object):
"""Executor of this utility."""
def __init__(self):
self._result = Results()
self._parser = argparse.ArgumentParser('get_achievable_framerates')
self._parser.add_argument('result_xml', nargs='+')
def _parseArgs(self):
self._args = self._parser.parse_args()
def _importXml(self, xml):
self._result.importFile(xml)
def _report(self):
for line in self._result.dumpXml(r for r in self._result.getData() if r):
print line
def run(self):
self._parseArgs()
try:
for xml in self._args.result_xml:
try:
self._importXml(xml)
except (ValueError, IOError, AssertionError) as e:
print >> sys.stderr, e
raise KeyboardInterrupt
self._report()
except KeyboardInterrupt:
print >> sys.stderr, 'Interrupted.'
if __name__ == '__main__':
Main().run()