| #!/usr/bin/env python |
| # -*- coding: utf-8 -*- |
| # |
| # Copyright 2017 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """ |
| This module tests the Vehicle HAL using adb socket. |
| |
| Protocol Buffer: |
| This module relies on VehicleHalProto_pb2.py being in sync with the protobuf in the VHAL. |
| If the VehicleHalProto.proto file has changed, re-generate the python version using |
| a command of the form: |
| protoc -I=<proto_dir> --python_out=<out_dir> <proto_dir>/VehicleHalProto.proto |
| For example: |
| protoDir=~/android/master/hardware/interfaces/automotive/vehicle/2.0/default/impl/vhal_v2_0/proto |
| outDir=~/android/master/packages/services/Car/tools/emulator |
| protoc -I=$protoDir --python_out=$outDir $protoDir/VehicleHalProto.proto |
| """ |
| |
| from __future__ import print_function |
| |
| # Suppress .pyc files |
| import sys |
| sys.dont_write_bytecode = True |
| |
| import VehicleHalProto_pb2 |
| import vhal_consts_2_0 |
| import vhal_emulator |
| import logging |
| |
| class VhalTest: |
| # Global vars |
| _badProps = [0, 0x3FFFFFFF] # List of bad properties to try for negative tests |
| _configs = 0 # List of configs from DUT |
| _log = 0 # Logger module |
| _vhal = 0 # Handle to VHAL object that communicates over socket to DUT |
| # TODO: b/38203109 - Fix OBD2 values, implement handling for complex properties |
| _skipProps = [ |
| vhal_consts_2_0.VEHICLEPROPERTY_OBD2_LIVE_FRAME, |
| vhal_consts_2_0.VEHICLEPROPERTY_OBD2_FREEZE_FRAME, |
| vhal_consts_2_0.VEHICLEPROPERTY_OBD2_FREEZE_FRAME_INFO, |
| vhal_consts_2_0.VEHICLEPROPERTY_OBD2_FREEZE_FRAME_CLEAR, |
| vhal_consts_2_0.VEHICLEPROPERTY_VEHICLE_MAP_SERVICE, |
| vhal_consts_2_0.VEHICLEPROPERTY_WHEEL_TICK, # Need to support complex properties |
| 0x21E00666 # FakeDataControllingProperty - an internal test property |
| ] |
| |
| def _getMidpoint(self, minVal, maxVal): |
| retVal = minVal + (maxVal - minVal)/2 |
| return retVal |
| |
| # Generates a test value based on the config |
| def _generateTestValue(self, cfg, idx, origValue): |
| valType = cfg.value_type |
| if valType in self._types.TYPE_STRING: |
| testValue = "test string" |
| elif valType in self._types.TYPE_BYTES: |
| # Generate array of integers counting from 0 |
| testValue = list(range(len(origValue))) |
| elif valType == vhal_consts_2_0.VEHICLEPROPERTYTYPE_BOOLEAN: |
| testValue = origValue ^ 1 |
| elif valType in self._types.TYPE_INT32: |
| try: |
| testValue = self._getMidpoint(cfg.area_configs[idx].min_int32_value, |
| cfg.area_configs[idx].max_int32_value) |
| except: |
| # min/max values aren't set. Set a hard-coded value |
| testValue = 123 |
| elif valType in self._types.TYPE_INT64: |
| try: |
| testValue = self._getMidpoint(cfg.area_configs[idx].min_int64_value, |
| cfg.area_configs[idx].max_int64_value) |
| except: |
| # min/max values aren't set. Set a large hard-coded value |
| testValue = 1 << 50 |
| elif valType in self._types.TYPE_FLOAT: |
| try: |
| testValue = self._getMidpoint(cfg.area_configs[idx].min_float_value, |
| cfg.area_configs[idx].max_float_value) |
| except: |
| # min/max values aren't set. Set a hard-coded value |
| testValue = 123.456 |
| # Truncate float to 5 decimal places |
| testValue = "%.5f" % testValue |
| testValue = float(testValue) |
| else: |
| self._log.error("generateTestValue: valType=0x%X is not handled", valType) |
| testValue = None |
| return testValue |
| |
| # Helper function to extract values array from rxMsg |
| def _getValueFromMsg(self, rxMsg): |
| # Check to see only one property value is returned |
| if len(rxMsg.value) != 1: |
| self._log.error("getValueFromMsg: Received invalid value") |
| value = None |
| else: |
| valType = rxMsg.value[0].value_type |
| try: |
| if valType in self._types.TYPE_STRING: |
| value = rxMsg.value[0].string_value |
| elif valType in self._types.TYPE_BYTES: |
| value = rxMsg.value[0].bytes_value |
| elif valType == vhal_consts_2_0.VEHICLEPROPERTYTYPE_BOOLEAN: |
| value = rxMsg.value[0].int32_values[0] |
| elif valType in self._types.TYPE_INT32: |
| value = rxMsg.value[0].int32_values[0] |
| elif valType in self._types.TYPE_INT64: |
| value = rxMsg.value[0].int64_values[0] |
| elif valType in self._types.TYPE_FLOAT: |
| value = rxMsg.value[0].float_values[0] |
| # Truncate float to 5 decimal places |
| value = "%.5f" % value |
| value = float(value) |
| else: |
| self._log.error("getValueFromMsg: valType=0x%X is not handled", valType) |
| value = None |
| except IndexError: |
| self._log.error("getValueFromMsg: Received malformed message: %s", str(rxMsg)) |
| value = None |
| return value |
| |
| def _validateVmsMessage(self, rxMsg): |
| return (len(rxMsg.value) == 1 and rxMsg.value[0].value_type in self._types.TYPE_MIXED and |
| len(rxMsg.value[0].int32_values) > 0 and |
| vhal_consts_2_0.VMSMESSAGETYPE_SUBSCRIBE <= rxMsg.value[0].int32_values[0] |
| <= vhal_consts_2_0.VMSMESSAGETYPE_LAST_VMS_MESSAGE_TYPE) |
| |
| def _getVmsMessageTypeFromMsg(self, rxMsg): |
| if self._validateVmsMessage(rxMsg): |
| value = rxMsg.value[0].int32_values[ |
| vhal_consts_2_0.VMSBASEMESSAGEINTEGERVALUESINDEX_MESSAGE_TYPE] |
| else: |
| self._log.error("getVmsMessageTypeFromMsg: Received invalid message") |
| value = None |
| return value |
| |
| # Helper function to receive a message and validate the type and status |
| # retVal = 1 if no errors |
| # retVal = 0 if errors detected |
| def _rxMsgAndValidate(self, expectedType, expectedStatus): |
| retVal = 1 |
| rxMsg = self._vhal.rxMsg() |
| if rxMsg.msg_type != expectedType: |
| self._log.error("rxMsg Type expected: 0x%X, received: 0x%X", expectedType, rxMsg.msg_type) |
| retVal = 0 |
| if rxMsg.status != expectedStatus: |
| self._log.error("rxMsg Status expected: 0x%X, received: 0x%X", expectedStatus, rxMsg.status) |
| retVal = 0 |
| return rxMsg, retVal |
| |
| # Calls getConfig() on each individual property ID and verifies it matches with the config |
| # received in getConfigAll() |
| def testGetConfig(self): |
| self._log.info("Starting testGetConfig...") |
| for cfg in self._configs: |
| self._log.debug(" Getting config for propId=0x%X", cfg.prop) |
| self._vhal.getConfig(cfg.prop) |
| rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_CONFIG_RESP, |
| VehicleHalProto_pb2.RESULT_OK) |
| if retVal: |
| if rxMsg.config[0] != cfg: |
| self._log.error("testGetConfig failed. prop=0x%X, expected:\n%s\nreceived:\n%s", |
| cfg.prop, str(cfg), str(rxMsg.config)) |
| self._log.info(" Finished testGetConfig!") |
| |
| # Calls getConfig() on invalid property ID and verifies it generates an error |
| def testGetBadConfig(self): |
| self._log.info("Starting testGetBadConfig...") |
| for prop in self._badProps: |
| self._log.debug(" Testing bad propId=0x%X", prop) |
| self._vhal.getConfig(prop) |
| rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_CONFIG_RESP, |
| VehicleHalProto_pb2.ERROR_INVALID_PROPERTY) |
| if retVal: |
| for cfg in rxMsg.config: |
| self._log.error("testGetBadConfig prop=0x%X, expected:None, received:\n%s", |
| cfg.prop, str(rxMsg.config)) |
| self._log.info(" Finished testGetBadConfig!") |
| |
| def testGetPropertyAll(self): |
| self._log.info("Starting testGetPropertyAll...") |
| self._vhal.getPropertyAll() |
| rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_PROPERTY_ALL_RESP, |
| VehicleHalProto_pb2.RESULT_OK) |
| if retVal == 0: |
| self._log.error("testGetPropertyAll: Failed to receive proper rxMsg") |
| |
| # TODO: Finish writing this test. What should we be testing, anyway? |
| |
| self._log.info(" Finished testGetPropertyAll!") |
| |
| def testGetSet(self): |
| self._log.info("Starting testGetSet()...") |
| for cfg in self._configs: |
| if cfg.prop in self._skipProps: |
| # Skip properties that cannot be handled properly by this test. |
| self._log.warning(" Skipping propId=0x%X", cfg.prop) |
| continue |
| |
| areas = cfg.supported_areas |
| idx = -1 |
| while (idx == -1) | (areas != 0): |
| idx += 1 |
| # Get the area to test |
| area = areas & (areas -1) |
| area ^= areas |
| |
| # Remove the area from areas |
| areas ^= area |
| |
| self._log.debug(" Testing propId=0x%X, area=0x%X", cfg.prop, area) |
| |
| # Get the current value |
| self._vhal.getProperty(cfg.prop, area) |
| rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_PROPERTY_RESP, |
| VehicleHalProto_pb2.RESULT_OK) |
| |
| # Save the original value |
| origValue = self._getValueFromMsg(rxMsg) |
| if origValue == None: |
| self._log.error("testGetSet: Could not get value for prop=0x%X, area=0x%X", |
| cfg.prop, area) |
| continue |
| |
| # Generate the test value |
| testValue = self._generateTestValue(cfg, idx, origValue) |
| if testValue == None: |
| self._log.error("testGetSet: Cannot generate test value for prop=0x%X, area=0x%X", |
| cfg.prop, area) |
| continue |
| |
| # Send the new value |
| self._vhal.setProperty(cfg.prop, area, testValue) |
| rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.SET_PROPERTY_RESP, |
| VehicleHalProto_pb2.RESULT_OK) |
| |
| # Get the new value and verify it |
| self._vhal.getProperty(cfg.prop, area) |
| rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_PROPERTY_RESP, |
| VehicleHalProto_pb2.RESULT_OK) |
| newValue = self._getValueFromMsg(rxMsg) |
| if newValue != testValue: |
| self._log.error("testGetSet: set failed for propId=0x%X, area=0x%X", cfg.prop, area) |
| print("testValue= ", testValue, "newValue= ", newValue) |
| continue |
| |
| # Reset the value to what it was before |
| self._vhal.setProperty(cfg.prop, area, origValue) |
| rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.SET_PROPERTY_RESP, |
| VehicleHalProto_pb2.RESULT_OK) |
| self._log.info(" Finished testGetSet()!") |
| |
| def testGetBadProperty(self): |
| self._log.info("Starting testGetBadProperty()...") |
| for prop in self._badProps: |
| self._log.debug(" Testing bad propId=0x%X", prop) |
| self._vhal.getProperty(prop, 0) |
| rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_PROPERTY_RESP, |
| VehicleHalProto_pb2.ERROR_INVALID_PROPERTY) |
| if retVal: |
| for value in rxMsg.value: |
| self._log.error("testGetBadProperty prop=0x%X, expected:None, received:\n%s", |
| prop, str(rxMsg)) |
| self._log.info(" Finished testGetBadProperty()!") |
| |
| def testSetBadProperty(self): |
| self._log.info("Starting testSetBadProperty()...") |
| area = 1 |
| value = 100 |
| for prop in self._badProps: |
| self._log.debug(" Testing bad propId=0x%X", prop) |
| area = area + 1 |
| value = value + 1 |
| try: |
| self._vhal.setProperty(prop, area, value) |
| self._log.error("testGetBadProperty failed. prop=0x%X, area=0x%X, value=%d", |
| prop, area, value) |
| except ValueError as e: |
| # Received expected error |
| pass |
| self._log.info(" Finished testSetBadProperty()!") |
| |
| def testGetVmsAvailability(self): |
| self._log.info("Starting testVms()...") |
| |
| # Request the availability from the VmsCore. |
| value = {'int32_values' : [vhal_consts_2_0.VMSMESSAGETYPE_AVAILABILITY_REQUEST] } |
| self._vhal.setProperty( |
| vhal_consts_2_0.VEHICLEPROPERTY_VEHICLE_MAP_SERVICE, 0, value) |
| |
| rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.SET_PROPERTY_RESP, |
| VehicleHalProto_pb2.RESULT_OK) |
| |
| # The Vms Core should immediately respond |
| rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.SET_PROPERTY_ASYNC, |
| VehicleHalProto_pb2.RESULT_OK) |
| |
| if self._getVmsMessageTypeFromMsg(rxMsg) != vhal_consts_2_0.VMSMESSAGETYPE_AVAILABILITY_RESPONSE: |
| self._log.error("testVms: VmsCore did not respond with AvailabilityResponse: %s", str(rxMsg)) |
| |
| |
| # Test that we can get the property on command |
| self._vhal.getProperty( |
| vhal_consts_2_0.VEHICLEPROPERTY_VEHICLE_MAP_SERVICE, 0) |
| rxMsg, retVal = self._rxMsgAndValidate(VehicleHalProto_pb2.GET_PROPERTY_RESP, |
| VehicleHalProto_pb2.RESULT_OK) |
| |
| if self._getVmsMessageTypeFromMsg(rxMsg) != vhal_consts_2_0.VMSMESSAGETYPE_AVAILABILITY_RESPONSE: |
| self._log.error("testVms: VmsCore did not respond with AvailabilityResponse: %s", str(rxMsg)) |
| else: |
| # Parse Availability Response |
| layers = rxMsg.value[0].int32_values[ |
| vhal_consts_2_0.VMSAVAILABILITYSTATEINTEGERVALUESINDEX_NUMBER_OF_ASSOCIATED_LAYERS] |
| index = vhal_consts_2_0.VMSAVAILABILITYSTATEINTEGERVALUESINDEX_LAYERS_START |
| numPublishersIndex = vhal_consts_2_0.VMSMESSAGEWITHLAYERINTEGERVALUESINDEX_LAYER_VERSION |
| self._log.info("testVms: %d available layers", layers) |
| for layer in xrange(layers): |
| self._log.info("testVms: Available layer: %s", |
| rxMsg.value[0].int32_values[index:index+numPublishersIndex]) |
| index += numPublishersIndex + 1 + rxMsg.value[0].int32_values[index+numPublishersIndex] |
| |
| if len(rxMsg.value[0].int32_values) != index: |
| self._log.error("testVms: Malformed AvailabilityResponse: index: %d %s", index, str(rxMsg)) |
| |
| def runTests(self): |
| self.testGetConfig() |
| self.testGetBadConfig() |
| self.testGetPropertyAll() |
| self.testGetSet() |
| self.testGetBadProperty() |
| self.testSetBadProperty() |
| self.testGetVmsAvailability() |
| # Add new tests here to be run |
| |
| |
| # Valid logLevels: |
| # CRITICAL 50 |
| # ERRROR 40 |
| # WARNING 30 |
| # INFO 20 |
| # DEBUG 10 |
| # NOTSET 0 |
| def __init__(self, types, logLevel=20): |
| self._types = types |
| # Configure the logger |
| logging.basicConfig() |
| self._log = logging.getLogger('vhal_emulator_test') |
| self._log.setLevel(logLevel) |
| # Start the VHAL Emulator |
| self._vhal = vhal_emulator.Vhal(types) |
| # Get the list of configs |
| self._vhal.getConfigAll() |
| self._configs = self._vhal.rxMsg().config |
| |
| if __name__ == '__main__': |
| v = VhalTest(vhal_consts_2_0.vhal_types_2_0) |
| v.runTests() |