autotest: freonize input via uinput.

BUG=chromium:413085
TEST=test_that <link_freon> graphics_Stress.gmaps

Change-Id: I0f925188ffa3be1ac6d04a4804d45bfb5bfa1378
Reviewed-on: https://chromium-review.googlesource.com/224885
Tested-by: Ilja Friedel <ihf@chromium.org>
Reviewed-by: Dominik Behr <dbehr@chromium.org>
Commit-Queue: Ilja Friedel <ihf@chromium.org>
diff --git a/client/cros/graphics/graphics_utils.py b/client/cros/graphics/graphics_utils.py
index e23aa6d..78af055 100644
--- a/client/cros/graphics/graphics_utils.py
+++ b/client/cros/graphics/graphics_utils.py
@@ -7,13 +7,23 @@
 the state of the graphics driver.
 """
 
-import glob, logging, os, re, sys
+import glob
+import logging
+import os
+import re
+import sys
+import time
+# Please limit the use of the uinput library to this file. Try not to spread
+# dependencies and abstract as much as possible to make switching to a different
+# input library in the future easier.
+import uinput
 
 from autotest_lib.client.bin import utils
 from autotest_lib.client.common_lib import error
 from autotest_lib.client.cros.graphics import drm
 
 
+# TODO(ihf): Remove xcommand for non-freon builds.
 def xcommand(cmd, user=None):
     """
     Add the necessary X setup to a shell command that needs to connect to the X
@@ -30,8 +40,8 @@
     return 'DISPLAY=:0 XAUTHORITY=/home/chronos/.Xauthority su %s -c \'%s\'' % (
                                                                       user, cmd)
 
-
-def xsystem(cmd, user=None, timeout=None, ignore_status=False):
+# TODO(ihf): Remove xsystem for non-freon builds.
+def xsystem(cmd, user=None):
     """
     Run the command cmd, using utils.system, after adding the necessary
     setup to connect to the X server.
@@ -41,37 +51,13 @@
     @param timeout: Optional timeout.
     @param ignore_status: Whether to check the return code of the command.
     """
-    return utils.system(xcommand(cmd, user), timeout=timeout,
-                        ignore_status=ignore_status)
+    return utils.system(xcommand(cmd, user))
 
 
-def press_key(key_str):
-    """Presses the given key(s).
-    @param key_str: A string of the key(s), like 'ctrl+F4', 'Up'.
-    """
-    if utils.is_freon():
-        raise error.TestFail('freon: press_key not implemented')
-    command = 'xdotool key %s' % key_str
-    xsystem(command)
-
-
+# TODO(ihf): Remove XSET for non-freon builds.
 XSET = 'LD_LIBRARY_PATH=/usr/local/lib xset'
 
-def do_power_consumption_xset():
-    """ Called from power_Consumption to immediately disable energy saving. """
-    if utils.is_freon():
-        raise error.TestFail('freon: do_power_consumption_xset not implemented')
-    # Disable X screen saver
-    xsystem(XSET + ' s 0 0')
-    # Disable DPMS Standby/Suspend/Off
-    xsystem(XSET + ' dpms 0 0 0')
-    # Force monitor on
-    xsystem(XSET + ' dpms force on')
-    # Save off X settings
-    xsystem(XSET + ' q')
-
-
-def do_power_backlight_xset():
+def screen_disable_blanking():
     """ Called from power_Backlight to disable screen blanking. """
     if utils.is_freon():
         raise error.TestFail('freon: do_power_backlight_xset not implemented')
@@ -80,10 +66,54 @@
     xsystem(XSET + ' -dpms')
 
 
-def wakeup_screen():
+def screen_disable_energy_saving():
+    """ Called from power_Consumption to immediately disable energy saving. """
+    if utils.is_freon():
+        raise error.TestFail('freon: do_power_consumption_xset not implemented')
+    # Disable X screen saver
+    xsystem(XSET + ' s 0 0')
+    # Disable DPMS Standby/Suspend/Off
+    xsystem(XSET + ' dpms 0 0 0')
+    # Force monitor on
+    screen_switch_on(on=1)
+    # Save off X settings
+    xsystem(XSET + ' q')
+
+
+def screen_switch_on(on):
+    """Turn the touch screen on/off."""
+    if on:
+        xsystem(XSET + ' dpms force on')
+    else:
+        xsystem(XSET + ' dpms force off')
+
+
+def screen_toggle_fullscreen():
+    """Toggles fullscreen mode."""
+    # TODO(ihf): Does this work on freon?
+    if utils.is_freon():
+        press_keys(['KEY_F11'])
+    else:
+        press_key_X('F11')
+
+
+def screen_toggle_mirrored():
+    """Toggles the mirrored screen."""
+    # TODO(ihf): Does this work on freon?
+    if utils.is_freon():
+        press_keys(['KEY_LEFTCTRL', 'KEY_F4'])
+    else:
+        press_key_X('ctrl+F4')
+
+
+def screen_wakeup():
     """Wake up the screen if it is dark."""
     # Move the mouse a little bit to wake up the screen.
