Merge "Integrate Google WiFi APs into retail AP class enabling unified API."
diff --git a/acts/framework/acts/controllers/android_device.py b/acts/framework/acts/controllers/android_device.py
index d41a21c..e089272 100755
--- a/acts/framework/acts/controllers/android_device.py
+++ b/acts/framework/acts/controllers/android_device.py
@@ -1483,6 +1483,9 @@
def exit_setup_wizard(self):
if not self.is_user_setup_complete() or self.is_setupwizard_on():
+ # b/116709539 need this to prevent reboot after skip setup wizard
+ self.adb.shell(
+ "am start -a com.android.setupwizard.EXIT", ignore_status=True)
self.adb.shell(
"pm disable %s" % self.get_setupwizard_package_name())
# Wait up to 5 seconds for user_setup_complete to be updated
diff --git a/acts/framework/acts/controllers/anritsu_lib/md8475a.py b/acts/framework/acts/controllers/anritsu_lib/md8475a.py
index 6d0fb7a..d3a0aa0 100644
--- a/acts/framework/acts/controllers/anritsu_lib/md8475a.py
+++ b/acts/framework/acts/controllers/anritsu_lib/md8475a.py
@@ -516,7 +516,7 @@
cmd = "IMSCSCFCALL {},{}".format(virtual_network_id, action)
self.send_command(cmd)
- def send_query(self, query, sock_timeout=10):
+ def send_query(self, query, sock_timeout=120):
""" Sends a Query message to Anritsu and return response
Args:
@@ -540,7 +540,7 @@
except socket.error:
raise AnritsuError("Socket Error")
- def send_command(self, command, sock_timeout=20):
+ def send_command(self, command, sock_timeout=120):
""" Sends a Command message to Anritsu
Args:
diff --git a/acts/framework/acts/controllers/sl4a_lib/rpc_connection.py b/acts/framework/acts/controllers/sl4a_lib/rpc_connection.py
index 3876789..b062043 100644
--- a/acts/framework/acts/controllers/sl4a_lib/rpc_connection.py
+++ b/acts/framework/acts/controllers/sl4a_lib/rpc_connection.py
@@ -58,8 +58,9 @@
def _log_formatter(message):
"""Defines the formatting used in the logger."""
- return '[SL4A Client|%s|%s] %s' % (self.adb.serial, self.uid,
- message)
+ return '[SL4A Client|%s|%s|%s] %s' % (self.adb.serial,
+ self.ports.client_port,
+ self.uid, message)
self.log = logger.create_logger(_log_formatter)
diff --git a/acts/framework/acts/test_utils/car/car_telecom_utils.py b/acts/framework/acts/test_utils/car/car_telecom_utils.py
index 74e9fd6..4516aea 100644
--- a/acts/framework/acts/test_utils/car/car_telecom_utils.py
+++ b/acts/framework/acts/test_utils/car/car_telecom_utils.py
@@ -176,7 +176,7 @@
return True
-def hangup_conf(log, ad, conf_id):
+def hangup_conf(log, ad, conf_id, timeout=10):
"""Hangup a conference call
Args:
@@ -195,36 +195,18 @@
log.info("We are not in-call {}".format(ad.serial))
return False
- # Get the list of children for this conference.
- all_calls = get_call_id_children(log, ad, conf_id)
-
- # All calls that needs disconnecting (Parent + Children)
- all_calls.add(conf_id)
-
- # Make sure we are registered with the events.
- ad.droid.telecomStartListeningForCallRemoved()
-
# Disconnect call.
ad.droid.telecomCallDisconnect(conf_id)
- # Wait for removed event.
- while len(all_calls) > 0:
- event = None
- try:
- event = ad.ed.pop_event(
- tel_defines.EventTelecomCallRemoved,
- tel_defines.MAX_WAIT_TIME_CONNECTION_STATE_UPDATE)
- except queue.Empty:
- log.info("Did not get TelecomCallRemoved event")
- ad.droid.telecomStopListeningForCallRemoved()
- return False
-
- removed_call_id = event['data']['CallId']
- all_calls.remove(removed_call_id)
- log.info("Removed call {} left calls {}".format(removed_call_id, all_calls))
-
- ad.droid.telecomStopListeningForCallRemoved()
- return True
+ start_time = time.time()
+ while time.time() < start_time + timeout:
+ call_ids = get_calls_in_states(log, ad, [tel_defines.CALL_STATE_ACTIVE])
+ log.debug("Active calls {}".format(call_ids))
+ if not call_ids:
+ return True
+ time.sleep(1)
+ log.error("Failed to hang up all conference participants")
+ return False
def accept_call(log, ad, call_id):
"""Accept a number
@@ -440,72 +422,34 @@
return False
return True
-def wait_for_conference(log, ad, conf_calls):
+def wait_for_conference(log, ad, participants=2, timeout=10):
"""Wait for the droid to be in a conference with calls specified
in conf_calls.
Args:
log: log object
ad: android device object
- conf_calls: List of calls that should transition to conference
+ participants: conference participant count
Returns:
call_id if success, None if fail.
"""
- conf_calls = set(conf_calls)
+
+ def get_conference_id(callers):
+ for call_id in callers:
+ call_details = ad.droid.telecomCallGetCallById(call_id).get("Details")
+ if set([tel_defines.CALL_PROPERTY_CONFERENCE]).issubset(call_details.get("Properties")):
+ return call_id
+ return None
log.info("waiting for conference {}".format(ad.serial))
- ad.droid.telecomStartListeningForCallAdded()
-
- call_ids = ad.droid.telecomCallGetCallIds()
-
- # Check if we have a conference call and if the children match
- for call_id in call_ids:
- call_chld = get_call_id_children(log, ad, call_id)
- if call_chld == conf_calls:
- ad.droid.telecomStopListeningForCallAdded()
- return call_id
-
- # If not poll for new calls.
- event = None
- call_id = None
- try:
- event = ad.ed.pop_event(
- tel_defines.EventTelecomCallAdded,
- tel_defines.MAX_WAIT_TIME_CALLEE_RINGING)
- log.info("wait_for_conference event {} droid {}".format(
- event, ad.serial))
- except queue.Empty:
- log.error("Did not get {} droid {}".format(
- tel_defines.EventTelecomCallAdded,
- ad.serial))
- return None
- finally:
- ad.droid.telecomStopListeningForCallAdded()
- call_id = event['data']['CallId']
-
- # Now poll until the children change.
- ad.droid.telecomCallStartListeningForEvent(
- call_id, tel_defines.EVENT_CALL_CHILDREN_CHANGED)
-
- event = None
- while True:
- try:
- event = ad.ed.pop_event(
- tel_defines.EventTelecomCallChildrenChanged,
- tel_defines.MAX_WAIT_TIME_CONNECTION_STATE_UPDATE)
- call_chld = set(event['data']['Event'])
- log.info("wait_for_conference children chld event {}".format(call_chld))
- if call_chld == conf_calls:
- ad.droid.telecomCallStopListeningForEvent(
- call_id, tel_defines.EVENT_CALL_CHILDREN_CHANGED)
- return call_id
- except queue.Empty:
- log.error("Did not get {} droid {}".format(
- tel_defines.EventTelecomCallChildrenChanged, ad.serial))
- ad.droid.telecomCallStopListeningForEvent(
- call_id, tel_defines.EVENT_CALL_CHILDREN_CHANGED)
- return None
+ start_time = time.time()
+ while time.time() < start_time + timeout:
+ participant_callers = get_calls_in_states(log, ad, [tel_defines.CALL_STATE_ACTIVE])
+ if (len(participant_callers) == participants + 1):
+ return get_conference_id(participant_callers)
+ time.sleep(1)
+ return None
def get_call_id_children(log, ad, call_id):
"""Return the list of calls that are children to call_id
diff --git a/acts/framework/acts/test_utils/power/PowerBaseTest.py b/acts/framework/acts/test_utils/power/PowerBaseTest.py
index 59aab6e..c5cdf77 100644
--- a/acts/framework/acts/test_utils/power/PowerBaseTest.py
+++ b/acts/framework/acts/test_utils/power/PowerBaseTest.py
@@ -411,7 +411,7 @@
# Retry loop to recover monsoon from error
retry_monsoon = 1
while retry_monsoon <= RECOVER_MONSOON_RETRY_COUNT:
- mon_status = self.monsoon_recover(self.mon_info.dut)
+ mon_status = self.monsoon_recover()
if mon_status:
break
else:
diff --git a/acts/framework/acts/test_utils/power/PowerCellularLabBaseTest.py b/acts/framework/acts/test_utils/power/PowerCellularLabBaseTest.py
index bc7f46f..e9ca28a 100644
--- a/acts/framework/acts/test_utils/power/PowerCellularLabBaseTest.py
+++ b/acts/framework/acts/test_utils/power/PowerCellularLabBaseTest.py
@@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import time
import acts.test_utils.power.PowerBaseTest as PBT
from acts.controllers.anritsu_lib._anritsu_utils import AnritsuError
@@ -50,6 +51,11 @@
self.simulation = None
self.anritsu = None
+ # If callbox version was not specified in the config files, set a default value
+ if not hasattr(self, "md8475_version"):
+ self.md8475_version = "A"
+
+
def setup_class(self):
""" Executed before any test case is started.
@@ -60,6 +66,7 @@
"""
super().setup_class()
+
if hasattr(self, 'network_file'):
self.networks = self.unpack_custom_file(self.network_file, False)
self.main_network = self.networks['main_network']
@@ -81,8 +88,9 @@
"""
try:
+
self.anritsu = MD8475A(self.md8475a_ip_address, self.log,
- self.wlan_option)
+ self.wlan_option, md8475_version=self.md8475_version)
return True
except AnritsuError:
self.log.error('Error in connecting to Anritsu Callbox')
@@ -108,10 +116,6 @@
# Remove the 'test' keyword
self.parameters.remove('test')
- # Changing cell parameters requires the phone to be detached
- if self.simulation:
- self.simulation.stop()
-
# Decide what type of simulation and instantiate it if needed
if self.consume_parameter(self.PARAM_SIM_TYPE_LTE):
self.init_simulation(self.PARAM_SIM_TYPE_LTE)
@@ -123,12 +127,18 @@
self.log.error("Simulation type needs to be indicated in the test name.")
return False
- # Parse simulation parameters
+ # Changing cell parameters requires the phone to be detached
+ self.simulation.detach()
+
+ # Parse simulation parameters. This may return false if incorrect values are passed.
if not self.simulation.parse_parameters(self.parameters):
return False
+ # Wait for new params to settle
+ time.sleep(4)
+
# Attach the phone to the basestation
- self.simulation.start()
+ self.simulation.attach()
# Make the device go to sleep
self.dut.droid.goToSleepNow()
@@ -187,26 +197,27 @@
type: defines the type of simulation to be started.
"""
- if sim_type == self.PARAM_SIM_TYPE_LTE:
+ simulation_dictionary = {
+ self.PARAM_SIM_TYPE_LTE: LteSimulation,
+ self.PARAM_SIM_TYPE_UMTS: UmtsSimulation,
+ self.PARAM_SIM_TYPE_GSM: GsmSimulation,
+ }
- if self.simulation and type(self.simulation) is LteSimulation:
- # The simulation object we already have is enough.
- return
+ if not sim_type in simulation_dictionary:
+ raise ValueError("The provided simulation type is invalid.")
- # Instantiate a new simulation
- self.simulation = LteSimulation(self.anritsu, self.log, self.dut)
+ simulation_class = simulation_dictionary[sim_type]
- elif sim_type == self.PARAM_SIM_TYPE_UMTS:
+ if isinstance(self.simulation, simulation_class):
+ # The simulation object we already have is enough.
+ return
- if self.simulation and type(self.simulation) is UmtsSimulation:
- return
+ if self.simulation:
+ # Make sure the simulation is stopped before loading a new one
+ self.simulation.stop()
- self.simulation = UmtsSimulation(self.anritsu, self.log, self.dut)
+ # Instantiate a new simulation
+ self.simulation = simulation_class(self.anritsu, self.log, self.dut)
- elif sim_type == self.PARAM_SIM_TYPE_GSM:
-
- if self.simulation and type(self.simulation) is GsmSimulation:
- return
-
- self.simulation = GsmSimulation(self.anritsu, self.log, self.dut)
-
+ # Start the simulation
+ self.simulation.start()
diff --git a/acts/framework/acts/test_utils/power/tel_simulations/BaseSimulation.py b/acts/framework/acts/test_utils/power/tel_simulations/BaseSimulation.py
index 01491e3..459ddf1 100644
--- a/acts/framework/acts/test_utils/power/tel_simulations/BaseSimulation.py
+++ b/acts/framework/acts/test_utils/power/tel_simulations/BaseSimulation.py
@@ -70,17 +70,32 @@
self.sim_dl_power = None
self.sim_ul_power = None
- def start(self):
- """ Start simulation and attach the DUT to the basestation
+ # Set to default APN
+ log.info("Setting preferred APN to anritsu1.com.")
+ dut.droid.telephonySetAPN("anritsu1.com", "anritsu1.com")
- Starts the simulation in the Anritsu Callbox and waits for the
- UE to attach.
+
+ def start(self):
+ """ Start simulation.
+
+ Starts the simulation in the Anritsu Callbox.
"""
+ # Make sure airplane mode is on so the phone won't attach right away
+ toggle_airplane_mode(self.log, self.dut, True)
+
# Start simulation if it wasn't started
self.anritsu.start_simulation()
+ def attach(self):
+ """ Attach the phone to the basestation.
+
+ Sets a good signal level, toggles airplane mode
+ and waits for the phone to attach.
+
+ """
+
# Turn on airplane mode
toggle_airplane_mode(self.log, self.dut, True)
@@ -102,6 +117,22 @@
if self.sim_ul_power:
self.set_uplink_tx_power(self.sim_ul_power)
+ def detach(self):
+ """ Detach the phone from the basestation.
+
+ Turns airplane mode and resets basestation.
+ """
+
+ # Set the DUT to airplane mode so it doesn't see the cellular network going off
+ toggle_airplane_mode(self.log, self.dut, True)
+
+ # Wait for APM to propagate
+ time.sleep(2)
+
+ # Power off basestation
+ self.anritsu.set_simulation_state_to_poweroff()
+
+
def stop(self):
""" Detach phone from the basestation by stopping the simulation.
@@ -109,10 +140,15 @@
"""
- self.anritsu.stop_simulation()
-
+ # Set the DUT to airplane mode so it doesn't see the cellular network going off
toggle_airplane_mode(self.log, self.dut, True)
+ # Wait for APM to propagate
+ time.sleep(2)
+
+ # Stop the simulation
+ self.anritsu.stop_simulation()
+
def parse_parameters(self, parameters):
""" Configures simulation using a list of parameters.
@@ -217,8 +253,8 @@
if self.dl_path_loss and self.ul_path_loss:
self.log.info("Measurements are already calibrated.")
- # Start simulation if needed
- self.start()
+ # Attach the phone to the base station
+ self.attach()
# If downlink or uplink were not yet calibrated, do it now
if not self.dl_path_loss:
@@ -226,8 +262,8 @@
if not self.ul_path_loss:
self.ul_path_loss = self.uplink_calibration(self.bts1)
- # Stop simulation after calibrating
- self.stop()
+ # Detach after calibrating
+ self.detach()
def downlink_calibration(self, bts, rat = None, power_units_conversion_func = None):
@@ -305,6 +341,10 @@
# Calculate Path Loss
down_call_path_loss = self.DOWNLINK_CAL_TARGET_POWER_DBM - avg_down_power
+ # Validate the result
+ if not 0 < down_call_path_loss < 100:
+ raise RuntimeError("Downlink calibration failed. The calculated path loss value was {} dBm.".format(down_call_path_loss))
+
self.log.info("Measured downlink path loss: {} dB".format(down_call_path_loss))
return down_call_path_loss
@@ -381,12 +421,13 @@
# Phone only supports 1x1 Uplink so always chain 0
avg_up_power = np.nanmean(up_power_per_chain[0])
if np.isnan(avg_up_power):
- raise ValueError("Calibration failed because the callbox reported the chain to be deactive.")
+ raise RuntimeError("Calibration failed because the callbox reported the chain to be deactive.")
up_call_path_loss = target_power - avg_up_power
- self.up_call_path_loss = up_call_path_loss
- self.up_call_power_per_chain = up_power_per_chain
+ # Validate the result
+ if not 0 < up_call_path_loss < 100:
+ raise RuntimeError("Uplink calibration failed. The calculated path loss value was {} dBm.".format(up_call_path_loss))
self.log.info("Measured uplink path loss: {} dB".format(up_call_path_loss))
diff --git a/acts/framework/acts/test_utils/power/tel_simulations/LteSimulation.py b/acts/framework/acts/test_utils/power/tel_simulations/LteSimulation.py
index fb02de8..7985147 100644
--- a/acts/framework/acts/test_utils/power/tel_simulations/LteSimulation.py
+++ b/acts/framework/acts/test_utils/power/tel_simulations/LteSimulation.py
@@ -104,9 +104,6 @@
else:
log.info("Preferred network type set.")
- set_preferred_apn_by_adb(self.dut, "anritsu1.com")
- log.info("Prefered apn set to anritsu1.com")
-
def parse_parameters(self, parameters):
""" Configs an LTE simulation using a list of parameters.
diff --git a/acts/tests/google/ble/fuchsia_tests/BleFuchsiaTest.py b/acts/tests/google/ble/fuchsia_tests/BleFuchsiaTest.py
index 021a8bc..c80e28c 100644
--- a/acts/tests/google/ble/fuchsia_tests/BleFuchsiaTest.py
+++ b/acts/tests/google/ble/fuchsia_tests/BleFuchsiaTest.py
@@ -13,30 +13,24 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
-"""This script shows simple examples of how to get started with bluetooth
- low energy testing in acts.
+"""This scrip tests various BLE apis for Fuchsia devices.
"""
import pprint
import random
import time
-from acts.controllers import android_device
-from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
-from acts.test_utils.bt.bt_constants import adv_succ
-from acts.test_utils.bt.bt_constants import scan_result
-from acts.test_utils.bt.bt_test_utils import cleanup_scanners_and_advertisers
-from acts.test_utils.bt.bt_test_utils import reset_bluetooth
+from acts.base_test import BaseTestClass
-class BleFuchsiaTest(BluetoothBaseTest):
+class BleFuchsiaTest(BaseTestClass):
default_timeout = 10
active_scan_callback_list = []
active_adv_callback_list = []
droid = None
def __init__(self, controllers):
- BluetoothBaseTest.__init__(self, controllers)
+ BaseTestClass.__init__(self, controllers)
if (len(self.fuchsia_devices) < 2):
self.log.error("BleFuchsiaTest Init: Not enough fuchsia devices.")
@@ -48,7 +42,6 @@
self.fuchsia_adv.clean_up()
self.fuchsia_scan.clean_up()
- @BluetoothBaseTest.bt_test_wrap
def test_fuchsia_publish_service(self):
service_id = 0
service_primary = True
@@ -63,7 +56,6 @@
return True
- @BluetoothBaseTest.bt_test_wrap
def test_fuchsia_scan_fuchsia_adv(self):
# Initialize advertising on fuchsia dveice with name and interval
fuchsia_name = "testADV1234"
@@ -109,7 +101,6 @@
return res
- @BluetoothBaseTest.bt_test_wrap
def test_fuchsia_gatt_fuchsia_periph(self):
# Create random service with id, primary, and uuid
service_id = 3
diff --git a/acts/tests/google/bt/car_bt/BtCarHfpConferenceTest.py b/acts/tests/google/bt/car_bt/BtCarHfpConferenceTest.py
index 79195f4..6a12e8f 100644
--- a/acts/tests/google/bt/car_bt/BtCarHfpConferenceTest.py
+++ b/acts/tests/google/bt/car_bt/BtCarHfpConferenceTest.py
@@ -66,12 +66,12 @@
1. Devices are connected over HFP.
Steps:
- 1. Make a call from re to AG
- 2. Wait for dialing on re and ringing on HF/AG.
- 3. Accept the call on HF
- 4. Make a call on RE2 to AG
- 5. Wait for dialing on re and ringing on HF/AG.
- 6. Accept the call on HF.
+ 1. Make a call from AG to RE
+ 2. Wait for dialing on re and ringing on RE/HF.
+ 3. Accept the call on RE
+ 4. Make a call from AG to RE2
+ 5. Wait for dialing on re and ringing on RE2/HF.
+ 6. Accept the call on RE2.
7. See that HF/AG have one active and one held call.
8. Merge the call on HF.
9. Verify that we have a conference call on HF/AG.
@@ -84,15 +84,15 @@
Priority: 0
"""
- timeout_for_state_updates = 3
- # Dial AG from re
- if not initiate_call(self.log, self.re, self.ag_phone_number):
+
+ # Dial RE from AG
+ if not initiate_call(self.log, self.ag, self.re_phone_number):
self.log.error("Failed to initiate call from re.")
return False
# Wait for dialing/ringing
ret = True
- ret &= wait_for_ringing_call(self.log, self.ag)
+ ret &= wait_for_ringing_call(self.log, self.re)
ret &= car_telecom_utils.wait_for_ringing(self.log, self.hf)
if not ret:
@@ -103,24 +103,28 @@
time.sleep(SHORT_TIMEOUT)
# Extract the call.
call_1 = car_telecom_utils.get_calls_in_states(
- self.log, self.hf, [tel_defines.CALL_STATE_RINGING])
+ self.log, self.hf, [tel_defines.CALL_STATE_DIALING])
if len(call_1) != 1:
self.hf.log.error("Call State in ringing failed {}".format(call_1))
return False
- # Accept the call on HF
- if not car_telecom_utils.accept_call(self.log, self.hf, call_1[0]):
+ re_ringing_call_id = car_telecom_utils.get_calls_in_states(
+ self.log, self.re, [tel_defines.CALL_STATE_RINGING])
+
+ # Accept the call on RE
+ if not car_telecom_utils.accept_call(self.log, self.re, re_ringing_call_id[0]):
self.hf.log.error("Accepting call failed {}".format(
self.hf.serial))
return False
- # Dial another call from RE2
- if not initiate_call(self.log, self.re2, self.ag_phone_number):
+ time.sleep(SHORT_TIMEOUT)
+ # Dial another call to RE2
+ if not initiate_call(self.log, self.ag, self.re2_phone_number):
self.re2.log.error("Failed to initiate call from re.")
return False
# Wait for dialing/ringing
- ret &= wait_for_ringing_call(self.log, self.ag)
+ ret &= wait_for_ringing_call(self.log, self.re2)
ret &= car_telecom_utils.wait_for_ringing(self.log, self.hf)
if not ret:
@@ -132,30 +136,34 @@
# Extract the call.
# input("Continue?")
call_2 = car_telecom_utils.get_calls_in_states(
- self.log, self.hf, [tel_defines.CALL_STATE_RINGING])
+ self.log, self.hf, [tel_defines.CALL_STATE_DIALING])
if len(call_2) != 1:
self.hf.log.info("Call State in ringing failed {}".format(call_2))
return False
+ re2_ringing_call_id = car_telecom_utils.get_calls_in_states(
+ self.log, self.re2, [tel_defines.CALL_STATE_RINGING])
+
# Accept the call on HF
- if not car_telecom_utils.accept_call(self.log, self.hf, call_2[0]):
+ if not car_telecom_utils.accept_call(self.log, self.re2, re2_ringing_call_id[0]):
self.hf.log.info("Accepting call failed {}".format(self.hf.serial))
return False
+ # Give time before merge for state to update due to carrier limitations
+ time.sleep(SHORT_TIMEOUT)
+
# Merge the calls now.
self.hf.droid.telecomCallJoinCallsInConf(call_1[0], call_2[0])
# Check if we are in conference with call_1 and call_2
- conf_call_id = car_telecom_utils.wait_for_conference(
- self.log, self.hf, [call_1[0], call_2[0]])
- if conf_call_id == None:
+ conf_call_id = car_telecom_utils.wait_for_conference(self.log, self.hf, participants=2)
+ if conf_call_id is None:
self.hf.log.error("Did not get the conference setup correctly")
return False
# Now hangup the conference call.
if not car_telecom_utils.hangup_conf(self.log, self.hf, conf_call_id):
- self.hf.log.error("Could not hangup conference call {}!".format(
- conf_call_id))
+ self.hf.log.error("Could not hangup conference call {}!".format(conf_call_id))
return False
return True
diff --git a/acts/tests/google/wifi/WifiP2pManagerTest.py b/acts/tests/google/wifi/WifiP2pManagerTest.py
new file mode 100644
index 0000000..b3eb686
--- /dev/null
+++ b/acts/tests/google/wifi/WifiP2pManagerTest.py
@@ -0,0 +1,139 @@
+#!/usr/bin/env python3
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import acts.test_utils.wifi.wifi_test_utils as wutils
+
+from acts import asserts
+from acts import utils
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
+
+DEFAULT_TIMEOUT = 30
+DEFAULT_SLEEPTIME = 5
+
+class WifiP2pManagerTest(WifiBaseTest):
+ """Tests for APIs in Android's WifiP2pManager class.
+
+ Test Bed Requirement:
+ * Two Android devices
+ """
+
+ def __init__(self, controllers):
+ WifiBaseTest.__init__(self, controllers)
+
+ def setup_class(self):
+ self.dut = self.android_devices[0]
+ self.dut_client = self.android_devices[1]
+
+ wutils.wifi_test_device_init(self.dut)
+ self.dut.droid.wifiP2pInitialize()
+ asserts.assert_true(self.dut.droid.wifiP2pIsEnabled(),
+ "DUT's p2p should be initialized but it didn't")
+ self.dut_name = "Android_" + utils.rand_ascii_str(4)
+ self.dut.droid.wifiP2pSetDeviceName(self.dut_name)
+ wutils.wifi_test_device_init(self.dut_client)
+ self.dut_client.droid.wifiP2pInitialize()
+ asserts.assert_true(self.dut_client.droid.wifiP2pIsEnabled(),
+ "Peer's p2p should be initialized but it didn't")
+ self.dut_client_name = "Android_" + utils.rand_ascii_str(4)
+ self.dut_client.droid.wifiP2pSetDeviceName(self.dut_client_name)
+
+ def teardown_class(self):
+ self.dut.droid.wifiP2pClose()
+ self.dut_client.droid.wifiP2pClose()
+
+ def setup_test(self):
+ for ad in self.android_devices:
+ ad.droid.wakeLockAcquireBright()
+ ad.droid.wakeUpNow()
+
+ def teardown_test(self):
+ # Clear p2p group info
+ for ad in self.android_devices:
+ ad.droid.wifiP2pRequestPersistentGroupInfo()
+ event = ad.ed.pop_event("WifiP2pOnPersistentGroupInfoAvailable", DEFAULT_TIMEOUT)
+ for network in event['data']:
+ ad.droid.wifiP2pDeletePersistentGroup(network['NetworkId'])
+ ad.droid.wakeLockRelease()
+ ad.droid.goToSleepNow()
+
+ def on_fail(self, test_name, begin_time):
+ for ad in self.android_devices:
+ ad.take_bug_report(test_name, begin_time)
+ ad.cat_adb_log(test_name, begin_time)
+
+ """Helper Functions"""
+
+ def _is_discovered(self, event, device_name):
+ for device in event['data']['Peers']:
+ if device['Name'] == device_name:
+ return True
+ return False
+
+ """Test Cases"""
+ @test_tracker_info(uuid="28ddb16c-2ce4-44da-92f9-701d0dacc321")
+ def test_p2p_discovery(self):
+ """Verify the p2p discovery functionality
+
+ Steps:
+ 1. Discover the target device
+ """
+ # Discover the target device
+ self.log.info("Device discovery")
+ self.dut.droid.wifiP2pDiscoverPeers()
+ self.dut_client.droid.wifiP2pDiscoverPeers()
+ dut_event = self.dut.ed.pop_event("WifiP2pOnPeersAvailable", DEFAULT_TIMEOUT)
+ peer_event = self.dut_client.ed.pop_event("WifiP2pOnPeersAvailable", DEFAULT_TIMEOUT)
+ asserts.assert_true(self._is_discovered(dut_event, self.dut_client_name),
+ "DUT didn't discovered peer device")
+ asserts.assert_true(self._is_discovered(peer_event, self.dut_name),
+ "Peer didn't discovered DUT device")
+
+ @test_tracker_info(uuid="708af645-6562-41da-9cd3-bdca428ac308")
+ def test_p2p_connect(self):
+ """Verify the p2p connect functionality
+
+ Steps:
+ 1. Discover the target device
+ 2. Request the connection
+ 3. Disconnect
+ """
+ # Discover the target device
+ self.log.info("Device discovery")
+ self.dut.droid.wifiP2pDiscoverPeers()
+ self.dut_client.droid.wifiP2pDiscoverPeers()
+ dut_event = self.dut.ed.pop_event("WifiP2pOnPeersAvailable", DEFAULT_TIMEOUT)
+ peer_event = self.dut_client.ed.pop_event("WifiP2pOnPeersAvailable", DEFAULT_TIMEOUT)
+ asserts.assert_true(self._is_discovered(dut_event, self.dut_client_name),
+ "DUT didn't discovered peer device")
+ asserts.assert_true(self._is_discovered(peer_event, self.dut_name),
+ "Peer didn't discovered DUT device")
+
+ # Request the connection
+ self.log.info("Create p2p connection")
+ self.dut.droid.wifiP2pConnect(self.dut_client_name)
+ time.sleep(DEFAULT_SLEEPTIME)
+ self.dut_client.droid.wifiP2pAcceptConnection()
+ self.dut.ed.pop_event("WifiP2pConnectOnSuccess", DEFAULT_TIMEOUT)
+
+ # Disconnect
+ self.log.info("Disconnect")
+ self.dut.droid.wifiP2pRemoveGroup()
+ self.dut.droid.wifiP2pRequestConnectionInfo()
+ event = self.dut.ed.pop_event("WifiP2pOnConnectionInfoAvailable", DEFAULT_TIMEOUT)
+ asserts.assert_false(event['data']['groupFormed'],
+ "P2P connection should be disconnected but it didn't")