Updates script to generate images automatically on emulator
Bug: 64529725
Test: ./cts/hostsidetests/theme/run_theme_capture_device.py Nexus_5_API_27
Change-Id: I38d731ffc899a25422842236e4f974ffc6f2e102
diff --git a/hostsidetests/theme/android_device.py b/hostsidetests/theme/android_device.py
index 4c7d8d1..7711060 100644
--- a/hostsidetests/theme/android_device.py
+++ b/hostsidetests/theme/android_device.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/python3
#
# Copyright (C) 2015 The Android Open Source Project
#
@@ -17,106 +17,119 @@
import os
import re
+import subprocess
import sys
import threading
-import subprocess
import time
+from subprocess import PIPE
+
+
# class for running android device from python
# it will fork the device processor
-class androidDevice(object):
- def __init__(self, adbDevice):
- self._adbDevice = adbDevice
+class AndroidDevice(object):
+ def __init__(self, serial):
+ self._serial = serial
- def runAdbCommand(self, cmd):
- self.waitForAdbDevice()
- adbCmd = "adb -s %s %s" %(self._adbDevice, cmd)
- print adbCmd
- adbProcess = subprocess.Popen(adbCmd.split(" "), bufsize = -1, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
- return adbProcess.communicate()
+ def run_adb_command(self, cmd, timeout=None):
+ adb_cmd = "adb -s %s %s" % (self._serial, cmd)
+ print(adb_cmd)
- def runShellCommand(self, cmd):
- return self.runAdbCommand("shell " + cmd)
+ adb_process = subprocess.Popen(args=adb_cmd.split(), bufsize=-1, stderr=PIPE, stdout=PIPE)
+ (out, err) = adb_process.communicate(timeout=timeout)
+ return out.decode('utf-8').strip(), err.decode('utf-8').strip()
- def waitForAdbDevice(self):
- print "waitForAdbDevice"
- os.system("adb -s %s wait-for-device" %self._adbDevice)
+ def run_shell_command(self, cmd):
+ return self.run_adb_command("shell %s" % cmd)
- def waitForBootComplete(self, timeout = 240):
+ def wait_for_device(self, timeout=30):
+ return self.run_adb_command('wait-for-device', timeout)
+
+ def wait_for_prop(self, key, value, timeout=30):
boot_complete = False
attempts = 0
- wait_period = 5
+ wait_period = 1
while not boot_complete and (attempts*wait_period) < timeout:
- (output, err) = self.runShellCommand("getprop dev.bootcomplete")
- output = output.strip()
- if output == "1":
+ (out, err) = self.run_shell_command("getprop %s" % key)
+ if out == value:
boot_complete = True
else:
time.sleep(wait_period)
attempts += 1
if not boot_complete:
- print "***boot not complete within timeout. will proceed to the next step"
+ print("%s not set to %s within timeout!" % (key, value))
return boot_complete
- def installApk(self, apkPath):
- (out, err) = self.runAdbCommand("install -r -d -g " + apkPath)
- result = err.split()
- return (out, err, "Success" in result)
+ def wait_for_service(self, name, timeout=30):
+ service_found = False
+ attempts = 0
+ wait_period = 1
+ while not service_found and (attempts*wait_period) < timeout:
+ (output, err) = self.run_shell_command("service check %s" % name)
+ if 'not found' not in output:
+ service_found = True
+ else:
+ time.sleep(wait_period)
+ attempts += 1
+ if not service_found:
+ print("Service '%s' not found within timeout!" % name)
+ return service_found
- def uninstallApk(self, package):
- (out, err) = self.runAdbCommand("uninstall " + package)
- result = err.split()
+ def wait_for_boot_complete(self, timeout=60):
+ return self.wait_for_prop('dev.bootcomplete', '1', timeout)
+
+ def install_apk(self, apk_path):
+ self.wait_for_service('package')
+ (out, err) = self.run_adb_command("install -r -d -g %s" % apk_path)
+ result = out.split()
+ return out, err, "Success" in result
+
+ def uninstall_package(self, package):
+ self.wait_for_service('package')
+ (out, err) = self.run_adb_command("uninstall %s" % package)
+ result = out.split()
return "Success" in result
- def runInstrumentationTest(self, option):
- return self.runShellCommand("am instrument -w --no-window-animation " + option)
+ def run_instrumentation_test(self, option):
+ self.wait_for_service('activity')
+ return self.run_shell_command("am instrument -w --no-window-animation %s" % option)
- def isProcessAlive(self, processName):
- (out, err) = self.runShellCommand("ps")
+ def is_process_alive(self, process_name):
+ (out, err) = self.run_shell_command("ps")
names = out.split()
# very lazy implementation as it does not filter out things like uid
# should work mostly unless processName is too simple to overlap with
# uid. So only use name like com.android.xyz
- return processName in names
+ return process_name in names
- def getVersionSdkInt(self):
- return int(self.runShellCommand("getprop ro.build.version.sdk")[0])
+ def get_version_sdk(self):
+ return int(self.run_shell_command("getprop ro.build.version.sdk")[0])
- def getVersionCodename(self):
- return self.runShellCommand("getprop ro.build.version.codename")[0].strip()
+ def get_version_codename(self):
+ return self.run_shell_command("getprop ro.build.version.codename")[0].strip()
- def getDensity(self):
- if "emulator" in self._adbDevice:
- return int(self.runShellCommand("getprop qemu.sf.lcd_density")[0])
+ def get_density(self):
+ if "emulator" in self._serial:
+ return int(self.run_shell_command("getprop qemu.sf.lcd_density")[0])
else:
- return int(self.runShellCommand("getprop ro.sf.lcd_density")[0])
+ return int(self.run_shell_command("getprop ro.sf.lcd_density")[0])
- def getSdkLevel(self):
- return int(self.runShellCommand("getprop ro.build.version.sdk")[0])
+ def get_orientation(self):
+ return int(self.run_shell_command("dumpsys | grep SurfaceOrientation")[0].split()[1])
- def getOrientation(self):
- return int(self.runShellCommand("dumpsys | grep SurfaceOrientation")[0].split()[1])
- # Running dumpsys on the emulator currently yields a SIGSEGV, so don't do it.
- #
- #def getHWType(self):
- # (output, err) = self.runShellCommand("dumpsys | grep android.hardware.type")
- # output = output.strip()
- # return output
-
-def runAdbDevices():
+def enumerate_android_devices(require_prefix=''):
devices = subprocess.check_output(["adb", "devices"])
- devices = devices.split('\n')[1:]
+ if not devices:
+ return []
- deviceSerial = []
+ devices = devices.decode('UTF-8').split('\n')[1:]
+ device_list = []
for device in devices:
- if device is not "":
+ if device is not "" and device.startswith(require_prefix):
info = device.split('\t')
if info[1] == "device":
- deviceSerial.append(info[0])
+ device_list.append(info[0])
- return deviceSerial
-
-if __name__ == '__main__':
- main(sys.argv)
+ return device_list