-    xsystem('xdotool mousemove_relative 1 1')
+    if utils.is_freon():
+        _uinput_emit("REL_X", 1)
+        _uinput_emit("REL_X", -1)
+    else:
+        xsystem('xdotool mousemove_relative 1 1')
 
 
 def switch_screen_on(on):
@@ -98,6 +128,167 @@
         xsystem(XSET + ' dpms force off')
 
 
+# Don't create a device during build_packages or for tests that don't need it.
+uinput_device_keyboard = None
+uinput_device_touch = None
+uinput_device_mouse_rel = None
+
+# Don't add more events to this list than are used. For a complete list of
+# available events check python2.7/site-packages/uinput/ev.py.
+UINPUT_DEVICE_EVENTS_KEYBOARD = [
+        uinput.KEY_F4,
+        uinput.KEY_F11,
+        uinput.KEY_KPPLUS,
+        uinput.KEY_KPMINUS,
+        uinput.KEY_LEFTCTRL,
+        uinput.KEY_TAB
+    ]
+# TODO(ihf): Find an ABS sequence that actually works.
+UINPUT_DEVICE_EVENTS_TOUCH = [
+        uinput.BTN_TOUCH,
+        uinput.ABS_MT_SLOT,
+        uinput.ABS_MT_POSITION_X + (0, 2560, 0, 0),
+        uinput.ABS_MT_POSITION_Y + (0, 1700, 0, 0),
+        uinput.ABS_MT_TRACKING_ID + (0, 10, 0, 0),
+        uinput.BTN_TOUCH
+]
+UINPUT_DEVICE_EVENTS_MOUSE_REL = [
+        uinput.REL_X,
+        uinput.REL_Y,
+        uinput.BTN_MOUSE,
+        uinput.BTN_LEFT,
+        uinput.BTN_RIGHT
+    ]
+
+
+def _get_uinput_device_keyboard():
+    """
+    Lazy initialize device and return it. We don't want to create a device during
+    build_packages or for tests that don't need it, hence init is with = None.
+    """
+    global uinput_device_keyboard
+    if uinput_device_keyboard is None:
+        uinput_device_keyboard = uinput.Device(UINPUT_DEVICE_EVENTS_KEYBOARD)
+    return uinput_device_keyboard
+
+
+def _get_uinput_device_mouse_rel():
+    """
+    Lazy initialize device and return it. We don't want to create a device during
+    build_packages or for tests that don't need it, hence init is with = None.
+    """
+    global uinput_device_mouse_rel
+    if uinput_device_mouse_rel is None:
+        uinput_device_mouse_rel = uinput.Device(UINPUT_DEVICE_EVENTS_MOUSE_REL)
+    return uinput_device_mouse_rel
+
+
+def _get_uinput_device_touch():
+    """
+    Lazy initialize device and return it. We don't want to create a device during
+    build_packages or for tests that don't need it, hence init is with = None.
+    """
+    global uinput_device_touch
+    if uinput_device_touch is None:
+        uinput_device_touch = uinput.Device(UINPUT_DEVICE_EVENTS_TOUCH)
+    return uinput_device_touch
+
+
+def _uinput_translate_name(event_name):
+    """
+    Translates string |event_name| to uinput event.
+    """
+    return getattr(uinput, event_name)
+
+
+def _uinput_emit(device, event_name, value, syn=True):
+    """
+    Wrapper for uinput.emit. Emits event with value.
+    Example: ('REL_X', 20), ('BTN_RIGHT', 1)
+    """
+    event = _uinput_translate_name(event_name)
+    device.emit(event, value, syn)
+
+
+def _uinput_emit_click(device, event_name, syn=True):
+    """
+    Wrapper for uinput.emit_click. Emits click event. Only KEY and BTN events
+    are accepted, otherwise ValueError is raised. Example: 'KEY_A'
+    """
+    event = _uinput_translate_name(event_name)
+    device.emit_click(event, syn)
+
+
+def _uinput_emit_combo(device, event_names, syn=True):
+    """
+    Wrapper for uinput.emit_combo. Emits sequence of events.
+    Example: ['KEY_LEFTCTRL', 'KEY_LEFTALT', 'KEY_F5']
+    """
+    events = [_uinput_translate_name(en) for en in event_names]
+    device.emit_combo(events, syn)
+
+
+def press_keys(key_list):
+    """Presses the given keys as one combination.
+
+    Please do not leak uinput dependencies outside of the file.
+
+    @param key: A list of key strings, e.g. ['LEFTCTRL', 'F4']
+    """
+    _uinput_emit_combo(_get_uinput_device_keyboard(), key_list)
+
+
+# TODO(ihf): Remove press_key_X for non-freon builds.
+def press_key_X(key_str):
+    """Presses the given keys as one combination.
+    @param key: A string of keys, e.g. 'ctrl+F4'.
+    """
+    if utils.is_freon():
+        raise error.TestFail('freon: press_key_X not implemented')
+    command = 'xdotool key %s' % key_str
+    xsystem(command)
+
+
+def click_mouse():
+    """Just click the mouse.
+    Presumably only hacky tests use this function.
+    """
+    logging.info('click_mouse()')
+    # Move a little to make the cursor appear.
+    device = _get_uinput_device_mouse_rel()
+    _uinput_emit(device, 'REL_X', 1)
+    # Some sleeping is needed otherwise events disappear.
+    time.sleep(0.1)
+    # Move cursor back to not drift.
+    _uinput_emit(device, 'REL_X', -1)
+    time.sleep(0.1)
+    # Click down.
+    _uinput_emit(device, 'BTN_LEFT', 1)
+    time.sleep(0.2)
+    # Release click.
+    _uinput_emit(device, 'BTN_LEFT', 0)
+
+
+# TODO(ihf): this function is broken. Make it work.
+def activate_focus_at(rel_x, rel_y):
+    """Clicks with the mouse at screen position (x, y).
+
+    This is a pretty hacky method. Using this will probably lead to
+    flaky tests as page layout changes over time.
+    @param rel_x: relative horizontal position between 0 and 1.
+    @param rel_y: relattive vertical position between 0 and 1.
+    """
+    width, height = get_display_resolution()
+    device = _get_uinput_device_touch()
+    _uinput_emit(device, 'ABS_MT_SLOT', 0, syn=False)
+    _uinput_emit(device, 'ABS_MT_TRACKING_ID', 1, syn=False)
+    _uinput_emit(device, 'ABS_MT_POSITION_X', int(rel_x*width), syn=False)
+    _uinput_emit(device, 'ABS_MT_POSITION_Y', int(rel_y*height), syn=False)
+    _uinput_emit(device, 'BTN_TOUCH', 1, syn=True)
+    time.sleep(0.2)
+    _uinput_emit(device, 'BTN_TOUCH', 0, syn=True)
+
+
 def take_screenshot(resultsdir, fname_prefix, extension='png'):
     """Take screenshot and save to a new file in the results dir.
     Args:
@@ -249,12 +440,12 @@
     modetest_output = utils.system_output('modetest -c')
     modetest_connector_pattern = (r'\d+\s+\d+\s+(connected|disconnected)\s+'
                                   r'[- 0-9a-zA-Z]+\s+\d+x\d+\s+\d+\s+\d+')
-    connected = 0;
+    connected = 0
     for line in modetest_output.splitlines():
         connector_match = re.match(modetest_connector_pattern, line)
         if connector_match is not None:
             if connector_match.group(1) == 'connected':
-                connected = connected + 1;
+                connected = connected + 1
 
     return connected;
 
@@ -275,9 +466,9 @@
     """
 
     if utils.is_freon():
