Stress test that uses servo to repeatedly put the device to sleep while another test runs.
BUG=None
TEST=platform_StressSuspend
Change-Id: Ic3a4db2b7cbd898f67be711b27ade313e48c56cd
Reviewed-on: https://gerrit.chromium.org/gerrit/34686
Tested-by: Craig Harrison <craigdh@chromium.org>
Reviewed-by: Chris Masone <cmasone@chromium.org>
Commit-Ready: Craig Harrison <craigdh@chromium.org>
diff --git a/server/cros/stress.py b/server/cros/stress.py
index f1650c3..98d186f 100755
--- a/server/cros/stress.py
+++ b/server/cros/stress.py
@@ -3,78 +3,138 @@
# found in the LICENSE file.
import threading
+import time
-class ControlledStressor(threading.Thread):
+class BaseStressor(threading.Thread):
+ """
+ Implements common functionality for *Stressor classes.
+
+ @var stressor: callable which performs a single stress event.
+ """
def __init__(self, stressor):
- """Run a stressor callable in a threaded loop on demand.
-
- Creates a new thread and runs |stressor| in a loop until told to stop.
-
- Args:
- stressor: callable which performs a single stress event
"""
- super(ControlledStressor, self).__init__()
+ Initialize the ControlledStressor.
+
+ @param stressor: callable which performs a single stress event.
+ """
+ super(BaseStressor, self).__init__()
self.daemon = True
- self._complete = threading.Event()
- self._stressor = stressor
+ self.stressor = stressor
+
+
+ def start(self, start_condition=None):
+ """
+ Creates a new thread which will call the run() method.
+
+ Optionally takes a wait condition before the stressor loop. Returns
+ immediately.
+
+ @param start_condition: the new thread will wait to until this optional
+ callable returns True before running the stressor.
+ """
+ self._start_condition = start_condition
+ super(BaseStressor, self).start()
def run(self):
- """Overloaded from threading.Thread."""
+ """
+ Introduce a delay then start the stressor loop.
+
+ Overloaded from threading.Thread. This is run in a separate thread when
+ start() is called.
+ """
+ if self._start_condition:
+ while not self._start_condition():
+ time.sleep(1)
+ self._loop_stressor()
+
+
+ def _loop_stressor(self):
+ """
+ Apply stressor in a loop.
+
+ Overloaded by the particular *Stressor.
+ """
+ raise NotImplementedError
+
+
+class ControlledStressor(BaseStressor):
+ """
+ Run a stressor in loop on a separate thread.
+
+ Creates a new thread and calls |stressor| in a loop until stop() is called.
+ """
+ def __init__(self, stressor):
+ """
+ Initialize the ControlledStressor.
+
+ @param stressor: callable which performs a single stress event.
+ """
+ self._complete = threading.Event()
+ super(ControlledStressor, self).__init__(stressor)
+
+
+ def _loop_stressor(self):
+ """Overloaded from parent."""
while not self._complete.is_set():
- self._stressor()
+ self.stressor()
- def start(self):
- """Start applying the stressor."""
+ def start(self, start_condition=None):
+ """Start applying the stressor.
+
+ Overloaded from parent.
+
+ @param start_condition: the new thread will wait to until this optional
+ callable returns True before running the stressor.
+ """
self._complete.clear()
- super(ControlledStressor, self).start()
+ super(ControlledStressor, self).start(start_condition)
def stop(self, timeout=45):
- """Stop applying the stressor.
+ """
+ Stop applying the stressor.
- Args:
- timeout: maximum time to wait for a single run of the stressor to
- complete, defaults to 45 seconds."""
+ @param timeout: maximum time to wait for a single run of the stressor to
+ complete, defaults to 45 seconds.
+ """
self._complete.set()
self.join(timeout)
-class CountedStressor(threading.Thread):
- def __init__(self, stressor):
- """Run a stressor callable in a threaded loop a given number of times.
+class CountedStressor(BaseStressor):
+ """
+ Run a stressor in a loop on a separate thread a given number of times.
- Args:
- stressor: callable which performs a single stress event
- """
- super(CountedStressor, self).__init__()
- self.daemon = True
- self._stressor = stressor
-
-
- def run(self):
- """Overloaded from threading.Thread."""
+ Creates a new thread and calls |stressor| in a loop |count| times. The
+ calling thread can use wait() to block until the loop completes.
+ """
+ def _loop_stressor(self):
+ """Overloaded from parent."""
for i in xrange(self._count):
- self._stressor()
+ self.stressor()
- def start(self, count):
- """Apply the stressor a given number of times.
+ def start(self, count, start_condition=None):
+ """
+ Apply the stressor a given number of times.
- Args:
- count: number of times to apply the stressor
+ Overloaded from parent.
+
+ @param count: number of times to apply the stressor.
+ @param start_condition: the new thread will wait to until this optional
+ callable returns True before running the stressor.
"""
self._count = count
- super(CountedStressor, self).start()
+ super(CountedStressor, self).start(start_condition)
def wait(self, timeout=None):
"""Wait until the stressor completes.
- Args:
- timeout: maximum time for the thread to complete, by default never
- times out.
+ @param timeout: maximum time for the thread to complete, by default
+ never times out.
"""
self.join(timeout)