Stress test that uses servo to repeatedly put the device to sleep while another test runs.
BUG=None
TEST=platform_StressSuspend
Change-Id: Ic3a4db2b7cbd898f67be711b27ade313e48c56cd
Reviewed-on: https://gerrit.chromium.org/gerrit/34686
Tested-by: Craig Harrison <craigdh@chromium.org>
Reviewed-by: Chris Masone <cmasone@chromium.org>
Commit-Ready: Craig Harrison <craigdh@chromium.org>
diff --git a/server/cros/stress.py b/server/cros/stress.py
index f1650c3..98d186f 100755
--- a/server/cros/stress.py
+++ b/server/cros/stress.py
@@ -3,78 +3,138 @@
# found in the LICENSE file.
import threading
+import time
-class ControlledStressor(threading.Thread):
+class BaseStressor(threading.Thread):
+ """
+ Implements common functionality for *Stressor classes.
+
+ @var stressor: callable which performs a single stress event.
+ """
def __init__(self, stressor):
- """Run a stressor callable in a threaded loop on demand.
-
- Creates a new thread and runs |stressor| in a loop until told to stop.
-
- Args:
- stressor: callable which performs a single stress event
"""
- super(ControlledStressor, self).__init__()
+ Initialize the ControlledStressor.
+
+ @param stressor: callable which performs a single stress event.
+ """
+ super(BaseStressor, self).__init__()
self.daemon = True
- self._complete = threading.Event()
- self._stressor = stressor
+ self.stressor = stressor
+
+
+ def start(self, start_condition=None):
+ """
+ Creates a new thread which will call the run() method.
+
+ Optionally takes a wait condition before the stressor loop. Returns
+ immediately.
+
+ @param start_condition: the new thread will wait to until this optional
+ callable returns True before running the stressor.
+ """
+ self._start_condition = start_condition
+ super(BaseStressor, self).start()
def run(self):
- """Overloaded from threading.Thread."""
+ """
+ Introduce a delay then start the stressor loop.
+
+ Overloaded from threading.Thread. This is run in a separate thread when
+ start() is called.
+ """
+ if self._start_condition:
+ while not self._start_condition():
+ time.sleep(1)
+ self._loop_stressor()
+
+
+ def _loop_stressor(self):
+ """
+ Apply stressor in a loop.
+
+ Overloaded by the particular *Stressor.
+ """
+ raise NotImplementedError
+
+
+class ControlledStressor(BaseStressor):
+ """
+ Run a stressor in loop on a separate thread.
+
+ Creates a new thread and calls |stressor| in a loop until stop() is called.
+ """
+ def __init__(self, stressor):
+ """
+ Initialize the ControlledStressor.
+
+ @param stressor: callable which performs a single stress event.
+ """
+ self._complete = threading.Event()
+ super(ControlledStressor, self).__init__(stressor)
+
+
+ def _loop_stressor(self):
+ """Overloaded from parent."""
while not self._complete.is_set():
- self._stressor()
+ self.stressor()
- def start(self):
- """Start applying the stressor."""
+ def start(self, start_condition=None):
+ """Start applying the stressor.
+
+ Overloaded from parent.
+
+ @param start_condition: the new thread will wait to until this optional
+ callable returns True before running the stressor.
+ """
self._complete.clear()
- super(ControlledStressor, self).start()
+ super(ControlledStressor, self).start(start_condition)
def stop(self, timeout=45):
- """Stop applying the stressor.
+ """
+ Stop applying the stressor.
- Args:
- timeout: maximum time to wait for a single run of the stressor to
- complete, defaults to 45 seconds."""
+ @param timeout: maximum time to wait for a single run of the stressor to
+ complete, defaults to 45 seconds.
+ """
self._complete.set()
self.join(timeout)
-class CountedStressor(threading.Thread):
- def __init__(self, stressor):
- """Run a stressor callable in a threaded loop a given number of times.
+class CountedStressor(BaseStressor):
+ """
+ Run a stressor in a loop on a separate thread a given number of times.
- Args:
- stressor: callable which performs a single stress event
- """
- super(CountedStressor, self).__init__()
- self.daemon = True
- self._stressor = stressor
-
-
- def run(self):
- """Overloaded from threading.Thread."""
+ Creates a new thread and calls |stressor| in a loop |count| times. The
+ calling thread can use wait() to block until the loop completes.
+ """
+ def _loop_stressor(self):
+ """Overloaded from parent."""
for i in xrange(self._count):
- self._stressor()
+ self.stressor()
- def start(self, count):
- """Apply the stressor a given number of times.
+ def start(self, count, start_condition=None):
+ """
+ Apply the stressor a given number of times.
- Args:
- count: number of times to apply the stressor
+ Overloaded from parent.
+
+ @param count: number of times to apply the stressor.
+ @param start_condition: the new thread will wait to until this optional
+ callable returns True before running the stressor.
"""
self._count = count
- super(CountedStressor, self).start()
+ super(CountedStressor, self).start(start_condition)
def wait(self, timeout=None):
"""Wait until the stressor completes.
- Args:
- timeout: maximum time for the thread to complete, by default never
- times out.
+ @param timeout: maximum time for the thread to complete, by default
+ never times out.
"""
self.join(timeout)
diff --git a/server/site_tests/platform_StressSuspend/control.stress_suspend_youtube b/server/site_tests/platform_StressSuspend/control.stress_suspend_youtube
new file mode 100644
index 0000000..89a8d59
--- /dev/null
+++ b/server/site_tests/platform_StressSuspend/control.stress_suspend_youtube
@@ -0,0 +1,30 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+from autotest_lib.server import utils
+
+AUTHOR = "Chrome OS Team"
+NAME = "platform_StressSuspendYoutube"
+PURPOSE = "Servo based ChromeOS functional tests."
+CRITERIA = "This test will fail if YouTube tests fail to run under stress."
+TIME = "LONG"
+TEST_CATEGORY = "Functional"
+TEST_CLASS = "platform"
+TEST_TYPE = "server"
+
+DOC = """
+This test uses servo to simulate lid close and open events while a test runs.
+"""
+
+args_dict = utils.args_to_dict(args)
+servo_host = args_dict.get('servo_host', 'localhost')
+servo_port = args_dict.get('servo_port', None)
+
+def run(machine):
+ host = hosts.create_host(machine, servo_host=servo_host,
+ servo_port=servo_port)
+ job.run_test("platform_StressSuspend", host=host, disable_sysinfo=True,
+ client_autotest='desktopui_Youtube')
+
+parallel_simple(run, machines)
diff --git a/server/site_tests/platform_StressSuspend/platform_StressSuspend.py b/server/site_tests/platform_StressSuspend/platform_StressSuspend.py
new file mode 100644
index 0000000..78be0b0
--- /dev/null
+++ b/server/site_tests/platform_StressSuspend/platform_StressSuspend.py
@@ -0,0 +1,43 @@
+# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import json, time
+
+from autotest_lib.server import autotest, test
+from autotest_lib.server.cros import stress
+
+
+_TIME_TO_SUSPEND = 10
+_EXTRA_DELAY = 10
+
+
+class platform_StressSuspend(test.test):
+ """Uses servo to repeatedly close & open lid while running BrowserTests."""
+ version = 1
+
+
+ def run_once(self, host, client_autotest):
+ autotest_client = autotest.Autotest(host)
+
+ def sleepwake():
+ """Close and open the lid with enough delay to induce suspend."""
+ host.servo.lid_close()
+ time.sleep(_TIME_TO_SUSPEND + _EXTRA_DELAY)
+ host.servo.lid_open()
+ time.sleep(_EXTRA_DELAY)
+
+ def loggedin():
+ """
+ Checks if the host has a logged in user.
+
+ @return True if a user is logged in on the device.
+ """
+ cmd_out = host.run('cryptohome --action=status').stdout.strip()
+ status = json.loads(cmd_out)
+ return any((mount['mounted'] for mount in status['mounts']))
+
+ stressor = stress.ControlledStressor(sleepwake)
+ stressor.start(start_condition=loggedin)
+ autotest_client.run_test(client_autotest)
+ stressor.stop()