-        return _get_num_outputs_freon();
+        return _get_num_outputs_freon()
     else:
-        return _get_num_outputs_x();
+        return _get_num_outputs_x()
 
 def call_xrandr(args_string=''):
     """
diff --git a/client/cros/multimedia/display_facade_native.py b/client/cros/multimedia/display_facade_native.py
index 7b1215a..a1dd1de 100644
--- a/client/cros/multimedia/display_facade_native.py
+++ b/client/cros/multimedia/display_facade_native.py
@@ -279,11 +279,8 @@
 
 
     def toggle_mirrored(self):
-        """Toggles mirrored.
-
-        Emulates L_Ctrl + Maximize in X server to toggle mirrored.
-        """
-        self.press_key('ctrl+F4')
+        """Toggles mirrored."""
+        graphics_utils.screen_toggle_mirrored()
         return True
 
 
diff --git a/client/site_tests/graphics_Stress/graphics_Stress.py b/client/site_tests/graphics_Stress/graphics_Stress.py
index d9210e1..f192ca9 100644
--- a/client/site_tests/graphics_Stress/graphics_Stress.py
+++ b/client/site_tests/graphics_Stress/graphics_Stress.py
@@ -6,7 +6,7 @@
 
 import logging, os, time
 
-from autotest_lib.client.bin import test, utils
+from autotest_lib.client.bin import test
 from autotest_lib.client.common_lib.cros import chrome
 from autotest_lib.client.cros import service_stopper
 from autotest_lib.client.cros.graphics import graphics_utils
@@ -31,9 +31,6 @@
 
     def initialize(self):
         self.GSC = graphics_utils.GraphicsStateChecker()
-        utils.assert_has_X_server()
-        os.environ['DISPLAY'] = ':0.0'
-        os.environ['XAUTHORITY'] = '/home/chronos/.Xauthority'
 
 
     def cleanup(self):
@@ -50,13 +47,6 @@
         self.job.setup_dep(['graphics'])
 
 
-    def xsendevt(self, event_name):
-        """Uses xsendevt to send an event.
-
-        @param event_name: name of the event. Usually a keystroke."""
-        utils.system('/usr/bin/xsendevt %s' % event_name)
-
-
     def new_chrome(self):
         return chrome.Chrome(extension_paths=self.ext_paths,
                              logged_in=True,
@@ -103,11 +93,11 @@
         """Performs one cycle of the maps zooming."""
         # Zoom in on purpose once more than out.
         for _ in range(1, 11):
-            self.xsendevt('-tw \'[plus]\'')
+            graphics_utils.press_keys(['KEY_KPPLUS'])
             time.sleep(0.1)
         time.sleep(0.5)
         for _ in range(1, 10):
-            self.xsendevt('-tw \'[minus]\'')
+            graphics_utils.press_keys(['KEY_KPMINUS'])
             time.sleep(0.1)
         time.sleep(0.5)
 
@@ -148,11 +138,9 @@
         with self.new_chrome() as cr:
             tabs = self.open_urls(cr, [GMAPS_MTV_URL])
 
-            # Click into the map area.
-            # TODO(dbehr): find a better way to focus on map.
-            self.xsendevt('-tw \'[Move(1000,700)]\'')
-            time.sleep(0.5)
-            self.xsendevt('-tw \'[Button1]\'')
+            # Click into the map area to achieve focus.
+            time.sleep(5)
+            graphics_utils.click_mouse()
 
             # Do the stress test.
             cycle = 0
diff --git a/client/site_tests/hardware_TouchScreenPowerCycles/hardware_TouchScreenPowerCycles.py b/client/site_tests/hardware_TouchScreenPowerCycles/hardware_TouchScreenPowerCycles.py
index ec5f432..96a982b 100644
--- a/client/site_tests/hardware_TouchScreenPowerCycles/hardware_TouchScreenPowerCycles.py
+++ b/client/site_tests/hardware_TouchScreenPowerCycles/hardware_TouchScreenPowerCycles.py
@@ -33,7 +33,7 @@
 
     def _wakeup_screen(self):
         """Wake up the screen if it is dark."""
-        graphics_utils.wakeup_screen()
+        graphics_utils.screen_wakeup()
         time.sleep(2)
 
     def _touch_screen_on(self, interval):
diff --git a/client/site_tests/power_Backlight/power_Backlight.py b/client/site_tests/power_Backlight/power_Backlight.py
index 76fc75f..07546b9 100644
--- a/client/site_tests/power_Backlight/power_Backlight.py
+++ b/client/site_tests/power_Backlight/power_Backlight.py
@@ -37,7 +37,7 @@
         # and fixes all this for us.
         # TODO(davidjames): Power manager should support this feature directly
         time.sleep(5)
-        graphics_utils.do_power_backlight_xset()
+        graphics_utils.screen_disable_blanking()
 
         status = power_status.get_status()
         status.assert_battery_state(5)
diff --git a/client/site_tests/power_Consumption/power_Consumption.py b/client/site_tests/power_Consumption/power_Consumption.py
index 867d244..b8cd646 100644
--- a/client/site_tests/power_Consumption/power_Consumption.py
+++ b/client/site_tests/power_Consumption/power_Consumption.py
@@ -74,7 +74,7 @@
         self._chrome = chrome.Chrome()
         # Wait for login to finish and any extra windows to appear.
         time.sleep(10)
-        graphics_utils.do_power_consumption_xset()
+        graphics_utils.screen_disable_energy_saving()
         # Most of the tests will be running in this tab.
         self._tab = self._chrome.browser.tabs[0]
         logging.info('initialize() finished')
@@ -118,7 +118,7 @@
         # Note: full screen mode toggled with F11 is different from clicking the
         # full screen icon on video player controls. This needs improvement.
         # Bug: http://crbug.com/248939
-        graphics_utils.press_key('F11')
+        graphics_utils.screen_toggle_fullscreen()
 
 
     # Below are a series of generic sub-test runners. They run a given task