| #!/usr/bin/python |
| |
| # Copyright (C) 2014 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| import os.path |
| import select |
| import sys |
| import time |
| import collections |
| import socket |
| import gflags as flags # http://code.google.com/p/python-gflags/ |
| import pkgutil |
| import threading |
| import Queue |
| |
| # queue to signal thread to exit |
| signal_exit_q = Queue.Queue() |
| signal_abort = Queue.Queue() |
| |
| # let this script know about the power monitor implementations |
| sys.path = [os.path.basename(__file__)] + sys.path |
| available_monitors = [name for _, name, _ in pkgutil.iter_modules( |
| [os.path.join(os.path.dirname(__file__),'power_monitors')]) if not name.startswith('_')] |
| |
| APK = os.path.join( os.path.dirname(__file__), '..', "CtsVerifier.apk") |
| |
| FLAGS = flags.FLAGS |
| |
| # whether to use a strict delay to ensure screen is off, or attempt to use power measurements |
| USE_STRICT_DELAY = False |
| if USE_STRICT_DELAY: |
| DELAY_SCREEN_OFF = 30.0 |
| else: |
| DELAY_SCREEN_OFF = 2.0 |
| |
| # whether to log data collected to a file for each sensor run: |
| LOG_DATA_TO_FILE = True |
| |
| logging.getLogger().setLevel(logging.ERROR) |
| |
| def do_import(name): |
| """import a module by name dynamically""" |
| mod = __import__(name) |
| components = name.split('.') |
| for comp in components[1:]: |
| mod = getattr(mod, comp) |
| return mod |
| |
| |
| class PowerTest: |
| """Class to run a suite of power tests""" |
| |
| # Thresholds for max allowed power usage per sensor tested |
| MAX_ACCEL_POWER = 0.08 # Amps |
| MAX_MAG_POWER = 0.08 # Amps |
| MAX_GYRO_POWER = 0.08 # Amps |
| MAX_SIGMO_POWER = 0.08 # Amps |
| MAX_STEP_COUNTER_POWER = 0.08 # Amps |
| MAX_STEP_DETECTOR_POWER = 0.08 # Amps |
| |
| |
| PORT = 0 # any available port |
| DOMAIN_NAME = "/android/cts/powertest" |
| SAMPLE_COUNT_NOMINAL = 1000 |
| RATE_NOMINAL = 100 |
| |
| REQUEST_EXTERNAL_STORAGE = "EXTERNAL STORAGE?" |
| REQUEST_EXIT = "EXIT" |
| REQUEST_RAISE = "RAISE %s %s" |
| REQUEST_USER_RESPONSE = "USER RESPONSE %s" |
| REQUEST_SET_TEST_RESULT = "SET TEST RESULT %s %s %s" |
| REQUEST_SENSOR_SWITCH = "SENSOR %s %s" |
| REQUEST_SENSOR_AVAILABILITY = "SENSOR? %s" |
| REQUEST_SCREEN_OFF = "SCREEN OFF" |
| REQUEST_SHOW_MESSAGE = "MESSAGE %s" |
| |
| |
| def __init__(self): |
| power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor) |
| testid = time.strftime("%d_%m_%Y__%H__%M_%S") |
| self._power_monitor = power_monitors.Power_Monitor(log_file_id = testid) |
| print ("Establishing connection to device...") |
| self.setUsbEnabled(True) |
| status = self._power_monitor.GetStatus() |
| self._native_hz = status["sampleRate"] * 1000 |
| self._current_test = "None" |
| self._external_storage = self.executeOnDevice(PowerTest.REQUEST_EXTERNAL_STORAGE, |
| reportErrors=True) |
| |
| def __del__(self): |
| self.finalize() |
| |
| def finalize(self): |
| """To be called upon termination of host connection to device""" |
| if PowerTest.PORT > 0: |
| # tell device side to exit connection loop, and remove the forwarding connection |
| self.executeOnDevice(PowerTest.REQUEST_EXIT, reportErrors=False) |
| self.executeLocal("adb forward --remove tcp:%d" % PowerTest.PORT) |
| PowerTest.PORT = 0 |
| if self._power_monitor: |
| self._power_monitor.Close() |
| self._power_monitor = None |
| |
| def _send(self, msg, report_errors=True): |
| """Connect to the device, send the given command, and then disconnect""" |
| if PowerTest.PORT == 0: |
| # on first attempt to send a command, connect to device via any open port number, |
| # forwarding that port to a local socket on the device via adb |
| logging.debug("Seeking port for communication...") |
| # discover an open port |
| dummysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| dummysocket.bind(("localhost", 0)) |
| (_, PowerTest.PORT) = dummysocket.getsockname() |
| dummysocket.close() |
| assert(PowerTest.PORT > 0) |
| status = self.executeLocal("adb forward tcp:%d localabstract:%s" % |
| (PowerTest.PORT, PowerTest.DOMAIN_NAME)) |
| if report_errors: |
| self.reportErrorIf(status != 0, msg="Unable to forward requests to client over adb") |
| logging.info("Forwarding requests over local port %d" % PowerTest.PORT) |
| |
| link = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| |
| try: |
| logging.debug("Connecting to device...") |
| link.connect (("localhost", PowerTest.PORT)) |
| logging.debug("Connected.") |
| except: |
| if report_errors: |
| self.reportErrorIf(True, msg="Unable to communicate with device: connection refused") |
| logging.debug("Sending '%s'" % msg) |
| link.sendall(msg) |
| logging.debug("Getting response...") |
| response = link.recv(4096) |
| logging.debug("Got response '%s'" % response) |
| link.close() |
| return response |
| |
| def queryDevice(self, query): |
| """Post a yes/no query to the device, return True upon successful query, False otherwise""" |
| logging.info("Querying device with '%s'" % query) |
| return self._send(query) == "OK" |
| |
| # TODO: abstract device communication (and string commands) into its own class |
| def executeOnDevice(self, cmd , reportErrors=True): |
| """Execute a (string) command on the remote device""" |
| return self._send(cmd , reportErrors) |
| |
| def executeLocal(self, cmd, check_status=True): |
| """execute a shell command locally (on the host)""" |
| from subprocess import call |
| status = call(cmd.split(' ')) |
| if status != 0 and check_status: |
| logging.error("Failed to execute \"%s\"" % cmd) |
| else: |
| logging.debug("Executed \"%s\"" % cmd) |
| return status |
| |
| def reportErrorIf(self, condition, msg): |
| """Report an error condition to the device if condition is True. |
| Will raise an exception on the device if condition is True""" |
| if condition: |
| try: |
| logging.error("Exiting on error: %s" % msg) |
| self.executeOnDevice(PowerTest.REQUEST_RAISE % (self._current_test, msg), False) |
| except: |
| |
| logging.error("Unable to communicate with device to report error: %s" % msg) |
| self.finalize() |
| sys.exit(msg) |
| raise Exception(msg) |
| |
| def setUsbEnabled(self, enabled, verbose=True): |
| if enabled: |
| val = 1 |
| else: |
| val = 0 |
| self._power_monitor.SetUsbPassthrough(val) |
| tries = 0 |
| |
| # Sometimes command won't go through first time, particularly if immediately after a data |
| # collection, so allow for retries |
| status = self._power_monitor.GetStatus() |
| while status is None and tries < 5: |
| tries += 1 |
| time.sleep(2.0) |
| logging.error("Retrying get status call...") |
| self._power_monitor.StopDataCollection() |
| self._power_monitor.SetUsbPassthrough(val) |
| status = self._power_monitor.GetStatus() |
| |
| if enabled: |
| if verbose: print("...USB enabled, waiting for device") |
| self.executeLocal ("adb wait-for-device") |
| if verbose: print("...device online") |
| else: |
| if verbose: logging.info("...USB disabled") |
| # re-establish port forwarding |
| if enabled and PowerTest.PORT > 0: |
| status = self.executeLocal("adb forward tcp:%d localabstract:%s" % |
| (PowerTest.PORT, PowerTest.DOMAIN_NAME)) |
| self.reportErrorIf(status != 0, msg="Unable to forward requests to client over adb") |
| |
| def waitForScreenOff(self): |
| # disconnect of USB will cause screen to go on, so must wait (1 second more than screen off |
| # timeout) |
| if USE_STRICT_DELAY: |
| time.sleep(DELAY_SCREEN_OFF) |
| return |
| |
| # need at least 100 sequential clean low-power measurements to know screen is off |
| THRESHOLD_COUNT_LOW_POWER = 100 |
| CURRENT_LOW_POWER_THRESHOLD = 0.060 # Amps |
| TIMEOUT_SCREEN_OFF = 30 # this many tries at most |
| count_good = 0 |
| tries = 0 |
| print("Waiting for screen off and application processor in suspend mode...") |
| while count_good < THRESHOLD_COUNT_LOW_POWER: |
| measurements = self.collectMeasurements(THRESHOLD_COUNT_LOW_POWER, |
| PowerTest.RATE_NOMINAL, |
| ensure_screen_off=False, |
| verbose=False) |
| count_good = len([m for m in measurements |
| if m < CURRENT_LOW_POWER_THRESHOLD]) |
| tries += 1 |
| if count_good < THRESHOLD_COUNT_LOW_POWER and measurements: |
| print("Current usage: %.2f mAmps. Device is probably not in suspend mode. Waiting..." % |
| (1000.0*(sum(measurements)/len(measurements)))) |
| if tries >= TIMEOUT_SCREEN_OFF: |
| # TODO: dump the state of sensor service to identify if there are features using sensors |
| self.reportErrorIf(tries>=TIMEOUT_SCREEN_OFF, |
| msg="Unable to determine application processor suspend mode status.") |
| break |
| if DELAY_SCREEN_OFF: |
| # add additional delay time if necessary |
| time.sleep(DELAY_SCREEN_OFF) |
| print("...Screen off and device in suspend mode.") |
| |
| def collectMeasurements(self, measurementCount, rate , ensure_screen_off=True, verbose=True, |
| plot_data = False): |
| assert(measurementCount > 0) |
| decimate_by = self._native_hz / rate or 1 |
| if ensure_screen_off: |
| self.waitForScreenOff() |
| print ("Taking measurements...") |
| self._power_monitor.StartDataCollection() |
| sub_measurements = [] |
| measurements = [] |
| tries = 0 |
| if verbose: print("") |
| try: |
| while len(measurements) < measurementCount and tries < 5: |
| if tries: |
| self._power_monitor.StopDataCollection() |
| self._power_monitor.StartDataCollection() |
| time.sleep(1.0) |
| tries += 1 |
| additional = self._power_monitor.CollectData() |
| if additional is not None: |
| tries = 0 |
| sub_measurements.extend(additional) |
| while len(sub_measurements) >= decimate_by: |
| sub_avg = sum(sub_measurements) / len(sub_measurements) |
| measurements.append(sub_avg) |
| sub_measurements = sub_measurements[decimate_by:] |
| if verbose: |
| sys.stdout.write("\33[1A\33[2K") |
| print ("MEASURED[%d]: %f" % (len(measurements),measurements[-1])) |
| finally: |
| self._power_monitor.StopDataCollection() |
| |
| self.reportErrorIf(measurementCount > len(measurements), |
| "Unable to collect all requested measurements") |
| return measurements |
| |
| def request_user_acknowledgment(self, msg): |
| """Post message to user on screen and wait for acknowledgment""" |
| response = self.executeOnDevice(PowerTest.REQUEST_USER_RESPONSE % msg) |
| self.reportErrorIf(response != "OK", "Unable to request user acknowledgment") |
| |
| def setTestResult (self, testname, condition, msg): |
| if condition is False: |
| val = "FAIL" |
| elif condition is True: |
| val = "PASS" |
| else: |
| val = condition |
| print ("Test %s : %s" % (testname, val)) |
| response = self.executeOnDevice(PowerTest.REQUEST_SET_TEST_RESULT % (testname, val, msg)) |
| self.reportErrorIf(response != "OK", "Unable to send test status to Verifier") |
| |
| def setPowerOn(self, sensor, powered_on): |
| response = self.executeOnDevice(PowerTest.REQUEST_SENSOR_SWITCH % |
| ({True:"ON", False:"OFF"}[powered_on], sensor)) |
| self.reportErrorIf(response == "ERR", "Unable to set sensor %s state" % sensor) |
| logging.info("Set %s %s" % (sensor, {True:"ON", False:"OFF"}[powered_on])) |
| return response |
| |
| def runPowerTest(self, sensor, max_power_allowed, user_request = None): |
| if not signal_abort.empty(): |
| sys.exit( signal_abort.get() ) |
| self._current_test = "%s_Power_Test_While_%s" % (sensor, |
| {True:"Under_Motion", False:"Still"}[user_request is not None]) |
| try: |
| print ("\n\n---------------------------------") |
| if user_request is not None: |
| print ("Running power test on %s under motion." % sensor) |
| else: |
| print ("Running power test on %s while device is still." % sensor) |
| print ("---------------------------------") |
| response = self.executeOnDevice(PowerTest.REQUEST_SENSOR_AVAILABILITY % sensor) |
| if response == "UNAVAILABLE": |
| self.setTestResult(self._current_test, condition="SKIPPED", |
| msg="Sensor %s not available on this platform"%sensor) |
| self.setPowerOn("ALL", False) |
| if response == "UNAVAILABLE": |
| self.setTestResult(self._current_test, condition="SKIPPED", |
| msg="Sensor %s not available on this device"%sensor) |
| return |
| |
| self.reportErrorIf(response != "OK", "Unable to set all sensor off") |
| if not signal_abort.empty(): |
| sys.exit( signal_abort.get() ) |
| self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF) |
| self.setUsbEnabled(False) |
| print("Collecting background measurements...") |
| measurements = self.collectMeasurements( PowerTest.SAMPLE_COUNT_NOMINAL, |
| PowerTest.RATE_NOMINAL, |
| plot_data = True) |
| if measurements and LOG_DATA_TO_FILE: |
| with open( "/tmp/cts-power-tests-%s-%s-background-data.log"%(sensor, |
| {True:"Under_Motion", False:"Still"}[user_request is not None] ),'w') as f: |
| for m in measurements: |
| f.write( "%.4f\n"%m) |
| self.reportErrorIf(not measurements, "No background measurements could be taken") |
| backgnd = sum(measurements) / len(measurements) |
| self.setUsbEnabled(True) |
| self.setPowerOn(sensor, True) |
| if user_request is not None: |
| print("===========================================\n" + |
| "==> Please follow the instructions presented on the device\n" + |
| "===========================================" |
| ) |
| self.request_user_acknowledgment(user_request) |
| self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF) |
| self.setUsbEnabled(False) |
| self.reportErrorIf(response != "OK", "Unable to set sensor %s ON" % sensor) |
| print ("Collecting sensor %s measurements" % sensor) |
| measurements = self.collectMeasurements(PowerTest.SAMPLE_COUNT_NOMINAL, |
| PowerTest.RATE_NOMINAL) |
| |
| if measurements and LOG_DATA_TO_FILE: |
| with open( "/tmp/cts-power-tests-%s-%s-sensor-data.log"%(sensor, |
| {True:"Under_Motion", False:"Still"}[user_request is not None] ),'w') as f: |
| for m in measurements: |
| f.write( "%.4f\n"%m) |
| self.setUsbEnabled(True, verbose = False) |
| print("Saving raw data files to device...") |
| self.executeLocal("adb shell mkdir -p %s" % self._external_storage, False) |
| self.executeLocal("adb push %s %s/." % (f.name, self._external_storage)) |
| self.setUsbEnabled(False, verbose = False) |
| self.reportErrorIf(not measurements, "No measurements could be taken for %s" % sensor) |
| avg = sum(measurements) / len(measurements) |
| squared = [(m-avg)*(m-avg) for m in measurements] |
| |
| import math |
| stddev = math.sqrt(sum(squared)/len(squared)) |
| current_diff = avg - backgnd |
| self.setUsbEnabled(True) |
| max_power = max(measurements) - avg |
| if current_diff <= max_power_allowed: |
| # TODO: fail the test of background > current |
| message = ("Draw is within limits. Current:%f Background:%f Measured: %f Stddev: %f Peak: %f")%\ |
| ( current_diff*1000.0, backgnd*1000.0, avg*1000.0, stddev*1000.0, max_power*1000.0) |
| else: |
| message = ("Draw is too high. Current:%f Background:%f Measured: %f Stddev: %f Peak: %f")%\ |
| ( current_diff*1000.0, backgnd*1000.0, avg*1000.0, stddev*1000.0, max_power*1000.0) |
| self.setTestResult( testname = self._current_test, |
| condition = current_diff <= max_power_allowed, |
| msg = message) |
| print("Result: "+message) |
| except: |
| import traceback |
| traceback.print_exc() |
| self.setTestResult(self._current_test, condition="FAIL", |
| msg="Exception occurred during run of test.") |
| |
| |
| @staticmethod |
| def run_tests(): |
| testrunner = None |
| try: |
| GENERIC_MOTION_REQUEST = "\n===> Please press Next and when the screen is off, keep " + \ |
| "the device under motion with only tiny, slow movements until the screen turns " + \ |
| "on again.\nPlease refrain from interacting with the screen or pressing any side " + \ |
| "buttons while measurements are taken." |
| USER_STEPS_REQUEST = "\n===> Please press Next and when the screen is off, then move " + \ |
| "the device to simulate step motion until the screen turns on again.\nPlease " + \ |
| "refrain from interacting with the screen or pressing any side buttons while " + \ |
| "measurements are taken." |
| testrunner = PowerTest() |
| testrunner.executeOnDevice(PowerTest.REQUEST_SHOW_MESSAGE % "Connected. Running tests...") |
| testrunner.runPowerTest("SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_POWER, user_request = GENERIC_MOTION_REQUEST) |
| testrunner.runPowerTest("STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_POWER, user_request = USER_STEPS_REQUEST) |
| testrunner.runPowerTest("STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_POWER, user_request = USER_STEPS_REQUEST) |
| testrunner.runPowerTest("ACCELEROMETER", PowerTest.MAX_ACCEL_POWER, user_request = GENERIC_MOTION_REQUEST) |
| testrunner.runPowerTest("MAGNETIC_FIELD", PowerTest.MAX_MAG_POWER, user_request = GENERIC_MOTION_REQUEST) |
| testrunner.runPowerTest("GYROSCOPE", PowerTest.MAX_GYRO_POWER, user_request = GENERIC_MOTION_REQUEST) |
| testrunner.runPowerTest("ACCELEROMETER", PowerTest.MAX_ACCEL_POWER, user_request = None) |
| testrunner.runPowerTest("MAGNETIC_FIELD", PowerTest.MAX_MAG_POWER, user_request = None) |
| testrunner.runPowerTest("GYROSCOPE", PowerTest.MAX_GYRO_POWER, user_request = None) |
| testrunner.runPowerTest("SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_POWER, user_request = None) |
| testrunner.runPowerTest("STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_POWER, user_request = None) |
| testrunner.runPowerTest("STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_POWER, user_request = None) |
| except: |
| import traceback |
| traceback.print_exc() |
| finally: |
| signal_exit_q.put(0) # anything will signal thread to terminate |
| logging.info("TESTS COMPLETE") |
| if testrunner: |
| try: |
| testrunner.finalize() |
| except socket.error: |
| sys.exit("============================\nUnable to connect to device under " + \ |
| "test. Make sure the device is connected via the usb pass-through, " + \ |
| "the CtsVerifier app is running the SensorPowerTest on the device, " + \ |
| "and USB pass-through is enabled.\n===========================") |
| |
| |
| def main(argv): |
| """ Simple command-line interface for a power test application.""" |
| useful_flags = ["voltage", "status", "usbpassthrough", |
| "samples", "current", "log", "power_monitor"] |
| if not [f for f in useful_flags if FLAGS.get(f, None) is not None]: |
| print __doc__.strip() |
| print FLAGS.MainModuleHelp() |
| return |
| |
| if FLAGS.avg and FLAGS.avg < 0: |
| loggign.error("--avg must be greater than 0") |
| return |
| |
| if FLAGS.voltage is not None: |
| if FLAGS.voltage > 5.5: |
| print("!!WARNING: Voltage higher than typical values!!!") |
| try: |
| response = raw_input("Voltage of %.3f requested. Confirm this is correct (Y/N)"%FLAGS.voltage) |
| if response.upper() != "Y": |
| sys.exit("Aborting") |
| except: |
| sys.exit("Aborting.") |
| |
| if not FLAGS.power_monitor: |
| sys.exit("You must specify a '--power_monitor' option to specify which power monitor type " + \ |
| "you are using.\nOne of:\n \n ".join(available_monitors)) |
| power_monitors = do_import('power_monitors.%s' % FLAGS.power_monitor) |
| try: |
| mon = power_monitors.Power_Monitor(device=FLAGS.device) |
| except: |
| import traceback |
| traceback.print_exc() |
| sys.exit("No power monitors found") |
| |
| if FLAGS.voltage is not None: |
| |
| if FLAGS.ramp is not None: |
| mon.RampVoltage(mon.start_voltage, FLAGS.voltage) |
| else: |
| mon.SetVoltage(FLAGS.voltage) |
| |
| if FLAGS.current is not None: |
| mon.SetMaxCurrent(FLAGS.current) |
| |
| if FLAGS.status: |
| items = sorted(mon.GetStatus().items()) |
| print "\n".join(["%s: %s" % item for item in items]) |
| |
| if FLAGS.usbpassthrough: |
| if FLAGS.usbpassthrough == 'off': |
| mon.SetUsbPassthrough(0) |
| elif FLAGS.usbpassthrough == 'on': |
| mon.SetUsbPassthrough(1) |
| elif FLAGS.usbpassthrough == 'auto': |
| mon.SetUsbPassthrough(2) |
| else: |
| mon.Close() |
| sys.exit('bad pass-through flag: %s' % FLAGS.usbpassthrough) |
| |
| if FLAGS.samples: |
| # Make sure state is normal |
| mon.StopDataCollection() |
| status = mon.GetStatus() |
| native_hz = status["sampleRate"] * 1000 |
| |
| # Collect and average samples as specified |
| mon.StartDataCollection() |
| |
| # In case FLAGS.hz doesn't divide native_hz exactly, use this invariant: |
| # 'offset' = (consumed samples) * FLAGS.hz - (emitted samples) * native_hz |
| # This is the error accumulator in a variation of Bresenham's algorithm. |
| emitted = offset = 0 |
| collected = [] |
| history_deque = collections.deque() # past n samples for rolling average |
| |
| try: |
| last_flush = time.time() |
| while emitted < FLAGS.samples or FLAGS.samples == -1: |
| # The number of raw samples to consume before emitting the next output |
| need = (native_hz - offset + FLAGS.hz - 1) / FLAGS.hz |
| if need > len(collected): # still need more input samples |
| samples = mon.CollectData() |
| if not samples: break |
| collected.extend(samples) |
| else: |
| # Have enough data, generate output samples. |
| # Adjust for consuming 'need' input samples. |
| offset += need * FLAGS.hz |
| while offset >= native_hz: # maybe multiple, if FLAGS.hz > native_hz |
| this_sample = sum(collected[:need]) / need |
| |
| if FLAGS.timestamp: print int(time.time()), |
| |
| if FLAGS.avg: |
| history_deque.appendleft(this_sample) |
| if len(history_deque) > FLAGS.avg: history_deque.pop() |
| print "%f %f" % (this_sample, |
| sum(history_deque) / len(history_deque)) |
| else: |
| print "%f" % this_sample |
| sys.stdout.flush() |
| |
| offset -= native_hz |
| emitted += 1 # adjust for emitting 1 output sample |
| collected = collected[need:] |
| now = time.time() |
| if now - last_flush >= 0.99: # flush every second |
| sys.stdout.flush() |
| last_flush = now |
| except KeyboardInterrupt: |
| print("interrupted") |
| return 1 |
| finally: |
| mon.Close() |
| return 0 |
| |
| if FLAGS.run: |
| if not FLAGS.power_monitor: |
| sys.exit("When running power tests, you must specify which type of power monitor to use" + |
| " with '--power_monitor <type of power monitor>'") |
| PowerTest.run_tests() |
| |
| |
| if __name__ == "__main__": |
| flags.DEFINE_boolean("status", None, "Print power meter status") |
| flags.DEFINE_integer("avg", None, |
| "Also report average over last n data points") |
| flags.DEFINE_float("voltage", None, "Set output voltage (0 for off)") |
| flags.DEFINE_float("current", None, "Set max output current") |
| flags.DEFINE_string("usbpassthrough", None, "USB control (on, off, auto)") |
| flags.DEFINE_integer("samples", None, "Collect and print this many samples") |
| flags.DEFINE_integer("hz", 5000, "Print this many samples/sec") |
| flags.DEFINE_string("device", None, |
| "Path to the device in /dev/... (ex:/dev/ttyACM1)") |
| flags.DEFINE_boolean("timestamp", None, |
| "Also print integer (seconds) timestamp on each line") |
| flags.DEFINE_boolean("ramp", True, "Gradually increase voltage") |
| flags.DEFINE_boolean("log", False, "Log progress to a file or not") |
| flags.DEFINE_boolean("run", False, "Run the test suite for power") |
| flags.DEFINE_string("power_monitor", None, "Type of power monitor to use") |
| sys.exit(main(FLAGS(sys.argv))) |
| |