Merge "add get_achievable_rates.py under cts-media tools" into marshmallow-cts-dev am: 4e0bf59401
am: af9e742727
* commit 'af9e742727e6a0e3d48d39054a5cc9bba976d7fe':
add get_achievable_rates.py under cts-media tools
diff --git a/tools/cts-media/get_achievable_rates.py b/tools/cts-media/get_achievable_rates.py
new file mode 100755
index 0000000..81412da
--- /dev/null
+++ b/tools/cts-media/get_achievable_rates.py
@@ -0,0 +1,396 @@
+#!/usr/bin/python
+# Copyright (C) 2015 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse, math, re, sys
+import xml.etree.ElementTree as ET
+from collections import defaultdict, namedtuple
+import itertools
+
+
+def createLookup(values, key):
+ """Creates a lookup table for a collection of values based on keys.
+
+ Arguments:
+ values: a collection of arbitrary values. Must be iterable.
+ key: a function of one argument that returns the key for a value.
+
+ Returns:
+ A dict mapping keys (as generated by the key argument) to lists of
+ values. All values in the lists have the same key, and are in the order
+ they appeared in the collection.
+ """
+ lookup = defaultdict(list)
+ for v in values:
+ lookup[key(v)].append(v)
+ return lookup
+
+
+def _intify(value):
+ """Returns a value converted to int if possible, else the original value."""
+ try:
+ return int(value)
+ except ValueError:
+ return value
+
+
+class Size(namedtuple('Size', ['width', 'height'])):
+ """A namedtuple with width and height fields."""
+ def __str__(self):
+ return '%dx%d' % (self.width, self.height)
+
+
+class _VideoResultBase(object):
+ """Helper methods for results. Not for use by applications.
+
+ Attributes:
+ codec: The name of the codec (string) or None
+ size: Size representing the video size or None
+ mime: The mime-type of the codec (string) or None
+ rates: The measured achievable frame rates
+ is_decoder: True iff codec is a decoder.
+ """
+
+ def __init__(self, is_decoder):
+ self.codec = None
+ self.mime = None
+ self.size = None
+ self._rates_from_failure = []
+ self._rates_from_message = []
+ self.is_decoder = is_decoder
+
+ def _inited(self):
+ """Returns true iff codec, mime and size was set."""
+ return None not in (self.codec, self.mime, self.size)
+
+ def __len__(self):
+ # don't report any result if codec name, mime type and size is unclear
+ if not self._inited():
+ return 0
+ return len(self.rates)
+
+ @property
+ def rates(self):
+ return self._rates_from_failure or self._rates_from_message
+
+ def _parseDict(self, value):
+ """Parses a MediaFormat from its string representation sans brackets."""
+ return dict((k, _intify(v))
+ for k, v in re.findall(r'([^ =]+)=([^ [=]+(?:|\[[^\]]+\]))(?:, |$)', value))
+
+ def _cleanFormat(self, format):
+ """Removes internal fields from a parsed MediaFormat."""
+ format.pop('what', None)
+ format.pop('image-data', None)
+
+ MESSAGE_PATTERN = r'(?P<key>\w+)=(?P<value>\{[^}]*\}|[^ ,{}]+)'
+
+ def _parsePartialResult(self, message_match):
+ """Parses a partial test result conforming to the message pattern.
+
+ Returns:
+ A tuple of string key and int, string or dict value, where dict has
+ string keys mapping to int or string values.
+ """
+ key, value = message_match.group('key', 'value')
+ if value.startswith('{'):
+ value = self._parseDict(value[1:-1])
+ if key.endswith('Format'):
+ self._cleanFormat(value)
+ else:
+ value = _intify(value)
+ return key, value
+
+ def _parseValuesFromBracket(self, line):
+ """Returns the values enclosed in brackets without the brackets.
+
+ Parses a line matching the pattern "<tag>: [<values>]" and returns <values>.
+
+ Raises:
+ ValueError: if the line does not match the pattern.
+ """
+ try:
+ return re.match(r'^[^:]+: *\[(?P<values>.*)\]\.$', line).group('values')
+ except AttributeError:
+ raise ValueError('line does not match "tag: [value]": %s' % line)
+
+ def _parseRawData(self, line):
+ """Parses the raw data line for video performance tests.
+
+ Yields:
+ Dict objects corresponding to parsed results, mapping string keys to
+ int, string or dict values.
+ """
+ try:
+ values = self._parseValuesFromBracket(line)
+ result = {}
+ for m in re.finditer(self.MESSAGE_PATTERN + r'(?P<sep>,? +|$)', values):
+ key, value = self._parsePartialResult(m)
+ result[key] = value
+ if m.group('sep') != ' ':
+ yield result
+ result = {}
+ except ValueError:
+ print >> sys.stderr, 'could not parse line %s' % repr(line)
+
+ def _tryParseMeasuredFrameRate(self, line):
+ """Parses a line starting with 'Measured frame rate:'."""
+ if line.startswith('Measured frame rate: '):
+ try:
+ values = self._parseValuesFromBracket(line)
+ values = re.split(r' *, *', values)
+ self._rates_from_failure = list(map(float, values))
+ except ValueError:
+ print >> sys.stderr, 'could not parse line %s' % repr(line)
+
+ def parse(self, test):
+ """Parses the ValueArray and FailedScene lines of a test result.
+
+ Arguments:
+ test: An ElementTree <Test> element.
+ """
+ failure = test.find('FailedScene')
+ if failure is not None:
+ trace = failure.find('StackTrace')
+ if trace is not None:
+ for line in re.split(r'[\r\n]+', trace.text):
+ self._parseFailureLine(line)
+ details = test.find('Details')
+ if details is not None:
+ for array in details.iter('ValueArray'):
+ message = array.get('message')
+ self._parseMessage(message, array)
+
+ def _parseFailureLine(self, line):
+ raise NotImplementedError
+
+ def _parseMessage(self, message, array):
+ raise NotImplementedError
+
+ def getData(self):
+ """Gets the parsed test result data.
+
+ Yields:
+ Result objects containing at least codec, size, mime and rates attributes."""
+ yield self
+
+
+class VideoEncoderDecoderTestResult(_VideoResultBase):
+ """Represents a result from a VideoEncoderDecoderTest performance case."""
+
+ def __init__(self, unused_m):
+ super(VideoEncoderDecoderTestResult, self).__init__(is_decoder=False)
+
+ # If a VideoEncoderDecoderTest succeeds, it provides the results in the
+ # message of a ValueArray. If fails, it provides the results in the failure
+ # using raw data. (For now it also includes some data in the ValueArrays even
+ # if it fails, which we ignore.)
+
+ def _parseFailureLine(self, line):
+ """Handles parsing a line from the failure log."""
+ self._tryParseMeasuredFrameRate(line)
+
+ def _parseMessage(self, message, array):
+ """Handles parsing a message from ValueArrays."""
+ if message.startswith('codec='):
+ result = dict(self._parsePartialResult(m)
+ for m in re.finditer(self.MESSAGE_PATTERN + '(?: |$)', message))
+ if 'EncInputFormat' in result:
+ self.codec = result['codec']
+ fmt = result['EncInputFormat']
+ self.size = Size(fmt['width'], fmt['height'])
+ self.mime = result['EncOutputFormat']['mime']
+ self._rates_from_message.append(1000000./result['min'])
+
+
+class VideoDecoderPerfTestResult(_VideoResultBase):
+ """Represents a result from a VideoDecoderPerfTest performance case."""
+
+ # If a VideoDecoderPerfTest succeeds, it provides the results in the message
+ # of a ValueArray. If fails, it provides the results in the failure only
+ # using raw data.
+
+ def __init__(self, unused_m):
+ super(VideoDecoderPerfTestResult, self).__init__(is_decoder=True)
+
+ def _parseFailureLine(self, line):
+ """Handles parsing a line from the failure log."""
+ self._tryParseMeasuredFrameRate(line)
+ # if the test failed, we can only get the codec/size/mime from the raw data.
+ if line.startswith('Raw data: '):
+ for result in self._parseRawData(line):
+ fmt = result['DecOutputFormat']
+ self.size = Size(fmt['width'], fmt['height'])
+ self.codec = result['codec']
+ self.mime = result['mime']
+
+ def _parseMessage(self, message, array):
+ """Handles parsing a message from ValueArrays."""
+ if message.startswith('codec='):
+ result = dict(self._parsePartialResult(m)
+ for m in re.finditer(self.MESSAGE_PATTERN + '(?: |$)', message))
+ if result.get('decodeto') == 'surface':
+ self.codec = result['codec']
+ fmt = result['DecOutputFormat']
+ self.size = Size(fmt['width'], fmt['height'])
+ self.mime = result['mime']
+ self._rates_from_message.append(1000000. / result['min'])
+
+
+class Results(object):
+ """Container that keeps all test results."""
+ def __init__(self):
+ self._results = [] # namedtuples
+ self._device = None
+
+ VIDEO_ENCODER_DECODER_TEST_REGEX = re.compile(
+ 'test(.*)(\d{4})x(\d{4})(Goog|Other)$')
+
+ VIDEO_DECODER_PERF_TEST_REGEX = re.compile(
+ 'test(VP[89]|H26[34]|MPEG4|HEVC)(\d+)x(\d+)(.*)$')
+
+ TestCaseSpec = namedtuple('TestCaseSpec', 'package path class_ regex result_class')
+
+ def _getTestCases(self):
+ return [
+ self.TestCaseSpec(package='CtsDeviceVideoPerf',
+ path='TestSuite/TestSuite/TestSuite/TestSuite/TestCase',
+ class_='VideoEncoderDecoderTest',
+ regex=self.VIDEO_ENCODER_DECODER_TEST_REGEX,
+ result_class=VideoEncoderDecoderTestResult),
+ self.TestCaseSpec(package='CtsMediaTestCases',
+ path='TestSuite/TestSuite/TestSuite/TestCase',
+ class_='VideoDecoderPerfTest',
+ regex=self.VIDEO_DECODER_PERF_TEST_REGEX,
+ result_class=VideoDecoderPerfTestResult)
+ ]
+
+ def _verifyDeviceInfo(self, device):
+ assert self._device in (None, device), "expected %s device" % self._device
+ self._device = device
+
+ def importXml(self, xml):
+ self._verifyDeviceInfo(xml.find('DeviceInfo/BuildInfo').get('buildName'))
+
+ packages = createLookup(self._getTestCases(), lambda tc: tc.package)
+
+ for pkg in xml.iter('TestPackage'):
+ tests_in_package = packages.get(pkg.get('name'))
+ if not tests_in_package:
+ continue
+ paths = createLookup(tests_in_package, lambda tc: tc.path)
+ for path, tests_in_path in paths.items():
+ classes = createLookup(tests_in_path, lambda tc: tc.class_)
+ for tc in pkg.iterfind(path):
+ tests_in_class = classes.get(tc.get('name'))
+ if not tests_in_class:
+ continue
+ for test in tc.iter('Test'):
+ for tc in tests_in_class:
+ m = tc.regex.match(test.get('name'))
+ if m:
+ result = tc.result_class(m)
+ result.parse(test)
+ self._results.append(result)
+
+ def importFile(self, path):
+ print >> sys.stderr, 'Importing "%s"...' % path
+ try:
+ return self.importXml(ET.parse(path))
+ except ET.ParseError:
+ raise ValueError('not a valid XML file')
+
+ def getData(self):
+ for result in self._results:
+ for data in result.getData():
+ yield data
+
+ def dumpXml(self, results):
+ yield '<?xml version="1.0" encoding="utf-8" ?>'
+ yield '<!-- Copyright 2015 The Android Open Source Project'
+ yield ''
+ yield ' Licensed under the Apache License, Version 2.0 (the "License");'
+ yield ' you may not use this file except in compliance with the License.'
+ yield ' You may obtain a copy of the License at'
+ yield ''
+ yield ' http://www.apache.org/licenses/LICENSE-2.0'
+ yield ''
+ yield ' Unless required by applicable law or agreed to in writing, software'
+ yield ' distributed under the License is distributed on an "AS IS" BASIS,'
+ yield ' WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.'
+ yield ' See the License for the specific language governing permissions and'
+ yield ' limitations under the License.'
+ yield '-->'
+ yield ''
+ yield '<MediaCodecs>'
+ last_section = None
+ Comp = namedtuple('Comp', 'is_decoder google mime name')
+ by_comp = createLookup(results,
+ lambda e: Comp(is_decoder=e.is_decoder, google='.google.' in e.codec, mime=e.mime, name=e.codec))
+ for comp in sorted(by_comp):
+ section = 'Decoders' if comp.is_decoder else 'Encoders'
+ if section != last_section:
+ if last_section:
+ yield ' </%s>' % last_section
+ yield ' <%s>' % section
+ last_section = section
+ yield ' <MediaCodec name="%s" type="%s" update="true">' % (comp.name, comp.mime)
+ by_size = createLookup(by_comp[comp], lambda e: e.size)
+ for size in sorted(by_size):
+ values = list(itertools.chain(*(e.rates for e in by_size[size])))
+ min_, max_ = min(values), max(values)
+ med_ = int(math.sqrt(min_ * max_))
+ yield ' <Limit name="measured-frame-rate-%s" range="%d-%d" />' % (size, med_, med_)
+ yield ' </MediaCodec>'
+ if last_section:
+ yield ' </%s>' % last_section
+ yield '</MediaCodecs>'
+
+
+class Main(object):
+ """Executor of this utility."""
+
+ def __init__(self):
+ self._result = Results()
+
+ self._parser = argparse.ArgumentParser('get_achievable_framerates')
+ self._parser.add_argument('result_xml', nargs='+')
+
+ def _parseArgs(self):
+ self._args = self._parser.parse_args()
+
+ def _importXml(self, xml):
+ self._result.importFile(xml)
+
+ def _report(self):
+ for line in self._result.dumpXml(r for r in self._result.getData() if r):
+ print line
+
+ def run(self):
+ self._parseArgs()
+ try:
+ for xml in self._args.result_xml:
+ try:
+ self._importXml(xml)
+ except (ValueError, IOError, AssertionError) as e:
+ print >> sys.stderr, e
+ raise KeyboardInterrupt
+ self._report()
+ except KeyboardInterrupt:
+ print >> sys.stderr, 'Interrupted.'
+
+if __name__ == '__main__':
+ Main().run()
+