Merge "Minor edits for BT related utils and Enums" am: 9133fbd925 am: c11d573822
am: 9412f4bc1e
Change-Id: I451abe2a0407e03bb98036a6225e95aaffe8223a
diff --git a/acts/framework/acts/controllers/buds_lib/apollo_lib.py b/acts/framework/acts/controllers/buds_lib/apollo_lib.py
index 9b971ff..7b0f80f 100644
--- a/acts/framework/acts/controllers/buds_lib/apollo_lib.py
+++ b/acts/framework/acts/controllers/buds_lib/apollo_lib.py
@@ -46,7 +46,7 @@
import time
import serial
-from acts import tracelogger
+from acts.controllers.buds_lib import tako_trace_logger
from acts.controllers.buds_lib import logserial
from acts.controllers.buds_lib.b29_lib import B29Device
from acts.controllers.buds_lib.dev_utils import apollo_log_decoder
@@ -55,7 +55,7 @@
from logging import Logger
from retry import retry
-logging = tracelogger.TakoTraceLogger(Logger('apollo'))
+logging = tako_trace_logger.TakoTraceLogger(Logger('apollo'))
BAUD_RATE = 115200
BYTE_SIZE = 8
diff --git a/acts/framework/acts/controllers/buds_lib/b29_lib.py b/acts/framework/acts/controllers/buds_lib/b29_lib.py
index bb4ea7d..df6a163 100644
--- a/acts/framework/acts/controllers/buds_lib/b29_lib.py
+++ b/acts/framework/acts/controllers/buds_lib/b29_lib.py
@@ -28,12 +28,12 @@
import os
import re
import time
-
-from acts import tracelogger
-from acts import utils
from logging import Logger
-logging = tracelogger.TakoTraceLogger(Logger(__file__))
+from acts import utils
+from acts.controllers.buds_lib import tako_trace_logger
+
+logging = tako_trace_logger.TakoTraceLogger(Logger(__file__))
DEVICE_REGEX = (
r'_(?P<device_serial>[A-Z0-9]+)-(?P<interface>\w+)\s->\s'
r'(\.\./){2}(?P<port>\w+)'
diff --git a/acts/framework/acts/controllers/buds_lib/logserial.py b/acts/framework/acts/controllers/buds_lib/logserial.py
index 58bf154..6b18e3c 100644
--- a/acts/framework/acts/controllers/buds_lib/logserial.py
+++ b/acts/framework/acts/controllers/buds_lib/logserial.py
@@ -21,15 +21,15 @@
import sys
import time
import uuid
+from logging import Logger
from threading import Thread
import serial
from serial.tools import list_ports
-from acts import tracelogger
-from logging import Logger
+from acts.controllers.buds_lib import tako_trace_logger
-logging = tracelogger.TakoTraceLogger(Logger(__file__))
+logging = tako_trace_logger.TakoTraceLogger(Logger(__file__))
RETRIES = 0
diff --git a/acts/framework/acts/controllers/buds_lib/tako_trace_logger.py b/acts/framework/acts/controllers/buds_lib/tako_trace_logger.py
new file mode 100644
index 0000000..ff31840
--- /dev/null
+++ b/acts/framework/acts/controllers/buds_lib/tako_trace_logger.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python3
+#
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from acts import tracelogger
+
+
+class TakoTraceLogger(tracelogger.TraceLogger):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.d = self.debug
+ self.e = self.error
+ self.i = self.info
+ self.t = self.step
+ self.w = self.warning
+
+ def _logger_level(self, level_name):
+ level = logging.getLevelName(level_name)
+ return lambda *args, **kwargs: self._logger.log(level, *args, **kwargs)
+
+ def step(self, msg, *args, **kwargs):
+ """Delegate a step call to the underlying logger."""
+ self._log_with(self._logger_level('STEP'), 1, msg, *args, **kwargs)
+
+ def device(self, msg, *args, **kwargs):
+ """Delegate a device call to the underlying logger."""
+ self._log_with(self._logger_level('DEVICE'), 1, msg, *args, **kwargs)
+
+ def suite(self, msg, *args, **kwargs):
+ """Delegate a device call to the underlying logger."""
+ self._log_with(self._logger_level('SUITE'), 1, msg, *args, **kwargs)
+
+ def case(self, msg, *args, **kwargs):
+ """Delegate a case call to the underlying logger."""
+ self._log_with(self._logger_level('CASE'), 1, msg, *args, **kwargs)
+
+ def flush_log(self):
+ """This function exists for compatibility with Tako's logserial module.
+
+ Note that flushing the log is handled automatically by python's logging
+ module.
+ """
+ pass
diff --git a/acts/framework/acts/controllers/buds_lib/test_actions/audio_utils.py b/acts/framework/acts/controllers/buds_lib/test_actions/audio_utils.py
index 42f8c46..e94fc52 100644
--- a/acts/framework/acts/controllers/buds_lib/test_actions/audio_utils.py
+++ b/acts/framework/acts/controllers/buds_lib/test_actions/audio_utils.py
@@ -19,8 +19,8 @@
import datetime
import time
-from acts import tracelogger
from acts import utils
+from acts.controllers.buds_lib import tako_trace_logger
class AudioUtilsError(Exception):
@@ -36,7 +36,7 @@
"""
def __init__(self):
- self.logger = tracelogger.TakoTraceLogger()
+ self.logger = tako_trace_logger.TakoTraceLogger()
def play_audio_into_device(self, audio_file_path, audio_player, dut):
"""Open mic on DUT, play audio into DUT, close mic on DUT.
diff --git a/acts/framework/acts/controllers/buds_lib/test_actions/base_test_actions.py b/acts/framework/acts/controllers/buds_lib/test_actions/base_test_actions.py
index 7b6cbc4..8f4b37a 100644
--- a/acts/framework/acts/controllers/buds_lib/test_actions/base_test_actions.py
+++ b/acts/framework/acts/controllers/buds_lib/test_actions/base_test_actions.py
@@ -22,7 +22,7 @@
import inspect
import time
-from acts import tracelogger
+from acts.controllers.buds_lib import tako_trace_logger
from acts.libs.utils.timer import TimeRecorder
# All methods start with "_" are considered hidden.
@@ -37,7 +37,7 @@
func_name = self._convert_default_action_name(method.__name__)
if not func_name:
func_name = method.__name__
- self.logger.step('%s...' % func_name)
+ self.log_step('%s...' % func_name)
self.timer.start_timer(func_name, True)
result = method(self, *args, **kw)
# TODO: Method run time collected can be used for automatic KPI checks
@@ -135,9 +135,11 @@
def __init__(self, logger=None):
if logger is None:
- self.logger = tracelogger.TakoTraceLogger()
+ self.logger = tako_trace_logger.TakoTraceLogger()
+ self.log_step = self.logger.step
else:
self.logger = logger
+ self.log_step = self.logger.info
self.timer = TimeRecorder()
self._fill_default_action_map()
@@ -172,15 +174,16 @@
exceptions
"""
num_acts = len(self._action_map)
- self.logger.i('I can do %d action%s:' %
+
+ self.logger.info('I can do %d action%s:' %
(num_acts, 's' if num_acts != 1 else ''))
for act in self._action_map.keys():
- self.logger.i(' - %s' % act)
+ self.logger.info(' - %s' % act)
return True
@timed_action
def sleep(self, seconds):
- self.logger.i('%s seconds' % seconds)
+ self.logger.info('%s seconds' % seconds)
time.sleep(seconds)
diff --git a/acts/framework/acts/controllers/buds_lib/test_actions/bt_utils.py b/acts/framework/acts/controllers/buds_lib/test_actions/bt_utils.py
index 2582402..a607659 100644
--- a/acts/framework/acts/controllers/buds_lib/test_actions/bt_utils.py
+++ b/acts/framework/acts/controllers/buds_lib/test_actions/bt_utils.py
@@ -31,12 +31,12 @@
"""
import queue
import time
-
-from acts import tracelogger
-from acts.utils import wait_until
-from acts.utils import TimeoutError
from logging import Logger
+from acts.controllers.buds_lib import tako_trace_logger
+from acts.utils import TimeoutError
+from acts.utils import wait_until
+
# Add connection profile for future devices in this dictionary
WEARABLE_BT_PROTOCOLS = {
'rio': {
@@ -69,7 +69,7 @@
def __init__(self):
self.default_timeout = 60
- self.logger = tracelogger.TakoTraceLogger(Logger(__file__))
+ self.logger = tako_trace_logger.TakoTraceLogger(Logger(__file__))
def bt_pair_and_connect(self, pri_device, sec_device):
"""Pair and connect a pri_device to a sec_device.
@@ -198,8 +198,8 @@
return True, 0
self.logger.debug('Unpairing from %s' % target_address)
start_time = end_time = time.time()
- assert (True is pri_device.droid.bluetoothUnbond(target_address),
- 'Failed to request device unpairing.')
+ assert True is pri_device.droid.bluetoothUnbond(target_address), (
+ 'Failed to request device unpairing.')
# Check that devices have unpaired successfully.
self.logger.debug('Verifying devices are unpaired')
@@ -290,4 +290,3 @@
if expected[key] != actual[key]:
return False
return True
-
diff --git a/acts/framework/acts/controllers/spectracom_lib/__init__.py b/acts/framework/acts/controllers/spectracom_lib/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/acts/framework/acts/controllers/spectracom_lib/__init__.py
diff --git a/acts/framework/acts/controllers/spectracom_lib/gsg6.py b/acts/framework/acts/controllers/spectracom_lib/gsg6.py
new file mode 100644
index 0000000..6b96456
--- /dev/null
+++ b/acts/framework/acts/controllers/spectracom_lib/gsg6.py
@@ -0,0 +1,120 @@
+#!/usr/bin/env python3
+#
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Python module for Spectracom/Orolia GSG-6 GNSS simulator."""
+
+from acts.controllers import abstract_inst
+
+
+class GSG6Error(abstract_inst.SocketInstrumentError):
+ """GSG-6 Instrument Error Class."""
+
+
+class GSG6(abstract_inst.SocketInstrument):
+ """GSG-6 Class, inherted from abstract_inst SocketInstrument."""
+
+ def __init__(self, ip_addr, ip_port):
+ """Init method for GSG-6.
+
+ Args:
+ ip_addr: IP Address.
+ Type, str.
+ ip_port: TCPIP Port.
+ Type, str.
+ """
+ super(GSG6, self).__init__(ip_addr, ip_port)
+
+ self.idn = ''
+
+ def connect(self):
+ """Init and Connect to GSG-6."""
+ self._connect_socket()
+
+ self.get_idn()
+
+ infmsg = 'Connected to GSG-6, with ID: {}'.format(self.idn)
+ self._logger.debug(infmsg)
+
+ def close(self):
+ """Close GSG-6."""
+ self._close_socket()
+
+ self._logger.debug('Closed connection to GSG-6')
+
+ def get_idn(self):
+ """Get the Idenification of GSG-6.
+
+ Returns:
+ GSG-6 Identifier
+ """
+ self.idn = self._query('*IDN?')
+
+ return self.idn
+
+ def start_scenario(self, scenario=''):
+ """Start to run scenario.
+
+ Args:
+ scenario: Scenario to run.
+ Type, str.
+ Default, '', which will run current selected one.
+ """
+ if scenario:
+ cmd = 'SOUR:SCEN:LOAD ' + scenario
+ self._send(cmd)
+
+ self._send('SOUR:SCEN:CONT START')
+
+ if scenario:
+ infmsg = 'Started running scenario {}'.format(scenario)
+ else:
+ infmsg = 'Started running current scenario'
+
+ self._logger.debug(infmsg)
+
+ def stop_scenario(self):
+ """Stop the running scenario."""
+
+ self._send('SOUR:SCEN:CONT STOP')
+
+ self._logger.debug('Stopped running scenario')
+
+ def preset(self):
+ """Preset GSG-6 to default status."""
+ self._send('*RST')
+
+ self._logger.debug('Reset GSG-6')
+
+ def set_power(self, power_level):
+ """set GSG-6 transmit power on all bands.
+
+ Args:
+ power_level: transmit power level
+ Type, float.
+ Decimal, unit [dBm]
+
+ Raises:
+ GSG6Error: raise when power level is not in [-160, -65] range.
+ """
+ if not -160 <= power_level <= -65:
+ errmsg = ('"power_level" must be within [-160, -65], '
+ 'current input is {}').format(str(power_level))
+ raise GSG6Error(error=errmsg, command='set_power')
+
+ self._send(':SOUR:POW ' + str(round(power_level, 1)))
+
+ infmsg = 'Set GSG-6 transmit power to "{}"'.format(
+ round(power_level, 1))
+ self._logger.debug(infmsg)
diff --git a/acts/framework/acts/controllers/spirent_lib/__init__.py b/acts/framework/acts/controllers/spirent_lib/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/acts/framework/acts/controllers/spirent_lib/__init__.py
diff --git a/acts/framework/acts/controllers/spirent_lib/gss6450.py b/acts/framework/acts/controllers/spirent_lib/gss6450.py
new file mode 100644
index 0000000..aa84575
--- /dev/null
+++ b/acts/framework/acts/controllers/spirent_lib/gss6450.py
@@ -0,0 +1,381 @@
+#!/usr/bin/env python3
+#
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Python module for Spirent GSS6450 GNSS RPS."""
+
+import datetime
+import numbers
+from acts.controllers import abstract_inst
+
+
+class GSS6450Error(abstract_inst.SocketInstrumentError):
+ """GSS6450 Instrument Error Class."""
+
+
+class GSS6450(abstract_inst.RequestInstrument):
+ """GSS6450 Class, inherted from abstract_inst RequestInstrument."""
+
+ def __init__(self, ip_addr):
+ """Init method for GSS6450.
+
+ Args:
+ ip_addr: IP Address.
+ Type, str.
+ """
+ super(GSS6450, self).__init__(ip_addr)
+
+ self.idn = 'Spirent-GSS6450'
+
+ def _put(self, cmd):
+ """Send put command via GSS6450 HTTP Request and get response.
+
+ Args:
+ cmd: parameters listed in SHM_PUT.
+ Type, Str.
+
+ Returns:
+ resp: Response from the _query method.
+ Type, Str.
+ """
+ put_cmd = 'shm_put.shtml?' + cmd
+ resp = self._query(put_cmd)
+
+ return resp
+
+ def _get(self, cmd):
+ """Send get command via GSS6450 HTTP Request and get response.
+
+ Args:
+ cmd: parameters listed in SHM_GET.
+ Type, Str.
+
+ Returns:
+ resp: Response from the _query method.
+ Type, Str.
+ """
+ get_cmd = 'shm_get.shtml?' + cmd
+ resp = self._query(get_cmd)
+
+ return resp
+
+ def get_scenario_filename(self):
+ """Get the scenario filename of GSS6450.
+
+ Returns:
+ filename: RPS Scenario file name.
+ Type, Str.
+ """
+ resp_raw = self._get('-f')
+ filename = resp_raw.split(':')[-1].strip(' ')
+ self._logger.debug('Got scenario file name: "%s".', filename)
+
+ return filename
+
+ def get_scenario_description(self):
+ """Get the scenario description of GSS6450.
+
+ Returns:
+ description: RPS Scenario description.
+ Type, Str.
+ """
+ resp_raw = self._get('-d')
+ description = resp_raw.split('-d')[-1].strip(' ')
+
+ if description:
+ self._logger.debug('Got scenario description: "%s".', description)
+ else:
+ self._logger.warning('Got scenario description with empty string.')
+
+ return description
+
+ def get_scenario_location(self):
+ """Get the scenario location of GSS6450.
+
+ Returns:
+ location: RPS Scenario location.
+ Type, Str.
+ """
+ resp_raw = self._get('-i')
+ location = resp_raw.split('-i')[-1].strip(' ')
+
+ if location:
+ self._logger.debug('Got scenario location: "%s".', location)
+ else:
+ self._logger.warning('Got scenario location with empty string.')
+
+ return location
+
+ def get_operation_mode(self):
+ """Get the operation mode of GSS6450.
+
+ Returns:
+ mode: RPS Operation Mode.
+ Type, Str.
+ Option, STOPPED/PLAYING/RECORDING
+ """
+ resp_raw = self._get('-m')
+ mode = resp_raw.split('-m')[-1].strip(' ')
+ self._logger.debug('Got operation mode: "%s".', mode)
+
+ return mode
+
+ def get_battery_level(self):
+ """Get the battery level of GSS6450.
+
+ Returns:
+ batterylevel: RPS Battery Level.
+ Type, float.
+ """
+ resp_raw = self._get('-l')
+ batterylevel = float(resp_raw.split('-l')[-1].strip(' '))
+ self._logger.debug('Got battery level: %s%%.', batterylevel)
+
+ return batterylevel
+
+ def get_rfport_voltage(self):
+ """Get the RF port voltage of GSS6450.
+
+ Returns:
+ voltageout: RPS RF port voltage.
+ Type, str
+ """
+ resp_raw = self._get('-v')
+ voltageout = resp_raw.split('-v')[-1].strip(' ')
+ self._logger.debug('Got RF port voltage: "%s".', voltageout)
+
+ return voltageout
+
+ def get_storage_media(self):
+ """Get the storage media of GSS6450.
+
+ Returns:
+ media: RPS storage.
+ Type, str
+
+ Raises:
+ GSS6450Error: raise when request response is not support.
+ """
+ resp_raw = self._get('-M')
+ resp_num = resp_raw.split('-M')[-1].strip(' ')
+
+ if resp_num == '1':
+ media = '1-INTERNAL'
+ elif resp_num == '2':
+ media = '2-REMOVABLE'
+ else:
+ errmsg = ('"{}" is not recognized as GSS6450 valid storage media'
+ ' type'.format(resp_num))
+ raise GSS6450Error(error=errmsg, command='get_storage_media')
+
+ self._logger.debug('Got current storage media: %s.', media)
+
+ return media
+
+ def get_attenuation(self):
+ """Get the attenuation of GSS6450.
+
+ Returns:
+ attenuation: RPS attenuation level, in dB.
+ Type, list of float.
+ """
+ resp_raw = self._get('-a')
+ resp_str = resp_raw.split('-a')[-1].strip(' ')
+ self._logger.debug('Got attenuation: %s dB.', resp_str)
+ attenuation = [float(itm) for itm in resp_str.split(',')]
+
+ return attenuation
+
+ def get_elapsed_time(self):
+ """Get the running scenario elapsed time of GSS6450.
+
+ Returns:
+ etime: RPS elapsed time.
+ Type, datetime.timedelta.
+ """
+ resp_raw = self._get('-e')
+ resp_str = resp_raw.split('-e')[-1].strip(' ')
+ self._logger.debug('Got senario elapsed time: "%s".', resp_str)
+ etime_tmp = datetime.datetime.strptime(resp_str, '%H:%M:%S')
+ etime = datetime.timedelta(hours=etime_tmp.hour,
+ minutes=etime_tmp.minute,
+ seconds=etime_tmp.second)
+
+ return etime
+
+ def get_playback_offset(self):
+ """Get the running scenario playback offset of GSS6450.
+
+ Returns:
+ offset: RPS playback offset.
+ Type, datetime.timedelta.
+ """
+ resp_raw = self._get('-o')
+ offset_tmp = float(resp_raw.split('-o')[-1].strip(' '))
+ self._logger.debug('Got senario playback offset: %s sec.', offset_tmp)
+ offset = datetime.timedelta(seconds=offset_tmp)
+
+ return offset
+
+ def play_scenario(self, scenario=''):
+ """Start to play scenario in GSS6450.
+
+ Args:
+ scenario: Scenario to play.
+ Type, str.
+ Default, '', which will run current selected one.
+ """
+ if scenario:
+ cmd = '-f{},-wP'.format(scenario)
+ else:
+ cmd = '-wP'
+
+ _ = self._put(cmd)
+
+ if scenario:
+ infmsg = 'Started playing scenario: "{}".'.format(scenario)
+ else:
+ infmsg = 'Started playing current scenario.'
+
+ self._logger.debug(infmsg)
+
+ def record_scenario(self, scenario=''):
+ """Start to record scenario in GSS6450.
+
+ Args:
+ scenario: Scenario to record.
+ Type, str.
+ Default, '', which will run current selected one.
+ """
+ if scenario:
+ cmd = '-f{},-wR'.format(scenario)
+ else:
+ cmd = '-wR'
+
+ _ = self._put(cmd)
+
+ if scenario:
+ infmsg = 'Started recording scenario: "{}".'.format(scenario)
+ else:
+ infmsg = 'Started recording scenario.'
+
+ self._logger.debug(infmsg)
+
+ def stop_scenario(self):
+ """Start to stop playing/recording scenario in GSS6450."""
+ _ = self._put('-wS')
+
+ self._logger.debug('Stopped playing/recording scanrio.')
+
+ def set_rfport_voltage(self, voltageout):
+ """Set the RF port voltage of GSS6450.
+
+ Args:
+ voltageout: RPS RF port voltage.
+ Type, str
+
+ Raises:
+ GSS6450Error: raise when voltageout input is not valid.
+ """
+ if voltageout == 'OFF':
+ voltage_cmd = '0'
+ elif voltageout == '3.3V':
+ voltage_cmd = '3'
+ elif voltageout == '5V':
+ voltage_cmd = '5'
+ else:
+ errmsg = ('"{}" is not recognized as GSS6450 valid RF port voltage'
+ ' type'.format(voltageout))
+ raise GSS6450Error(error=errmsg, command='set_rfport_voltage')
+
+ _ = self._put('-v{},-wV'.format(voltage_cmd))
+ self._logger.debug('Set RF port voltage: "%s".', voltageout)
+
+ def set_attenuation(self, attenuation):
+ """Set the attenuation of GSS6450.
+
+ Args:
+ attenuation: RPS attenuation level, in dB.
+ Type, numerical.
+
+ Raises:
+ GSS6450Error: raise when attenuation is not in range.
+ """
+ if not 0 <= attenuation <= 31:
+ errmsg = ('"attenuation" must be within [0, 31], '
+ 'current input is {}').format(str(attenuation))
+ raise GSS6450Error(error=errmsg, command='set_attenuation')
+
+ attenuation_raw = round(attenuation)
+
+ if attenuation_raw != attenuation:
+ warningmsg = ('"attenuation" must be integer, current input '
+ 'will be rounded to {}'.format(attenuation_raw))
+ self._logger.warning(warningmsg)
+
+ _ = self._put('-a{},-wA'.format(attenuation_raw))
+
+ self._logger.debug('Set attenuation: %s dB.', attenuation_raw)
+
+ def set_playback_offset(self, offset):
+ """Set the playback offset of GSS6450.
+
+ Args:
+ offset: RPS playback offset.
+ Type, datetime.timedelta, or numerical.
+
+ Raises:
+ GSS6450Error: raise when offset is not numeric or timedelta.
+ """
+ if isinstance(offset, datetime.timedelta):
+ offset_raw = offset.total_seconds()
+ elif isinstance(offset, numbers.Number):
+ offset_raw = offset
+ else:
+ raise GSS6450Error(error=('"offset" must be numerical value or '
+ 'datetime.timedelta'),
+ command='set_playback_offset')
+
+ _ = self._put('-o{}'.format(offset_raw))
+
+ self._logger.debug('Set playback offset: %s sec.', offset_raw)
+
+ def set_storage_media(self, media):
+ """Set the storage media of GSS6450.
+
+ Args:
+ media: RPS storage Media, Internal or External.
+ Type, str. Option, 'internal', 'removable'
+
+ Raises:
+ GSS6450Error: raise when media option is not support.
+ """
+ if media == 'internal':
+ raw_media = '1'
+ elif media == 'removable':
+ raw_media = '2'
+ else:
+ raise GSS6450Error(
+ error=('"media" input must be in ["internal", "removable"]. '
+ ' Current input is {}'.format(media)),
+ command='set_storage_media')
+
+ _ = self._put('-M{}-wM'.format(raw_media))
+
+ resp_raw = self.get_storage_media()
+ if raw_media != resp_raw[0]:
+ raise GSS6450Error(
+ error=('Setting media "{}" is not the same as queried media '
+ '"{}".'.format(media, resp_raw)),
+ command='set_storage_media')
diff --git a/acts/framework/acts/test_utils/instrumentation/adb_command_types.py b/acts/framework/acts/test_utils/instrumentation/adb_command_types.py
index 5c55734..ee7172c 100644
--- a/acts/framework/acts/test_utils/instrumentation/adb_command_types.py
+++ b/acts/framework/acts/test_utils/instrumentation/adb_command_types.py
@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from acts.test_utils.instrumentation.intent_builder import IntentBuilder
+
class DeviceState(object):
"""Class for adb commands for setting device properties to a value."""
@@ -82,6 +84,31 @@
on_val, off_val)
+class DeviceGServices(DeviceState):
+ """Class for overriding a GServices value."""
+
+ OVERRIDE_GSERVICES_INTENT = ('com.google.gservices.intent.action.'
+ 'GSERVICES_OVERRIDE')
+
+ def __init__(self, setting, on_val='true', off_val='false'):
+ """Create a DeviceGServices.
+
+ Args:
+ setting: Name of the GServices setting
+ on_val: Value used for the 'on' state
+ off_val: Value used for the 'off' state
+ """
+ super().__init__(None, on_val, off_val)
+ self._intent_builder = IntentBuilder('am broadcast')
+ self._intent_builder.set_action(self.OVERRIDE_GSERVICES_INTENT)
+ self._setting = setting
+
+ def set_value(self, value):
+ """Returns the adb command with the given value."""
+ self._intent_builder.add_key_value_param(self._setting, value)
+ return self._intent_builder.build()
+
+
class DeviceBinaryCommandSeries(object):
"""Class for toggling multiple settings at once."""
diff --git a/acts/framework/acts/test_utils/instrumentation/adb_commands/goog.py b/acts/framework/acts/test_utils/instrumentation/adb_commands/goog.py
new file mode 100644
index 0000000..4dceca0
--- /dev/null
+++ b/acts/framework/acts/test_utils/instrumentation/adb_commands/goog.py
@@ -0,0 +1,75 @@
+#!/usr/bin/env python3
+#
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts.test_utils.instrumentation.adb_command_types \
+ import DeviceBinaryCommandSeries
+from acts.test_utils.instrumentation.adb_command_types import DeviceGServices
+from acts.test_utils.instrumentation.adb_command_types import DeviceState
+
+"""Google-internal device settings for power testing."""
+
+# TODO: add descriptions to each setting
+
+# Location
+
+location_collection = DeviceGServices(
+ 'location:collection_enabled', on_val='1', off_val='0')
+
+location_opt_in = DeviceBinaryCommandSeries(
+ [
+ DeviceState('content insert --uri content://com.google.settings/'
+ 'partner --bind name:s:use_location_for_services '
+ '--bind value:s:%s'),
+ DeviceState('content insert --uri content://com.google.settings/'
+ 'partner --bind name:s:network_location_opt_in '
+ '--bind value:s:%s')
+ ]
+)
+
+# Cast
+
+cast_broadcast = DeviceGServices('gms:cast:mdns_device_scanner:is_enabled')
+
+
+# Apps
+
+disable_playstore = 'pm disable-user com.android.vending'
+
+
+# Volta
+
+disable_volta = 'pm disable-user com.google.android.volta'
+
+
+# CHRE
+
+disable_chre = 'setprop ctl.stop vendor.chre'
+
+
+# MusicIQ
+
+disable_musiciq = 'pm disable-user com.google.intelligence.sense'
+
+
+# Hotword
+
+disable_hotword = (
+ 'am start -a com.android.intent.action.MANAGE_VOICE_KEYPHRASES '
+ '--ei com.android.intent.extra.VOICE_KEYPHRASE_ACTION 2 '
+ '--es com.android.intent.extra.VOICE_KEYPHRASE_HINT_TEXT "demo" '
+ '--es com.android.intent.extra.VOICE_KEYPHRASE_LOCALE "en-US" '
+ 'com.android.hotwordenrollment.okgoogle/'
+ 'com.android.hotwordenrollment.okgoogle.EnrollmentActivity')
diff --git a/acts/framework/acts/test_utils/instrumentation/brightness.py b/acts/framework/acts/test_utils/instrumentation/brightness.py
new file mode 100644
index 0000000..95c5151
--- /dev/null
+++ b/acts/framework/acts/test_utils/instrumentation/brightness.py
@@ -0,0 +1,79 @@
+#!/usr/bin/env python3
+#
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+_BRIGHTNESS_FOR_200_NITS = {
+ 'hammerhead': 88,
+ 'shamu': 203,
+ 'razor': 112, # Flo
+ 'razorg': 112, # Deb
+ 'volantis': 175,
+ 'volantisg': 175,
+ '4560MMX': 155, # Tinno
+ '4560MMX_b': 164, # Longcheer
+ 'AQ4501': 120, # Tinno MicroMax
+ 'Mi-498': 134, # Sprout Spice
+ 'l8150': 113, # Seed
+ 'ctih220': 116, # Seed Cherry
+ 'angler': 158, # Angler
+ 'bullhead': 149, # Bullhead
+ 'ryu': 91, # Ryu
+ 'sailfish': 131, # Sailfish & friends
+ 'sailfish_eas': 131,
+ 'sailfish_vboot': 131,
+ 'marlin': 147, # Marlin & friends
+ 'marlin_eas': 147,
+ 'muskie': 152, # Muskie
+ 'vega': 156, # Vega day dream device
+ 'walleye': 136, # Walleye & friends
+ 'walleye_clang': 136,
+ 'walleye_kcfi': 136,
+ 'walleye_vboot': 136,
+ 'taimen': 157, # Taimen & friends
+ 'taimen_clang': 157,
+ 'crosshatch': 130, # Crosshatch
+ 'blueline': 114, # Blueline
+ 'bonito': 120, # Bonito
+ 'sargo': 126, # Sargo
+ 'maran9810': 130, # Maran9810
+ 'maran9820': 130, # Maran9820
+ 'maran9820_419': 130, # Maran9820_419
+ 'coral': 123, # Coral
+ 'flame': 118, # Flame
+}
+
+_BRIGHTNESS_FOR_100_LUX = {
+ 'sailfish': 48, # Sailfish & friends
+ 'marlin': 57, # Marlin & friends
+ 'walleye': 40, # Walleye & friends
+ 'taimen': 55, # Taimen & friends
+ 'crosshatch': 60, # Crosshatch
+ 'blueline': 61, # Blueline
+ 'bonito': 54, # Bonito
+ 'sargo': 54, # Sargo
+ 'maran9810': 60, # Maran9810
+ 'coral': 67, # Coral
+ 'flame': 58, # Flame
+}
+
+
+def get_brightness_for_200_nits(model_name):
+ """Returns the brightness setting for 200 nits on the specified model."""
+ return _BRIGHTNESS_FOR_200_NITS[model_name]
+
+
+def get_brightness_for_100_lux(model_name):
+ """Returns the brightness setting for 100 lux on the specified model."""
+ return _BRIGHTNESS_FOR_100_LUX[model_name]
diff --git a/acts/framework/acts/test_utils/instrumentation/instrumentation_base_test.py b/acts/framework/acts/test_utils/instrumentation/instrumentation_base_test.py
index fe00d33..7495dc9 100644
--- a/acts/framework/acts/test_utils/instrumentation/instrumentation_base_test.py
+++ b/acts/framework/acts/test_utils/instrumentation/instrumentation_base_test.py
@@ -16,12 +16,19 @@
import os
+import tzlocal
import yaml
from acts.keys import Config
from acts.test_utils.instrumentation import app_installer
from acts.test_utils.instrumentation import instrumentation_proto_parser \
as proto_parser
+from acts.test_utils.instrumentation.adb_command_types import DeviceGServices
+from acts.test_utils.instrumentation.adb_command_types import DeviceSetprop
+from acts.test_utils.instrumentation.adb_command_types import DeviceSetting
from acts.test_utils.instrumentation.adb_commands import common
+from acts.test_utils.instrumentation.adb_commands import goog
+from acts.test_utils.instrumentation.brightness import \
+ get_brightness_for_200_nits
from acts.test_utils.instrumentation.config_wrapper import ConfigWrapper
from acts.test_utils.instrumentation.instrumentation_command_builder import \
InstrumentationCommandBuilder
@@ -290,3 +297,90 @@
# Uninstall PermissionUtils.apk
self.ad_apps.uninstall(permissions_apk_path)
+
+ def base_device_configuration(self):
+ """Run the base setup commands for power testing."""
+ self.log.info('Running base device setup commands.')
+
+ self.ad_dut.adb.ensure_root()
+
+ # Test harness flag
+ self.adb_run(common.test_harness.toggle(True))
+
+ # Calling
+ self.adb_run(common.disable_dialing.toggle(True))
+
+ # Screen
+ self.adb_run(common.screen_adaptive_brightness.toggle(False))
+ self.adb_run(common.screen_brightness.set_value(
+ get_brightness_for_200_nits(self.ad_dut.model)))
+ self.adb_run(common.screen_timeout_ms.set_value(1800000))
+ self.adb_run(common.notification_led.toggle(False))
+ self.adb_run(common.screensaver.toggle(False))
+ self.adb_run(common.wake_gesture.toggle(False))
+ self.adb_run(common.doze_mode.toggle(False))
+
+ # Accelerometer
+ self.adb_run(common.auto_rotate.toggle(False))
+
+ # Time
+ self.adb_run(common.auto_time.toggle(False))
+ self.adb_run(common.auto_timezone.toggle(False))
+ self.adb_run(common.timezone.set_value(str(tzlocal.get_localzone())))
+
+ # Location
+ self.adb_run(common.location_gps.toggle(False))
+ self.adb_run(common.location_network.toggle(False))
+
+ # Power
+ self.adb_run(common.battery_saver_mode.toggle(False))
+ self.adb_run(common.battery_saver_trigger.set_value(0))
+ self.adb_run(common.enable_full_batterystats_history)
+ self.adb_run(common.disable_doze)
+
+ # Camera
+ self.adb_run(DeviceSetprop(
+ 'camera.optbar.hdr', 'true', 'false').toggle(True))
+
+ # Gestures
+ gestures = {
+ 'doze_pulse_on_pick_up': False,
+ 'doze_pulse_on_double_tap': False,
+ 'camera_double_tap_power_gesture_disabled': True,
+ 'camera_double_twist_to_flip_enabled': False,
+ 'assist_gesture_enabled': False,
+ 'assist_gesture_silence_alerts_enabled': False,
+ 'assist_gesture_wake_enabled': False,
+ 'system_navigation_keys_enabled': False,
+ 'camera_lift_trigger_enabled': False,
+ 'doze_always_on': False,
+ 'aware_enabled': False,
+ 'doze_wake_screen_gesture': False,
+ 'skip_gesture': False,
+ 'silence_gesture': False
+ }
+ self.adb_run(
+ [DeviceSetting(common.SECURE, k).toggle(v)
+ for k, v in gestures.items()])
+
+ # GServices
+ self.adb_run(goog.location_collection.toggle(False))
+ self.adb_run(goog.cast_broadcast.toggle(False))
+ self.adb_run(DeviceGServices(
+ 'location:compact_log_enabled').toggle(True))
+ self.adb_run(DeviceGServices('gms:magictether:enable').toggle(False))
+ self.adb_run(DeviceGServices('ocr.cc_ocr_enabled').toggle(False))
+ self.adb_run(DeviceGServices(
+ 'gms:phenotype:phenotype_flag:debug_bypass_phenotype').toggle(True))
+ self.adb_run(DeviceGServices(
+ 'gms_icing_extension_download_enabled').toggle(False))
+
+ # Misc. Google features
+ self.adb_run(goog.disable_playstore)
+ self.adb_run(goog.disable_volta)
+ self.adb_run(goog.disable_chre)
+ self.adb_run(goog.disable_musiciq)
+ self.adb_run(goog.disable_hotword)
+
+ # Enable clock dump info
+ self.adb_run('echo 1 > /d/clk/debug_suspend')
diff --git a/acts/framework/acts/test_utils/instrumentation/instrumentation_power_test.py b/acts/framework/acts/test_utils/instrumentation/instrumentation_power_test.py
new file mode 100644
index 0000000..9a21574
--- /dev/null
+++ b/acts/framework/acts/test_utils/instrumentation/instrumentation_power_test.py
@@ -0,0 +1,292 @@
+#!/usr/bin/env python3
+#
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import shutil
+import tempfile
+import time
+
+from acts.controllers.android_device import SL4A_APK_NAME
+from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
+from acts.test_utils.instrumentation import instrumentation_proto_parser \
+ as proto_parser
+from acts.test_utils.instrumentation.instrumentation_base_test \
+ import InstrumentationBaseTest
+from acts.test_utils.instrumentation.instrumentation_base_test \
+ import InstrumentationTestError
+from acts.test_utils.instrumentation.instrumentation_command_builder import \
+ DEFAULT_NOHUP_LOG
+from acts.test_utils.instrumentation.instrumentation_command_builder import \
+ InstrumentationTestCommandBuilder
+from acts.test_utils.instrumentation.instrumentation_proto_parser import \
+ DEFAULT_INST_LOG_DIR
+from acts.test_utils.instrumentation.power_metrics import Measurement
+from acts.test_utils.instrumentation.power_metrics import PowerMetrics
+
+from acts import context
+from acts import signals
+
+ACCEPTANCE_THRESHOLD = 'acceptance_threshold'
+AUTOTESTER_LOG = 'autotester.log'
+DISCONNECT_USB_FILE = 'disconnectusb.log'
+POLLING_INTERVAL = 0.5
+
+
+class InstrumentationPowerTest(InstrumentationBaseTest):
+ """Instrumentation test for measuring and validating power metrics.
+
+ Params:
+ metric_logger: Blackbox metric logger used to store test metrics.
+ _instr_cmd_builder: Builder for the instrumentation command
+ """
+
+ def __init__(self, configs):
+ super().__init__(configs)
+ self.metric_logger = BlackboxMappedMetricLogger.for_test_class()
+
+ def setup_class(self):
+ super().setup_class()
+ self.monsoon = self.monsoons[0]
+ self._setup_monsoon()
+ self._instr_cmd_builder = self.power_instrumentation_command_builder()
+ self._sl4a_apk = None
+
+ def _prepare_device(self):
+ """Prepares the device for power testing."""
+ super()._prepare_device()
+ self._cleanup_test_files()
+ self.install_test_apk()
+ self.grant_permissions()
+
+ def _cleanup_device(self):
+ """Clean up device after power testing."""
+ self._cleanup_test_files()
+
+ def _setup_monsoon(self):
+ """Set up the Monsoon controller for this testclass/testcase."""
+ self.log.info('Setting up Monsoon %s' % self.monsoon.serial)
+ monsoon_config = self._get_merged_config('Monsoon')
+ self._monsoon_voltage = monsoon_config.get_numeric('voltage', 4.2)
+ self.monsoon.set_voltage_safe(self._monsoon_voltage)
+ if 'max_current' in monsoon_config:
+ self.monsoon.set_max_current(
+ monsoon_config.get_numeric('max_current'))
+
+ self.monsoon.usb('on')
+ self.monsoon.set_on_disconnect(self._on_disconnect)
+ self.monsoon.set_on_reconnect(self._on_reconnect)
+
+ self._disconnect_usb_timeout = monsoon_config.get_numeric(
+ 'usb_disconnection_timeout', 240)
+
+ self._measurement_args = dict(
+ duration=monsoon_config.get_numeric('duration'),
+ hz=monsoon_config.get_numeric('frequency'),
+ measure_after_seconds=monsoon_config.get_numeric('delay')
+ )
+
+ def _on_disconnect(self):
+ """Callback invoked by device disconnection from the Monsoon."""
+ self.ad_dut.log.info('Disconnecting device.')
+ self.ad_dut.stop_services()
+ # Uninstall SL4A
+ self._sl4a_apk = self.ad_apps.pull_apk(
+ SL4A_APK_NAME, tempfile.mkdtemp(prefix='sl4a'))
+ self.ad_apps.uninstall(self._sl4a_apk)
+ time.sleep(1)
+
+ def _on_reconnect(self):
+ """Callback invoked by device reconnection to the Monsoon"""
+ # Reinstall SL4A
+ if not self.ad_dut.is_sl4a_installed() and self._sl4a_apk:
+ self.ad_apps.install(self._sl4a_apk)
+ shutil.rmtree(os.path.dirname(self._sl4a_apk))
+ self._sl4a_apk = None
+ self.ad_dut.start_services()
+ # Release wake lock to put device into sleep.
+ self.ad_dut.droid.goToSleepNow()
+ self.ad_dut.log.info('Device reconnected.')
+
+ def install_test_apk(self):
+ """Installs test apk on the device."""
+ test_apk_file = self._instrumentation_config.get_file('test_apk')
+ self.ad_apps.install(test_apk_file, '-g')
+ if not self.ad_apps.is_installed(test_apk_file):
+ raise InstrumentationTestError('Failed to install test APK.')
+ self._test_pkg = self.ad_apps.get_package_name(test_apk_file)
+
+ def _cleanup_test_files(self):
+ """Remove test-generated files from the device."""
+ self.ad_dut.log.info('Cleaning up test generated files.')
+ for file_name in [DISCONNECT_USB_FILE, DEFAULT_INST_LOG_DIR,
+ DEFAULT_NOHUP_LOG, AUTOTESTER_LOG]:
+ path = os.path.join(
+ self.ad_dut.adb.shell('echo $EXTERNAL_STORAGE'), file_name)
+ self.adb_run('rm -rf %s' % path)
+
+ # Test runtime utils
+
+ def power_instrumentation_command_builder(self):
+ """Return the default command builder for power tests"""
+ builder = InstrumentationTestCommandBuilder.default()
+ builder.set_manifest_package(self._test_pkg)
+ builder.set_nohup()
+ return builder
+
+ def _wait_for_disconnect_signal(self):
+ """Poll the device for a disconnect USB signal file. This will indicate
+ to the Monsoon that the device is ready to be disconnected.
+ """
+ self.log.info('Waiting for USB disconnect signal')
+ disconnect_file = os.path.join(
+ self.ad_dut.adb.shell('echo $EXTERNAL_STORAGE'),
+ DISCONNECT_USB_FILE)
+ start_time = time.time()
+ while time.time() < start_time + self._disconnect_usb_timeout:
+ if self.ad_dut.adb.shell('ls %s' % disconnect_file):
+ return
+ time.sleep(POLLING_INTERVAL)
+ raise InstrumentationTestError('Timeout while waiting for USB '
+ 'disconnect signal.')
+
+ def measure_power(self):
+ """Measures power consumption with the Monsoon. See monsoon_lib API for
+ details.
+ """
+ if not hasattr(self, '_measurement_args'):
+ raise InstrumentationTestError('Missing Monsoon measurement args.')
+
+ # Start measurement after receiving disconnect signal
+ self._wait_for_disconnect_signal()
+ power_data_path = os.path.join(
+ context.get_current_context().get_full_output_path(), 'power_data')
+ self.monsoon.usb('auto')
+ measure_start_time = time.time()
+ result = self.monsoon.measure_power(
+ **self._measurement_args, output_path=power_data_path)
+ self.monsoon.usb('on')
+
+ # Gather relevant metrics from measurements
+ session = self.dump_instrumentation_result_proto()
+ self._power_metrics = PowerMetrics(self._monsoon_voltage,
+ start_time=measure_start_time)
+ self._power_metrics.generate_test_metrics(
+ PowerMetrics.import_raw_data(power_data_path),
+ proto_parser.get_test_timestamps(session))
+ self._log_metrics()
+ return result
+
+ def run_and_measure(self, instr_class, instr_method=None, req_params=None):
+ """Convenience method for setting up the instrumentation test command,
+ running it on the device, and starting the Monsoon measurement.
+
+ Args:
+ instr_class: Fully qualified name of the instrumentation test class
+ instr_method: Name of the instrumentation test method
+ req_params: List of required parameter names
+
+ Returns: summary of Monsoon measurement
+ """
+ if instr_method:
+ self._instr_cmd_builder.add_test_method(instr_class, instr_method)
+ else:
+ self._instr_cmd_builder.add_test_class(instr_class)
+ params = {}
+ instr_call_config = self._get_merged_config('instrumentation_call')
+ # Add required parameters
+ for param_name in req_params or []:
+ params[param_name] = instr_call_config.get(
+ param_name, verify_fn=lambda x: x is not None,
+ failure_msg='%s is a required parameter.' % param_name)
+ # Add all other parameters
+ params.update(instr_call_config)
+ for name, value in params.items():
+ self._instr_cmd_builder.add_key_value_param(name, value)
+ instr_cmd = self._instr_cmd_builder.build()
+ self.adb_run_async(instr_cmd)
+ return self.measure_power()
+
+ def _log_metrics(self):
+ """Record the collected metrics with the metric logger."""
+ for metric_name in PowerMetrics.ALL_METRICS:
+ for instr_test_name in self._power_metrics.test_metrics:
+ metric_value = getattr(
+ self._power_metrics.test_metrics[instr_test_name],
+ metric_name).value
+ self.metric_logger.add_metric(
+ '%s__%s' % (metric_name, instr_test_name), metric_value)
+
+ def validate_power_results(self, *instr_test_names):
+ """Compare power measurements with target values and set the test result
+ accordingly.
+
+ Args:
+ instr_test_names: Name(s) of the instrumentation test method.
+ If none specified, defaults to all test methods run.
+
+ Raises:
+ signals.TestFailure if one or more metrics do not satisfy threshold
+ signals.TestPass otherwise
+ """
+ summaries = {}
+ failures = {}
+ all_thresholds = self._get_merged_config(ACCEPTANCE_THRESHOLD)
+
+ if not instr_test_names:
+ instr_test_names = all_thresholds.keys()
+
+ for instr_test_name in instr_test_names:
+ try:
+ test_metrics = self._power_metrics.test_metrics[instr_test_name]
+ except KeyError:
+ raise InstrumentationTestError(
+ 'Unable to find test method %s in instrumentation output.'
+ % instr_test_name)
+
+ summaries[instr_test_name] = test_metrics.summary
+ failures[instr_test_name] = {}
+ test_thresholds = all_thresholds.get_config(instr_test_name)
+ for metric_name, metric in test_thresholds.items():
+ try:
+ actual_result = getattr(test_metrics, metric_name)
+ except AttributeError:
+ continue
+
+ if 'unit_type' not in metric or 'unit' not in metric:
+ continue
+ unit_type = metric['unit_type']
+ unit = metric['unit']
+
+ lower_value = metric.get_numeric('lower_limit', float('-inf'))
+ upper_value = metric.get_numeric('upper_limit', float('inf'))
+ if 'expected_value' in metric and 'percent_deviation' in metric:
+ expected_value = metric.get_numeric('expected_value')
+ percent_deviation = metric.get_numeric('percent_deviation')
+ lower_value = expected_value * (1 - percent_deviation / 100)
+ upper_value = expected_value * (1 + percent_deviation / 100)
+
+ lower_bound = Measurement(lower_value, unit_type, unit)
+ upper_bound = Measurement(upper_value, unit_type, unit)
+ if not lower_bound <= actual_result <= upper_bound:
+ failures[instr_test_name][metric_name] = {
+ 'expected': '[%s, %s]' % (lower_bound, upper_bound),
+ 'actual': str(actual_result)
+ }
+ self.log.info('Summary of measurements: %s' % summaries)
+ if any(failures.values()):
+ raise signals.TestFailure('One or more measurements do not meet '
+ 'the specified criteria', failures)
+ raise signals.TestPass('All measurements meet the specified criteria')
diff --git a/acts/framework/acts/test_utils/instrumentation/power_metrics.py b/acts/framework/acts/test_utils/instrumentation/power_metrics.py
new file mode 100644
index 0000000..3ef9948
--- /dev/null
+++ b/acts/framework/acts/test_utils/instrumentation/power_metrics.py
@@ -0,0 +1,282 @@
+#!/usr/bin/env python3
+#
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import bisect
+import math
+
+import numpy as np
+from acts.test_utils.instrumentation import instrumentation_proto_parser \
+ as parser
+
+# Unit type constants
+CURRENT = 'current'
+POWER = 'power'
+TIME = 'time'
+
+# Unit constants
+MILLIAMP = 'mA'
+AMP = 'A'
+AMPERE = AMP
+MILLIWATT = 'mW'
+WATT = 'W'
+MILLISECOND = 'ms'
+SECOND = 's'
+MINUTE = 'm'
+HOUR = 'h'
+
+CONVERSION_TABLES = {
+ CURRENT: {
+ MILLIAMP: 0.001,
+ AMP: 1
+ },
+ POWER: {
+ MILLIWATT: 0.001,
+ WATT: 1
+ },
+ TIME: {
+ MILLISECOND: 0.001,
+ SECOND: 1,
+ MINUTE: 60,
+ HOUR: 3600
+ }
+}
+
+
+class Measurement(object):
+ """Base class for describing power measurement values. Each object contains
+ an value and a unit. Enables some basic arithmetic operations with other
+ measurements of the same unit type.
+
+ Attributes:
+ _value: Numeric value of the measurement
+ _unit_type: Unit type of the measurement (e.g. current, power)
+ _unit: Unit of the measurement (e.g. W, mA)
+ """
+
+ def __init__(self, value, unit_type, unit):
+ if unit_type not in CONVERSION_TABLES:
+ raise TypeError('%s is not a valid unit type' % unit_type)
+ self._value = value
+ self._unit_type = unit_type
+ self._unit = unit
+
+ # Convenience constructor methods
+ @staticmethod
+ def amps(amps):
+ """Create a new current measurement, in amps."""
+ return Measurement(amps, CURRENT, AMP)
+
+ @staticmethod
+ def watts(watts):
+ """Create a new power measurement, in watts."""
+ return Measurement(watts, POWER, WATT)
+
+ @staticmethod
+ def seconds(seconds):
+ """Create a new time measurement, in seconds."""
+ return Measurement(seconds, TIME, SECOND)
+
+ # Comparison methods
+
+ def __eq__(self, other):
+ return self.value == other.to_unit(self._unit).value
+
+ def __lt__(self, other):
+ return self.value < other.to_unit(self._unit).value
+
+ def __le__(self, other):
+ return self == other or self < other
+
+ # Addition and subtraction with other measurements
+
+ def __add__(self, other):
+ """Adds measurements of compatible unit types. The result will be in the
+ same units as self.
+ """
+ return Measurement(self.value + other.to_unit(self._unit).value,
+ self._unit_type, self._unit)
+
+ def __sub__(self, other):
+ """Subtracts measurements of compatible unit types. The result will be
+ in the same units as self.
+ """
+ return Measurement(self.value - other.to_unit(self._unit).value,
+ self._unit_type, self._unit)
+
+ # String representation
+
+ def __str__(self):
+ return '%g%s' % (self._value, self._unit)
+
+ def __repr__(self):
+ return str(self)
+
+ @property
+ def value(self):
+ return self._value
+
+ def to_unit(self, new_unit):
+ """Create an equivalent measurement under a different unit.
+ e.g. 0.5W -> 500mW
+
+ Args:
+ new_unit: Target unit. Must be compatible with current unit.
+
+ Returns: A new measurement with the converted value and unit.
+ """
+ try:
+ new_value = self._value * (
+ CONVERSION_TABLES[self._unit_type][self._unit] /
+ CONVERSION_TABLES[self._unit_type][new_unit])
+ except KeyError:
+ raise TypeError('Incompatible units: %s, %s' %
+ (self._unit, new_unit))
+ return Measurement(new_value, self._unit_type, new_unit)
+
+
+class PowerMetrics(object):
+ """Class for processing raw power metrics generated by Monsoon measurements.
+ Provides useful metrics such as average current, max current, and average
+ power. Can generate individual test metrics.
+
+ See section "Numeric metrics" below for available metrics.
+ """
+
+ def __init__(self, voltage, start_time=0):
+ """Create a PowerMetrics.
+
+ Args:
+ voltage: Voltage of the measurement
+ start_time: Start time of the measurement. Used for generating
+ test-specific metrics.
+ """
+ self._voltage = voltage
+ self._start_time = start_time
+ self._num_samples = 0
+ self._sum_currents = 0
+ self._sum_squares = 0
+ self._max_current = None
+ self._min_current = None
+ self.test_metrics = {}
+
+ @staticmethod
+ def import_raw_data(path):
+ """Create a generator from a Monsoon data file.
+
+ Args:
+ path: path to raw data file
+
+ Returns: generator that yields (timestamp, sample) per line
+ """
+ with open(path, 'r') as f:
+ for line in f:
+ time, sample = line.split()
+ yield float(time[:-1]), float(sample)
+
+ def update_metrics(self, sample):
+ """Update the running metrics with the current sample.
+
+ Args:
+ sample: A current sample.
+ """
+ self._num_samples += 1
+ self._sum_currents += sample
+ self._sum_squares += sample ** 2
+ if self._max_current is None or sample > self._max_current:
+ self._max_current = sample
+ if self._min_current is None or sample < self._min_current:
+ self._min_current = sample
+
+ def generate_test_metrics(self, raw_data, test_timestamps=None):
+ """Split the data into individual test metrics, based on the timestamps
+ given as a dict.
+
+ Args:
+ raw_data: raw data as list or generator of (timestamp, sample)
+ test_timestamps: dict following the output format of
+ instrumentation_proto_parser.get_test_timestamps()
+ """
+
+ # Initialize metrics for each test
+ if test_timestamps is None:
+ test_timestamps = {}
+ test_starts = {}
+ test_ends = {}
+ for test_name, times in test_timestamps.items():
+ self.test_metrics[test_name] = PowerMetrics(
+ self._voltage, self._start_time)
+ test_starts[test_name] = Measurement(
+ times[parser.START_TIMESTAMP], TIME, MILLISECOND)\
+ .to_unit(SECOND).value - self._start_time
+ test_ends[test_name] = Measurement(
+ times[parser.END_TIMESTAMP], TIME, MILLISECOND)\
+ .to_unit(SECOND).value - self._start_time
+
+ # Assign data to tests based on timestamps
+ for timestamp, sample in raw_data:
+ self.update_metrics(sample)
+ for test_name in test_timestamps:
+ if test_starts[test_name] <= timestamp <= test_ends[test_name]:
+ self.test_metrics[test_name].update_metrics(sample)
+
+ # Numeric metrics
+
+ ALL_METRICS = ('avg_current', 'max_current', 'min_current', 'stdev_current',
+ 'avg_power')
+
+ @property
+ def avg_current(self):
+ """Average current, in amps."""
+ if not self._num_samples:
+ return Measurement.amps(0)
+ return Measurement.amps(self._sum_currents / self._num_samples)
+
+ @property
+ def max_current(self):
+ """Max current, in amps."""
+ return Measurement.amps(self._max_current or 0)
+
+ @property
+ def min_current(self):
+ """Min current, in amps."""
+ return Measurement.amps(self._min_current or 0)
+
+ @property
+ def stdev_current(self):
+ """Standard deviation of current values, in amps."""
+ if self._num_samples < 2:
+ return Measurement.amps(0)
+
+ return Measurement.amps(math.sqrt(
+ (self._sum_squares - (
+ self._num_samples * self.avg_current.value ** 2))
+ / (self._num_samples - 1)))
+
+ def current_to_power(self, current):
+ """Converts a current value to a power value."""
+ return Measurement.watts(current.to_unit(AMP).value * self._voltage)
+
+ @property
+ def avg_power(self):
+ """Average power, in watts."""
+ return self.current_to_power(self.avg_current)
+
+ @property
+ def summary(self):
+ """A summary of test metrics"""
+ return {'average_current': str(self.avg_current),
+ 'max_current': str(self.max_current),
+ 'average_power': str(self.avg_power)}
diff --git a/acts/framework/acts/test_utils/net/connectivity_const.py b/acts/framework/acts/test_utils/net/connectivity_const.py
index 946d2c6..07f4123 100644
--- a/acts/framework/acts/test_utils/net/connectivity_const.py
+++ b/acts/framework/acts/test_utils/net/connectivity_const.py
@@ -66,6 +66,8 @@
# Private DNS constants
DNS_GOOGLE = "dns.google"
+DNS_QUAD9 = "dns.quad9.net"
+DNS_CLOUDFLARE = "1dot1dot1dot1.cloudflare-dns.com"
PRIVATE_DNS_MODE_OFF = "off"
PRIVATE_DNS_MODE_OPPORTUNISTIC = "opportunistic"
PRIVATE_DNS_MODE_STRICT = "hostname"
diff --git a/acts/framework/acts/test_utils/power/PowerBaseTest.py b/acts/framework/acts/test_utils/power/PowerBaseTest.py
index 8723fa3..a5f2536 100644
--- a/acts/framework/acts/test_utils/power/PowerBaseTest.py
+++ b/acts/framework/acts/test_utils/power/PowerBaseTest.py
@@ -123,7 +123,7 @@
# Sync device time, timezone and country code
utils.require_sl4a((self.dut, ))
utils.sync_device_time(self.dut)
- self.dut.droid.wifiSetCountryCode('US')
+ wutils.set_wifi_country_code(self.dut, 'US')
screen_on_img = self.user_params.get('screen_on_img', [])
if screen_on_img:
diff --git a/acts/framework/acts/test_utils/tel/tel_test_utils.py b/acts/framework/acts/test_utils/tel/tel_test_utils.py
index fb47800..ee663ef 100644
--- a/acts/framework/acts/test_utils/tel/tel_test_utils.py
+++ b/acts/framework/acts/test_utils/tel/tel_test_utils.py
@@ -7636,3 +7636,4 @@
monitor_setting = ad.adb.getprop("persist.radio.enable_tel_mon")
ad.log.info("radio.enable_tel_mon setting is %s", monitor_setting)
return monitor_setting == expected_monitor_setting
+
diff --git a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py
index cb7ab80..82d655f 100644
--- a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py
+++ b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py
@@ -24,6 +24,7 @@
import acts.controllers.access_point as ap
from acts import asserts
+from acts import signals
from acts import utils
from acts.base_test import BaseTestClass
from acts.signals import TestSignal
@@ -608,3 +609,46 @@
hostapd_constants.BAND_5G, channel_5g)
if not result:
raise ValueError("Failed to configure channel for 5G band.")
+
+ @staticmethod
+ def wifi_test_wrap(fn):
+ def _safe_wrap_test_case(self, *args, **kwargs):
+ test_id = "%s:%s:%s" % (self.__class__.__name__, self.test_name,
+ self.log_begin_time.replace(' ', '-'))
+ self.test_id = test_id
+ self.result_detail = ""
+ tries = int(self.user_params.get("wifi_auto_rerun", 3))
+ for ad in self.android_devices:
+ ad.log_path = self.log_path
+ for i in range(tries + 1):
+ result = True
+ if i > 0:
+ log_string = "[Test Case] RETRY:%s %s" % (i, self.test_name)
+ self.log.info(log_string)
+ self._teardown_test(self.test_name)
+ self._setup_test(self.test_name)
+ try:
+ result = fn(self, *args, **kwargs)
+ except signals.TestFailure as e:
+ self.log.warn("Error msg: %s" % e)
+ if self.result_detail:
+ signal.details = self.result_detail
+ result = False
+ except signals.TestSignal:
+ if self.result_detail:
+ signal.details = self.result_detail
+ raise
+ except Exception as e:
+ self.log.exception(e)
+ asserts.fail(self.result_detail)
+ if result is False:
+ if i < tries:
+ continue
+ else:
+ break
+ if result is not False:
+ asserts.explicit_pass(self.result_detail)
+ else:
+ asserts.fail(self.result_detail)
+
+ return _safe_wrap_test_case
diff --git a/acts/framework/acts/test_utils/wifi/aware/AwareBaseTest.py b/acts/framework/acts/test_utils/wifi/aware/AwareBaseTest.py
index bf8b66e..df303ce 100644
--- a/acts/framework/acts/test_utils/wifi/aware/AwareBaseTest.py
+++ b/acts/framework/acts/test_utils/wifi/aware/AwareBaseTest.py
@@ -53,7 +53,7 @@
self.reset_device_parameters(ad)
self.reset_device_statistics(ad)
self.set_power_mode_parameters(ad)
- ad.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(ad, wutils.WifiEnums.CountryCode.US)
autils.configure_ndp_allow_any_override(ad, True)
# set randomization interval to 0 (disable) to reduce likelihood of
# interference in tests
diff --git a/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py b/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py
index 2623f9b..e297799 100644
--- a/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py
@@ -468,6 +468,28 @@
return True
+def run_ping6(dut, target_ip, duration=60):
+ """Run ping test and return the latency result
+
+ Args:
+ dut: the dut which run the ping cmd
+ target_ip: target IP Address for ping
+ duration: the duration time of the ping
+
+ return: dict contains "min/avg/max/mdev" result
+ """
+ cmd = "ping6 -w %d %s" % (duration, target_ip)
+ ping_result = dut.adb.shell(cmd, timeout=duration + 1)
+ res = re.match(".*mdev = (\S+) .*", ping_result, re.S)
+ asserts.assert_true(res, "Cannot reach the IP address %s", target_ip)
+ title = ["min", "avg", "max", "mdev"]
+ result = res.group(1).split("/")
+ latency_result = {}
+ for i in range(len(title)):
+ latency_result[title[i]] = result[i]
+ return latency_result
+
+
#########################################################
# Aware primitives
#########################################################
diff --git a/acts/framework/acts/test_utils/wifi/p2p/WifiP2pBaseTest.py b/acts/framework/acts/test_utils/wifi/p2p/WifiP2pBaseTest.py
index 7aca3c6..92e03dd 100644
--- a/acts/framework/acts/test_utils/wifi/p2p/WifiP2pBaseTest.py
+++ b/acts/framework/acts/test_utils/wifi/p2p/WifiP2pBaseTest.py
@@ -32,8 +32,6 @@
def setup_class(self):
self.dut1 = self.android_devices[0]
self.dut2 = self.android_devices[1]
- self.dut1_mac = self.get_p2p_mac_address(self.dut1)
- self.dut2_mac = self.get_p2p_mac_address(self.dut2)
#init location before init p2p
acts.utils.set_location_service(self.dut1, True)
@@ -102,8 +100,3 @@
for ad in self.android_devices:
ad.take_bug_report(test_name, begin_time)
ad.cat_adb_log(test_name, begin_time)
-
- def get_p2p_mac_address(self, dut):
- """Gets the current MAC address being used for Wi-Fi Direct."""
- out = dut.adb.shell("ifconfig p2p0")
- return re.match(".* HWaddr (\S+).*", out, re.S).group(1)
diff --git a/acts/framework/acts/test_utils/wifi/rtt/RttBaseTest.py b/acts/framework/acts/test_utils/wifi/rtt/RttBaseTest.py
index 6cb3460..d6267dd 100644
--- a/acts/framework/acts/test_utils/wifi/rtt/RttBaseTest.py
+++ b/acts/framework/acts/test_utils/wifi/rtt/RttBaseTest.py
@@ -50,7 +50,7 @@
rutils.wait_for_event(ad, rconsts.BROADCAST_WIFI_RTT_AVAILABLE)
ad.ed.clear_all_events()
rutils.config_privilege_override(ad, False)
- ad.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(ad, wutils.WifiEnums.CountryCode.US)
ad.rtt_capabilities = rutils.get_rtt_capabilities(ad)
def teardown_test(self):
diff --git a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
index 37426c6..b47fda1 100755
--- a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
@@ -53,12 +53,12 @@
"AP1_on_AP2_off": [
0,
0,
- 95,
- 95
+ 60,
+ 60
],
"AP1_off_AP2_on": [
- 95,
- 95,
+ 60,
+ 60,
0,
0
],
@@ -759,12 +759,22 @@
ad.log.info("wpa_supplicant log change status: %s", output)
utils.sync_device_time(ad)
ad.droid.telephonyToggleDataConnection(False)
- # TODO(angli): need to verify the country code was actually set. No generic
- # way to check right now.
- ad.adb.shell("halutil -country %s" % WifiEnums.CountryCode.US)
- ad.droid.wifiSetCountryCode(WifiEnums.CountryCode.US)
+ set_wifi_country_code(ad, WifiEnums.CountryCode.US)
utils.set_ambient_display(ad, False)
+def set_wifi_country_code(ad, country_code):
+ """Sets the wifi country code on the device.
+
+ Args:
+ ad: An AndroidDevice object.
+ country_code: 2 letter ISO country code
+ """
+ ad.adb.shell("cmd wifi force-country-code enabled %s" % country_code)
+ try:
+ ad.droid.wifiSetCountryCode(WifiEnums.CountryCode.US)
+ except Exception:
+ pass
+
def start_wifi_connection_scan(ad):
"""Starts a wifi connection scan and wait for results to become available.
@@ -1990,9 +2000,7 @@
WifiEnums.SSID_KEY: expected_con[WifiEnums.SSID_KEY],
WifiEnums.BSSID_KEY: expected_con["bssid"],
}
- set_attns(attenuator, attn_val_name)
- logging.info("Wait %ss for roaming to finish.", ROAMING_TIMEOUT)
- time.sleep(ROAMING_TIMEOUT)
+ set_attns_steps(attenuator, attn_val_name)
verify_wifi_connection_info(dut, expected_con)
expected_bssid = expected_con[WifiEnums.BSSID_KEY]
@@ -2138,30 +2146,33 @@
if test_status:
shutil.rmtree(os.path.dirname(fname))
-def verify_mac_not_found_in_pcap(mac, packets):
+def verify_mac_not_found_in_pcap(ad, mac, packets):
"""Verify that a mac address is not found in the captured packets.
Args:
+ ad: android device object
mac: string representation of the mac address
packets: packets obtained by rdpcap(pcap_fname)
"""
for pkt in packets:
logging.debug("Packet Summary = %s", pkt.summary())
if mac in pkt.summary():
- asserts.fail("Caught Factory MAC: %s in packet sniffer."
- "Packet = %s" % (mac, pkt.show()))
+ asserts.fail("Device %s caught Factory MAC: %s in packet sniffer."
+ "Packet = %s" % (ad.serial, mac, pkt.show()))
-def verify_mac_is_found_in_pcap(mac, packets):
+def verify_mac_is_found_in_pcap(ad, mac, packets):
"""Verify that a mac address is found in the captured packets.
Args:
+ ad: android device object
mac: string representation of the mac address
packets: packets obtained by rdpcap(pcap_fname)
"""
for pkt in packets:
if mac in pkt.summary():
return
- asserts.fail("Did not find MAC = %s in packet sniffer." % mac)
+ asserts.fail("Did not find MAC = %s in packet sniffer."
+ "for device %s" % (mac, ad.serial))
def start_cnss_diags(ads):
for ad in ads:
diff --git a/acts/framework/acts/tracelogger.py b/acts/framework/acts/tracelogger.py
index 9652fd0..ed9ee12 100644
--- a/acts/framework/acts/tracelogger.py
+++ b/acts/framework/acts/tracelogger.py
@@ -14,11 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import datetime
import inspect
-import logging
import os
-import xml.etree.cElementTree as et
class TraceLogger(object):
@@ -65,41 +62,3 @@
def __getattr__(self, name):
return getattr(self._logger, name)
-
-
-class TakoTraceLogger(TraceLogger):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.d = self.debug
- self.e = self.error
- self.i = self.info
- self.t = self.step
- self.w = self.warning
-
- def _logger_level(self, level_name):
- level = logging.getLevelName(level_name)
- return lambda *args, **kwargs: self._logger.log(level, *args, **kwargs)
-
- def step(self, msg, *args, **kwargs):
- """Delegate a step call to the underlying logger."""
- self._log_with(self._logger_level('STEP'), 1, msg, *args, **kwargs)
-
- def device(self, msg, *args, **kwargs):
- """Delegate a device call to the underlying logger."""
- self._log_with(self._logger_level('DEVICE'), 1, msg, *args, **kwargs)
-
- def suite(self, msg, *args, **kwargs):
- """Delegate a device call to the underlying logger."""
- self._log_with(self._logger_level('SUITE'), 1, msg, *args, **kwargs)
-
- def case(self, msg, *args, **kwargs):
- """Delegate a case call to the underlying logger."""
- self._log_with(self._logger_level('CASE'), 1, msg, *args, **kwargs)
-
- def flush_log(self):
- """This function exists for compatibility with Tako's logserial module.
-
- Note that flushing the log is handled automatically by python's logging
- module.
- """
- pass
diff --git a/acts/framework/setup.py b/acts/framework/setup.py
index b5e4467..3f814ce 100755
--- a/acts/framework/setup.py
+++ b/acts/framework/setup.py
@@ -29,6 +29,7 @@
'numpy',
'pyserial',
'pyyaml>=5.1',
+ 'tzlocal',
'shellescape>=3.4.1',
'protobuf',
'retry',
diff --git a/acts/framework/tests/test_utils/__init__.py b/acts/framework/tests/test_utils/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/acts/framework/tests/test_utils/__init__.py
diff --git a/acts/framework/tests/test_utils/instrumentation/adb_command_types_test.py b/acts/framework/tests/test_utils/instrumentation/adb_command_types_test.py
index d934cc0..076f5ee 100755
--- a/acts/framework/tests/test_utils/instrumentation/adb_command_types_test.py
+++ b/acts/framework/tests/test_utils/instrumentation/adb_command_types_test.py
@@ -20,6 +20,8 @@
from acts.test_utils.instrumentation.adb_command_types import DeviceSetprop
from acts.test_utils.instrumentation.adb_command_types import DeviceSetting
from acts.test_utils.instrumentation.adb_command_types import \
+ DeviceGServices
+from acts.test_utils.instrumentation.adb_command_types import \
DeviceBinaryCommandSeries
@@ -105,6 +107,19 @@
device_binary_setting.toggle(False),
'settings put system some_other_setting off')
+ def test_device_gservices(self):
+ """Tests that DeviceGServices returns the correct ADB command with
+ set_value.
+ """
+ setting = 'some_gservice'
+ val = 22
+ device_gservices = DeviceGServices(setting)
+ self.assertEqual(
+ device_gservices.set_value(val),
+ 'am broadcast -a '
+ 'com.google.gservices.intent.action.GSERVICES_OVERRIDE '
+ '--ei some_gservice 22')
+
def test_device_binary_command_series(self):
"""Tests that DeviceBinaryCommandSuite returns the correct ADB
commands.
diff --git a/acts/framework/tests/test_utils/instrumentation/data/sample_monsoon_data b/acts/framework/tests/test_utils/instrumentation/data/sample_monsoon_data
new file mode 100644
index 0000000..2a70273
--- /dev/null
+++ b/acts/framework/tests/test_utils/instrumentation/data/sample_monsoon_data
@@ -0,0 +1,10 @@
+0s 3.67
+1s 3.69
+2s 0.95
+3s 3.06
+4s 2.17
+5s 1.62
+6s 3.95
+7s 2.47
+8s 1.11
+9s 0.47
diff --git a/acts/framework/tests/test_utils/instrumentation/instrumentation_power_test_test.py b/acts/framework/tests/test_utils/instrumentation/instrumentation_power_test_test.py
new file mode 100644
index 0000000..36cb43b
--- /dev/null
+++ b/acts/framework/tests/test_utils/instrumentation/instrumentation_power_test_test.py
@@ -0,0 +1,280 @@
+#!/usr/bin/env python3
+#
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+import unittest
+
+from acts.test_utils.instrumentation.config_wrapper import ConfigWrapper
+from acts.test_utils.instrumentation.instrumentation_power_test \
+ import ACCEPTANCE_THRESHOLD
+from acts.test_utils.instrumentation.instrumentation_power_test \
+ import InstrumentationPowerTest
+from acts.test_utils.instrumentation.power_metrics import PowerMetrics
+
+from acts import signals
+
+
+class MockInstrumentationPowerTest(InstrumentationPowerTest):
+ """Mock test class to initialize required attributes."""
+
+ # avg: 2.214, stdev: 1.358, max: 4.78, min: 0.61
+ SAMPLE_DATA = [1.64, 2.98, 1.72, 3.45, 1.31, 4.78, 3.43, 0.61, 1.19, 1.03]
+
+ def __init__(self):
+ self.log = mock.Mock()
+ self.metric_logger = mock.Mock()
+ self.current_test_name = 'test_case'
+ self._power_metrics = PowerMetrics(4.2)
+ self._power_metrics.test_metrics = {
+ 'instrTest1': PowerMetrics(4.2),
+ 'instrTest2': PowerMetrics(4.2)
+ }
+ self._power_metrics.test_metrics['instrTest1'].generate_test_metrics(
+ list(zip(range(10), self.SAMPLE_DATA))
+ )
+ self._power_metrics.test_metrics['instrTest2'].generate_test_metrics(
+ list(zip(range(10), self.SAMPLE_DATA))
+ )
+ self._instrumentation_config = ConfigWrapper()
+ self._class_config = ConfigWrapper(
+ {
+ self.current_test_name: {
+ ACCEPTANCE_THRESHOLD: {}
+ }
+ }
+ )
+
+ def set_criteria(self, criteria):
+ """Set the acceptance criteria for metrics validation."""
+ test_config = self._class_config[self.current_test_name]
+ test_config[ACCEPTANCE_THRESHOLD] = ConfigWrapper(criteria)
+
+
+class InstrumentationPowerTestTest(unittest.TestCase):
+ """Unit tests for InstrumentationPowerTest."""
+ def setUp(self):
+ self.instrumentation_power_test = MockInstrumentationPowerTest()
+
+ def test_validate_power_results_lower_and_upper_limit_accept(self):
+ """Test that validate_power_results accept passing measurements
+ given a lower and upper limit.
+ """
+ criteria_accept = {
+ 'instrTest1': {
+ 'avg_current': {
+ 'unit_type': 'current',
+ 'unit': 'A',
+ 'lower_limit': 1.5,
+ 'upper_limit': 2.5
+ },
+ 'max_current': {
+ 'unit_type': 'current',
+ 'unit': 'mA',
+ 'upper_limit': 5000
+ }
+ }
+ }
+ self.instrumentation_power_test.set_criteria(criteria_accept)
+ with self.assertRaises(signals.TestPass):
+ self.instrumentation_power_test.validate_power_results('instrTest1')
+
+ def test_validate_power_results_lower_and_upper_limit_reject(self):
+ """Test that validate_power_results reject failing measurements
+ given a lower and upper limit.
+ """
+ criteria_reject = {
+ 'instrTest1': {
+ 'avg_current': {
+ 'unit_type': 'current',
+ 'unit': 'A',
+ 'lower_limit': 1.5,
+ 'upper_limit': 2
+ },
+ 'max_current': {
+ 'unit_type': 'current',
+ 'unit': 'mA',
+ 'upper_limit': 4000
+ }
+ }
+ }
+ self.instrumentation_power_test.set_criteria(criteria_reject)
+ with self.assertRaises(signals.TestFailure):
+ self.instrumentation_power_test.validate_power_results('instrTest1')
+
+ def test_validate_power_results_expected_value_and_deviation_accept(self):
+ """Test that validate_power_results accept passing measurements
+ given an expected value and percent deviation.
+ """
+ criteria_accept = {
+ 'instrTest1': {
+ 'stdev_current': {
+ 'unit_type': 'current',
+ 'unit': 'A',
+ 'expected_value': 1.5,
+ 'percent_deviation': 20
+ }
+ }
+ }
+ self.instrumentation_power_test.set_criteria(criteria_accept)
+ with self.assertRaises(signals.TestPass):
+ self.instrumentation_power_test.validate_power_results('instrTest1')
+
+ def test_validate_power_results_expected_value_and_deviation_reject(self):
+ """Test that validate_power_results reject failing measurements
+ given an expected value and percent deviation.
+ """
+ criteria_reject = {
+ 'instrTest1': {
+ 'min_current': {
+ 'unit_type': 'current',
+ 'unit': 'mA',
+ 'expected_value': 500,
+ 'percent_deviation': 10
+ }
+ }
+ }
+ self.instrumentation_power_test.set_criteria(criteria_reject)
+ with self.assertRaises(signals.TestFailure):
+ self.instrumentation_power_test.validate_power_results('instrTest1')
+
+ def test_validate_power_results_no_such_test(self):
+ """Test that validate_power_results skip validation if there are no
+ criteria matching the specified instrumentation test name.
+ """
+ criteria_wrong_test = {
+ 'instrTest2': {
+ 'min_current': {
+ 'unit_type': 'current',
+ 'unit': 'A',
+ 'expected_value': 2,
+ 'percent_deviation': 20
+ }
+ }
+ }
+ self.instrumentation_power_test.set_criteria(criteria_wrong_test)
+ with self.assertRaises(signals.TestPass):
+ self.instrumentation_power_test.validate_power_results('instrTest1')
+
+ def test_validate_power_results_no_such_metric(self):
+ """Test that validate_power_results skip validation if the specified
+ metric is invalid.
+ """
+ criteria_invalid_metric = {
+ 'instrTest1': {
+ 'no_such_metric': {
+ 'unit_type': 'current',
+ 'unit': 'A',
+ 'lower_limit': 5,
+ 'upper_limit': 7
+ }
+ }
+ }
+ self.instrumentation_power_test.set_criteria(criteria_invalid_metric)
+ with self.assertRaises(signals.TestPass):
+ self.instrumentation_power_test.validate_power_results('instrTest1')
+
+ def test_validate_power_results_criteria_missing_params(self):
+ """Test that validate_power_results skip validation if the specified
+ metric has missing parameters.
+ """
+ criteria_missing_params = {
+ 'instrTest1': {
+ 'avg_current': {
+ 'unit': 'A',
+ 'lower_limit': 1,
+ 'upper_limit': 2
+ }
+ }
+ }
+ self.instrumentation_power_test.set_criteria(criteria_missing_params)
+ with self.assertRaises(signals.TestPass):
+ self.instrumentation_power_test.validate_power_results('instrTest1')
+
+ def test_validate_power_results_pass_if_all_tests_accept(self):
+ """Test that validate_power_results succeeds if it accepts the results
+ of all instrumentation tests.
+ """
+ criteria_multi_test_accept = {
+ 'instrTest1': {
+ 'avg_current': {
+ 'unit_type': 'current',
+ 'unit': 'A',
+ 'lower_limit': 2
+ },
+ 'stdev_current': {
+ 'unit_type': 'current',
+ 'unit': 'mA',
+ 'expected_value': 1250,
+ 'percent_deviation': 30
+ }
+ },
+ 'instrTest2': {
+ 'max_current': {
+ 'unit_type': 'current',
+ 'unit': 'A',
+ 'upper_limit': 5
+ },
+ 'avg_power': {
+ 'unit_type': 'power',
+ 'unit': 'W',
+ 'upper_limit': 10
+ }
+ }
+ }
+ self.instrumentation_power_test.set_criteria(criteria_multi_test_accept)
+ with self.assertRaises(signals.TestPass):
+ self.instrumentation_power_test.validate_power_results(
+ 'instrTest1', 'instrTest2')
+
+ def test_validate_power_results_fail_if_at_least_one_test_rejects(self):
+ """Test that validate_power_results fails if it rejects the results
+ of at least one instrumentation test.
+ """
+ criteria_multi_test_reject = {
+ 'instrTest1': {
+ 'avg_current': {
+ 'unit_type': 'current',
+ 'unit': 'A',
+ 'lower_limit': 2
+ },
+ 'stdev_current': {
+ 'unit_type': 'current',
+ 'unit': 'mA',
+ 'expected_value': 1250,
+ 'percent_deviation': 30
+ }
+ },
+ 'instrTest2': {
+ 'max_current': {
+ 'unit_type': 'current',
+ 'unit': 'A',
+ 'upper_limit': 5
+ },
+ 'avg_power': {
+ 'unit_type': 'power',
+ 'unit': 'W',
+ 'upper_limit': 8
+ }
+ }
+ }
+ self.instrumentation_power_test.set_criteria(criteria_multi_test_reject)
+ with self.assertRaises(signals.TestFailure):
+ self.instrumentation_power_test.validate_power_results(
+ 'instrTest1', 'instrTest2')
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/acts/framework/tests/test_utils/instrumentation/power_metrics_test.py b/acts/framework/tests/test_utils/instrumentation/power_metrics_test.py
new file mode 100644
index 0000000..eeccfd9
--- /dev/null
+++ b/acts/framework/tests/test_utils/instrumentation/power_metrics_test.py
@@ -0,0 +1,158 @@
+#!/usr/bin/env python3
+#
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import statistics
+import unittest
+
+from acts.test_utils.instrumentation import instrumentation_proto_parser \
+ as parser
+from acts.test_utils.instrumentation.power_metrics import CURRENT
+from acts.test_utils.instrumentation.power_metrics import HOUR
+from acts.test_utils.instrumentation.power_metrics import MILLIAMP
+from acts.test_utils.instrumentation.power_metrics import MINUTE
+from acts.test_utils.instrumentation.power_metrics import Measurement
+from acts.test_utils.instrumentation.power_metrics import PowerMetrics
+from acts.test_utils.instrumentation.power_metrics import TIME
+from acts.test_utils.instrumentation.power_metrics import WATT
+
+FAKE_UNIT_TYPE = 'fake_unit'
+FAKE_UNIT = 'F'
+
+
+class MeasurementTest(unittest.TestCase):
+ """Unit tests for the Measurement class."""
+
+ def test_init_with_valid_unit_type(self):
+ """Test that a Measurement is properly initialized given a valid unit
+ type.
+ """
+ measurement = Measurement(2, CURRENT, MILLIAMP)
+ self.assertEqual(measurement.value, 2)
+ self.assertEqual(measurement._unit, MILLIAMP)
+
+ def test_init_with_invalid_unit_type(self):
+ """Test that __init__ raises an error if given an invalid unit type."""
+ with self.assertRaisesRegex(TypeError, 'valid unit type'):
+ measurement = Measurement(2, FAKE_UNIT_TYPE, FAKE_UNIT)
+
+ def test_unit_conversion(self):
+ """Test that to_unit correctly converts value and unit."""
+ ratio = 1000
+ current_amps = Measurement.amps(15)
+ current_milliamps = current_amps.to_unit(MILLIAMP)
+ self.assertEqual(current_milliamps.value / current_amps.value, ratio)
+
+ def test_unit_conversion_with_wrong_type(self):
+ """Test that to_unit raises and error if incompatible unit type is
+ specified.
+ """
+ current_amps = Measurement.amps(3.4)
+ with self.assertRaisesRegex(TypeError, 'Incompatible units'):
+ power_watts = current_amps.to_unit(WATT)
+
+ def test_comparison_operators(self):
+ """Test that the comparison operators work as intended."""
+ # time_a == time_b < time_c
+ time_a = Measurement.seconds(120)
+ time_b = Measurement(2, TIME, MINUTE)
+ time_c = Measurement(0.1, TIME, HOUR)
+
+ self.assertEqual(time_a, time_b)
+ self.assertEqual(time_b, time_a)
+ self.assertLessEqual(time_a, time_b)
+ self.assertGreaterEqual(time_a, time_b)
+
+ self.assertNotEqual(time_a, time_c)
+ self.assertNotEqual(time_c, time_a)
+ self.assertLess(time_a, time_c)
+ self.assertLessEqual(time_a, time_c)
+ self.assertGreater(time_c, time_a)
+ self.assertGreaterEqual(time_c, time_a)
+
+ def test_arithmetic_operators(self):
+ """Test that the addition and subtraction operators work as intended"""
+ time_a = Measurement(3, TIME, HOUR)
+ time_b = Measurement(90, TIME, MINUTE)
+
+ sum_ = time_a + time_b
+ self.assertEqual(sum_.value, 4.5)
+ self.assertEqual(sum_._unit, HOUR)
+
+ sum_reversed = time_b + time_a
+ self.assertEqual(sum_reversed.value, 270)
+ self.assertEqual(sum_reversed._unit, MINUTE)
+
+ diff = time_a - time_b
+ self.assertEqual(diff.value, 1.5)
+ self.assertEqual(diff._unit, HOUR)
+
+ diff_reversed = time_b - time_a
+ self.assertEqual(diff_reversed.value, -90)
+ self.assertEqual(diff_reversed._unit, MINUTE)
+
+
+class PowerMetricsTest(unittest.TestCase):
+ """Unit tests for the PowerMetrics class."""
+
+ SAMPLES = [0.13, 0.95, 0.32, 4.84, 2.48, 4.11, 4.85, 4.88, 4.22, 2.2]
+ RAW_DATA = list(zip(range(10), SAMPLES))
+ VOLTAGE = 4.2
+ START_TIME = 5
+
+ def setUp(self):
+ self.power_metrics = PowerMetrics(self.VOLTAGE, self.START_TIME)
+
+ def test_import_raw_data(self):
+ """Test that power metrics can be loaded from file. Simply ensure that
+ the number of samples is correct."""
+
+ imported_data = PowerMetrics.import_raw_data(
+ os.path.join(os.path.dirname(__file__), 'data/sample_monsoon_data')
+ )
+ self.power_metrics.generate_test_metrics(imported_data)
+ self.assertEqual(self.power_metrics._num_samples, 10)
+
+ def test_split_by_test_with_timestamps(self):
+ """Test that given test timestamps, a power metric is generated from
+ a subset of samples corresponding to the test."""
+ sample_test = 'sample_test'
+ test_start = 8500
+ test_end = 13500
+ test_timestamps = {sample_test: {parser.START_TIMESTAMP: test_start,
+ parser.END_TIMESTAMP: test_end}}
+ self.power_metrics.generate_test_metrics(self.RAW_DATA, test_timestamps)
+ test_metrics = self.power_metrics.test_metrics[sample_test]
+ self.assertEqual(test_metrics._num_samples, 5)
+
+ def test_numeric_metrics(self):
+ """Test that the numeric metrics have correct values."""
+ self.power_metrics.generate_test_metrics(self.RAW_DATA)
+ self.assertAlmostEqual(self.power_metrics.avg_current.value,
+ statistics.mean(self.SAMPLES))
+ self.assertAlmostEqual(self.power_metrics.max_current.value,
+ max(self.SAMPLES))
+ self.assertAlmostEqual(self.power_metrics.min_current.value,
+ min(self.SAMPLES))
+ self.assertAlmostEqual(self.power_metrics.stdev_current.value,
+ statistics.stdev(self.SAMPLES))
+ self.assertAlmostEqual(
+ self.power_metrics.avg_power.value,
+ self.power_metrics.avg_current.value * self.VOLTAGE)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/acts/tests/google/ble/concurrency/ConcurrentBleAdvertisementDiscoveryTest.py b/acts/tests/google/ble/concurrency/ConcurrentBleAdvertisementDiscoveryTest.py
index 2af4c05..c38fc93 100644
--- a/acts/tests/google/ble/concurrency/ConcurrentBleAdvertisementDiscoveryTest.py
+++ b/acts/tests/google/ble/concurrency/ConcurrentBleAdvertisementDiscoveryTest.py
@@ -43,22 +43,21 @@
class ConcurrentBleAdvertisementDiscoveryTest(BluetoothBaseTest):
default_timeout = 10
+ droid_list = []
max_advertisements = -1
advertise_callback_list = []
def setup_class(self):
super().setup_class()
- self.droid_list = get_advanced_droid_list(self.android_devices)
self.scn_ad = self.android_devices[0]
self.adv_ad = self.android_devices[1]
+ self.droid_list = get_advanced_droid_list(self.android_devices)
self.max_advertisements = self.droid_list[1]['max_advertisements']
def setup_test(self):
- return reset_bluetooth(self.android_devices)
-
- def setup_test(self):
- super(BluetoothBaseTest, self).setup_test()
+ super().setup_test()
self.log.info("Setting up advertisements")
+ reset_bluetooth(self.android_devices)
try:
self.advertise_callback_list = setup_n_advertisements(
self.adv_ad, self.max_advertisements)
diff --git a/acts/tests/google/ble/concurrency/ConcurrentBleAdvertisingTest.py b/acts/tests/google/ble/concurrency/ConcurrentBleAdvertisingTest.py
index 94ee0f7..09c6cd3 100644
--- a/acts/tests/google/ble/concurrency/ConcurrentBleAdvertisingTest.py
+++ b/acts/tests/google/ble/concurrency/ConcurrentBleAdvertisingTest.py
@@ -43,17 +43,18 @@
class ConcurrentBleAdvertisingTest(BluetoothBaseTest):
default_timeout = 10
+ droid_list = []
max_advertisements = -1
def setup_class(self):
super().setup_class()
- self.droid_list = get_advanced_droid_list(self.android_devices)
self.scn_ad = self.android_devices[0]
self.adv_ad = self.android_devices[1]
+ self.droid_list = get_advanced_droid_list(self.android_devices)
self.max_advertisements = self.droid_list[1]['max_advertisements']
def setup_test(self):
- super(BluetoothBaseTest, self).setup_test()
+ super().setup_test()
return reset_bluetooth(self.android_devices)
def _verify_n_advertisements(self, num_advertisements):
diff --git a/acts/tests/google/ble/conn_oriented_chan/BleCocTest.py b/acts/tests/google/ble/conn_oriented_chan/BleCocTest.py
index 97ace9b..166b848 100644
--- a/acts/tests/google/ble/conn_oriented_chan/BleCocTest.py
+++ b/acts/tests/google/ble/conn_oriented_chan/BleCocTest.py
@@ -49,14 +49,14 @@
def setup_class(self):
super().setup_class()
self.client_ad = self.android_devices[0]
- # The client which is scanning will need location to be enabled in order to
- # start scan and get scan results.
- utils.set_location_service(self.client_ad, True)
self.server_ad = self.android_devices[1]
# Note that some tests required a third device.
if len(self.android_devices) > 2:
self.server2_ad = self.android_devices[2]
+ # The client which is scanning will need location to be enabled in order to
+ # start scan and get scan results.
+ utils.set_location_service(self.client_ad, True)
return setup_multiple_devices_for_bt_test(self.android_devices)
def teardown_test(self):
diff --git a/acts/tests/google/ble/system_tests/BleStressTest.py b/acts/tests/google/ble/system_tests/BleStressTest.py
index f97907f..afbb67a 100644
--- a/acts/tests/google/ble/system_tests/BleStressTest.py
+++ b/acts/tests/google/ble/system_tests/BleStressTest.py
@@ -37,6 +37,7 @@
class BleStressTest(BluetoothBaseTest):
default_timeout = 10
PAIRING_TIMEOUT = 20
+ droid_list = []
def setup_class(self):
super().setup_class()
@@ -45,7 +46,7 @@
self.adv_ad = self.android_devices[1]
def teardown_test(self):
- super(BluetoothBaseTest, self).teardown_test()
+ super().teardown_test()
self.log_stats()
def bleadvertise_verify_onsuccess_handler(self, event):
diff --git a/acts/tests/google/coex/hotspot_tests/HotspotWiFiChannelTest.py b/acts/tests/google/coex/hotspot_tests/HotspotWiFiChannelTest.py
index c7f0c32..819de12 100644
--- a/acts/tests/google/coex/hotspot_tests/HotspotWiFiChannelTest.py
+++ b/acts/tests/google/coex/hotspot_tests/HotspotWiFiChannelTest.py
@@ -188,8 +188,8 @@
if wifi_band == '2g':
wband = WifiEnums.WIFI_CONFIG_APBAND_2G
elif wifi_band == '5g':
- self.pri_ad.droid.wifiSetCountryCode(WifiEnums.CountryCode.US)
- self.sec_ad.droid.wifiSetCountryCode(WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.pri_ad, WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.sec_ad, WifiEnums.CountryCode.US)
wband = WifiEnums.WIFI_CONFIG_APBAND_5G
elif wifi_band == 'auto':
wband = WifiEnums.WIFI_CONFIG_APBAND_AUTO
diff --git a/acts/tests/google/experimental/BluetoothLatencyTest.py b/acts/tests/google/experimental/BluetoothLatencyTest.py
index 811a41c..ea6ed4a 100644
--- a/acts/tests/google/experimental/BluetoothLatencyTest.py
+++ b/acts/tests/google/experimental/BluetoothLatencyTest.py
@@ -21,6 +21,7 @@
from acts import asserts
from acts.base_test import BaseTestClass
from acts.signals import TestPass
+from acts.test_decorators import test_tracker_info
from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from acts.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection
from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
@@ -112,6 +113,7 @@
return (end_time - start_time) * 1000
@BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='7748295d-204e-4ad0-adf5-7591380b940a')
def test_bluetooth_latency(self):
"""Tests the latency for a data transfer over RFCOMM"""
diff --git a/acts/tests/google/experimental/BluetoothPairAndConnectTest.py b/acts/tests/google/experimental/BluetoothPairAndConnectTest.py
index e54e4e7..2402fb5 100644
--- a/acts/tests/google/experimental/BluetoothPairAndConnectTest.py
+++ b/acts/tests/google/experimental/BluetoothPairAndConnectTest.py
@@ -27,8 +27,11 @@
from acts.controllers.buds_lib.test_actions.apollo_acts import ApolloTestActions
from acts.signals import TestFailure
from acts.signals import TestPass
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from acts.test_utils.bt.bt_test_utils import factory_reset_bluetooth
from acts.test_utils.bt.bt_test_utils import enable_bluetooth
+from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
from acts.test_utils.bt.loggers.bluetooth_metric_logger import BluetoothMetricLogger
from acts.utils import set_location_service
@@ -74,10 +77,10 @@
self.bt_logger = BluetoothMetricLogger.for_test_case()
def setup_test(self):
+ setup_multiple_devices_for_bt_test(self.android_devices)
# Make sure Bluetooth is on
enable_bluetooth(self.phone.droid, self.phone.ed)
set_location_service(self.phone, True)
- factory_reset_bluetooth([self.phone])
self.apollo_act.factory_reset()
self.log.info('===== START BLUETOOTH PAIR AND CONNECT TEST =====')
@@ -116,6 +119,8 @@
return pair_time, connection_time
+ @BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='c914fd08-350d-465a-96cf-970d40e71060')
def test_bluetooth_connect(self):
# Store metrics
metrics = {}
diff --git a/acts/tests/google/experimental/BluetoothReconnectTest.py b/acts/tests/google/experimental/BluetoothReconnectTest.py
index a03ec7b..5339e51 100644
--- a/acts/tests/google/experimental/BluetoothReconnectTest.py
+++ b/acts/tests/google/experimental/BluetoothReconnectTest.py
@@ -22,9 +22,12 @@
from acts import asserts
from acts.base_test import BaseTestClass
from acts.controllers.buds_lib.test_actions.apollo_acts import ApolloTestActions
-from acts.signals import TestPass, TestFailure
+from acts.signals import TestFailure
+from acts.signals import TestPass
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from acts.test_utils.bt.bt_test_utils import enable_bluetooth
-from acts.test_utils.bt.bt_test_utils import factory_reset_bluetooth
+from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
from acts.test_utils.bt.loggers.bluetooth_metric_logger import BluetoothMetricLogger
from acts.utils import set_location_service
@@ -67,10 +70,10 @@
self.bt_logger = BluetoothMetricLogger.for_test_case()
def setup_test(self):
+ setup_multiple_devices_for_bt_test(self.android_devices)
# Make sure Bluetooth is on
enable_bluetooth(self.phone.droid, self.phone.ed)
set_location_service(self.phone, True)
- factory_reset_bluetooth([self.phone])
self.apollo_act.factory_reset()
# Initial pairing and connection of devices
@@ -119,6 +122,8 @@
end_time = time.perf_counter()
return (end_time - start_time) * 1000
+ @BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='da921903-92d0-471d-ae01-456058cc1297')
def test_bluetooth_reconnect(self):
"""Reconnects Bluetooth between a phone and Apollo device a specified
number of times and reports connection time statistics."""
diff --git a/acts/tests/google/experimental/BluetoothThroughputTest.py b/acts/tests/google/experimental/BluetoothThroughputTest.py
index 3403ded..1f53172 100644
--- a/acts/tests/google/experimental/BluetoothThroughputTest.py
+++ b/acts/tests/google/experimental/BluetoothThroughputTest.py
@@ -18,6 +18,7 @@
from acts import asserts
from acts.base_test import BaseTestClass
from acts.signals import TestPass
+from acts.test_decorators import test_tracker_info
from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
from acts.test_utils.bt.bt_test_utils import orchestrate_rfcomm_connection
from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
@@ -109,6 +110,7 @@
return throughput
@BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='23afba5b-5801-42c2-8d7a-41510e91a605')
def test_bluetooth_throughput_large_buffer(self):
"""Tests the throughput over a series of data transfers with large
buffer size.
@@ -143,6 +145,7 @@
extras=proto)
@BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='5472fe33-891e-4fa1-ba84-78fc6f6a2327')
def test_bluetooth_throughput_medium_buffer(self):
"""Tests the throughput over a series of data transfers with medium
buffer size.
@@ -177,6 +180,7 @@
extras=proto)
@BluetoothBaseTest.bt_test_wrap
+ @test_tracker_info(uuid='97589280-cefa-4ae4-b3fd-94ec9c1f4104')
def test_bluetooth_throughput_small_buffer(self):
"""Tests the throughput over a series of data transfers with small
buffer size.
diff --git a/acts/tests/google/instrumentation/PartialWakeLockTest.py b/acts/tests/google/instrumentation/PartialWakeLockTest.py
new file mode 100644
index 0000000..f20a14c
--- /dev/null
+++ b/acts/tests/google/instrumentation/PartialWakeLockTest.py
@@ -0,0 +1,34 @@
+#!/usr/bin/env python3
+#
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts.test_utils.instrumentation import instrumentation_power_test
+
+
+class PartialWakeLockTest(instrumentation_power_test.InstrumentationPowerTest):
+ """Test class for running instrumentation test PartialWakeLock."""
+
+ def setup_class(self):
+ super().setup_class()
+ self.run_and_measure('%s.tests.PartialWakeLock' % self._test_pkg)
+
+ def _prepare_device(self):
+ super()._prepare_device()
+ self.mode_airplane()
+ self.base_device_configuration()
+
+ def test_partial_wake_lock(self):
+ """Measures power when the device is idle with a partial wake lock."""
+ self.validate_power_results()
diff --git a/acts/tests/google/instrumentation/RockBottomTest.py b/acts/tests/google/instrumentation/RockBottomTest.py
new file mode 100644
index 0000000..403861e
--- /dev/null
+++ b/acts/tests/google/instrumentation/RockBottomTest.py
@@ -0,0 +1,34 @@
+#!/usr/bin/env python3
+#
+# Copyright 2019 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts.test_utils.instrumentation import instrumentation_power_test
+
+
+class RockBottomTest(instrumentation_power_test.InstrumentationPowerTest):
+ """Test class for running instrumentation test RockBottom."""
+
+ def setup_class(self):
+ super().setup_class()
+ self.run_and_measure('%s.tests.RockBottom' % self._test_pkg)
+
+ def _prepare_device(self):
+ super()._prepare_device()
+ self.mode_airplane()
+ self.base_device_configuration()
+
+ def test_rock_bottom(self):
+ """Measures power when the device is in a rock bottom state."""
+ self.validate_power_results()
diff --git a/acts/tests/google/power/tel/lab/PowerTelHotspotTest.py b/acts/tests/google/power/tel/lab/PowerTelHotspotTest.py
index 66aa458..cc34b58 100644
--- a/acts/tests/google/power/tel/lab/PowerTelHotspotTest.py
+++ b/acts/tests/google/power/tel/lab/PowerTelHotspotTest.py
@@ -103,7 +103,7 @@
for dut in [hotspot_dut, slave_dut]:
self.log.info("Setting Country Code to %s for SN:%s" %
(country_code, dut.serial))
- dut.droid.wifiSetCountryCode(country_code)
+ wutils.set_wifi_country_code(dut, country_code)
# Setup tethering
wutils.start_wifi_tethering(self.dut,
diff --git a/acts/tests/google/power/wifi/PowerWiFiHotspotTest.py b/acts/tests/google/power/wifi/PowerWiFiHotspotTest.py
index f554d12..52c6432 100644
--- a/acts/tests/google/power/wifi/PowerWiFiHotspotTest.py
+++ b/acts/tests/google/power/wifi/PowerWiFiHotspotTest.py
@@ -91,8 +91,8 @@
# Both devices need to have a country code in order
# to use the 5 GHz band.
- self.android_devices[0].droid.wifiSetCountryCode('US')
- self.android_devices[1].droid.wifiSetCountryCode('US')
+ wutils.set_wifi_country_code(self.android_devices[0], 'US')
+ wutils.set_wifi_country_code(self.android_devices[1], 'US')
def setup_test(self):
"""Set up test specific parameters or configs.
diff --git a/acts/tests/google/wifi/WifiChaosTest.py b/acts/tests/google/wifi/WifiChaosTest.py
index e0a4668..d0a3722 100755
--- a/acts/tests/google/wifi/WifiChaosTest.py
+++ b/acts/tests/google/wifi/WifiChaosTest.py
@@ -116,7 +116,7 @@
self.admin = 'admin' + str(random.randint(1000001, 12345678))
wutils.wifi_test_device_init(self.dut)
# Set country code explicitly to "US".
- self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US)
asserts.assert_true(
self.lock_pcap(),
diff --git a/acts/tests/google/wifi/WifiDppTest.py b/acts/tests/google/wifi/WifiDppTest.py
index d7788db..42591b0 100644
--- a/acts/tests/google/wifi/WifiDppTest.py
+++ b/acts/tests/google/wifi/WifiDppTest.py
@@ -19,14 +19,14 @@
import time
from acts import asserts
-from acts import base_test
from acts import utils
from acts.test_decorators import test_tracker_info
from acts.test_utils.wifi import wifi_constants
from acts.test_utils.wifi import wifi_test_utils as wutils
+from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
from acts.test_utils.wifi.aware import aware_test_utils as autils
-class WifiDppTest(base_test.BaseTestClass):
+class WifiDppTest(WifiBaseTest):
"""This class tests the DPP API surface.
Attributes: The tests in this class require one DUT and one helper phone
@@ -84,8 +84,8 @@
wutils.reset_wifi(self.dut)
def on_fail(self, test_name, begin_time):
- self.dut.take_bug_report(test_name, begin_time)
- self.dut.cat_adb_log(test_name, begin_time)
+ self.dut.take_bug_report(test_name, begin_time)
+ self.dut.cat_adb_log(test_name, begin_time)
def create_and_save_wifi_network_config(self, security):
""" Create a config with random SSID and password.
@@ -625,6 +625,7 @@
""" Tests Begin """
@test_tracker_info(uuid="30893d51-2069-4e1c-8917-c8a840f91b59")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_with_psk_5G(self):
asserts.skip_if(not self.dut.droid.wifiIs5GHzBandSupported() or
not self.helper_dev.droid.wifiIs5GHzBandSupported(),
@@ -634,6 +635,7 @@
use_mac=True)
@test_tracker_info(uuid="54d1d19a-aece-459c-b819-9d4b1ae63f77")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_with_psk_5G_broadcast(self):
asserts.skip_if(not self.dut.droid.wifiIs5GHzBandSupported() or
not self.helper_dev.droid.wifiIs5GHzBandSupported(),
@@ -643,6 +645,7 @@
use_mac=False)
@test_tracker_info(uuid="18270a69-300c-4f54-87fd-c19073a2854e ")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_with_psk_no_chan_in_uri_listen_on_5745_broadcast(self):
asserts.skip_if(not self.dut.droid.wifiIs5GHzBandSupported() or
not self.helper_dev.droid.wifiIs5GHzBandSupported(),
@@ -651,6 +654,7 @@
security=self.DPP_TEST_SECURITY_PSK, responder_chan=None, responder_freq=5745, use_mac=False)
@test_tracker_info(uuid="fbdd687c-954a-400b-9da3-2d17e28b0798")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_with_psk_no_chan_in_uri_listen_on_5745(self):
asserts.skip_if(not self.dut.droid.wifiIs5GHzBandSupported() or
not self.helper_dev.droid.wifiIs5GHzBandSupported(),
@@ -658,42 +662,50 @@
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_PSK, responder_chan=None, responder_freq=5745, use_mac=True)
- @test_tracker_info(uuid="570f499f-ab12-4405-af14-c9ed36da2e01 ")
+ @test_tracker_info(uuid="570f499f-ab12-4405-af14-c9ed36da2e01")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_with_psk_no_chan_in_uri_listen_on_2462_broadcast(self):
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_PSK, responder_chan=None, responder_freq=2462, use_mac=False)
@test_tracker_info(uuid="e1f083e0-0878-4c49-8ac5-d7c6bba24625")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_with_psk_no_chan_in_uri_listen_on_2462(self):
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_PSK, responder_chan=None, responder_freq=2462, use_mac=True)
@test_tracker_info(uuid="d2a526f5-4269-493d-bd79-4e6d1b7b00f0")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_with_psk(self):
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_PSK, use_mac=True)
@test_tracker_info(uuid="6ead218c-222b-45b8-8aad-fe7d883ed631")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_with_sae(self):
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_SAE, use_mac=True)
@test_tracker_info(uuid="1686adb5-1b3c-4e6d-a969-6b007bdd990d")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_with_psk_passphrase(self):
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_PSK_PASSPHRASE, use_mac=True)
@test_tracker_info(uuid="3958feb5-1a0c-4487-9741-ac06f04c55a2")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_with_sae_broadcast(self):
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_SAE, use_mac=False)
@test_tracker_info(uuid="fe6d66f5-73a1-46e9-8f49-73b8f332cc8c")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_with_psk_passphrase_broadcast(self):
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_PSK_PASSPHRASE, use_mac=False)
@test_tracker_info(uuid="9edd372d-e2f1-4545-8d04-6a1636fcbc4b")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_with_sae_for_ap(self):
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_SAE,
@@ -701,6 +713,7 @@
net_role=self.DPP_TEST_NETWORK_ROLE_AP)
@test_tracker_info(uuid="e9eec912-d665-4926-beac-859cb13dc17b")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_with_psk_passphrase_for_ap(self):
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_PSK_PASSPHRASE,
@@ -708,26 +721,31 @@
net_role=self.DPP_TEST_NETWORK_ROLE_AP)
@test_tracker_info(uuid="8055694f-606f-41dd-9826-3ea1e9b007f8")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_enrollee_with_sae(self):
self.start_dpp_as_initiator_enrollee(
security=self.DPP_TEST_SECURITY_SAE, use_mac=True)
@test_tracker_info(uuid="c1e9f605-b5c0-4e53-8a08-1b0087a667fa")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_enrollee_with_psk_passphrase(self):
self.start_dpp_as_initiator_enrollee(
security=self.DPP_TEST_SECURITY_PSK_PASSPHRASE, use_mac=True)
@test_tracker_info(uuid="1d7f30ad-2f9a-427a-8059-651dc8827ae2")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_enrollee_with_sae_broadcast(self):
self.start_dpp_as_initiator_enrollee(
security=self.DPP_TEST_SECURITY_SAE, use_mac=False)
@test_tracker_info(uuid="0cfc2645-600e-4f2b-ab5c-fcee6d363a9a")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_enrollee_with_psk_passphrase_broadcast(self):
self.start_dpp_as_initiator_enrollee(
security=self.DPP_TEST_SECURITY_PSK_PASSPHRASE, use_mac=False)
@test_tracker_info(uuid="2e26b248-65dd-41f6-977b-e223d72b2de9")
+ @WifiBaseTest.wifi_test_wrap
def test_start_dpp_as_initiator_enrollee_receive_invalid_config(self):
self.start_dpp_as_initiator_enrollee(
security=self.DPP_TEST_SECURITY_PSK_PASSPHRASE,
@@ -735,6 +753,7 @@
invalid_config=True)
@test_tracker_info(uuid="ed189661-d1c1-4626-9f01-3b7bb8a417fe")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_fail_authentication(self):
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_PSK_PASSPHRASE,
@@ -742,6 +761,7 @@
fail_authentication=True)
@test_tracker_info(uuid="5a8c6587-fbb4-4a27-9cba-af6f8935833a")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_fail_unicast_timeout(self):
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_PSK_PASSPHRASE,
@@ -749,6 +769,7 @@
cause_timeout=True)
@test_tracker_info(uuid="b12353ac-1a04-4036-81a4-2d2d0c653dbb")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_fail_broadcast_timeout(self):
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_PSK_PASSPHRASE,
@@ -756,6 +777,7 @@
cause_timeout=True)
@test_tracker_info(uuid="eeff91be-09ce-4a33-8b4f-ece40eb51c76")
+ @WifiBaseTest.wifi_test_wrap
def test_dpp_as_initiator_configurator_invalid_uri(self):
self.start_dpp_as_initiator_configurator(
security=self.DPP_TEST_SECURITY_PSK_PASSPHRASE,
@@ -763,6 +785,7 @@
invalid_uri=True)
@test_tracker_info(uuid="1fa25f58-0d0e-40bd-8714-ab78957514d9")
+ @WifiBaseTest.wifi_test_wrap
def test_start_dpp_as_initiator_enrollee_fail_timeout(self):
self.start_dpp_as_initiator_enrollee(
security=self.DPP_TEST_SECURITY_PSK_PASSPHRASE,
diff --git a/acts/tests/google/wifi/WifiMacRandomizationTest.py b/acts/tests/google/wifi/WifiMacRandomizationTest.py
index 5ad2188..d41b05f 100644
--- a/acts/tests/google/wifi/WifiMacRandomizationTest.py
+++ b/acts/tests/google/wifi/WifiMacRandomizationTest.py
@@ -241,8 +241,9 @@
for pkt in packets:
self.log.debug("Packet Summary = %s" % pkt.summary())
if mac in pkt.summary():
- raise signals.TestFailure("Caught Factory MAC in packet sniffer."
- "Packet = %s" % pkt.show())
+ raise signals.TestFailure("Caught Factory MAC in packet sniffer"
+ "Packet = %s Device = %s"
+ % (pkt.show(), self.dut))
def verify_mac_is_found_in_pcap(self, mac, packets):
for pkt in packets:
@@ -250,7 +251,7 @@
if mac in pkt.summary():
return
raise signals.TestFailure("Did not find MAC = %s in packet sniffer."
- % mac)
+ "for device %s" % (mac, self.dut))
def get_sta_mac_address(self):
"""Gets the current MAC address being used for client mode."""
@@ -435,8 +436,8 @@
7. Verify the factory MAC is not leaked.
"""
- self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
- self.dut_client.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut_client, wutils.WifiEnums.CountryCode.US)
mac_sta = self.connect_to_network_and_verify_mac_randomization(
self.wpapsk_2g)
softap = wutils.start_softap_and_verify(self, WIFI_CONFIG_APBAND_2G)
diff --git a/acts/tests/google/wifi/WifiManagerTest.py b/acts/tests/google/wifi/WifiManagerTest.py
index a4ef011..8a7d067 100644
--- a/acts/tests/google/wifi/WifiManagerTest.py
+++ b/acts/tests/google/wifi/WifiManagerTest.py
@@ -976,3 +976,15 @@
"wifi state changed after reboot")
disable_bluetooth(self.dut.droid)
+
+ @test_tracker_info(uuid="d0e14a2d-a28f-4551-8988-1e15d9d8bb1a")
+ def test_scan_result_api(self):
+ """Register scan result callback, start scan and wait for event"""
+ self.dut.ed.clear_all_events()
+ self.dut.droid.wifiStartScanWithListener()
+ try:
+ events = self.dut.ed.pop_events(
+ "WifiManagerScanResultsCallbackOnSuccess", 60)
+ except queue.Empty:
+ asserts.fail(
+ "Wi-Fi scan results did not become available within 60s.")
diff --git a/acts/tests/google/wifi/WifiNetworkRequestTest.py b/acts/tests/google/wifi/WifiNetworkRequestTest.py
index b966de2..28d5d16 100644
--- a/acts/tests/google/wifi/WifiNetworkRequestTest.py
+++ b/acts/tests/google/wifi/WifiNetworkRequestTest.py
@@ -405,39 +405,17 @@
network_specifier)
time.sleep(wifi_constants.NETWORK_REQUEST_CB_REGISTER_DELAY_SEC)
self.dut.droid.wifiRegisterNetworkRequestMatchCallback()
- # Wait for the request to timeout. In the meantime, platform will scan
- # and return matching networks. Ensure the matching networks list is
- # empty.
- start_time = time.time()
- has_request_timedout = False
+ # Wait for the request to timeout.
+ timeout_secs = \
+ NETWORK_REQUEST_TIMEOUT_MS / 1000 + NETWORK_REQUEST_INSTANT_FAILURE_TIMEOUT_SEC
try:
- while not has_request_timedout and time.time() - start_time <= \
- NETWORK_REQUEST_TIMEOUT_MS / 1000:
- # Pop all network request related events.
- network_request_events = \
- self.dut.ed.pop_events("WifiManagerNetwork.*", 30)
- asserts.assert_true(network_request_events, "invalid events")
- for network_request_event in network_request_events:
- # Handle the network match callbacks.
- if network_request_event["name"] == \
- wifi_constants.WIFI_NETWORK_REQUEST_MATCH_CB_ON_MATCH:
- matched_scan_results = network_request_event["data"]
- self.dut.log.debug(
- "Network request on match results %s",
- matched_scan_results)
- asserts.assert_false(matched_scan_results,
- "Empty network matches expected")
- # Handle the network request unavailable timeout.
- if network_request_event["name"] == \
- wifi_constants.WIFI_NETWORK_CB_ON_UNAVAILABLE:
- self.dut.log.info("Network request timed out")
- has_request_timedout = True
+ on_unavailable_event = self.dut.ed.pop_event(
+ wifi_constants.WIFI_NETWORK_CB_ON_UNAVAILABLE, timeout_secs)
+ asserts.assert_true(on_unavailable_event, "Network request did not timeout")
except queue.Empty:
asserts.fail("No events returned")
finally:
self.dut.droid.wifiStopTrackingStateChange()
- asserts.assert_true(has_request_timedout,
- "Network request did not timeout")
@test_tracker_info(uuid="760c3768-697d-442b-8d61-cfe02f10ceff")
def test_connect_failure_user_rejected(self):
diff --git a/acts/tests/google/wifi/WifiNetworkSuggestionTest.py b/acts/tests/google/wifi/WifiNetworkSuggestionTest.py
index 6d6da35..e678dc8 100644
--- a/acts/tests/google/wifi/WifiNetworkSuggestionTest.py
+++ b/acts/tests/google/wifi/WifiNetworkSuggestionTest.py
@@ -36,10 +36,12 @@
EapPhase2 = WifiEnums.EapPhase2
# Enterprise Config Macros
Ent = WifiEnums.Enterprise
+ATT = 2
# Default timeout used for reboot, toggle WiFi and Airplane mode,
# for the system to settle down after the operation.
DEFAULT_TIMEOUT = 10
+PASSPOINT_TIMEOUT = 30
class WifiNetworkSuggestionTest(WifiBaseTest):
@@ -59,7 +61,7 @@
req_params = []
opt_param = [
"open_network", "reference_networks", "radius_conf_2g", "radius_conf_5g", "ca_cert",
- "eap_identity", "eap_password", "hidden_networks"
+ "eap_identity", "eap_password", "hidden_networks", "passpoint_networks"
]
self.unpack_userparams(
req_param_names=req_params, opt_param_names=opt_param)
@@ -70,16 +72,14 @@
radius_conf_2g=self.radius_conf_2g,
radius_conf_5g=self.radius_conf_5g,)
- asserts.assert_true(
- len(self.reference_networks) > 0,
- "Need at least one reference network with psk.")
- if hasattr(self, "reference_networks"):
- self.wpa_psk_2g = self.reference_networks[0]["2g"]
- self.wpa_psk_5g = self.reference_networks[0]["5g"]
- if hasattr(self, "open_network"):
+ if hasattr(self, "reference_networks") and \
+ isinstance(self.reference_networks, list):
+ self.wpa_psk_2g = self.reference_networks[0]["2g"]
+ self.wpa_psk_5g = self.reference_networks[0]["5g"]
+ if hasattr(self, "open_network") and isinstance(self.open_network,list):
self.open_2g = self.open_network[0]["2g"]
self.open_5g = self.open_network[0]["5g"]
- if hasattr(self, "ent_networks"):
+ if hasattr(self, "ent_networks") and isinstance(self.ent_networks,list):
self.ent_network_2g = self.ent_networks[0]["2g"]
self.ent_network_5g = self.ent_networks[0]["5g"]
self.config_aka = {
@@ -94,8 +94,9 @@
Ent.PHASE2: int(EapPhase2.MSCHAPV2),
WifiEnums.SSID_KEY: self.ent_network_2g[WifiEnums.SSID_KEY],
}
- if hasattr(self, "hidden_networks"):
- self.hidden_network = self.hidden_networks[0]
+ if hasattr(self, "hidden_networks") and \
+ isinstance(self.hidden_networks, list):
+ self.hidden_network = self.hidden_networks[0]
self.dut.droid.wifiRemoveNetworkSuggestions([])
def setup_test(self):
@@ -153,13 +154,16 @@
self.dut.droid.wifiAddNetworkSuggestions(network_suggestions),
"Failed to add suggestions")
# Enable suggestions by the app.
- self.dut.log.debug("Enabling suggestions from test");
+ self.dut.log.debug("Enabling suggestions from test")
self.set_approved(True)
wutils.start_wifi_connection_scan_and_return_status(self.dut)
+ # if suggestion is passpoint wait longer for connection.
+ if "profile" in network_suggestions:
+ time.sleep(PASSPOINT_TIMEOUT)
wutils.wait_for_connect(self.dut, expected_ssid)
if expect_post_connection_broadcast is None:
- return;
+ return
# Check if we expected to get the broadcast.
try:
@@ -180,15 +184,11 @@
def remove_suggestions_disconnect_and_ensure_no_connection_back(self,
network_suggestions,
expected_ssid):
+ # Remove suggestion trigger disconnect and wait for the disconnect.
self.dut.log.info("Removing network suggestions")
asserts.assert_true(
self.dut.droid.wifiRemoveNetworkSuggestions(network_suggestions),
"Failed to remove suggestions")
- # Ensure we did not disconnect
- wutils.ensure_no_disconnect(self.dut)
-
- # Trigger a disconnect and wait for the disconnect.
- self.dut.droid.wifiDisconnect()
wutils.wait_for_disconnect(self.dut)
self.dut.ed.clear_all_events()
@@ -247,8 +247,11 @@
1. Send 2 network suggestions to the device (with different priorities).
2. Wait for the device to connect to the network with the highest
priority.
- 3. Re-add the suggestions with the priorities reversed.
- 4. Again wait for the device to connect to the network with the highest
+ 3. In-place modify network suggestions with priorities reversed
+ 4. Restart wifi, wait for the device to connect to the network with the highest
+ priority.
+ 5. Re-add the suggestions with the priorities reversed again.
+ 6. Again wait for the device to connect to the network with the highest
priority.
"""
network_suggestion_2g = self.wpa_psk_2g
@@ -262,16 +265,33 @@
self.wpa_psk_2g[WifiEnums.SSID_KEY],
None)
+ # In-place modify Reverse the priority, should be no disconnect
+ network_suggestion_2g[WifiEnums.PRIORITY] = 2
+ network_suggestion_5g[WifiEnums.PRIORITY] = 5
+ self.dut.log.info("Modifying network suggestions");
+ asserts.assert_true(
+ self.dut.droid.wifiAddNetworkSuggestions([network_suggestion_2g,
+ network_suggestion_5g]),
+ "Failed to add suggestions")
+ wutils.ensure_no_disconnect(self.dut)
+
+ # Disable and re-enable wifi, should connect to higher priority
+ wutils.wifi_toggle_state(self.dut, False)
+ time.sleep(DEFAULT_TIMEOUT)
+ wutils.wifi_toggle_state(self.dut, True)
+ wutils.start_wifi_connection_scan_and_return_status(self.dut)
+ wutils.wait_for_connect(self.dut, self.wpa_psk_5g[WifiEnums.SSID_KEY])
+
self.remove_suggestions_disconnect_and_ensure_no_connection_back(
- [], self.wpa_psk_2g[WifiEnums.SSID_KEY])
+ [], self.wpa_psk_5g[WifiEnums.SSID_KEY])
# Reverse the priority.
# Add suggestions & wait for the connection event.
- network_suggestion_2g[WifiEnums.PRIORITY] = 2
- network_suggestion_5g[WifiEnums.PRIORITY] = 5
+ network_suggestion_2g[WifiEnums.PRIORITY] = 5
+ network_suggestion_5g[WifiEnums.PRIORITY] = 2
self.add_suggestions_and_ensure_connection(
[network_suggestion_2g, network_suggestion_5g],
- self.wpa_psk_5g[WifiEnums.SSID_KEY],
+ self.wpa_psk_2g[WifiEnums.SSID_KEY],
None)
@test_tracker_info(uuid="b1d27eea-23c8-4c4f-b944-ef118e4cc35f")
@@ -456,3 +476,98 @@
[network_suggestion], network_suggestion[WifiEnums.SSID_KEY], False)
self.remove_suggestions_disconnect_and_ensure_no_connection_back(
[network_suggestion], network_suggestion[WifiEnums.SSID_KEY])
+
+ @test_tracker_info(uuid="806dff14-7543-482b-bd0a-598de59374b3")
+ def test_connect_to_passpoint_network_with_post_connection_broadcast(self):
+ """ Adds a passpoint network suggestion and ensure that the device connected.
+
+ Steps:
+ 1. Send a network suggestion to the device.
+ 2. Wait for the device to connect to it.
+ 3. Ensure that we did receive the post connection broadcast
+ (isAppInteractionRequired = true).
+ 4. Remove the suggestions and ensure the device does not connect back.
+ """
+ asserts.skip_if(not hasattr(self, "passpoint_networks"),
+ "No passpoint networks, skip this test")
+ passpoint_config = self.passpoint_networks[ATT]
+ passpoint_config[WifiEnums.IS_APP_INTERACTION_REQUIRED] = True
+ self.add_suggestions_and_ensure_connection([passpoint_config],
+ passpoint_config[WifiEnums.SSID_KEY], True)
+ self.remove_suggestions_disconnect_and_ensure_no_connection_back(
+ [passpoint_config], passpoint_config[WifiEnums.SSID_KEY])
+
+ @test_tracker_info(uuid="159b8b8c-fb00-4d4e-a29f-606881dcbf44")
+ def test_connect_to_passpoint_network_reboot_config_store(self):
+ """
+ Adds a passpoint network suggestion and ensure that the device connects to it
+ after reboot.
+
+ Steps:
+ 1. Send a network suggestion to the device.
+ 2. Wait for the device to connect to it.
+ 3. Ensure that we did not receive the post connection broadcast
+ (isAppInteractionRequired = False).
+ 4. Reboot the device.
+ 5. Wait for the device to connect to back to it.
+ 6. Remove the suggestions and ensure the device does not connect back.
+ """
+ asserts.skip_if(not hasattr(self, "passpoint_networks"),
+ "No passpoint networks, skip this test")
+ passpoint_config = self.passpoint_networks[ATT]
+ self._test_connect_to_wifi_network_reboot_config_store([passpoint_config],
+ passpoint_config)
+
+ @test_tracker_info(uuid="34f3d28a-bedf-43fe-a12d-2cfadf6bc6eb")
+ def test_fail_to_connect_to_passpoint_network_when_not_approved(self):
+ """
+ Adds a passpoint network suggestion and ensure that the device does not
+ connect to it until we approve the app.
+
+ Steps:
+ 1. Send a network suggestion to the device with the app not approved.
+ 2. Ensure the network is present in scan results, but we don't connect
+ to it.
+ 3. Now approve the app.
+ 4. Wait for the device to connect to it.
+ """
+ asserts.skip_if(not hasattr(self, "passpoint_networks"),
+ "No passpoint networks, skip this test")
+ passpoint_config = self.passpoint_networks[ATT]
+ self.dut.log.info("Adding network suggestions")
+ asserts.assert_true(
+ self.dut.droid.wifiAddNetworkSuggestions([passpoint_config]),
+ "Failed to add suggestions")
+
+ # Disable suggestions by the app.
+ self.set_approved(False)
+
+ # Ensure the app is not approved.
+ asserts.assert_false(
+ self.is_approved(),
+ "Suggestions should be disabled")
+
+ # Start a new scan to trigger auto-join.
+ wutils.start_wifi_connection_scan_and_ensure_network_found(
+ self.dut, passpoint_config[WifiEnums.SSID_KEY])
+
+ # Ensure we don't connect to the network.
+ asserts.assert_false(
+ wutils.wait_for_connect(
+ self.dut, passpoint_config[WifiEnums.SSID_KEY], assert_on_fail=False),
+ "Should not connect to network suggestions from unapproved app")
+
+ self.dut.log.info("Enabling suggestions from test");
+ # Now Enable suggestions by the app & ensure we connect to the network.
+ self.set_approved(True)
+
+ # Ensure the app is approved.
+ asserts.assert_true(
+ self.is_approved(),
+ "Suggestions should be enabled")
+
+ # Start a new scan to trigger auto-join.
+ wutils.start_wifi_connection_scan_and_ensure_network_found(
+ self.dut, passpoint_config[WifiEnums.SSID_KEY])
+ time.sleep(PASSPOINT_TIMEOUT)
+ wutils.wait_for_connect(self.dut, passpoint_config[WifiEnums.SSID_KEY])
diff --git a/acts/tests/google/wifi/WifiPingTest.py b/acts/tests/google/wifi/WifiPingTest.py
index 1bce903..36c139a 100644
--- a/acts/tests/google/wifi/WifiPingTest.py
+++ b/acts/tests/google/wifi/WifiPingTest.py
@@ -432,7 +432,7 @@
self.log.info('Already connected to desired network')
else:
wutils.reset_wifi(self.dut)
- self.dut.droid.wifiSetCountryCode(
+ wutils.set_wifi_country_code(self.dut,
self.testclass_params['country_code'])
testcase_params['test_network']['channel'] = testcase_params[
'channel']
diff --git a/acts/tests/google/wifi/WifiRoamingPerformanceTest.py b/acts/tests/google/wifi/WifiRoamingPerformanceTest.py
index da57bf5..8684dd0 100644
--- a/acts/tests/google/wifi/WifiRoamingPerformanceTest.py
+++ b/acts/tests/google/wifi/WifiRoamingPerformanceTest.py
@@ -443,7 +443,7 @@
if not wputils.health_check(self.dut, 10):
asserts.skip('Battery level too low. Skipping test.')
wutils.reset_wifi(self.dut)
- self.dut.droid.wifiSetCountryCode(
+ wutils.set_wifi_country_code(self.dut,
self.testclass_params['country_code'])
(primary_net_id,
primary_net_config) = next(net for net in self.main_network.items()
diff --git a/acts/tests/google/wifi/WifiRssiTest.py b/acts/tests/google/wifi/WifiRssiTest.py
index 91bfa9f..b929c2b 100644
--- a/acts/tests/google/wifi/WifiRssiTest.py
+++ b/acts/tests/google/wifi/WifiRssiTest.py
@@ -574,7 +574,7 @@
wutils.reset_wifi(self.dut)
self.main_network[testcase_params['band']][
'channel'] = testcase_params['channel']
- self.dut.droid.wifiSetCountryCode(
+ wutils.set_wifi_country_code(self.dut,
self.testclass_params['country_code'])
wutils.wifi_connect(
self.dut,
diff --git a/acts/tests/google/wifi/WifiRvrTest.py b/acts/tests/google/wifi/WifiRvrTest.py
index 289c5e4..65ada95 100644
--- a/acts/tests/google/wifi/WifiRvrTest.py
+++ b/acts/tests/google/wifi/WifiRvrTest.py
@@ -491,7 +491,7 @@
self.log.info('Already connected to desired network')
else:
wutils.reset_wifi(self.dut)
- self.dut.droid.wifiSetCountryCode(
+ wutils.set_wifi_country_code(self.dut,
self.testclass_params['country_code'])
testcase_params['test_network']['channel'] = testcase_params[
'channel']
diff --git a/acts/tests/google/wifi/WifiSensitivityTest.py b/acts/tests/google/wifi/WifiSensitivityTest.py
index 1947684..19a3227 100644
--- a/acts/tests/google/wifi/WifiSensitivityTest.py
+++ b/acts/tests/google/wifi/WifiSensitivityTest.py
@@ -393,7 +393,7 @@
self.log.info('Already connected to desired network')
else:
wutils.reset_wifi(self.dut)
- self.dut.droid.wifiSetCountryCode(
+ wutils.set_wifi_country_code(self.dut,
self.testclass_params['country_code'])
self.main_network[band]['channel'] = testcase_params['channel']
wutils.wifi_connect(
diff --git a/acts/tests/google/wifi/WifiSoftApAcsTest.py b/acts/tests/google/wifi/WifiSoftApAcsTest.py
index 7f92e27..0bf47a9 100644
--- a/acts/tests/google/wifi/WifiSoftApAcsTest.py
+++ b/acts/tests/google/wifi/WifiSoftApAcsTest.py
@@ -55,8 +55,8 @@
utils.sync_device_time(self.dut)
utils.sync_device_time(self.dut_client)
# Set country code explicitly to "US".
- self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
- self.dut_client.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut_client, wutils.WifiEnums.CountryCode.US)
# Enable verbose logging on the duts
self.dut.droid.wifiEnableVerboseLogging(1)
asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1,
@@ -69,8 +69,17 @@
"iperf_server_port"]
self.unpack_userparams(
req_param_names=req_params, opt_param_names=opt_param)
+ self.chan_map = {v: k for k, v in hostapd_constants.CHANNEL_MAP.items()}
+ self.pcap_procs = None
def setup_test(self):
+ if hasattr(self, 'packet_capture'):
+ chan = self.test_name.split('_')[-1]
+ if chan.isnumeric():
+ band = '2G' if self.chan_map[int(chan)] < 5000 else '5G'
+ self.packet_capture[0].configure_monitor_mode(band, int(chan))
+ self.pcap_procs = wutils.start_pcap(
+ self.packet_capture[0], band, self.test_name)
self.dut.droid.wakeLockAcquireBright()
self.dut.droid.wakeUpNow()
@@ -80,6 +89,9 @@
wutils.stop_wifi_tethering(self.dut)
wutils.reset_wifi(self.dut)
wutils.reset_wifi(self.dut_client)
+ if hasattr(self, 'packet_capture') and self.pcap_procs:
+ wutils.stop_pcap(self.packet_capture[0], self.pcap_procs, False)
+ self.pcap_procs = None
try:
if "AccessPoint" in self.user_params:
del self.user_params["reference_networks"]
diff --git a/acts/tests/google/wifi/WifiSoftApPerformanceTest.py b/acts/tests/google/wifi/WifiSoftApPerformanceTest.py
index a31812f..175f2ff 100644
--- a/acts/tests/google/wifi/WifiSoftApPerformanceTest.py
+++ b/acts/tests/google/wifi/WifiSoftApPerformanceTest.py
@@ -122,7 +122,7 @@
# Reset WiFi on all devices
for dev in self.android_devices:
wutils.reset_wifi(dev)
- dev.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(dev, wutils.WifiEnums.CountryCode.US)
# Setup Soft AP
sap_config = wutils.create_softap_config()
diff --git a/acts/tests/google/wifi/WifiSoftApTest.py b/acts/tests/google/wifi/WifiSoftApTest.py
index 9298e63..ec1922d 100644
--- a/acts/tests/google/wifi/WifiSoftApTest.py
+++ b/acts/tests/google/wifi/WifiSoftApTest.py
@@ -60,8 +60,8 @@
utils.sync_device_time(self.dut)
utils.sync_device_time(self.dut_client)
# Set country code explicitly to "US".
- self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
- self.dut_client.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut_client, wutils.WifiEnums.CountryCode.US)
# Enable verbose logging on the duts
self.dut.droid.wifiEnableVerboseLogging(1)
asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1,
@@ -76,7 +76,7 @@
self.AP_IFACE = 'wlan1'
if len(self.android_devices) > 2:
utils.sync_device_time(self.android_devices[2])
- self.android_devices[2].droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.android_devices[2], wutils.WifiEnums.CountryCode.US)
self.android_devices[2].droid.wifiEnableVerboseLogging(1)
asserts.assert_equal(self.android_devices[2].droid.wifiGetVerboseLoggingLevel(), 1,
"Failed to enable WiFi verbose logging on the client dut.")
@@ -91,8 +91,8 @@
def setup_test(self):
# Set country code explicitly to "US".
- self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
- self.dut_client.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut_client, wutils.WifiEnums.CountryCode.US)
def teardown_test(self):
self.dut.log.debug("Toggling Airplane mode OFF.")
diff --git a/acts/tests/google/wifi/WifiStaApConcurrencyStressTest.py b/acts/tests/google/wifi/WifiStaApConcurrencyStressTest.py
index fae3103..6dda5dd 100755
--- a/acts/tests/google/wifi/WifiStaApConcurrencyStressTest.py
+++ b/acts/tests/google/wifi/WifiStaApConcurrencyStressTest.py
@@ -72,8 +72,8 @@
utils.sync_device_time(self.dut)
utils.sync_device_time(self.dut_client)
# Set country code explicitly to "US".
- self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
- self.dut_client.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut_client, wutils.WifiEnums.CountryCode.US)
# Enable verbose logging on the duts
self.dut.droid.wifiEnableVerboseLogging(1)
asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1,
@@ -104,7 +104,7 @@
if len(self.android_devices) > 2:
wutils.wifi_test_device_init(self.android_devices[2])
utils.sync_device_time(self.android_devices[2])
- self.android_devices[2].droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.android_devices[2], wutils.WifiEnums.CountryCode.US)
self.android_devices[2].droid.wifiEnableVerboseLogging(1)
asserts.assert_equal(self.android_devices[2].droid.wifiGetVerboseLoggingLevel(), 1,
"Failed to enable WiFi verbose logging on the client dut.")
diff --git a/acts/tests/google/wifi/WifiStaApConcurrencyTest.py b/acts/tests/google/wifi/WifiStaApConcurrencyTest.py
index 92dc7aa..e16e253 100644
--- a/acts/tests/google/wifi/WifiStaApConcurrencyTest.py
+++ b/acts/tests/google/wifi/WifiStaApConcurrencyTest.py
@@ -61,8 +61,8 @@
utils.sync_device_time(self.dut)
utils.sync_device_time(self.dut_client)
# Set country code explicitly to "US".
- self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
- self.dut_client.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut_client, wutils.WifiEnums.CountryCode.US)
# Enable verbose logging on the duts
self.dut.droid.wifiEnableVerboseLogging(1)
asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1,
@@ -86,7 +86,7 @@
if len(self.android_devices) > 2:
wutils.wifi_test_device_init(self.android_devices[2])
utils.sync_device_time(self.android_devices[2])
- self.android_devices[2].droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.android_devices[2], wutils.WifiEnums.CountryCode.US)
self.android_devices[2].droid.wifiEnableVerboseLogging(1)
asserts.assert_equal(self.android_devices[2].droid.wifiGetVerboseLoggingLevel(), 1,
"Failed to enable WiFi verbose logging on the client dut.")
diff --git a/acts/tests/google/wifi/WifiStressTest.py b/acts/tests/google/wifi/WifiStressTest.py
index 0f9032a..5c0633b 100644
--- a/acts/tests/google/wifi/WifiStressTest.py
+++ b/acts/tests/google/wifi/WifiStressTest.py
@@ -431,17 +431,14 @@
4. Verify softAP is turned down and WiFi is up.
"""
- # Set country code explicitly to "US".
- self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
- self.dut_client.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
ap_ssid = "softap_" + utils.rand_ascii_str(8)
ap_password = utils.rand_ascii_str(8)
self.dut.log.info("softap setup: %s %s", ap_ssid, ap_password)
config = {wutils.WifiEnums.SSID_KEY: ap_ssid}
config[wutils.WifiEnums.PWD_KEY] = ap_password
# Set country code explicitly to "US".
- self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
- self.dut_client.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US)
+ wutils.set_wifi_country_code(self.dut_client, wutils.WifiEnums.CountryCode.US)
for count in range(self.stress_count):
initial_wifi_state = self.dut.droid.wifiCheckState()
wutils.start_wifi_tethering(self.dut,
diff --git a/acts/tests/google/wifi/WifiThroughputStabilityTest.py b/acts/tests/google/wifi/WifiThroughputStabilityTest.py
index f3e942f..cc3e468 100644
--- a/acts/tests/google/wifi/WifiThroughputStabilityTest.py
+++ b/acts/tests/google/wifi/WifiThroughputStabilityTest.py
@@ -286,7 +286,7 @@
else:
wutils.wifi_toggle_state(self.dut, True)
wutils.reset_wifi(self.dut)
- self.dut.droid.wifiSetCountryCode(
+ wutils.set_wifi_country_code(self.dut,
self.testclass_params['country_code'])
self.main_network[band]['channel'] = testcase_params['channel']
wutils.wifi_connect(
diff --git a/acts/tests/google/wifi/aware/functional/DataPathTest.py b/acts/tests/google/wifi/aware/functional/DataPathTest.py
index 3016bb9..cf939bf 100644
--- a/acts/tests/google/wifi/aware/functional/DataPathTest.py
+++ b/acts/tests/google/wifi/aware/functional/DataPathTest.py
@@ -1740,8 +1740,8 @@
init_dut = self.android_devices[0]
resp_dut = self.android_devices[1]
- init_dut.droid.wifiSetCountryCode(init_domain)
- resp_dut.droid.wifiSetCountryCode(resp_domain)
+ wutils.set_wifi_country_code(init_dut, init_domain)
+ wutils.set_wifi_country_code(resp_dut, resp_domain)
if use_ib:
(resp_req_key, init_req_key, resp_aware_if, init_aware_if,
@@ -2159,8 +2159,8 @@
The NDPs are all OPEN (no encryption).
"""
- asserts.assert_true(
- len(self.android_devices) >= 3,
+ asserts.skip_if(
+ len(self.android_devices) < 3,
'A minimum of 3 devices is needed to run the test, have %d' % len(
self.android_devices))
diff --git a/acts/tests/google/wifi/aware/functional/DiscoveryTest.py b/acts/tests/google/wifi/aware/functional/DiscoveryTest.py
index 5f01c80..d867443 100644
--- a/acts/tests/google/wifi/aware/functional/DiscoveryTest.py
+++ b/acts/tests/google/wifi/aware/functional/DiscoveryTest.py
@@ -1068,3 +1068,192 @@
s_config=autils.create_discovery_config(
sub_service_name, aconsts.SUBSCRIBE_TYPE_PASSIVE),
device_startup_offset=self.device_startup_offset)
+
+ ##########################################################
+
+ def exchange_messages(self, p_dut, p_disc_id, s_dut, s_disc_id, peer_id_on_sub, session_name):
+ """
+ Exchange message between Publisher and Subscriber on target discovery session
+
+ Args:
+ p_dut: Publisher device
+ p_disc_id: Publish discovery session id
+ s_dut: Subscriber device
+ s_disc_id: Subscribe discovery session id
+ peer_id_on_sub: Peer ID of the Publisher as seen on the Subscriber
+ session_name: dictionary of discovery session name base on role("pub" or "sub")
+ {role: {disc_id: name}}
+ """
+ msg_template = "Hello {} from {} !"
+
+ # Message send from Subscriber to Publisher
+ s_to_p_msg = msg_template.format(session_name["pub"][p_disc_id],
+ session_name["sub"][s_disc_id])
+ s_dut.droid.wifiAwareSendMessage(s_disc_id,
+ peer_id_on_sub,
+ self.get_next_msg_id(),
+ s_to_p_msg,
+ self.msg_retx_count)
+ autils.wait_for_event(s_dut,
+ autils.decorate_event(aconsts.SESSION_CB_ON_MESSAGE_SENT, s_disc_id))
+ event = autils.wait_for_event(p_dut,
+ autils.decorate_event(aconsts.SESSION_CB_ON_MESSAGE_RECEIVED,
+ p_disc_id))
+ asserts.assert_equal(
+ event["data"][aconsts.SESSION_CB_KEY_MESSAGE_AS_STRING], s_to_p_msg,
+ "Message on service %s from Subscriber to Publisher "
+ "not received correctly" % session_name["pub"][p_disc_id])
+ peer_id_on_pub = event["data"][aconsts.SESSION_CB_KEY_PEER_ID]
+
+ # Message send from Publisher to Subscriber
+ p_to_s_msg = msg_template.format(session_name["sub"][s_disc_id],
+ session_name["pub"][p_disc_id])
+ p_dut.droid.wifiAwareSendMessage(p_disc_id,
+ peer_id_on_pub,
+ self.get_next_msg_id(), p_to_s_msg,
+ self.msg_retx_count)
+ autils.wait_for_event(
+ p_dut, autils.decorate_event(aconsts.SESSION_CB_ON_MESSAGE_SENT, p_disc_id))
+ event = autils.wait_for_event(s_dut,
+ autils.decorate_event(aconsts.SESSION_CB_ON_MESSAGE_RECEIVED,
+ s_disc_id))
+ asserts.assert_equal(
+ event["data"][aconsts.SESSION_CB_KEY_MESSAGE_AS_STRING], p_to_s_msg,
+ "Message on service %s from Publisher to Subscriber"
+ "not received correctly" % session_name["sub"][s_disc_id])
+
+ def run_multiple_concurrent_services_same_name_diff_ssi(self, type_x, type_y):
+ """Validate same service name with multiple service specific info on publisher
+ and subscriber can see all service
+
+ - p_dut running Publish X and Y
+ - s_dut running subscribe A and B
+ - subscribe A find X and Y
+ - subscribe B find X and Y
+
+ Message exchanges:
+ - A to X and X to A
+ - B to X and X to B
+ - A to Y and Y to A
+ - B to Y and Y to B
+
+ Note: test requires that publisher device support 2 publish sessions concurrently,
+ and subscriber device support 2 subscribe sessions concurrently.
+ The test will be skipped if the devices are not capable.
+
+ Args:
+ type_x, type_y: A list of [ptype, stype] of the publish and subscribe
+ types for services X and Y respectively.
+ """
+ p_dut = self.android_devices[0]
+ s_dut = self.android_devices[1]
+
+ asserts.skip_if(
+ p_dut.aware_capabilities[aconsts.CAP_MAX_PUBLISHES] < 2
+ or s_dut.aware_capabilities[aconsts.CAP_MAX_SUBSCRIBES] < 2,
+ "Devices do not support 2 publish sessions or 2 subscribe sessions")
+
+ SERVICE_NAME = "ServiceName"
+ X_SERVICE_SSI = "ServiceSpecificInfoXXX"
+ Y_SERVICE_SSI = "ServiceSpecificInfoYYY"
+ use_id = True
+
+ # attach and wait for confirmation
+ p_id = p_dut.droid.wifiAwareAttach(False, None, use_id)
+ autils.wait_for_event(p_dut, autils.decorate_event(aconsts.EVENT_CB_ON_ATTACHED, p_id))
+ time.sleep(self.device_startup_offset)
+ s_id = s_dut.droid.wifiAwareAttach(False, None, use_id)
+ autils.wait_for_event(s_dut, autils.decorate_event(aconsts.EVENT_CB_ON_ATTACHED, s_id))
+
+ # Publisher: start publishing both X & Y services and wait for confirmations
+ p_disc_id_x = p_dut.droid.wifiAwarePublish(
+ p_id, autils.create_discovery_config(SERVICE_NAME, type_x[0], X_SERVICE_SSI), use_id)
+ event = autils.wait_for_event(p_dut,
+ autils.decorate_event(
+ aconsts.SESSION_CB_ON_PUBLISH_STARTED, p_disc_id_x))
+
+ p_disc_id_y = p_dut.droid.wifiAwarePublish(
+ p_id, autils.create_discovery_config(SERVICE_NAME, type_x[0], Y_SERVICE_SSI), use_id)
+ event = autils.wait_for_event(p_dut,
+ autils.decorate_event(
+ aconsts.SESSION_CB_ON_PUBLISH_STARTED, p_disc_id_y))
+
+ # Subscriber: start subscribe session A
+ s_disc_id_a = s_dut.droid.wifiAwareSubscribe(
+ s_id, autils.create_discovery_config(SERVICE_NAME, type_x[1]), use_id)
+ autils.wait_for_event(s_dut, autils.decorate_event(
+ aconsts.SESSION_CB_ON_SUBSCRIBE_STARTED, s_disc_id_a))
+
+ # Subscriber: start subscribe session B
+ s_disc_id_b = s_dut.droid.wifiAwareSubscribe(
+ p_id, autils.create_discovery_config(SERVICE_NAME, type_y[1]), use_id)
+ autils.wait_for_event(s_dut, autils.decorate_event(
+ aconsts.SESSION_CB_ON_SUBSCRIBE_STARTED, s_disc_id_b))
+
+ session_name = {"pub": {p_disc_id_x: "X", p_disc_id_y: "Y"},
+ "sub": {s_disc_id_a: "A", s_disc_id_b: "B"}}
+
+ # Subscriber: subscribe session A & B wait for service discovery
+ # Number of results on each session should be exactly 2
+ results_a = {}
+ for i in range(2):
+ event = autils.wait_for_event(s_dut, autils.decorate_event(
+ aconsts.SESSION_CB_ON_SERVICE_DISCOVERED, s_disc_id_a))
+ results_a[
+ bytes(event["data"][
+ aconsts.SESSION_CB_KEY_SERVICE_SPECIFIC_INFO]).decode('utf-8')] = event
+ autils.fail_on_event(s_dut, autils.decorate_event(
+ aconsts.SESSION_CB_ON_SERVICE_DISCOVERED, s_disc_id_a))
+
+ results_b = {}
+ for i in range(2):
+ event = autils.wait_for_event(s_dut, autils.decorate_event(
+ aconsts.SESSION_CB_ON_SERVICE_DISCOVERED, s_disc_id_b))
+ results_b[
+ bytes(event["data"][
+ aconsts.SESSION_CB_KEY_SERVICE_SPECIFIC_INFO]).decode('utf-8')] = event
+ autils.fail_on_event(s_dut, autils.decorate_event(
+ aconsts.SESSION_CB_ON_SERVICE_DISCOVERED, s_disc_id_b))
+
+ s_a_peer_id_for_p_x = results_a[X_SERVICE_SSI]["data"][aconsts.SESSION_CB_KEY_PEER_ID]
+ s_a_peer_id_for_p_y = results_a[Y_SERVICE_SSI]["data"][aconsts.SESSION_CB_KEY_PEER_ID]
+ s_b_peer_id_for_p_x = results_b[X_SERVICE_SSI]["data"][aconsts.SESSION_CB_KEY_PEER_ID]
+ s_b_peer_id_for_p_y = results_b[Y_SERVICE_SSI]["data"][aconsts.SESSION_CB_KEY_PEER_ID]
+
+ # Message exchange between Publisher and Subscribe
+ self.exchange_messages(p_dut, p_disc_id_x,
+ s_dut, s_disc_id_a, s_a_peer_id_for_p_x, session_name)
+
+ self.exchange_messages(p_dut, p_disc_id_x,
+ s_dut, s_disc_id_b, s_b_peer_id_for_p_x, session_name)
+
+ self.exchange_messages(p_dut, p_disc_id_y,
+ s_dut, s_disc_id_a, s_a_peer_id_for_p_y, session_name)
+
+ self.exchange_messages(p_dut, p_disc_id_y,
+ s_dut, s_disc_id_b, s_b_peer_id_for_p_y, session_name)
+
+ # Check no more messages
+ time.sleep(autils.EVENT_TIMEOUT)
+ autils.verify_no_more_events(p_dut, timeout=0)
+ autils.verify_no_more_events(s_dut, timeout=0)
+
+ ##########################################################
+
+ def test_multiple_concurrent_services_diff_ssi_unsolicited_passive(self):
+ """Multi service test on same service name but different Service Specific Info
+ - Unsolicited publish
+ - Passive subscribe
+ """
+ self.run_multiple_concurrent_services_same_name_diff_ssi(
+ type_x=[aconsts.PUBLISH_TYPE_UNSOLICITED, aconsts.SUBSCRIBE_TYPE_PASSIVE],
+ type_y=[aconsts.PUBLISH_TYPE_UNSOLICITED, aconsts.SUBSCRIBE_TYPE_PASSIVE])
+
+ def test_multiple_concurrent_services_diff_ssi_solicited_active(self):
+ """Multi service test on same service name but different Service Specific Info
+ - Solicited publish
+ - Active subscribe
+ """
+ self.run_multiple_concurrent_services_same_name_diff_ssi(
+ type_x=[aconsts.PUBLISH_TYPE_SOLICITED, aconsts.SUBSCRIBE_TYPE_ACTIVE],
+ type_y=[aconsts.PUBLISH_TYPE_SOLICITED, aconsts.SUBSCRIBE_TYPE_ACTIVE])
diff --git a/acts/tests/google/wifi/aware/functional/MacRandomNoLeakageTest.py b/acts/tests/google/wifi/aware/functional/MacRandomNoLeakageTest.py
index 074ca48..3898832 100644
--- a/acts/tests/google/wifi/aware/functional/MacRandomNoLeakageTest.py
+++ b/acts/tests/google/wifi/aware/functional/MacRandomNoLeakageTest.py
@@ -70,12 +70,13 @@
pcaps = pcap_5g + pcap_2g
# Verify factory MAC is not leaked in both 2G and 5G pcaps
- for mac in factory_mac_addresses:
- wutils.verify_mac_not_found_in_pcap(mac, pcaps)
+ ads = [self.android_devices[0], self.android_devices[1]]
+ for i, mac in enumerate(factory_mac_addresses):
+ wutils.verify_mac_not_found_in_pcap(ads[i], mac, pcaps)
# Verify random MACs are being used and in pcaps
- for mac in mac_addresses:
- wutils.verify_mac_is_found_in_pcap(mac, pcaps)
+ for i, mac in enumerate(mac_addresses):
+ wutils.verify_mac_is_found_in_pcap(ads[i], mac, pcaps)
def transfer_mac_format(self, mac):
"""add ':' to mac String, and transfer to lower case
diff --git a/acts/tests/google/wifi/aware/functional/NonConcurrencyTest.py b/acts/tests/google/wifi/aware/functional/NonConcurrencyTest.py
index f0286f2..4f40509 100644
--- a/acts/tests/google/wifi/aware/functional/NonConcurrencyTest.py
+++ b/acts/tests/google/wifi/aware/functional/NonConcurrencyTest.py
@@ -40,6 +40,7 @@
AwareBaseTest.teardown_test(self)
for ad in self.android_devices:
ad.droid.wifiP2pClose()
+ ad.droid.connectivityStopTethering(0)
def run_aware_then_incompat_service(self, is_p2p):
"""Run test to validate that a running Aware session terminates when an
@@ -81,6 +82,15 @@
# expect an announcement about Aware non-availability
autils.wait_for_event(dut, aconsts.BROADCAST_WIFI_AWARE_NOT_AVAILABLE)
+ # Wifi state and location mode changes should not make Aware available
+ wutils.wifi_toggle_state(dut, False)
+ utils.set_location_service(dut, False)
+ wutils.wifi_toggle_state(dut, True)
+ utils.set_location_service(dut, True)
+ autils.fail_on_event(dut, aconsts.BROADCAST_WIFI_AWARE_AVAILABLE)
+ asserts.assert_false(dut.droid.wifiIsAwareAvailable(),
+ "Aware is available (it shouldn't be)")
+
# try starting anyway (expect failure)
dut.droid.wifiAwareAttach()
autils.wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACH_FAILED)
@@ -133,6 +143,7 @@
p_id = dut.droid.wifiAwareAttach()
autils.wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACHED)
+ wutils.start_wifi_connection_scan_and_ensure_network_found(dut, ap_ssid)
wutils.wifi_connect(dut, config, check_connectivity=False)
autils.wait_for_event(dut, wconsts.WIFI_STATE_CHANGED)
diff --git a/acts/tests/google/wifi/aware/performance/ThroughputTest.py b/acts/tests/google/wifi/aware/performance/ThroughputTest.py
index a90734d..2dab276 100644
--- a/acts/tests/google/wifi/aware/performance/ThroughputTest.py
+++ b/acts/tests/google/wifi/aware/performance/ThroughputTest.py
@@ -406,3 +406,35 @@
[self.PASSPHRASE, self.PASSPHRASE2], results=results)
asserts.explicit_pass(
"test_iperf_max_ndi_aware_only_passphrases passes", extras=results)
+
+ def run_test_traffic_latency_single_ndp_ib_aware_only_open(self):
+ """Measure IPv6 traffic latency performance(ping) on NDP between 2 devices.
+ Security config is open.
+ """
+ p_dut = self.android_devices[0]
+ p_dut.pretty_name = "publisher"
+ s_dut = self.android_devices[1]
+ s_dut.pretty_name = "subscriber"
+ ndp_info = autils.create_ib_ndp(p_dut,
+ s_dut,
+ autils.create_discovery_config(
+ self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED),
+ autils.create_discovery_config(
+ self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE),
+ self.device_startup_offset)
+ p_req_key = ndp_info[0]
+ s_req_key = ndp_info[1]
+ p_aware_if = ndp_info[2]
+ s_aware_if = ndp_info[3]
+ p_ipv6 = ndp_info[4]
+ s_ipv6 = ndp_info[5]
+ self.log.info("Interface names: P=%s, S=%s", p_aware_if, s_aware_if)
+ self.log.info("Interface addresses (IPv6): P=%s, S=%s", p_ipv6, s_ipv6)
+ self.log.info("Start ping %s from %s", s_ipv6, p_ipv6)
+ latency_result = autils.run_ping6(p_dut, s_ipv6)
+ self.log.info("The latency results are %s", latency_result)
+
+ def test_traffic_latency_single_ndp_ib_aware_only_open(self):
+ """Test IPv6 traffic latency performance on NDP with security config is open.
+ """
+ self.run_test_traffic_latency_single_ndp_ib_aware_only_open()
diff --git a/acts/tests/google/wifi/aware/stress/MessagesStressTest.py b/acts/tests/google/wifi/aware/stress/MessagesStressTest.py
index fbe95d4..153a81f 100644
--- a/acts/tests/google/wifi/aware/stress/MessagesStressTest.py
+++ b/acts/tests/google/wifi/aware/stress/MessagesStressTest.py
@@ -30,10 +30,18 @@
class MessagesStressTest(AwareBaseTest):
"""Set of stress tests for Wi-Fi Aware L2 (layer 2) message exchanges."""
+ # Number of the message queue depth per Uid from framework
+ MESSAGE_QUEUE_DEPTH_PER_UID = 50
# Number of iterations in the stress test (number of messages)
+ # Should be larger than MESSAGE_QUEUE_DEPTH_PER_UID
NUM_ITERATIONS = 100
+ # Number of message to send per round to avoid exceed message queue depth limit
+ # Should be less than or equal to 1/2 of MESSAGE_QUEUE_DEPTH_PER_UID
+ NUM_PER_ROUND = 20
+ NUM_ROUNDS = 5
+
# Maximum permitted percentage of messages which fail to be transmitted
# correctly
MAX_TX_FAILURE_PERCENTAGE = 2
@@ -172,39 +180,166 @@
if data[KEY_TX_OK_COUNT] > 0:
results["tx_count_success"] = results["tx_count_success"] + 1
if data[KEY_TX_OK_COUNT] > 1:
- results["tx_count_duplicate_success"] = (
- results["tx_count_duplicate_success"] + 1)
+ results["tx_count_duplicate_success"] += 1
if data[KEY_TX_FAIL_COUNT] > 0:
- results["tx_count_fail"] = results["tx_count_fail"] + 1
+ results["tx_count_fail"] += 1
if data[KEY_TX_FAIL_COUNT] > 1:
- results[
- "tx_count_duplicate_fail"] = results["tx_count_duplicate_fail"] + 1
+ results["tx_count_duplicate_fail"] += 1
if (data[KEY_TX_OK_COUNT] == 0 and data[KEY_TX_FAIL_COUNT] == 0
and data[KEY_ID] != -1):
- results["tx_count_neither"] = results["tx_count_neither"] + 1
+ results["tx_count_neither"] += 1
if data[KEY_TX_OK_COUNT] > 0 and data[KEY_RX_COUNT] == 0:
- results["tx_count_tx_ok_but_no_rx"] = (
- results["tx_count_tx_ok_but_no_rx"] + 1)
+ results["tx_count_tx_ok_but_no_rx"] += 1
if data[KEY_RX_COUNT] > 0:
- results["rx_count"] = results["rx_count"] + 1
+ results["rx_count"] += 1
if data[KEY_RX_COUNT] > 1:
- results[
- "rx_count_duplicate"] = results["rx_count_duplicate"] + 1
+ results["rx_count_duplicate"] += 1
if data[KEY_RX_COUNT] > 0 and data[KEY_TX_OK_COUNT] == 0:
- results["rx_count_no_ok_tx_indication"] = (
- results["rx_count_no_ok_tx_indication"] + 1)
+ results["rx_count_no_ok_tx_indication"] += 1
if data[KEY_RX_COUNT] > 0 and data[KEY_TX_FAIL_COUNT] > 0:
- results["rx_count_fail_tx_indication"] = (
- results["rx_count_fail_tx_indication"] + 1)
+ results["rx_count_fail_tx_indication"] += 1
if data[KEY_RX_COUNT] > 0 and data[KEY_ID] == -1:
- results[
- "rx_count_no_tx_message"] = results["rx_count_no_tx_message"] + 1
+ results["rx_count_no_tx_message"] += 1
#######################################################################
@test_tracker_info(uuid="e88c060f-4ca7-41c1-935a-d3d62878ec0b")
- def test_stress_message(self):
- """Stress test for bi-directional message transmission and reception."""
+ def test_stress_message_no_throttling(self):
+ """Stress test for bi-directional message transmission and reception no throttling"""
+ p_dut = self.android_devices[0]
+ s_dut = self.android_devices[1]
+
+ # Start up a discovery session
+ discovery_data = autils.create_discovery_pair(
+ p_dut,
+ s_dut,
+ p_config=autils.create_discovery_config(
+ self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED),
+ s_config=autils.create_discovery_config(
+ self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE),
+ device_startup_offset=self.device_startup_offset,
+ msg_id=self.get_next_msg_id())
+ p_id = discovery_data[0]
+ s_id = discovery_data[1]
+ p_disc_id = discovery_data[2]
+ s_disc_id = discovery_data[3]
+ peer_id_on_sub = discovery_data[4]
+ peer_id_on_pub = discovery_data[5]
+
+ # Store information on Tx & Rx messages
+ messages_by_msg = {} # keyed by message text
+ # {text -> {id, tx_ok_count, tx_fail_count, rx_count}}
+ messages_by_id = {} # keyed by message ID {id -> text}
+ iterations = 0
+ p_tx_ok_count_total = 0
+ p_tx_fail_count_total = 0
+ p_tx_unknown_id_total = 0
+ s_tx_ok_count_total = 0
+ s_tx_fail_count_total = 0
+ s_tx_unknown_id_total = 0
+
+ # First round will fill up the message queue
+ num_of_messages_this_round = self.MESSAGE_QUEUE_DEPTH_PER_UID
+
+ # send messages (one in each direction) in rounds to avoid exceed the queue limit
+ for j in range(self.NUM_ROUNDS):
+ for k in range(num_of_messages_this_round):
+ msg_p2s = "Message Publisher -> Subscriber #%d" % iterations
+ next_msg_id = self.get_next_msg_id()
+ self.init_info(msg_p2s, next_msg_id, messages_by_msg,
+ messages_by_id)
+ p_dut.droid.wifiAwareSendMessage(p_disc_id, peer_id_on_pub,
+ next_msg_id, msg_p2s, 0)
+
+ msg_s2p = "Message Subscriber -> Publisher #%d" % iterations
+ next_msg_id = self.get_next_msg_id()
+ self.init_info(msg_s2p, next_msg_id, messages_by_msg,
+ messages_by_id)
+ s_dut.droid.wifiAwareSendMessage(s_disc_id, peer_id_on_sub,
+ next_msg_id, msg_s2p, 0)
+ iterations += 1
+
+ # wait for message tx confirmation
+ (p_tx_ok_count, p_tx_fail_count, p_tx_unknown_id) = self.wait_for_tx_events(
+ p_dut, self.NUM_PER_ROUND, messages_by_msg, messages_by_id)
+ p_tx_ok_count_total += p_tx_ok_count
+ p_tx_fail_count_total += p_tx_fail_count
+ p_tx_unknown_id_total += p_tx_unknown_id
+ (s_tx_ok_count, s_tx_fail_count, s_tx_unknown_id) = self.wait_for_tx_events(
+ s_dut, self.NUM_PER_ROUND, messages_by_msg, messages_by_id)
+ s_tx_ok_count_total += s_tx_ok_count
+ s_tx_fail_count_total += s_tx_fail_count
+ s_tx_unknown_id_total += s_tx_unknown_id
+
+ num_of_messages_this_round = self.NUM_PER_ROUND
+
+ # wait for the rest message tx confirmation
+ p_tx_total = p_tx_ok_count_total + p_tx_fail_count_total + p_tx_unknown_id_total
+ s_tx_total = s_tx_ok_count_total + s_tx_fail_count_total + s_tx_unknown_id_total
+ (p_tx_ok_count, p_tx_fail_count, p_tx_unknown_id) = self.wait_for_tx_events(
+ p_dut, iterations - p_tx_total, messages_by_msg, messages_by_id)
+ (s_tx_ok_count, s_tx_fail_count, s_tx_unknown_id) = self.wait_for_tx_events(
+ s_dut, iterations - s_tx_total, messages_by_msg, messages_by_id)
+ p_tx_ok_count_total += p_tx_ok_count
+ p_tx_fail_count_total += p_tx_fail_count
+ p_tx_unknown_id_total += p_tx_unknown_id
+ s_tx_ok_count_total += s_tx_ok_count
+ s_tx_fail_count_total += s_tx_fail_count
+ s_tx_unknown_id_total += s_tx_unknown_id
+ self.log.info(
+ "Transmission done: pub=%d, sub=%d transmitted successfully",
+ p_tx_ok_count_total, s_tx_ok_count_total)
+
+ # wait for message rx confirmation (giving it the total number of messages
+ # transmitted rather than just those transmitted correctly since sometimes
+ # the Tx doesn't get that information correctly. I.e. a message the Tx
+ # thought was not transmitted correctly is actually received - missing ACK?
+ # bug?)
+ self.wait_for_rx_events(p_dut, iterations, messages_by_msg)
+ self.wait_for_rx_events(s_dut, iterations, messages_by_msg)
+
+ # analyze results
+ results = {}
+ results["tx_count"] = 2 * iterations
+ results["tx_unknown_ids"] = p_tx_unknown_id_total + s_tx_unknown_id_total
+ self.analyze_results(results, messages_by_msg)
+
+ # clear errors
+ asserts.assert_equal(results["tx_unknown_ids"], 0,
+ "Message ID corruption", results)
+ asserts.assert_equal(results["tx_count_neither"], 0,
+ "Tx message with no success or fail indication",
+ results)
+ asserts.assert_equal(results["tx_count_duplicate_fail"], 0,
+ "Duplicate Tx fail messages", results)
+ asserts.assert_equal(results["tx_count_duplicate_success"], 0,
+ "Duplicate Tx success messages", results)
+ asserts.assert_equal(results["rx_count_no_tx_message"], 0,
+ "Rx message which wasn't sent - message corruption?", results)
+ asserts.assert_equal(results["tx_count_tx_ok_but_no_rx"], 0,
+ "Tx got ACK but Rx didn't get message", results)
+
+ # possibly ok - but flag since most frequently a bug
+ asserts.assert_equal(results["rx_count_no_ok_tx_indication"], 0,
+ "Message received but Tx didn't get ACK", results)
+ asserts.assert_equal(results["rx_count_fail_tx_indication"], 0,
+ "Message received but Tx didn't get ACK", results)
+
+ # permissible failures based on thresholds
+ asserts.assert_true(
+ results["tx_count_fail"] <=
+ (self.MAX_TX_FAILURE_PERCENTAGE * iterations * 2 / 100),
+ "Number of Tx failures exceeds threshold", extras=results)
+ asserts.assert_true(
+ results["rx_count_duplicate"] <=
+ (self.MAX_DUPLICATE_RX_PERCENTAGE * iterations * 2 / 100),
+ "Number of duplicate Rx exceeds threshold", extras=results)
+
+ asserts.explicit_pass("test_stress_message_no_throttling done", extras=results)
+
+ @test_tracker_info(uuid="546b0c6f-3071-4330-8e23-842ecbd07018")
+ def test_stress_message_throttling(self):
+ """Stress test for bi-directional message transmission and reception with throttling"""
p_dut = self.android_devices[0]
s_dut = self.android_devices[1]
@@ -295,14 +430,16 @@
# permissible failures based on thresholds
asserts.assert_true(
- results["tx_count_fail"] <=
- (self.MAX_TX_FAILURE_PERCENTAGE * self.NUM_ITERATIONS / 100),
- "Number of Tx failures exceeds threshold",
- extras=results)
- asserts.assert_true(
results["rx_count_duplicate"] <=
- (self.MAX_DUPLICATE_RX_PERCENTAGE * self.NUM_ITERATIONS / 100),
- "Number of duplicate Rx exceeds threshold",
- extras=results)
+ (self.MAX_DUPLICATE_RX_PERCENTAGE * results["tx_count_success"] / 100),
+ "Number of duplicate Rx exceeds threshold", extras=results)
- asserts.explicit_pass("test_stress_message done", extras=results)
+ # check working status message queue limit per UID
+ asserts.assert_true(
+ results["tx_count_success"] >= self.MESSAGE_QUEUE_DEPTH_PER_UID * 2,
+ "Number of messages did not reach uid message queue limit", extras=results)
+ asserts.assert_true(
+ results["tx_count_success"] < self.NUM_ITERATIONS * 2,
+ "Seems uid message queue limit is not working, Tx all message", extras=results)
+
+ asserts.explicit_pass("test_stress_message_throttling done", extras=results)
diff --git a/acts/tests/google/wifi/p2p/functional/WifiP2pSnifferTest.py b/acts/tests/google/wifi/p2p/functional/WifiP2pSnifferTest.py
index f9d2b23..7fd331e 100644
--- a/acts/tests/google/wifi/p2p/functional/WifiP2pSnifferTest.py
+++ b/acts/tests/google/wifi/p2p/functional/WifiP2pSnifferTest.py
@@ -17,6 +17,7 @@
import acts.test_utils.wifi.wifi_test_utils as wutils
import acts.utils
import time
+import re
from acts import asserts
from acts import utils
@@ -32,6 +33,7 @@
WPS_DISPLAY = wp2putils.WifiP2PEnums.WpsInfo.WIFI_WPS_INFO_DISPLAY
WPS_KEYPAD = wp2putils.WifiP2PEnums.WpsInfo.WIFI_WPS_INFO_KEYPAD
DEFAULT_TIMEOUT = 10
+WAIT_TIME = 60
class WifiP2pSnifferTest(WifiP2pBaseTest):
"""Tests factory MAC is not leaked for p2p discovery and associated cases.
@@ -46,6 +48,8 @@
def setup_class(self):
super(WifiP2pSnifferTest, self).setup_class()
+ self.dut1_mac = self.get_p2p_mac_address(self.dut1)
+ self.dut2_mac = self.get_p2p_mac_address(self.dut2)
wp2putils.wifi_p2p_set_channels_for_current_group(self.dut1, 6, 6)
wp2putils.wifi_p2p_set_channels_for_current_group(self.dut2, 6, 6)
self.configure_packet_capture()
@@ -56,7 +60,6 @@
self.packet_capture, '2g', self.test_name)
def teardown_test(self):
- self.verify_mac_no_leakage()
super(WifiP2pSnifferTest, self).teardown_test()
def configure_packet_capture(self):
@@ -66,6 +69,13 @@
if not result:
raise ValueError("Failed to configure channel for 2G band")
+ def get_p2p_mac_address(self, dut):
+ """Gets the current MAC address being used for Wi-Fi Direct."""
+ dut.reboot()
+ time.sleep(WAIT_TIME)
+ out = dut.adb.shell("ifconfig p2p0")
+ return re.match(".* HWaddr (\S+).*", out, re.S).group(1)
+
def verify_mac_no_leakage(self):
time.sleep(DEFAULT_TIMEOUT)
self.log.info("Stopping packet capture")
@@ -74,8 +84,8 @@
pcap_fname = '%s_%s.pcap' % (self.pcap_procs[BAND_2G][1],
BAND_2G.upper())
packets = rdpcap(pcap_fname)
- wutils.verify_mac_not_found_in_pcap(self.dut1_mac, packets)
- wutils.verify_mac_not_found_in_pcap(self.dut2_mac, packets)
+ wutils.verify_mac_not_found_in_pcap(self.dut1, self.dut1_mac, packets)
+ wutils.verify_mac_not_found_in_pcap(self.dut2, self.dut2_mac, packets)
"""Test Cases"""
@test_tracker_info(uuid=" d04e62dc-e1ef-4cea-86e6-39f0dd08fb6b")
@@ -88,6 +98,7 @@
self.log.info("Device discovery")
wp2putils.find_p2p_device(self.dut1, self.dut2)
wp2putils.find_p2p_device(self.dut2, self.dut1)
+ self.verify_mac_no_leakage()
@test_tracker_info(uuid="6a02be84-912d-4b5b-8dfa-fd80d2554c55")
def test_p2p_connect_via_pbc_and_ping_and_reconnect_sniffer(self):
@@ -136,3 +147,6 @@
wp2putils.p2p_disconnect(gc_dut)
wp2putils.check_disconnect(go_dut)
time.sleep(p2pconsts.DEFAULT_FUNCTION_SWITCH_TIME)
+
+ # teardown
+ self.verify_mac_no_leakage()