Improvements to suspend state detection; code cleanups and documentation.
- Added additional functions computeBaselineState() and getBaselineState() for detecting baseline state (improved the tests and estimating average)
- Added functions isApInSuspendState(), waitForApSuspendMode() to improve the criteria to detect AP off state
- Modified run_tests() to avoid computing a new baseline power for each test. The baseline power is measured once and then used for each test to determine the delta current draw.
- Removed waitForScreenOff() function
- Added endTestsBecauseConnectionToDeviceLost() function
- Added a flag "--max_baseline_amps" of type float to specify the max expected baseline amps for the device being tested
- Tested this on Shamu and HH, and the power tests run until the end and PASS everything
- several docstring comments added to functions, code cleanups, variable renaming for better description
- Added a PowerTestException class
Change-Id: I7190009882cc2d77d20138f10d4659ea1f39611d
diff --git a/apps/CtsVerifier/assets/scripts/execute_power_tests.py b/apps/CtsVerifier/assets/scripts/execute_power_tests.py
index d1c2dac..d07cb36 100755
--- a/apps/CtsVerifier/assets/scripts/execute_power_tests.py
+++ b/apps/CtsVerifier/assets/scripts/execute_power_tests.py
@@ -25,57 +25,108 @@
import pkgutil
import threading
import Queue
+import traceback
+import math
+import bisect
+from bisect import bisect_left
-# queue to signal thread to exit
-signal_exit_q = Queue.Queue()
-signal_abort = Queue.Queue()
+"""
+scipy, numpy and matplotlib are python packages that can be installed
+from: http://www.scipy.org/
+
+"""
+import scipy
+import matplotlib.pyplot as plt
# let this script know about the power monitor implementations
sys.path = [os.path.basename(__file__)] + sys.path
-available_monitors = [name for _, name, _ in pkgutil.iter_modules(
- [os.path.join(os.path.dirname(__file__),'power_monitors')]) if not name.startswith('_')]
+available_monitors = [
+ name
+ for _, name, _ in pkgutil.iter_modules(
+ [os.path.join(os.path.dirname(__file__), "power_monitors")])
+ if not name.startswith("_")]
-APK = os.path.join( os.path.dirname(__file__), '..', "CtsVerifier.apk")
+APK = os.path.join(os.path.dirname(__file__), "..", "CtsVerifier.apk")
FLAGS = flags.FLAGS
-# whether to use a strict delay to ensure screen is off, or attempt to use power measurements
-USE_STRICT_DELAY = False
-if USE_STRICT_DELAY:
- DELAY_SCREEN_OFF = 30.0
-else:
- DELAY_SCREEN_OFF = 2.0
+# DELAY_SCREEN_OFF is the number of seconds to wait for baseline state
+DELAY_SCREEN_OFF = 20.0
# whether to log data collected to a file for each sensor run:
LOG_DATA_TO_FILE = True
logging.getLogger().setLevel(logging.ERROR)
+
def do_import(name):
"""import a module by name dynamically"""
mod = __import__(name)
- components = name.split('.')
+ components = name.split(".")
for comp in components[1:]:
mod = getattr(mod, comp)
return mod
+class PowerTestException(Exception):
+ """
+ Definition of specialized Exception class for CTS power tests
+ """
+ def __init__(self, message):
+ self._error_message = message
+ def __str__(self):
+ return self._error_message
class PowerTest:
- """Class to run a suite of power tests"""
+ """Class to run a suite of power tests. This has methods for obtaining
+ measurements from the power monitor (through the driver) and then
+ processing it to determine baseline and AP suspend state and
+ measure ampere draw of various sensors.
+ Ctrl+C causes a keyboard interrupt exception which terminates the test."""
# Thresholds for max allowed power usage per sensor tested
- MAX_ACCEL_POWER = 0.08 # Amps
- MAX_MAG_POWER = 0.08 # Amps
- MAX_GYRO_POWER = 0.08 # Amps
- MAX_SIGMO_POWER = 0.08 # Amps
- MAX_STEP_COUNTER_POWER = 0.08 # Amps
- MAX_STEP_DETECTOR_POWER = 0.08 # Amps
+ # TODO: Accel, Mag and Gyro have no maximum power specified in the CDD;
+ # the following numbers are bogus and will be replaced soon by what
+ # the device reports (from Sensor.getPower())
+ MAX_ACCEL_AMPS = 0.08 # Amps
+ MAX_MAG_AMPS = 0.08 # Amps
+ MAX_GYRO_AMPS = 0.08 # Amps
+ MAX_SIGMO_AMPS = 0.08 # Amps
-
- PORT = 0 # any available port
+ # TODO: The following numbers for step counter, etc must be replaced by
+ # the numbers specified in CDD for low-power sensors. The expected current
+ # draw must be computed from the specified power and the voltage used to
+ # power the device (specified from a config file).
+ MAX_STEP_COUNTER_AMPS = 0.08 # Amps
+ MAX_STEP_DETECTOR_AMPS = 0.08 # Amps
+ # The variable EXPECTED_AMPS_VARIATION_HALF_RANGE denotes the expected
+ # variation of the ampere measurements
+ # around the mean value at baseline state. i.e. we expect most of the
+ # ampere measurements at baseline state to vary around the mean by
+ # between +/- of the number below
+ EXPECTED_AMPS_VARIATION_HALF_RANGE = 0.0005
+ # The variable THRESHOLD_BASELINE_SAMPLES_FRACTION denotes the minimum fraction of samples that must
+ # be in the range of variation defined by EXPECTED_AMPS_VARIATION_HALF_RANGE
+ # around the mean baseline for us to decide that the phone has settled into
+ # its baseline state
+ THRESHOLD_BASELINE_SAMPLES_FRACTION = 0.86
+ # The variable MAX_PERCENTILE_AP_SCREEN_OFF_AMPS denotes the maximum ampere
+ # draw that the device can consume when it has gone to suspend state with
+ # one or more sensors registered and batching samples (screen and AP are
+ # off in this case)
+ MAX_PERCENTILE_AP_SCREEN_OFF_AMPS = 0.030 # Amps
+ # The variable PERCENTILE_MAX_AP_SCREEN_OFF denotes the fraction of ampere
+ # measurements that must be below the specified maximum amperes
+ # MAX_PERCENTILE_AP_SCREEN_OFF_AMPS for us to decide that the phone has
+ # reached suspend state.
+ PERCENTILE_MAX_AP_SCREEN_OFF = 0.95
DOMAIN_NAME = "/android/cts/powertest"
+ # SAMPLE_COUNT_NOMINAL denotes the typical number of measurements of amperes
+ # to collect from the power monitor
SAMPLE_COUNT_NOMINAL = 1000
+ # RATE_NOMINAL denotes the nominal frequency at which ampere measurements
+ # are taken from the monsoon power monitor
RATE_NOMINAL = 100
+ ENABLE_PLOTTING = False
REQUEST_EXTERNAL_STORAGE = "EXTERNAL STORAGE?"
REQUEST_EXIT = "EXIT"
@@ -87,103 +138,157 @@
REQUEST_SCREEN_OFF = "SCREEN OFF"
REQUEST_SHOW_MESSAGE = "MESSAGE %s"
+ NEGATIVE_AMPERE_ERROR_MESSAGE = (
+ "Negative ampere draw measured, possibly due to power "
+ "supply from USB cable. Check the setup of device and power "
+ "monitor to make sure that the device is not connected "
+ "to machine via USB directly. The device should be "
+ "connected to the USB slot in the power monitor. It is okay "
+ "to change the wiring when the test is in progress.")
- def __init__(self):
+
+ def __init__(self, max_baseline_amps):
+ """
+ Args:
+ max_baseline_amps: The maximum value of baseline amperes
+ that we expect the device to consume at baseline state.
+ This can be different between models of phones.
+ """
power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor)
testid = time.strftime("%d_%m_%Y__%H__%M_%S")
self._power_monitor = power_monitors.Power_Monitor(log_file_id = testid)
+ self._tcp_connect_port = 0 # any available port
print ("Establishing connection to device...")
self.setUsbEnabled(True)
status = self._power_monitor.GetStatus()
self._native_hz = status["sampleRate"] * 1000
+ # the following describes power test being run (i.e on what sensor
+ # and what type of test. This is used for logging.
self._current_test = "None"
- self._external_storage = self.executeOnDevice(PowerTest.REQUEST_EXTERNAL_STORAGE,
- reportErrors=True)
+ self._external_storage = self.executeOnDevice(PowerTest.REQUEST_EXTERNAL_STORAGE)
+ self._max_baseline_amps = max_baseline_amps
def __del__(self):
self.finalize()
def finalize(self):
"""To be called upon termination of host connection to device"""
- if PowerTest.PORT > 0:
- # tell device side to exit connection loop, and remove the forwarding connection
- self.executeOnDevice(PowerTest.REQUEST_EXIT, reportErrors=False)
- self.executeLocal("adb forward --remove tcp:%d" % PowerTest.PORT)
- PowerTest.PORT = 0
+ if self._tcp_connect_port > 0:
+ # tell device side to exit connection loop, and remove the forwarding
+ # connection
+ self.executeOnDevice(PowerTest.REQUEST_EXIT, reportErrors = False)
+ self.executeLocal("adb forward --remove tcp:%d" % self._tcp_connect_port)
+ self._tcp_connect_port = 0
if self._power_monitor:
self._power_monitor.Close()
self._power_monitor = None
- def _send(self, msg, report_errors=True):
+ def _send(self, msg, report_errors = True):
"""Connect to the device, send the given command, and then disconnect"""
- if PowerTest.PORT == 0:
+ if self._tcp_connect_port == 0:
# on first attempt to send a command, connect to device via any open port number,
# forwarding that port to a local socket on the device via adb
logging.debug("Seeking port for communication...")
# discover an open port
dummysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
dummysocket.bind(("localhost", 0))
- (_, PowerTest.PORT) = dummysocket.getsockname()
+ (_, self._tcp_connect_port) = dummysocket.getsockname()
dummysocket.close()
- assert(PowerTest.PORT > 0)
+ assert(self._tcp_connect_port > 0)
+
status = self.executeLocal("adb forward tcp:%d localabstract:%s" %
- (PowerTest.PORT, PowerTest.DOMAIN_NAME))
- if report_errors:
- self.reportErrorIf(status != 0, msg="Unable to forward requests to client over adb")
- logging.info("Forwarding requests over local port %d" % PowerTest.PORT)
+ (self._tcp_connect_port, PowerTest.DOMAIN_NAME))
+ # If the status !=0, then the host machine is unable to
+ # forward requests to client over adb. Ending the test and logging error message
+ # to the console on the host.
+ self.endTestIfLostConnection(
+ status != 0,
+ "Unable to forward requests to client over adb")
+ logging.info("Forwarding requests over local port %d",
+ self._tcp_connect_port)
link = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
logging.debug("Connecting to device...")
- link.connect (("localhost", PowerTest.PORT))
+ link.connect(("localhost", self._tcp_connect_port))
logging.debug("Connected.")
+ except socket.error as serr:
+ print "Socket connection error: ", serr
+ print "Finalizing and exiting the test"
+ self.endTestIfLostConnection(
+ report_errors,
+ "Unable to communicate with device: connection refused")
except:
- if report_errors:
- self.reportErrorIf(True, msg="Unable to communicate with device: connection refused")
- logging.debug("Sending '%s'" % msg)
+ print "Non socket-related exception at this block in _send(); re-raising now."
+ raise
+ logging.debug("Sending '%s'", msg)
link.sendall(msg)
logging.debug("Getting response...")
response = link.recv(4096)
- logging.debug("Got response '%s'" % response)
+ logging.debug("Got response '%s'", response)
link.close()
return response
def queryDevice(self, query):
"""Post a yes/no query to the device, return True upon successful query, False otherwise"""
- logging.info("Querying device with '%s'" % query)
+ logging.info("Querying device with '%s'", query)
return self._send(query) == "OK"
# TODO: abstract device communication (and string commands) into its own class
- def executeOnDevice(self, cmd , reportErrors=True):
+ def executeOnDevice(self, cmd, reportErrors = True):
"""Execute a (string) command on the remote device"""
- return self._send(cmd , reportErrors)
+ return self._send(cmd, reportErrors)
- def executeLocal(self, cmd, check_status=True):
+ def executeLocal(self, cmd, check_status = True):
"""execute a shell command locally (on the host)"""
from subprocess import call
- status = call(cmd.split(' '))
+ status = call(cmd.split(" "))
if status != 0 and check_status:
- logging.error("Failed to execute \"%s\"" % cmd)
+ logging.error("Failed to execute \"%s\"", cmd)
else:
- logging.debug("Executed \"%s\"" % cmd)
+ logging.debug("Executed \"%s\"", cmd)
return status
- def reportErrorIf(self, condition, msg):
+ def reportErrorRaiseExceptionIf(self, condition, msg):
"""Report an error condition to the device if condition is True.
- Will raise an exception on the device if condition is True"""
+ Will raise an exception on the device if condition is True.
+ Args:
+ condition: If true, this reports error
+ msg: Message related to exception
+ Raises:
+ A PowerTestException encapsulating the message provided in msg
+ """
if condition:
try:
logging.error("Exiting on error: %s" % msg)
- self.executeOnDevice(PowerTest.REQUEST_RAISE % (self._current_test, msg), False)
+ self.executeOnDevice(PowerTest.REQUEST_RAISE % (self._current_test, msg),
+ reportErrors = True)
except:
-
- logging.error("Unable to communicate with device to report error: %s" % msg)
+ logging.error("Unable to communicate with device to report "
+ "error: %s" % msg)
self.finalize()
sys.exit(msg)
- raise Exception(msg)
+ raise PowerTestException(msg)
- def setUsbEnabled(self, enabled, verbose=True):
+ def endTestIfLostConnection(self, lost_connection, error_message):
+ """
+ This function ends the test if lost_connection was true,
+ which indicates that the connection to the device was lost.
+ Args:
+ lost_connection: boolean variable, if True it indicates that
+ connection to device was lost and the test must be terminated.
+ error_message: String to print to the host console before exiting the test
+ (if lost_connection is True)
+ Returns:
+ None.
+ """
+ if lost_connection:
+ logging.error(error_message)
+ self.finalize()
+ sys.exit(error_message)
+
+ def setUsbEnabled(self, enabled, verbose = True):
if enabled:
val = 1
else:
@@ -193,6 +298,7 @@
# Sometimes command won't go through first time, particularly if immediately after a data
# collection, so allow for retries
+ # TODO: Move this retry mechanism to the power monitor driver.
status = self._power_monitor.GetStatus()
while status is None and tries < 5:
tries += 1
@@ -203,59 +309,230 @@
status = self._power_monitor.GetStatus()
if enabled:
- if verbose: print("...USB enabled, waiting for device")
- self.executeLocal ("adb wait-for-device")
- if verbose: print("...device online")
+ if verbose:
+ print("...USB enabled, waiting for device")
+ self.executeLocal("adb wait-for-device")
+ if verbose:
+ print("...device online")
else:
- if verbose: logging.info("...USB disabled")
+ if verbose:
+ logging.info("...USB disabled")
# re-establish port forwarding
- if enabled and PowerTest.PORT > 0:
+ if enabled and self._tcp_connect_port > 0:
status = self.executeLocal("adb forward tcp:%d localabstract:%s" %
- (PowerTest.PORT, PowerTest.DOMAIN_NAME))
- self.reportErrorIf(status != 0, msg="Unable to forward requests to client over adb")
+ (self._tcp_connect_port, PowerTest.DOMAIN_NAME))
+ self.reportErrorRaiseExceptionIf(status != 0, msg = "Unable to forward requests to client over adb")
- def waitForScreenOff(self):
- # disconnect of USB will cause screen to go on, so must wait (1 second more than screen off
- # timeout)
- if USE_STRICT_DELAY:
- time.sleep(DELAY_SCREEN_OFF)
- return
+ def computeBaselineState(self, measurements):
+ """
+ Args:
+ measurements: List of floats containing ampere draw measurements
+ taken from the monsoon power monitor.
+ Must be atleast 100 measurements long
+ Returns:
+ A tuple (isBaseline, mean_current) where isBaseline is a
+ boolean that is True only if the baseline state for the phone is
+ detected. mean_current is an estimate of the average baseline
+ current for the device, which is valid only if baseline state is
+ detected (if not, it is set to -1).
+ """
- # need at least 100 sequential clean low-power measurements to know screen is off
- THRESHOLD_COUNT_LOW_POWER = 100
- CURRENT_LOW_POWER_THRESHOLD = 0.060 # Amps
- TIMEOUT_SCREEN_OFF = 30 # this many tries at most
- count_good = 0
- tries = 0
- print("Waiting for screen off and application processor in suspend mode...")
- while count_good < THRESHOLD_COUNT_LOW_POWER:
- measurements = self.collectMeasurements(THRESHOLD_COUNT_LOW_POWER,
- PowerTest.RATE_NOMINAL,
- ensure_screen_off=False,
- verbose=False)
- count_good = len([m for m in measurements
- if m < CURRENT_LOW_POWER_THRESHOLD])
- tries += 1
- if count_good < THRESHOLD_COUNT_LOW_POWER and measurements:
- print("Current usage: %.2f mAmps. Device is probably not in suspend mode. Waiting..." %
- (1000.0*(sum(measurements)/len(measurements))))
- if tries >= TIMEOUT_SCREEN_OFF:
- # TODO: dump the state of sensor service to identify if there are features using sensors
- self.reportErrorIf(tries>=TIMEOUT_SCREEN_OFF,
- msg="Unable to determine application processor suspend mode status.")
+ # Looks at the measurements to see if it is in baseline state
+ if len(measurements) < 100:
+ print(
+ "Need at least 100 measurements to determine if baseline state has"
+ " been reached")
+ return (False, -1)
+
+ # Assumption: At baseline state, the power profile is Gaussian distributed
+ # with low-variance around the mean current draw.
+ # Ideally we should find the mode from a histogram bin to find an estimated mean.
+ # Assuming here that the median is very close to this value; later we check that the
+ # variance of the samples is low enough to validate baseline.
+ sorted_measurements = sorted(measurements)
+ number_measurements = len(measurements)
+ if not number_measurements % 2:
+ median_measurement = (sorted_measurements[(number_measurements - 1) / 2] +
+ sorted_measurements[(number_measurements + 1) / 2]) / 2
+ else:
+ median_measurement = sorted_measurements[number_measurements / 2]
+
+ # Assume that at baseline state, a large fraction of power measurements
+ # are within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE milliAmperes of
+ # the average baseline current. Find all such measurements in the
+ # sorted measurement vector.
+ left_index = (
+ bisect_left(
+ sorted_measurements,
+ median_measurement -
+ PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE))
+ right_index = (
+ bisect_left(
+ sorted_measurements,
+ median_measurement +
+ PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE))
+
+ average_baseline_amps = scipy.mean(
+ sorted_measurements[left_index: (right_index - 1)])
+
+ detected_baseline = True
+ # We enforce that a fraction of more than 'THRESHOLD_BASELINE_SAMPLES_FRACTION'
+ # of samples must be within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE
+ # milliAmperes of the mean baseline current, which we have estimated as
+ # the median.
+ if ((right_index - left_index) < PowerTest.THRESHOLD_BASELINE_SAMPLES_FRACTION * len(
+ measurements)):
+ detected_baseline = False
+
+ # We check for the maximum limit of the expected baseline
+ if median_measurement > self._max_baseline_amps:
+ detected_baseline = False
+ if average_baseline_amps < 0:
+ print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE
+ detected_baseline = False
+
+ print("%s baseline state" % ("Could detect" if detected_baseline else "Could NOT detect"))
+ print(
+ "median amps = %f, avg amps = %f, fraction of good samples = %f" %
+ (median_measurement, average_baseline_amps,
+ float(right_index - left_index) / len(measurements)))
+ if PowerTest.ENABLE_PLOTTING:
+ plt.plot(measurements)
+ plt.show()
+ print("To continue test, please close the plot window manually.")
+ return (detected_baseline, average_baseline_amps)
+
+ def isApInSuspendState(self, measurements_amps, nominal_max_amps, test_percentile):
+ """
+ This function detects AP suspend and display off state of phone
+ after a sensor has been registered.
+
+ Because the power profile can be very different between sensors and
+ even across builds, it is difficult to specify a tight threshold for
+ mean current draw or mandate that the power measurements must have low
+ variance. We use a criteria that allows for a certain fraction of
+ peaks in power spectrum and checks that test_percentile fraction of
+ measurements must be below the specified value nominal_max_amps
+ Args:
+ measurements_amps: amperes draw measurements from power monitor
+ test_percentile: the fraction of measurements we require to be below
+ a specified amps value
+ nominal_max_amps: the specified value of the max current draw
+ Returns:
+ returns a boolean which is True if and only if the AP suspend and
+ display off state is detected
+ """
+ count_good = len([m for m in measurements_amps if m < nominal_max_amps])
+ count_negative = len([m for m in measurements_amps if m < 0])
+ if count_negative > 0:
+ print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE
+ return False;
+ return count_good > test_percentile * len(measurements_amps)
+
+ def getBaselineState(self):
+ """This function first disables all sensors, then collects measurements
+ through the power monitor and continuously evaluates if baseline state
+ is reached. Once baseline state is detected, it returns a tuple with
+ status information. If baseline is not detected in a preset maximum
+ number of trials, it returns as well.
+
+ Returns:
+ Returns a tuple (isBaseline, mean_current) where isBaseline is a
+ boolean that is True only if the baseline state for the phone is
+ detected. mean_current is an estimate of the average baseline current
+ for the device, which is valid only if baseline state is detected
+ (if not, it is set to -1)
+ """
+ self.setPowerOn("ALL", False)
+ self.setUsbEnabled(False)
+ print("Waiting %d seconds for baseline state" % DELAY_SCREEN_OFF)
+ time.sleep(DELAY_SCREEN_OFF)
+
+ MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION = 5 # seconds
+ NUMBER_MEASUREMENTS_BASELINE_DETECTION = (
+ PowerTest.RATE_NOMINAL *
+ MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION)
+ NUMBER_MEASUREMENTS_BASELINE_VERIFICATION = (
+ NUMBER_MEASUREMENTS_BASELINE_DETECTION * 5)
+ MAX_TRIALS = 50
+
+ collected_baseline_measurements = False
+
+ for tries in xrange(MAX_TRIALS):
+ print("Trial number %d of %d..." % (tries, MAX_TRIALS))
+ measurements = self.collectMeasurements(
+ NUMBER_MEASUREMENTS_BASELINE_DETECTION, PowerTest.RATE_NOMINAL,
+ verbose = False)
+ if self.computeBaselineState(measurements)[0] is True:
+ collected_baseline_measurements = True
break
- if DELAY_SCREEN_OFF:
- # add additional delay time if necessary
- time.sleep(DELAY_SCREEN_OFF)
- print("...Screen off and device in suspend mode.")
- def collectMeasurements(self, measurementCount, rate , ensure_screen_off=True, verbose=True,
- plot_data = False):
- assert(measurementCount > 0)
- decimate_by = self._native_hz / rate or 1
- if ensure_screen_off:
- self.waitForScreenOff()
- print ("Taking measurements...")
+ if collected_baseline_measurements:
+ print("Verifying baseline state over a longer interval "
+ "in order to double check baseline state")
+ measurements = self.collectMeasurements(
+ NUMBER_MEASUREMENTS_BASELINE_VERIFICATION, PowerTest.RATE_NOMINAL,
+ verbose = False)
+ self.reportErrorRaiseExceptionIf(
+ not measurements, "No background measurements could be taken")
+ retval = self.computeBaselineState(measurements)
+ if retval[0]:
+ print("Verified baseline.")
+ if measurements and LOG_DATA_TO_FILE:
+ with open("/tmp/cts-power-tests-background-data.log", "w") as f:
+ for m in measurements:
+ f.write("%.4f\n" % m)
+ return retval
+ else:
+ return (False, -1)
+
+ def waitForApSuspendMode(self):
+ """This function repeatedly collects measurements until AP suspend and display off
+ mode is detected. After a maximum number of trials, if this state is not reached, it
+ raises an error.
+ Returns:
+ boolean which is True if device was detected to be in suspend state
+ Raises:
+ Power monitor-related exception
+ """
+ print("waitForApSuspendMode(): Sleeping for %d seconds" % DELAY_SCREEN_OFF)
+ time.sleep(DELAY_SCREEN_OFF)
+
+ NUMBER_MEASUREMENTS = 200
+ # Maximum trials for which to collect measurements to get to Ap suspend
+ # state
+ MAX_TRIALS = 50
+
+ got_to_suspend_state = False
+ for count in xrange(MAX_TRIALS):
+ print ("waitForApSuspendMode(): Trial %d of %d" % (count, MAX_TRIALS))
+ measurements = self.collectMeasurements(NUMBER_MEASUREMENTS,
+ PowerTest.RATE_NOMINAL,
+ verbose = False)
+ if self.isApInSuspendState(
+ measurements, PowerTest.MAX_PERCENTILE_AP_SCREEN_OFF_AMPS,
+ PowerTest.PERCENTILE_MAX_AP_SCREEN_OFF):
+ got_to_suspend_state = True
+ break
+ self.reportErrorRaiseExceptionIf(
+ got_to_suspend_state is False,
+ msg = "Unable to determine application processor suspend mode status.")
+ print("Got to AP suspend state")
+ return got_to_suspend_state
+
+ def collectMeasurements(self, measurementCount, rate, verbose = True):
+ """Args:
+ measurementCount: Number of measurements to collect from the power
+ monitor
+ rate: The integer frequency in Hertz at which to collect measurements from
+ the power monitor
+ Returns:
+ A list containing measurements from the power monitor; that has the
+ requested count of the number of measurements at the specified rate
+ """
+ assert (measurementCount > 0)
+ decimate_by = self._native_hz / rate or 1
+
self._power_monitor.StartDataCollection()
sub_measurements = []
measurements = []
@@ -273,47 +550,70 @@
tries = 0
sub_measurements.extend(additional)
while len(sub_measurements) >= decimate_by:
- sub_avg = sum(sub_measurements) / len(sub_measurements)
+ sub_avg = sum(sub_measurements[0:decimate_by]) / decimate_by
measurements.append(sub_avg)
sub_measurements = sub_measurements[decimate_by:]
if verbose:
+ # "\33[1A\33[2K" is a special Linux console control
+ # sequence for moving to the previous line, and
+ # erasing it; and reprinting new text on that
+ # erased line.
sys.stdout.write("\33[1A\33[2K")
- print ("MEASURED[%d]: %f" % (len(measurements),measurements[-1]))
+ print ("MEASURED[%d]: %f" % (len(measurements), measurements[-1]))
finally:
self._power_monitor.StopDataCollection()
- self.reportErrorIf(measurementCount > len(measurements),
- "Unable to collect all requested measurements")
+ self.reportErrorRaiseExceptionIf(measurementCount > len(measurements),
+ "Unable to collect all requested measurements")
return measurements
- def request_user_acknowledgment(self, msg):
+ def requestUserAcknowledgment(self, msg):
"""Post message to user on screen and wait for acknowledgment"""
response = self.executeOnDevice(PowerTest.REQUEST_USER_RESPONSE % msg)
- self.reportErrorIf(response != "OK", "Unable to request user acknowledgment")
+ self.reportErrorRaiseExceptionIf(
+ response != "OK", "Unable to request user acknowledgment")
- def setTestResult (self, testname, condition, msg):
- if condition is False:
- val = "FAIL"
- elif condition is True:
- val = "PASS"
- else:
- val = condition
- print ("Test %s : %s" % (testname, val))
- response = self.executeOnDevice(PowerTest.REQUEST_SET_TEST_RESULT % (testname, val, msg))
- self.reportErrorIf(response != "OK", "Unable to send test status to Verifier")
+ def setTestResult(self, test_name, test_result, test_message):
+ """
+ Reports the result of a test to the device
+ Args:
+ test_name: name of the test
+ test_result: Boolean result of the test (True means Pass)
+ test_message: Relevant message
+ """
+ print ("Test %s : %s" % (test_name, test_result))
+
+ response = (
+ self.executeOnDevice(
+ PowerTest.REQUEST_SET_TEST_RESULT %
+ (test_name, test_result, test_message)))
+ self.reportErrorRaiseExceptionIf(
+ response != "OK", "Unable to send test status to Verifier")
def setPowerOn(self, sensor, powered_on):
response = self.executeOnDevice(PowerTest.REQUEST_SENSOR_SWITCH %
- ({True:"ON", False:"OFF"}[powered_on], sensor))
- self.reportErrorIf(response == "ERR", "Unable to set sensor %s state" % sensor)
- logging.info("Set %s %s" % (sensor, {True:"ON", False:"OFF"}[powered_on]))
+ (("ON" if powered_on else "OFF"), sensor))
+ self.reportErrorRaiseExceptionIf(
+ response == "ERR", "Unable to set sensor %s state" % sensor)
+ logging.info("Set %s %s", sensor, ("ON" if powered_on else "OFF"))
return response
- def runPowerTest(self, sensor, max_power_allowed, user_request = None):
- if not signal_abort.empty():
- sys.exit( signal_abort.get() )
- self._current_test = "%s_Power_Test_While_%s" % (sensor,
- {True:"Under_Motion", False:"Still"}[user_request is not None])
+ def runSensorPowerTest(
+ self, sensor, max_amperes_allowed, baseline_amps, user_request = None):
+ """
+ Runs power test for a specific sensor; i.e. measures the amperes draw
+ of the phone using monsoon, with the specified sensor mregistered
+ and the phone in suspend state; and verifies that the incremental
+ consumed amperes is within expected bounds.
+ Args:
+ sensor: The specified sensor for which to run the power test
+ max_amperes_allowed: Maximum ampere draw of the device with the
+ sensor registered and device in suspend state
+ baseline_amps: The power draw of the device when it is in baseline
+ state (no sensors registered, display off, AP asleep)
+ """
+ self._current_test = ("%s_Power_Test_While_%s" % (
+ sensor, ("Under_Motion" if user_request is not None else "Still")))
try:
print ("\n\n---------------------------------")
if user_request is not None:
@@ -321,248 +621,299 @@
else:
print ("Running power test on %s while device is still." % sensor)
print ("---------------------------------")
- response = self.executeOnDevice(PowerTest.REQUEST_SENSOR_AVAILABILITY % sensor)
+ response = self.executeOnDevice(
+ PowerTest.REQUEST_SENSOR_AVAILABILITY % sensor)
if response == "UNAVAILABLE":
- self.setTestResult(self._current_test, condition="SKIPPED",
- msg="Sensor %s not available on this platform"%sensor)
+ self.setTestResult(
+ self._current_test, test_result = "SKIPPED",
+ test_message = "Sensor %s not available on this platform" % sensor)
self.setPowerOn("ALL", False)
if response == "UNAVAILABLE":
- self.setTestResult(self._current_test, condition="SKIPPED",
- msg="Sensor %s not available on this device"%sensor)
+ self.setTestResult(
+ self._current_test, test_result = "SKIPPED",
+ test_message = "Sensor %s not available on this device" % sensor)
return
-
- self.reportErrorIf(response != "OK", "Unable to set all sensor off")
- if not signal_abort.empty():
- sys.exit( signal_abort.get() )
+ self.reportErrorRaiseExceptionIf(response != "OK", "Unable to set all sensor off")
self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF)
self.setUsbEnabled(False)
- print("Collecting background measurements...")
- measurements = self.collectMeasurements( PowerTest.SAMPLE_COUNT_NOMINAL,
- PowerTest.RATE_NOMINAL,
- plot_data = True)
- if measurements and LOG_DATA_TO_FILE:
- with open( "/tmp/cts-power-tests-%s-%s-background-data.log"%(sensor,
- {True:"Under_Motion", False:"Still"}[user_request is not None] ),'w') as f:
- for m in measurements:
- f.write( "%.4f\n"%m)
- self.reportErrorIf(not measurements, "No background measurements could be taken")
- backgnd = sum(measurements) / len(measurements)
self.setUsbEnabled(True)
self.setPowerOn(sensor, True)
if user_request is not None:
print("===========================================\n" +
"==> Please follow the instructions presented on the device\n" +
- "==========================================="
- )
- self.request_user_acknowledgment(user_request)
+ "===========================================")
+ self.requestUserAcknowledgment(user_request)
self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF)
self.setUsbEnabled(False)
- self.reportErrorIf(response != "OK", "Unable to set sensor %s ON" % sensor)
+ self.reportErrorRaiseExceptionIf(
+ response != "OK", "Unable to set sensor %s ON" % sensor)
+
+ self.waitForApSuspendMode()
print ("Collecting sensor %s measurements" % sensor)
measurements = self.collectMeasurements(PowerTest.SAMPLE_COUNT_NOMINAL,
PowerTest.RATE_NOMINAL)
if measurements and LOG_DATA_TO_FILE:
- with open( "/tmp/cts-power-tests-%s-%s-sensor-data.log"%(sensor,
- {True:"Under_Motion", False:"Still"}[user_request is not None] ),'w') as f:
+ with open("/tmp/cts-power-tests-%s-%s-sensor-data.log" % (sensor,
+ ("Under_Motion" if user_request is not None else "Still")), "w") as f:
for m in measurements:
- f.write( "%.4f\n"%m)
+ f.write("%.4f\n" % m)
self.setUsbEnabled(True, verbose = False)
print("Saving raw data files to device...")
self.executeLocal("adb shell mkdir -p %s" % self._external_storage, False)
self.executeLocal("adb push %s %s/." % (f.name, self._external_storage))
self.setUsbEnabled(False, verbose = False)
- self.reportErrorIf(not measurements, "No measurements could be taken for %s" % sensor)
+ self.reportErrorRaiseExceptionIf(
+ not measurements, "No measurements could be taken for %s" % sensor)
avg = sum(measurements) / len(measurements)
- squared = [(m-avg)*(m-avg) for m in measurements]
+ squared = [(m - avg) * (m - avg) for m in measurements]
- import math
- stddev = math.sqrt(sum(squared)/len(squared))
- current_diff = avg - backgnd
+ stddev = math.sqrt(sum(squared) / len(squared))
+ current_diff = avg - baseline_amps
self.setUsbEnabled(True)
max_power = max(measurements) - avg
- if current_diff <= max_power_allowed:
+ if current_diff <= max_amperes_allowed:
# TODO: fail the test of background > current
- message = ("Draw is within limits. Current:%f Background:%f Measured: %f Stddev: %f Peak: %f")%\
- ( current_diff*1000.0, backgnd*1000.0, avg*1000.0, stddev*1000.0, max_power*1000.0)
+ message = (
+ "Draw is within limits. Sensor delta:%f mAmp Baseline:%f "
+ "mAmp Sensor: %f mAmp Stddev : %f mAmp Peak: %f mAmp") % (
+ current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0,
+ stddev * 1000.0, max_power * 1000.0)
else:
- message = ("Draw is too high. Current:%f Background:%f Measured: %f Stddev: %f Peak: %f")%\
- ( current_diff*1000.0, backgnd*1000.0, avg*1000.0, stddev*1000.0, max_power*1000.0)
- self.setTestResult( testname = self._current_test,
- condition = current_diff <= max_power_allowed,
- msg = message)
- print("Result: "+message)
+ message = (
+ "Draw is too high. Current:%f Background:%f Measured: %f "
+ "Stddev: %f Peak: %f") % (
+ current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0,
+ stddev * 1000.0, max_power * 1000.0)
+ self.setTestResult(
+ self._current_test,
+ ("PASS" if (current_diff <= max_amperes_allowed) else "FAIL"),
+ message)
+ print("Result: " + message)
except:
- import traceback
traceback.print_exc()
- self.setTestResult(self._current_test, condition="FAIL",
- msg="Exception occurred during run of test.")
-
+ self.setTestResult(self._current_test, test_result = "FAIL",
+ test_message = "Exception occurred during run of test.")
+ raise
@staticmethod
- def run_tests():
+ def runTests(max_baseline_amps):
testrunner = None
try:
- GENERIC_MOTION_REQUEST = "\n===> Please press Next and when the screen is off, keep " + \
- "the device under motion with only tiny, slow movements until the screen turns " + \
- "on again.\nPlease refrain from interacting with the screen or pressing any side " + \
- "buttons while measurements are taken."
- USER_STEPS_REQUEST = "\n===> Please press Next and when the screen is off, then move " + \
- "the device to simulate step motion until the screen turns on again.\nPlease " + \
- "refrain from interacting with the screen or pressing any side buttons while " + \
- "measurements are taken."
- testrunner = PowerTest()
- testrunner.executeOnDevice(PowerTest.REQUEST_SHOW_MESSAGE % "Connected. Running tests...")
- testrunner.runPowerTest("SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_POWER, user_request = GENERIC_MOTION_REQUEST)
- testrunner.runPowerTest("STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_POWER, user_request = USER_STEPS_REQUEST)
- testrunner.runPowerTest("STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_POWER, user_request = USER_STEPS_REQUEST)
- testrunner.runPowerTest("ACCELEROMETER", PowerTest.MAX_ACCEL_POWER, user_request = GENERIC_MOTION_REQUEST)
- testrunner.runPowerTest("MAGNETIC_FIELD", PowerTest.MAX_MAG_POWER, user_request = GENERIC_MOTION_REQUEST)
- testrunner.runPowerTest("GYROSCOPE", PowerTest.MAX_GYRO_POWER, user_request = GENERIC_MOTION_REQUEST)
- testrunner.runPowerTest("ACCELEROMETER", PowerTest.MAX_ACCEL_POWER, user_request = None)
- testrunner.runPowerTest("MAGNETIC_FIELD", PowerTest.MAX_MAG_POWER, user_request = None)
- testrunner.runPowerTest("GYROSCOPE", PowerTest.MAX_GYRO_POWER, user_request = None)
- testrunner.runPowerTest("SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_POWER, user_request = None)
- testrunner.runPowerTest("STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_POWER, user_request = None)
- testrunner.runPowerTest("STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_POWER, user_request = None)
+ GENERIC_MOTION_REQUEST = ("\n===> Please press Next and when the "
+ "screen is off, keep the device under motion with only tiny, "
+ "slow movements until the screen turns on again.\nPlease "
+ "refrain from interacting with the screen or pressing any side "
+ "buttons while measurements are taken.")
+ USER_STEPS_REQUEST = ("\n===> Please press Next and when the "
+ "screen is off, then move the device to simulate step motion "
+ "until the screen turns on again.\nPlease refrain from "
+ "interacting with the screen or pressing any side buttons "
+ "while measurements are taken.")
+ testrunner = PowerTest(max_baseline_amps)
+ testrunner.executeOnDevice(
+ PowerTest.REQUEST_SHOW_MESSAGE % "Connected. Running tests...")
+ is_baseline_success, baseline_amps = testrunner.getBaselineState()
+
+ if is_baseline_success:
+ testrunner.setUsbEnabled(True)
+ # TODO: Enable testing a single sensor
+ testrunner.runSensorPowerTest(
+ "SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps,
+ user_request = GENERIC_MOTION_REQUEST)
+ testrunner.runSensorPowerTest(
+ "STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps,
+ user_request = USER_STEPS_REQUEST)
+ testrunner.runSensorPowerTest(
+ "STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps,
+ user_request = USER_STEPS_REQUEST)
+ testrunner.runSensorPowerTest(
+ "ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps,
+ user_request = GENERIC_MOTION_REQUEST)
+ testrunner.runSensorPowerTest(
+ "MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps,
+ user_request = GENERIC_MOTION_REQUEST)
+ testrunner.runSensorPowerTest(
+ "GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps,
+ user_request = GENERIC_MOTION_REQUEST)
+ testrunner.runSensorPowerTest(
+ "ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps,
+ user_request = None)
+ testrunner.runSensorPowerTest(
+ "MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps,
+ user_request = None)
+ testrunner.runSensorPowerTest(
+ "GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps,
+ user_request = None)
+ testrunner.runSensorPowerTest(
+ "SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps,
+ user_request = None)
+ testrunner.runSensorPowerTest(
+ "STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps,
+ user_request = None)
+ testrunner.runSensorPowerTest(
+ "STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps,
+ user_request = None)
+ else:
+ print("Could not get to baseline state. This is either because "
+ "in several trials, the monitor could not measure a set "
+ "of power measurements that had the specified low "
+ "variance or the mean measurements were below the "
+ "expected value. None of the sensor power measurement "
+ " tests were performed due to not being able to detect "
+ "baseline state. Please re-run the power tests.")
+ except KeyboardInterrupt:
+ print "Keyboard interrupt from user."
+ raise
except:
import traceback
traceback.print_exc()
finally:
- signal_exit_q.put(0) # anything will signal thread to terminate
logging.info("TESTS COMPLETE")
if testrunner:
try:
testrunner.finalize()
except socket.error:
- sys.exit("============================\nUnable to connect to device under " + \
- "test. Make sure the device is connected via the usb pass-through, " + \
- "the CtsVerifier app is running the SensorPowerTest on the device, " + \
- "and USB pass-through is enabled.\n===========================")
-
+ sys.exit(
+ "===================================================\n"
+ "Unable to connect to device under test. Make sure \n"
+ "the device is connected via the usb pass-through, \n"
+ "the CtsVerifier app is running the SensorPowerTest on \n"
+ "the device, and USB pass-through is enabled.\n"
+ "===================================================")
def main(argv):
- """ Simple command-line interface for a power test application."""
- useful_flags = ["voltage", "status", "usbpassthrough",
- "samples", "current", "log", "power_monitor"]
- if not [f for f in useful_flags if FLAGS.get(f, None) is not None]:
- print __doc__.strip()
- print FLAGS.MainModuleHelp()
- return
+ """ Simple command-line interface for a power test application."""
+ useful_flags = ["voltage", "status", "usbpassthrough",
+ "samples", "current", "log", "power_monitor"]
+ if not [f for f in useful_flags if FLAGS.get(f, None) is not None]:
+ print __doc__.strip()
+ print FLAGS.MainModuleHelp()
+ return
- if FLAGS.avg and FLAGS.avg < 0:
- loggign.error("--avg must be greater than 0")
- return
+ if FLAGS.avg and FLAGS.avg < 0:
+ logging.error("--avg must be greater than 0")
+ return
- if FLAGS.voltage is not None:
- if FLAGS.voltage > 5.5:
- print("!!WARNING: Voltage higher than typical values!!!")
- try:
- response = raw_input("Voltage of %.3f requested. Confirm this is correct (Y/N)"%FLAGS.voltage)
- if response.upper() != "Y":
- sys.exit("Aborting")
- except:
- sys.exit("Aborting.")
+ if FLAGS.voltage is not None:
+ if FLAGS.voltage > 5.5:
+ print("!!WARNING: Voltage higher than typical values!!!")
+ try:
+ response = raw_input(
+ "Voltage of %.3f requested. Confirm this is correct (Y/N)" %
+ FLAGS.voltage)
+ if response.upper() != "Y":
+ sys.exit("Aborting")
+ except:
+ sys.exit("Aborting.")
- if not FLAGS.power_monitor:
- sys.exit("You must specify a '--power_monitor' option to specify which power monitor type " + \
- "you are using.\nOne of:\n \n ".join(available_monitors))
- power_monitors = do_import('power_monitors.%s' % FLAGS.power_monitor)
- try:
- mon = power_monitors.Power_Monitor(device=FLAGS.device)
- except:
- import traceback
- traceback.print_exc()
- sys.exit("No power monitors found")
-
- if FLAGS.voltage is not None:
-
- if FLAGS.ramp is not None:
- mon.RampVoltage(mon.start_voltage, FLAGS.voltage)
- else:
- mon.SetVoltage(FLAGS.voltage)
-
- if FLAGS.current is not None:
- mon.SetMaxCurrent(FLAGS.current)
-
- if FLAGS.status:
- items = sorted(mon.GetStatus().items())
- print "\n".join(["%s: %s" % item for item in items])
-
- if FLAGS.usbpassthrough:
- if FLAGS.usbpassthrough == 'off':
- mon.SetUsbPassthrough(0)
- elif FLAGS.usbpassthrough == 'on':
- mon.SetUsbPassthrough(1)
- elif FLAGS.usbpassthrough == 'auto':
- mon.SetUsbPassthrough(2)
- else:
- mon.Close()
- sys.exit('bad pass-through flag: %s' % FLAGS.usbpassthrough)
-
- if FLAGS.samples:
- # Make sure state is normal
- mon.StopDataCollection()
- status = mon.GetStatus()
- native_hz = status["sampleRate"] * 1000
-
- # Collect and average samples as specified
- mon.StartDataCollection()
-
- # In case FLAGS.hz doesn't divide native_hz exactly, use this invariant:
- # 'offset' = (consumed samples) * FLAGS.hz - (emitted samples) * native_hz
- # This is the error accumulator in a variation of Bresenham's algorithm.
- emitted = offset = 0
- collected = []
- history_deque = collections.deque() # past n samples for rolling average
-
- try:
- last_flush = time.time()
- while emitted < FLAGS.samples or FLAGS.samples == -1:
- # The number of raw samples to consume before emitting the next output
- need = (native_hz - offset + FLAGS.hz - 1) / FLAGS.hz
- if need > len(collected): # still need more input samples
- samples = mon.CollectData()
- if not samples: break
- collected.extend(samples)
- else:
- # Have enough data, generate output samples.
- # Adjust for consuming 'need' input samples.
- offset += need * FLAGS.hz
- while offset >= native_hz: # maybe multiple, if FLAGS.hz > native_hz
- this_sample = sum(collected[:need]) / need
-
- if FLAGS.timestamp: print int(time.time()),
-
- if FLAGS.avg:
- history_deque.appendleft(this_sample)
- if len(history_deque) > FLAGS.avg: history_deque.pop()
- print "%f %f" % (this_sample,
- sum(history_deque) / len(history_deque))
- else:
- print "%f" % this_sample
- sys.stdout.flush()
-
- offset -= native_hz
- emitted += 1 # adjust for emitting 1 output sample
- collected = collected[need:]
- now = time.time()
- if now - last_flush >= 0.99: # flush every second
- sys.stdout.flush()
- last_flush = now
- except KeyboardInterrupt:
- print("interrupted")
- return 1
- finally:
- mon.Close()
- return 0
-
- if FLAGS.run:
if not FLAGS.power_monitor:
- sys.exit("When running power tests, you must specify which type of power monitor to use" +
- " with '--power_monitor <type of power monitor>'")
- PowerTest.run_tests()
+ sys.exit(
+ "You must specify a '--power_monitor' option to specify which power "
+ "monitor type " +
+ "you are using.\nOne of:\n \n ".join(available_monitors))
+ power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor)
+ try:
+ mon = power_monitors.Power_Monitor(device = FLAGS.device)
+ except:
+ import traceback
+ traceback.print_exc()
+ sys.exit("No power monitors found")
+
+ if FLAGS.voltage is not None:
+
+ if FLAGS.ramp is not None:
+ mon.RampVoltage(mon.start_voltage, FLAGS.voltage)
+ else:
+ mon.SetVoltage(FLAGS.voltage)
+
+ if FLAGS.current is not None:
+ mon.SetMaxCurrent(FLAGS.current)
+
+ if FLAGS.status:
+ items = sorted(mon.GetStatus().items())
+ print "\n".join(["%s: %s" % item for item in items])
+
+ if FLAGS.usbpassthrough:
+ if FLAGS.usbpassthrough == "off":
+ mon.SetUsbPassthrough(0)
+ elif FLAGS.usbpassthrough == "on":
+ mon.SetUsbPassthrough(1)
+ elif FLAGS.usbpassthrough == "auto":
+ mon.SetUsbPassthrough(2)
+ else:
+ mon.Close()
+ sys.exit("bad pass-through flag: %s" % FLAGS.usbpassthrough)
+
+ if FLAGS.samples:
+ # Make sure state is normal
+ mon.StopDataCollection()
+ status = mon.GetStatus()
+ native_hz = status["sampleRate"] * 1000
+
+ # Collect and average samples as specified
+ mon.StartDataCollection()
+
+ # In case FLAGS.hz doesn't divide native_hz exactly, use this invariant:
+ # 'offset' = (consumed samples) * FLAGS.hz - (emitted samples) * native_hz
+ # This is the error accumulator in a variation of Bresenham's algorithm.
+ emitted = offset = 0
+ collected = []
+ history_deque = collections.deque() # past n samples for rolling average
+
+ # TODO: Complicated lines of code below. Refactoring needed
+ try:
+ last_flush = time.time()
+ while emitted < FLAGS.samples or FLAGS.samples == -1:
+ # The number of raw samples to consume before emitting the next output
+ need = (native_hz - offset + FLAGS.hz - 1) / FLAGS.hz
+ if need > len(collected): # still need more input samples
+ samples = mon.CollectData()
+ if not samples: break
+ collected.extend(samples)
+ else:
+ # Have enough data, generate output samples.
+ # Adjust for consuming 'need' input samples.
+ offset += need * FLAGS.hz
+ while offset >= native_hz: # maybe multiple, if FLAGS.hz > native_hz
+ this_sample = sum(collected[:need]) / need
+
+ if FLAGS.timestamp: print int(time.time()),
+
+ if FLAGS.avg:
+ history_deque.appendleft(this_sample)
+ if len(history_deque) > FLAGS.avg: history_deque.pop()
+ print "%f %f" % (this_sample,
+ sum(history_deque) / len(history_deque))
+ else:
+ print "%f" % this_sample
+ sys.stdout.flush()
+
+ offset -= native_hz
+ emitted += 1 # adjust for emitting 1 output sample
+ collected = collected[need:]
+ now = time.time()
+ if now - last_flush >= 0.99: # flush every second
+ sys.stdout.flush()
+ last_flush = now
+ except KeyboardInterrupt:
+ print("interrupted")
+ return 1
+ finally:
+ mon.Close()
+ return 0
+
+ if FLAGS.run:
+ if not FLAGS.power_monitor:
+ sys.exit(
+ "When running power tests, you must specify which type of power "
+ "monitor to use" +
+ " with '--power_monitor <type of power monitor>'")
+ try:
+ PowerTest.runTests(FLAGS.max_baseline_amps)
+ except KeyboardInterrupt:
+ print "Keyboard interrupt from user"
if __name__ == "__main__":
flags.DEFINE_boolean("status", None, "Print power meter status")
@@ -581,5 +932,6 @@
flags.DEFINE_boolean("log", False, "Log progress to a file or not")
flags.DEFINE_boolean("run", False, "Run the test suite for power")
flags.DEFINE_string("power_monitor", None, "Type of power monitor to use")
+ flags.DEFINE_float("max_baseline_amps", 0.005,
+ "Set maximum baseline current for device being tested")
sys.exit(main(FLAGS(sys.argv)))
-