| #!/usr/bin/python |
| |
| # Copyright (C) 2014 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| import os.path |
| import select |
| import sys |
| import time |
| import collections |
| import socket |
| import gflags as flags # http://code.google.com/p/python-gflags/ |
| import pkgutil |
| import threading |
| import Queue |
| import traceback |
| import math |
| import bisect |
| from bisect import bisect_left |
| |
| """ |
| scipy, numpy and matplotlib are python packages that can be installed |
| from: http://www.scipy.org/ |
| |
| """ |
| import scipy |
| import matplotlib.pyplot as plt |
| |
| # let this script know about the power monitor implementations |
| sys.path = [os.path.basename(__file__)] + sys.path |
| available_monitors = [ |
| name |
| for _, name, _ in pkgutil.iter_modules( |
| [os.path.join(os.path.dirname(__file__), "power_monitors")]) |
| if not name.startswith("_")] |
| |
| APK = os.path.join(os.path.dirname(__file__), "..", "CtsVerifier.apk") |
| |
| FLAGS = flags.FLAGS |
| |
| # DELAY_SCREEN_OFF is the number of seconds to wait for baseline state |
| DELAY_SCREEN_OFF = 20.0 |
| |
| # whether to log data collected to a file for each sensor run: |
| LOG_DATA_TO_FILE = True |
| |
| logging.getLogger().setLevel(logging.ERROR) |
| |
| |
| def do_import(name): |
| """import a module by name dynamically""" |
| mod = __import__(name) |
| components = name.split(".") |
| for comp in components[1:]: |
| mod = getattr(mod, comp) |
| return mod |
| |
| class PowerTestException(Exception): |
| """ |
| Definition of specialized Exception class for CTS power tests |
| """ |
| def __init__(self, message): |
| self._error_message = message |
| def __str__(self): |
| return self._error_message |
| |
| class PowerTest: |
| """Class to run a suite of power tests. This has methods for obtaining |
| measurements from the power monitor (through the driver) and then |
| processing it to determine baseline and AP suspend state and |
| measure ampere draw of various sensors. |
| Ctrl+C causes a keyboard interrupt exception which terminates the test.""" |
| |
| # Thresholds for max allowed power usage per sensor tested |
| # TODO: Accel, Mag and Gyro have no maximum power specified in the CDD; |
| # the following numbers are bogus and will be replaced soon by what |
| # the device reports (from Sensor.getPower()) |
| MAX_ACCEL_AMPS = 0.08 # Amps |
| MAX_MAG_AMPS = 0.08 # Amps |
| MAX_GYRO_AMPS = 0.08 # Amps |
| MAX_SIGMO_AMPS = 0.08 # Amps |
| |
| # TODO: The following numbers for step counter, etc must be replaced by |
| # the numbers specified in CDD for low-power sensors. The expected current |
| # draw must be computed from the specified power and the voltage used to |
| # power the device (specified from a config file). |
| MAX_STEP_COUNTER_AMPS = 0.08 # Amps |
| MAX_STEP_DETECTOR_AMPS = 0.08 # Amps |
| # The variable EXPECTED_AMPS_VARIATION_HALF_RANGE denotes the expected |
| # variation of the ampere measurements |
| # around the mean value at baseline state. i.e. we expect most of the |
| # ampere measurements at baseline state to vary around the mean by |
| # between +/- of the number below |
| EXPECTED_AMPS_VARIATION_HALF_RANGE = 0.0005 |
| # The variable THRESHOLD_BASELINE_SAMPLES_FRACTION denotes the minimum fraction of samples that must |
| # be in the range of variation defined by EXPECTED_AMPS_VARIATION_HALF_RANGE |
| # around the mean baseline for us to decide that the phone has settled into |
| # its baseline state |
| THRESHOLD_BASELINE_SAMPLES_FRACTION = 0.86 |
| # The variable MAX_PERCENTILE_AP_SCREEN_OFF_AMPS denotes the maximum ampere |
| # draw that the device can consume when it has gone to suspend state with |
| # one or more sensors registered and batching samples (screen and AP are |
| # off in this case) |
| MAX_PERCENTILE_AP_SCREEN_OFF_AMPS = 0.030 # Amps |
| # The variable PERCENTILE_MAX_AP_SCREEN_OFF denotes the fraction of ampere |
| # measurements that must be below the specified maximum amperes |
| # MAX_PERCENTILE_AP_SCREEN_OFF_AMPS for us to decide that the phone has |
| # reached suspend state. |
| PERCENTILE_MAX_AP_SCREEN_OFF = 0.95 |
| DOMAIN_NAME = "/android/cts/powertest" |
| # SAMPLE_COUNT_NOMINAL denotes the typical number of measurements of amperes |
| # to collect from the power monitor |
| SAMPLE_COUNT_NOMINAL = 1000 |
| # RATE_NOMINAL denotes the nominal frequency at which ampere measurements |
| # are taken from the monsoon power monitor |
| RATE_NOMINAL = 100 |
| ENABLE_PLOTTING = False |
| |
| REQUEST_EXTERNAL_STORAGE = "EXTERNAL STORAGE?" |
| REQUEST_EXIT = "EXIT" |
| REQUEST_RAISE = "RAISE %s %s" |
| REQUEST_USER_RESPONSE = "USER RESPONSE %s" |
| REQUEST_SET_TEST_RESULT = "SET TEST RESULT %s %s %s" |
| REQUEST_SENSOR_SWITCH = "SENSOR %s %s" |
| REQUEST_SENSOR_AVAILABILITY = "SENSOR? %s" |
| REQUEST_SCREEN_OFF = "SCREEN OFF" |
| REQUEST_SHOW_MESSAGE = "MESSAGE %s" |
| |
| NEGATIVE_AMPERE_ERROR_MESSAGE = ( |
| "Negative ampere draw measured, possibly due to power " |
| "supply from USB cable. Check the setup of device and power " |
| "monitor to make sure that the device is not connected " |
| "to machine via USB directly. The device should be " |
| "connected to the USB slot in the power monitor. It is okay " |
| "to change the wiring when the test is in progress.") |
| |
| |
| def __init__(self, max_baseline_amps): |
| """ |
| Args: |
| max_baseline_amps: The maximum value of baseline amperes |
| that we expect the device to consume at baseline state. |
| This can be different between models of phones. |
| """ |
| power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor) |
| testid = time.strftime("%d_%m_%Y__%H__%M_%S") |
| self._power_monitor = power_monitors.Power_Monitor(log_file_id = testid) |
| self._tcp_connect_port = 0 # any available port |
| print ("Establishing connection to device...") |
| self.setUsbEnabled(True) |
| status = self._power_monitor.GetStatus() |
| self._native_hz = status["sampleRate"] * 1000 |
| # the following describes power test being run (i.e on what sensor |
| # and what type of test. This is used for logging. |
| self._current_test = "None" |
| self._external_storage = self.executeOnDevice(PowerTest.REQUEST_EXTERNAL_STORAGE) |
| self._max_baseline_amps = max_baseline_amps |
| |
| def __del__(self): |
| self.finalize() |
| |
| def finalize(self): |
| """To be called upon termination of host connection to device""" |
| if self._tcp_connect_port > 0: |
| # tell device side to exit connection loop, and remove the forwarding |
| # connection |
| self.executeOnDevice(PowerTest.REQUEST_EXIT, reportErrors = False) |
| self.executeLocal("adb forward --remove tcp:%d" % self._tcp_connect_port) |
| self._tcp_connect_port = 0 |
| if self._power_monitor: |
| self._power_monitor.Close() |
| self._power_monitor = None |
| |
| def _send(self, msg, report_errors = True): |
| """Connect to the device, send the given command, and then disconnect""" |
| if self._tcp_connect_port == 0: |
| # on first attempt to send a command, connect to device via any open port number, |
| # forwarding that port to a local socket on the device via adb |
| logging.debug("Seeking port for communication...") |
| # discover an open port |
| dummysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| dummysocket.bind(("localhost", 0)) |
| (_, self._tcp_connect_port) = dummysocket.getsockname() |
| dummysocket.close() |
| assert(self._tcp_connect_port > 0) |
| |
| status = self.executeLocal("adb forward tcp:%d localabstract:%s" % |
| (self._tcp_connect_port, PowerTest.DOMAIN_NAME)) |
| # If the status !=0, then the host machine is unable to |
| # forward requests to client over adb. Ending the test and logging error message |
| # to the console on the host. |
| self.endTestIfLostConnection( |
| status != 0, |
| "Unable to forward requests to client over adb") |
| logging.info("Forwarding requests over local port %d", |
| self._tcp_connect_port) |
| |
| link = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| |
| try: |
| logging.debug("Connecting to device...") |
| link.connect(("localhost", self._tcp_connect_port)) |
| logging.debug("Connected.") |
| except socket.error as serr: |
| print "Socket connection error: ", serr |
| print "Finalizing and exiting the test" |
| self.endTestIfLostConnection( |
| report_errors, |
| "Unable to communicate with device: connection refused") |
| except: |
| print "Non socket-related exception at this block in _send(); re-raising now." |
| raise |
| logging.debug("Sending '%s'", msg) |
| link.sendall(msg) |
| logging.debug("Getting response...") |
| response = link.recv(4096) |
| logging.debug("Got response '%s'", response) |
| link.close() |
| return response |
| |
| def queryDevice(self, query): |
| """Post a yes/no query to the device, return True upon successful query, False otherwise""" |
| logging.info("Querying device with '%s'", query) |
| return self._send(query) == "OK" |
| |
| # TODO: abstract device communication (and string commands) into its own class |
| def executeOnDevice(self, cmd, reportErrors = True): |
| """Execute a (string) command on the remote device""" |
| return self._send(cmd, reportErrors) |
| |
| def executeLocal(self, cmd, check_status = True): |
| """execute a shell command locally (on the host)""" |
| from subprocess import call |
| status = call(cmd.split(" ")) |
| if status != 0 and check_status: |
| logging.error("Failed to execute \"%s\"", cmd) |
| else: |
| logging.debug("Executed \"%s\"", cmd) |
| return status |
| |
| def reportErrorRaiseExceptionIf(self, condition, msg): |
| """Report an error condition to the device if condition is True. |
| Will raise an exception on the device if condition is True. |
| Args: |
| condition: If true, this reports error |
| msg: Message related to exception |
| Raises: |
| A PowerTestException encapsulating the message provided in msg |
| """ |
| if condition: |
| try: |
| logging.error("Exiting on error: %s" % msg) |
| self.executeOnDevice(PowerTest.REQUEST_RAISE % (self._current_test, msg), |
| reportErrors = True) |
| except: |
| logging.error("Unable to communicate with device to report " |
| "error: %s" % msg) |
| self.finalize() |
| sys.exit(msg) |
| raise PowerTestException(msg) |
| |
| def endTestIfLostConnection(self, lost_connection, error_message): |
| """ |
| This function ends the test if lost_connection was true, |
| which indicates that the connection to the device was lost. |
| Args: |
| lost_connection: boolean variable, if True it indicates that |
| connection to device was lost and the test must be terminated. |
| error_message: String to print to the host console before exiting the test |
| (if lost_connection is True) |
| Returns: |
| None. |
| """ |
| if lost_connection: |
| logging.error(error_message) |
| self.finalize() |
| sys.exit(error_message) |
| |
| def setUsbEnabled(self, enabled, verbose = True): |
| if enabled: |
| val = 1 |
| else: |
| val = 0 |
| self._power_monitor.SetUsbPassthrough(val) |
| tries = 0 |
| |
| # Sometimes command won't go through first time, particularly if immediately after a data |
| # collection, so allow for retries |
| # TODO: Move this retry mechanism to the power monitor driver. |
| status = self._power_monitor.GetStatus() |
| while status is None and tries < 5: |
| tries += 1 |
| time.sleep(2.0) |
| logging.error("Retrying get status call...") |
| self._power_monitor.StopDataCollection() |
| self._power_monitor.SetUsbPassthrough(val) |
| status = self._power_monitor.GetStatus() |
| |
| if enabled: |
| if verbose: |
| print("...USB enabled, waiting for device") |
| self.executeLocal("adb wait-for-device") |
| if verbose: |
| print("...device online") |
| else: |
| if verbose: |
| logging.info("...USB disabled") |
| # re-establish port forwarding |
| if enabled and self._tcp_connect_port > 0: |
| status = self.executeLocal("adb forward tcp:%d localabstract:%s" % |
| (self._tcp_connect_port, PowerTest.DOMAIN_NAME)) |
| self.reportErrorRaiseExceptionIf(status != 0, msg = "Unable to forward requests to client over adb") |
| |
| def computeBaselineState(self, measurements): |
| """ |
| Args: |
| measurements: List of floats containing ampere draw measurements |
| taken from the monsoon power monitor. |
| Must be atleast 100 measurements long |
| Returns: |
| A tuple (isBaseline, mean_current) where isBaseline is a |
| boolean that is True only if the baseline state for the phone is |
| detected. mean_current is an estimate of the average baseline |
| current for the device, which is valid only if baseline state is |
| detected (if not, it is set to -1). |
| """ |
| |
| # Looks at the measurements to see if it is in baseline state |
| if len(measurements) < 100: |
| print( |
| "Need at least 100 measurements to determine if baseline state has" |
| " been reached") |
| return (False, -1) |
| |
| # Assumption: At baseline state, the power profile is Gaussian distributed |
| # with low-variance around the mean current draw. |
| # Ideally we should find the mode from a histogram bin to find an estimated mean. |
| # Assuming here that the median is very close to this value; later we check that the |
| # variance of the samples is low enough to validate baseline. |
| sorted_measurements = sorted(measurements) |
| number_measurements = len(measurements) |
| if not number_measurements % 2: |
| median_measurement = (sorted_measurements[(number_measurements - 1) / 2] + |
| sorted_measurements[(number_measurements + 1) / 2]) / 2 |
| else: |
| median_measurement = sorted_measurements[number_measurements / 2] |
| |
| # Assume that at baseline state, a large fraction of power measurements |
| # are within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE milliAmperes of |
| # the average baseline current. Find all such measurements in the |
| # sorted measurement vector. |
| left_index = ( |
| bisect_left( |
| sorted_measurements, |
| median_measurement - |
| PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE)) |
| right_index = ( |
| bisect_left( |
| sorted_measurements, |
| median_measurement + |
| PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE)) |
| |
| average_baseline_amps = scipy.mean( |
| sorted_measurements[left_index: (right_index - 1)]) |
| |
| detected_baseline = True |
| # We enforce that a fraction of more than 'THRESHOLD_BASELINE_SAMPLES_FRACTION' |
| # of samples must be within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE |
| # milliAmperes of the mean baseline current, which we have estimated as |
| # the median. |
| if ((right_index - left_index) < PowerTest.THRESHOLD_BASELINE_SAMPLES_FRACTION * len( |
| measurements)): |
| detected_baseline = False |
| |
| # We check for the maximum limit of the expected baseline |
| if median_measurement > self._max_baseline_amps: |
| detected_baseline = False |
| if average_baseline_amps < 0: |
| print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE |
| detected_baseline = False |
| |
| print("%s baseline state" % ("Could detect" if detected_baseline else "Could NOT detect")) |
| print( |
| "median amps = %f, avg amps = %f, fraction of good samples = %f" % |
| (median_measurement, average_baseline_amps, |
| float(right_index - left_index) / len(measurements))) |
| if PowerTest.ENABLE_PLOTTING: |
| plt.plot(measurements) |
| plt.show() |
| print("To continue test, please close the plot window manually.") |
| return (detected_baseline, average_baseline_amps) |
| |
| def isApInSuspendState(self, measurements_amps, nominal_max_amps, test_percentile): |
| """ |
| This function detects AP suspend and display off state of phone |
| after a sensor has been registered. |
| |
| Because the power profile can be very different between sensors and |
| even across builds, it is difficult to specify a tight threshold for |
| mean current draw or mandate that the power measurements must have low |
| variance. We use a criteria that allows for a certain fraction of |
| peaks in power spectrum and checks that test_percentile fraction of |
| measurements must be below the specified value nominal_max_amps |
| Args: |
| measurements_amps: amperes draw measurements from power monitor |
| test_percentile: the fraction of measurements we require to be below |
| a specified amps value |
| nominal_max_amps: the specified value of the max current draw |
| Returns: |
| returns a boolean which is True if and only if the AP suspend and |
| display off state is detected |
| """ |
| count_good = len([m for m in measurements_amps if m < nominal_max_amps]) |
| count_negative = len([m for m in measurements_amps if m < 0]) |
| if count_negative > 0: |
| print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE |
| return False; |
| return count_good > test_percentile * len(measurements_amps) |
| |
| def getBaselineState(self): |
| """This function first disables all sensors, then collects measurements |
| through the power monitor and continuously evaluates if baseline state |
| is reached. Once baseline state is detected, it returns a tuple with |
| status information. If baseline is not detected in a preset maximum |
| number of trials, it returns as well. |
| |
| Returns: |
| Returns a tuple (isBaseline, mean_current) where isBaseline is a |
| boolean that is True only if the baseline state for the phone is |
| detected. mean_current is an estimate of the average baseline current |
| for the device, which is valid only if baseline state is detected |
| (if not, it is set to -1) |
| """ |
| self.setPowerOn("ALL", False) |
| self.setUsbEnabled(False) |
| print("Waiting %d seconds for baseline state" % DELAY_SCREEN_OFF) |
| time.sleep(DELAY_SCREEN_OFF) |
| |
| MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION = 5 # seconds |
| NUMBER_MEASUREMENTS_BASELINE_DETECTION = ( |
| PowerTest.RATE_NOMINAL * |
| MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION) |
| NUMBER_MEASUREMENTS_BASELINE_VERIFICATION = ( |
| NUMBER_MEASUREMENTS_BASELINE_DETECTION * 5) |
| MAX_TRIALS = 50 |
| |
| collected_baseline_measurements = False |
| |
| for tries in xrange(MAX_TRIALS): |
| print("Trial number %d of %d..." % (tries, MAX_TRIALS)) |
| measurements = self.collectMeasurements( |
| NUMBER_MEASUREMENTS_BASELINE_DETECTION, PowerTest.RATE_NOMINAL, |
| verbose = False) |
| if self.computeBaselineState(measurements)[0] is True: |
| collected_baseline_measurements = True |
| break |
| |
| if collected_baseline_measurements: |
| print("Verifying baseline state over a longer interval " |
| "in order to double check baseline state") |
| measurements = self.collectMeasurements( |
| NUMBER_MEASUREMENTS_BASELINE_VERIFICATION, PowerTest.RATE_NOMINAL, |
| verbose = False) |
| self.reportErrorRaiseExceptionIf( |
| not measurements, "No background measurements could be taken") |
| retval = self.computeBaselineState(measurements) |
| if retval[0]: |
| print("Verified baseline.") |
| if measurements and LOG_DATA_TO_FILE: |
| with open("/tmp/cts-power-tests-background-data.log", "w") as f: |
| for m in measurements: |
| f.write("%.4f\n" % m) |
| return retval |
| else: |
| return (False, -1) |
| |
| def waitForApSuspendMode(self): |
| """This function repeatedly collects measurements until AP suspend and display off |
| mode is detected. After a maximum number of trials, if this state is not reached, it |
| raises an error. |
| Returns: |
| boolean which is True if device was detected to be in suspend state |
| Raises: |
| Power monitor-related exception |
| """ |
| print("waitForApSuspendMode(): Sleeping for %d seconds" % DELAY_SCREEN_OFF) |
| time.sleep(DELAY_SCREEN_OFF) |
| |
| NUMBER_MEASUREMENTS = 200 |
| # Maximum trials for which to collect measurements to get to Ap suspend |
| # state |
| MAX_TRIALS = 50 |
| |
| got_to_suspend_state = False |
| for count in xrange(MAX_TRIALS): |
| print ("waitForApSuspendMode(): Trial %d of %d" % (count, MAX_TRIALS)) |
| measurements = self.collectMeasurements(NUMBER_MEASUREMENTS, |
| PowerTest.RATE_NOMINAL, |
| verbose = False) |
| if self.isApInSuspendState( |
| measurements, PowerTest.MAX_PERCENTILE_AP_SCREEN_OFF_AMPS, |
| PowerTest.PERCENTILE_MAX_AP_SCREEN_OFF): |
| got_to_suspend_state = True |
| break |
| self.reportErrorRaiseExceptionIf( |
| got_to_suspend_state is False, |
| msg = "Unable to determine application processor suspend mode status.") |
| print("Got to AP suspend state") |
| return got_to_suspend_state |
| |
| def collectMeasurements(self, measurementCount, rate, verbose = True): |
| """Args: |
| measurementCount: Number of measurements to collect from the power |
| monitor |
| rate: The integer frequency in Hertz at which to collect measurements from |
| the power monitor |
| Returns: |
| A list containing measurements from the power monitor; that has the |
| requested count of the number of measurements at the specified rate |
| """ |
| assert (measurementCount > 0) |
| decimate_by = self._native_hz / rate or 1 |
| |
| self._power_monitor.StartDataCollection() |
| sub_measurements = [] |
| measurements = [] |
| tries = 0 |
| if verbose: print("") |
| try: |
| while len(measurements) < measurementCount and tries < 5: |
| if tries: |
| self._power_monitor.StopDataCollection() |
| self._power_monitor.StartDataCollection() |
| time.sleep(1.0) |
| tries += 1 |
| additional = self._power_monitor.CollectData() |
| if additional is not None: |
| tries = 0 |
| sub_measurements.extend(additional) |
| while len(sub_measurements) >= decimate_by: |
| sub_avg = sum(sub_measurements[0:decimate_by]) / decimate_by |
| measurements.append(sub_avg) |
| sub_measurements = sub_measurements[decimate_by:] |
| if verbose: |
| # "\33[1A\33[2K" is a special Linux console control |
| # sequence for moving to the previous line, and |
| # erasing it; and reprinting new text on that |
| # erased line. |
| sys.stdout.write("\33[1A\33[2K") |
| print ("MEASURED[%d]: %f" % (len(measurements), measurements[-1])) |
| finally: |
| self._power_monitor.StopDataCollection() |
| |
| self.reportErrorRaiseExceptionIf(measurementCount > len(measurements), |
| "Unable to collect all requested measurements") |
| return measurements |
| |
| def requestUserAcknowledgment(self, msg): |
| """Post message to user on screen and wait for acknowledgment""" |
| response = self.executeOnDevice(PowerTest.REQUEST_USER_RESPONSE % msg) |
| self.reportErrorRaiseExceptionIf( |
| response != "OK", "Unable to request user acknowledgment") |
| |
| def setTestResult(self, test_name, test_result, test_message): |
| """ |
| Reports the result of a test to the device |
| Args: |
| test_name: name of the test |
| test_result: Boolean result of the test (True means Pass) |
| test_message: Relevant message |
| """ |
| print ("Test %s : %s" % (test_name, test_result)) |
| |
| response = ( |
| self.executeOnDevice( |
| PowerTest.REQUEST_SET_TEST_RESULT % |
| (test_name, test_result, test_message))) |
| self.reportErrorRaiseExceptionIf( |
| response != "OK", "Unable to send test status to Verifier") |
| |
| def setPowerOn(self, sensor, powered_on): |
| response = self.executeOnDevice(PowerTest.REQUEST_SENSOR_SWITCH % |
| (("ON" if powered_on else "OFF"), sensor)) |
| self.reportErrorRaiseExceptionIf( |
| response == "ERR", "Unable to set sensor %s state" % sensor) |
| logging.info("Set %s %s", sensor, ("ON" if powered_on else "OFF")) |
| return response |
| |
| def runSensorPowerTest( |
| self, sensor, max_amperes_allowed, baseline_amps, user_request = None): |
| """ |
| Runs power test for a specific sensor; i.e. measures the amperes draw |
| of the phone using monsoon, with the specified sensor mregistered |
| and the phone in suspend state; and verifies that the incremental |
| consumed amperes is within expected bounds. |
| Args: |
| sensor: The specified sensor for which to run the power test |
| max_amperes_allowed: Maximum ampere draw of the device with the |
| sensor registered and device in suspend state |
| baseline_amps: The power draw of the device when it is in baseline |
| state (no sensors registered, display off, AP asleep) |
| """ |
| self._current_test = ("%s_Power_Test_While_%s" % ( |
| sensor, ("Under_Motion" if user_request is not None else "Still"))) |
| try: |
| print ("\n\n---------------------------------") |
| if user_request is not None: |
| print ("Running power test on %s under motion." % sensor) |
| else: |
| print ("Running power test on %s while device is still." % sensor) |
| print ("---------------------------------") |
| response = self.executeOnDevice( |
| PowerTest.REQUEST_SENSOR_AVAILABILITY % sensor) |
| if response == "UNAVAILABLE": |
| self.setTestResult( |
| self._current_test, test_result = "SKIPPED", |
| test_message = "Sensor %s not available on this platform" % sensor) |
| self.setPowerOn("ALL", False) |
| if response == "UNAVAILABLE": |
| self.setTestResult( |
| self._current_test, test_result = "SKIPPED", |
| test_message = "Sensor %s not available on this device" % sensor) |
| return |
| self.reportErrorRaiseExceptionIf(response != "OK", "Unable to set all sensor off") |
| self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF) |
| self.setUsbEnabled(False) |
| self.setUsbEnabled(True) |
| self.setPowerOn(sensor, True) |
| if user_request is not None: |
| print("===========================================\n" + |
| "==> Please follow the instructions presented on the device\n" + |
| "===========================================") |
| self.requestUserAcknowledgment(user_request) |
| self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF) |
| self.setUsbEnabled(False) |
| self.reportErrorRaiseExceptionIf( |
| response != "OK", "Unable to set sensor %s ON" % sensor) |
| |
| self.waitForApSuspendMode() |
| print ("Collecting sensor %s measurements" % sensor) |
| measurements = self.collectMeasurements(PowerTest.SAMPLE_COUNT_NOMINAL, |
| PowerTest.RATE_NOMINAL) |
| |
| if measurements and LOG_DATA_TO_FILE: |
| with open("/tmp/cts-power-tests-%s-%s-sensor-data.log" % (sensor, |
| ("Under_Motion" if user_request is not None else "Still")), "w") as f: |
| for m in measurements: |
| f.write("%.4f\n" % m) |
| self.setUsbEnabled(True, verbose = False) |
| print("Saving raw data files to device...") |
| self.executeLocal("adb shell mkdir -p %s" % self._external_storage, False) |
| self.executeLocal("adb push %s %s/." % (f.name, self._external_storage)) |
| self.setUsbEnabled(False, verbose = False) |
| self.reportErrorRaiseExceptionIf( |
| not measurements, "No measurements could be taken for %s" % sensor) |
| avg = sum(measurements) / len(measurements) |
| squared = [(m - avg) * (m - avg) for m in measurements] |
| |
| stddev = math.sqrt(sum(squared) / len(squared)) |
| current_diff = avg - baseline_amps |
| self.setUsbEnabled(True) |
| max_power = max(measurements) - avg |
| if current_diff <= max_amperes_allowed: |
| # TODO: fail the test of background > current |
| message = ( |
| "Draw is within limits. Sensor delta:%f mAmp Baseline:%f " |
| "mAmp Sensor: %f mAmp Stddev : %f mAmp Peak: %f mAmp") % ( |
| current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0, |
| stddev * 1000.0, max_power * 1000.0) |
| else: |
| message = ( |
| "Draw is too high. Current:%f Background:%f Measured: %f " |
| "Stddev: %f Peak: %f") % ( |
| current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0, |
| stddev * 1000.0, max_power * 1000.0) |
| self.setTestResult( |
| self._current_test, |
| ("PASS" if (current_diff <= max_amperes_allowed) else "FAIL"), |
| message) |
| print("Result: " + message) |
| except: |
| traceback.print_exc() |
| self.setTestResult(self._current_test, test_result = "FAIL", |
| test_message = "Exception occurred during run of test.") |
| raise |
| |
| @staticmethod |
| def runTests(max_baseline_amps): |
| testrunner = None |
| try: |
| GENERIC_MOTION_REQUEST = ("\n===> Please press Next and when the " |
| "screen is off, keep the device under motion with only tiny, " |
| "slow movements until the screen turns on again.\nPlease " |
| "refrain from interacting with the screen or pressing any side " |
| "buttons while measurements are taken.") |
| USER_STEPS_REQUEST = ("\n===> Please press Next and when the " |
| "screen is off, then move the device to simulate step motion " |
| "until the screen turns on again.\nPlease refrain from " |
| "interacting with the screen or pressing any side buttons " |
| "while measurements are taken.") |
| testrunner = PowerTest(max_baseline_amps) |
| testrunner.executeOnDevice( |
| PowerTest.REQUEST_SHOW_MESSAGE % "Connected. Running tests...") |
| is_baseline_success, baseline_amps = testrunner.getBaselineState() |
| |
| if is_baseline_success: |
| testrunner.setUsbEnabled(True) |
| # TODO: Enable testing a single sensor |
| testrunner.runSensorPowerTest( |
| "SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps, |
| user_request = GENERIC_MOTION_REQUEST) |
| testrunner.runSensorPowerTest( |
| "STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps, |
| user_request = USER_STEPS_REQUEST) |
| testrunner.runSensorPowerTest( |
| "STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps, |
| user_request = USER_STEPS_REQUEST) |
| testrunner.runSensorPowerTest( |
| "ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps, |
| user_request = GENERIC_MOTION_REQUEST) |
| testrunner.runSensorPowerTest( |
| "MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps, |
| user_request = GENERIC_MOTION_REQUEST) |
| testrunner.runSensorPowerTest( |
| "GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps, |
| user_request = GENERIC_MOTION_REQUEST) |
| testrunner.runSensorPowerTest( |
| "ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps, |
| user_request = None) |
| testrunner.runSensorPowerTest( |
| "MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps, |
| user_request = None) |
| testrunner.runSensorPowerTest( |
| "GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps, |
| user_request = None) |
| testrunner.runSensorPowerTest( |
| "SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps, |
| user_request = None) |
| testrunner.runSensorPowerTest( |
| "STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps, |
| user_request = None) |
| testrunner.runSensorPowerTest( |
| "STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps, |
| user_request = None) |
| else: |
| print("Could not get to baseline state. This is either because " |
| "in several trials, the monitor could not measure a set " |
| "of power measurements that had the specified low " |
| "variance or the mean measurements were below the " |
| "expected value. None of the sensor power measurement " |
| " tests were performed due to not being able to detect " |
| "baseline state. Please re-run the power tests.") |
| except KeyboardInterrupt: |
| print "Keyboard interrupt from user." |
| raise |
| except: |
| import traceback |
| traceback.print_exc() |
| finally: |
| logging.info("TESTS COMPLETE") |
| if testrunner: |
| try: |
| testrunner.finalize() |
| except socket.error: |
| sys.exit( |
| "===================================================\n" |
| "Unable to connect to device under test. Make sure \n" |
| "the device is connected via the usb pass-through, \n" |
| "the CtsVerifier app is running the SensorPowerTest on \n" |
| "the device, and USB pass-through is enabled.\n" |
| "===================================================") |
| |
| def main(argv): |
| """ Simple command-line interface for a power test application.""" |
| useful_flags = ["voltage", "status", "usbpassthrough", |
| "samples", "current", "log", "power_monitor"] |
| if not [f for f in useful_flags if FLAGS.get(f, None) is not None]: |
| print __doc__.strip() |
| print FLAGS.MainModuleHelp() |
| return |
| |
| if FLAGS.avg and FLAGS.avg < 0: |
| logging.error("--avg must be greater than 0") |
| return |
| |
| if FLAGS.voltage is not None: |
| if FLAGS.voltage > 5.5: |
| print("!!WARNING: Voltage higher than typical values!!!") |
| try: |
| response = raw_input( |
| "Voltage of %.3f requested. Confirm this is correct (Y/N)" % |
| FLAGS.voltage) |
| if response.upper() != "Y": |
| sys.exit("Aborting") |
| except: |
| sys.exit("Aborting.") |
| |
| if not FLAGS.power_monitor: |
| sys.exit( |
| "You must specify a '--power_monitor' option to specify which power " |
| "monitor type " + |
| "you are using.\nOne of:\n \n ".join(available_monitors)) |
| power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor) |
| try: |
| mon = power_monitors.Power_Monitor(device = FLAGS.device) |
| except: |
| import traceback |
| |
| traceback.print_exc() |
| sys.exit("No power monitors found") |
| |
| if FLAGS.voltage is not None: |
| |
| if FLAGS.ramp is not None: |
| mon.RampVoltage(mon.start_voltage, FLAGS.voltage) |
| else: |
| mon.SetVoltage(FLAGS.voltage) |
| |
| if FLAGS.current is not None: |
| mon.SetMaxCurrent(FLAGS.current) |
| |
| if FLAGS.status: |
| items = sorted(mon.GetStatus().items()) |
| print "\n".join(["%s: %s" % item for item in items]) |
| |
| if FLAGS.usbpassthrough: |
| if FLAGS.usbpassthrough == "off": |
| mon.SetUsbPassthrough(0) |
| elif FLAGS.usbpassthrough == "on": |
| mon.SetUsbPassthrough(1) |
| elif FLAGS.usbpassthrough == "auto": |
| mon.SetUsbPassthrough(2) |
| else: |
| mon.Close() |
| sys.exit("bad pass-through flag: %s" % FLAGS.usbpassthrough) |
| |
| if FLAGS.samples: |
| # Make sure state is normal |
| mon.StopDataCollection() |
| status = mon.GetStatus() |
| native_hz = status["sampleRate"] * 1000 |
| |
| # Collect and average samples as specified |
| mon.StartDataCollection() |
| |
| # In case FLAGS.hz doesn't divide native_hz exactly, use this invariant: |
| # 'offset' = (consumed samples) * FLAGS.hz - (emitted samples) * native_hz |
| # This is the error accumulator in a variation of Bresenham's algorithm. |
| emitted = offset = 0 |
| collected = [] |
| history_deque = collections.deque() # past n samples for rolling average |
| |
| # TODO: Complicated lines of code below. Refactoring needed |
| try: |
| last_flush = time.time() |
| while emitted < FLAGS.samples or FLAGS.samples == -1: |
| # The number of raw samples to consume before emitting the next output |
| need = (native_hz - offset + FLAGS.hz - 1) / FLAGS.hz |
| if need > len(collected): # still need more input samples |
| samples = mon.CollectData() |
| if not samples: break |
| collected.extend(samples) |
| else: |
| # Have enough data, generate output samples. |
| # Adjust for consuming 'need' input samples. |
| offset += need * FLAGS.hz |
| while offset >= native_hz: # maybe multiple, if FLAGS.hz > native_hz |
| this_sample = sum(collected[:need]) / need |
| |
| if FLAGS.timestamp: print int(time.time()), |
| |
| if FLAGS.avg: |
| history_deque.appendleft(this_sample) |
| if len(history_deque) > FLAGS.avg: history_deque.pop() |
| print "%f %f" % (this_sample, |
| sum(history_deque) / len(history_deque)) |
| else: |
| print "%f" % this_sample |
| sys.stdout.flush() |
| |
| offset -= native_hz |
| emitted += 1 # adjust for emitting 1 output sample |
| collected = collected[need:] |
| now = time.time() |
| if now - last_flush >= 0.99: # flush every second |
| sys.stdout.flush() |
| last_flush = now |
| except KeyboardInterrupt: |
| print("interrupted") |
| return 1 |
| finally: |
| mon.Close() |
| return 0 |
| |
| if FLAGS.run: |
| if not FLAGS.power_monitor: |
| sys.exit( |
| "When running power tests, you must specify which type of power " |
| "monitor to use" + |
| " with '--power_monitor <type of power monitor>'") |
| try: |
| PowerTest.runTests(FLAGS.max_baseline_amps) |
| except KeyboardInterrupt: |
| print "Keyboard interrupt from user" |
| |
| if __name__ == "__main__": |
| flags.DEFINE_boolean("status", None, "Print power meter status") |
| flags.DEFINE_integer("avg", None, |
| "Also report average over last n data points") |
| flags.DEFINE_float("voltage", None, "Set output voltage (0 for off)") |
| flags.DEFINE_float("current", None, "Set max output current") |
| flags.DEFINE_string("usbpassthrough", None, "USB control (on, off, auto)") |
| flags.DEFINE_integer("samples", None, "Collect and print this many samples") |
| flags.DEFINE_integer("hz", 5000, "Print this many samples/sec") |
| flags.DEFINE_string("device", None, |
| "Path to the device in /dev/... (ex:/dev/ttyACM1)") |
| flags.DEFINE_boolean("timestamp", None, |
| "Also print integer (seconds) timestamp on each line") |
| flags.DEFINE_boolean("ramp", True, "Gradually increase voltage") |
| flags.DEFINE_boolean("log", False, "Log progress to a file or not") |
| flags.DEFINE_boolean("run", False, "Run the test suite for power") |
| flags.DEFINE_string("power_monitor", None, "Type of power monitor to use") |
| flags.DEFINE_float("max_baseline_amps", 0.005, |
| "Set maximum baseline current for device being tested") |
| sys.exit(main(FLAGS(sys.argv))) |