| # |
| # Copyright 2018 - The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from acts import asserts |
| from acts.test_decorators import test_tracker_info |
| from acts.test_utils.net.net_test_utils import start_tcpdump |
| from acts.test_utils.net.net_test_utils import stop_tcpdump |
| from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest |
| from acts.test_utils.wifi import wifi_test_utils as wutils |
| |
| from scapy.all import ICMPv6ND_RA |
| from scapy.all import rdpcap |
| from scapy.all import Scapy_Exception |
| |
| import acts.base_test |
| import acts.test_utils.wifi.wifi_test_utils as wutils |
| |
| import copy |
| import os |
| import random |
| import time |
| |
| WifiEnums = wutils.WifiEnums |
| |
| RA_SCRIPT = 'sendra.py' |
| SCAPY = 'scapy-2.2.0.tar.gz' |
| SCAPY_INSTALL_COMMAND = 'sudo python setup.py install' |
| PROC_NET_SNMP6 = '/proc/net/snmp6' |
| LIFETIME_FRACTION = 6 |
| LIFETIME = 180 |
| INTERVAL = 2 |
| |
| |
| class ApfCountersTest(WifiBaseTest): |
| |
| def __init__(self, controllers): |
| WifiBaseTest.__init__(self, controllers) |
| self.tests = ("test_IPv6_RA_packets", |
| "test_IPv6_RA_with_RTT", ) |
| |
| def setup_class(self): |
| self.dut = self.android_devices[0] |
| wutils.wifi_test_device_init(self.dut) |
| req_params = [] |
| opt_param = ["reference_networks", ] |
| |
| self.unpack_userparams( |
| req_param_names=req_params, opt_param_names=opt_param) |
| |
| if "AccessPoint" in self.user_params: |
| self.legacy_configure_ap_and_start() |
| |
| asserts.assert_true( |
| len(self.reference_networks) > 0, |
| "Need at least one reference network with psk.") |
| wutils.wifi_toggle_state(self.dut, True) |
| |
| self.wpapsk_2g = self.reference_networks[0]["2g"] |
| self.wpapsk_5g = self.reference_networks[0]["5g"] |
| |
| # install scapy |
| current_dir = os.path.dirname(os.path.realpath(__file__)) |
| send_ra = os.path.join(current_dir, RA_SCRIPT) |
| send_scapy = os.path.join(current_dir, SCAPY) |
| self.access_points[0].install_scapy(send_scapy, send_ra) |
| self.tcpdump_pid = None |
| |
| def setup_test(self): |
| if 'RTT' not in self.test_name: |
| self.tcpdump_pid = start_tcpdump(self.dut, self.test_name) |
| |
| def teardown_test(self): |
| if 'RTT' not in self.test_name: |
| stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name) |
| |
| def on_fail(self, test_name, begin_time): |
| self.dut.take_bug_report(test_name, begin_time) |
| self.dut.cat_adb_log(test_name, begin_time) |
| |
| def teardown_class(self): |
| if "AccessPoint" in self.user_params: |
| del self.user_params["reference_networks"] |
| self.access_points[0].cleanup_scapy() |
| wutils.reset_wifi(self.dut) |
| |
| """ Helper methods """ |
| |
| def _get_icmp6intype134(self): |
| """ Get ICMP6 Type 134 packet count on the DUT |
| |
| Returns: |
| Number of ICMP6 type 134 packets |
| """ |
| cmd = "grep Icmp6InType134 %s || true" % PROC_NET_SNMP6 |
| ra_count = self.dut.adb.shell(cmd) |
| if not ra_count: |
| return 0 |
| ra_count = int(ra_count.split()[-1].rstrip()) |
| self.dut.log.info("RA Count %s:" % ra_count) |
| return ra_count |
| |
| def _get_rtt_list_from_tcpdump(self, pcap_file): |
| """ Get RTT of each RA pkt in a list |
| |
| Args: |
| pcap_file: tcpdump file from the DUT |
| |
| Returns: |
| List of RTT of 400 pkts |
| """ |
| rtt = [] |
| try: |
| packets = rdpcap(pcap_file) |
| except Scapy_Exception: |
| self.log.error("Not a valid pcap file") |
| return rtt |
| |
| for pkt in packets: |
| if ICMPv6ND_RA in pkt: |
| rtt.append(int(pkt[ICMPv6ND_RA].retranstimer)) |
| return rtt |
| |
| """ Tests """ |
| |
| @test_tracker_info(uuid="bc8d3f27-582a-464a-be30-556f07b77ee1") |
| def test_IPv6_RA_packets(self): |
| """ Test if the device filters the IPv6 packets |
| |
| Steps: |
| 1. Send a RA packet to DUT. DUT should accept this |
| 2. Send duplicate RA packets. The RA packets should be filtered |
| for the next 30 seconds (1/6th of RA lifetime) |
| 3. The next RA packets should be accepted |
| """ |
| # get mac address of the dut |
| ap = self.access_points[0] |
| wifi_network = copy.deepcopy(self.wpapsk_5g) |
| wifi_network['meteredOverride'] = 1 |
| wutils.connect_to_wifi_network(self.dut, wifi_network) |
| mac_addr = self.dut.droid.wifiGetConnectionInfo()['mac_address'] |
| self.log.info("mac_addr %s" % mac_addr) |
| time.sleep(30) # wait 30 sec before sending RAs |
| |
| # get the current ra count |
| ra_count = self._get_icmp6intype134() |
| |
| # Start scapy to send RA to the phone's MAC |
| ap.send_ra('wlan1', mac_addr, 0, 1) |
| |
| # get the latest ra count |
| ra_count_latest = self._get_icmp6intype134() |
| asserts.assert_true(ra_count_latest == ra_count + 1, |
| "Device dropped the first RA in sequence") |
| |
| # Generate and send 'x' number of duplicate RAs, for 1/6th of the the |
| # lifetime of the original RA. Test assumes that the original RA has a |
| # lifetime of 180s. Hence, all RAs received within the next 30s of the |
| # original RA should be filtered. |
| ra_count = ra_count_latest |
| count = LIFETIME / LIFETIME_FRACTION / INTERVAL |
| ap.send_ra('wlan1', mac_addr, interval=INTERVAL, count=count) |
| |
| # Fail test if at least 90% of RAs were not dropped. |
| ra_count_latest = self._get_icmp6intype134() |
| pkt_loss = count - (ra_count_latest - ra_count) |
| percentage_loss = float(pkt_loss) / count * 100 |
| asserts.assert_true(percentage_loss >= 90, "Device did not filter " |
| "duplicate RAs correctly. %d Percent of duplicate" |
| " RAs were accepted" % (100 - percentage_loss)) |
| |
| # Any new RA after this should be accepted. |
| ap.send_ra('wlan1', mac_addr, interval=INTERVAL, count=1) |
| ra_count_latest = self._get_icmp6intype134() |
| asserts.assert_true(ra_count_latest == ra_count + 1, |
| "Device did not accept new RA after 1/6th time " |
| "interval. Device dropped a valid RA in sequence.") |
| |
| @test_tracker_info(uuid="d2a0aff0-048c-475f-9bba-d90d8d9ebae3") |
| def test_IPv6_RA_with_RTT(self): |
| """Test if the device filters IPv6 RA packets with different re-trans time |
| |
| Steps: |
| 1. Get the current RA count |
| 2. Send 400 packets with different re-trans time |
| 3. Verify that RA count increased by 400 |
| 4. Verify internet connectivity |
| """ |
| pkt_num = 400 |
| rtt_list = random.sample(range(10, 10000), pkt_num) |
| self.log.info("RTT List: %s" % rtt_list) |
| |
| # get mac address of the dut |
| ap = self.access_points[0] |
| wutils.connect_to_wifi_network(self.dut, self.wpapsk_5g) |
| mac_addr = self.dut.droid.wifiGetConnectionInfo()['mac_address'] |
| self.log.info("mac_addr %s" % mac_addr) |
| time.sleep(30) # wait 30 sec before sending RAs |
| |
| # get the current ra count |
| ra_count = self._get_icmp6intype134() |
| |
| # start tcpdump on the device |
| tcpdump_pid = start_tcpdump(self.dut, self.test_name) |
| |
| # send RA with differnt re-trans time |
| for rtt in rtt_list: |
| ap.send_ra('wlan1', mac_addr, 0, 1, rtt=rtt) |
| |
| # stop tcpdump and pull file |
| time.sleep(60) |
| pcap_file = stop_tcpdump(self.dut, tcpdump_pid, self.test_name) |
| |
| # get the new RA count |
| new_ra_count = self._get_icmp6intype134() |
| asserts.assert_true(new_ra_count >= ra_count + pkt_num, |
| "Device did not accept all RAs") |
| |
| # verify the RA pkts RTT match |
| tcpdump_rtt_list = self._get_rtt_list_from_tcpdump(pcap_file) |
| asserts.assert_true(set(rtt_list).issubset(set(tcpdump_rtt_list)), |
| "RA packets didn't match with tcpdump") |
| |
| # verify if internet connectivity works after sending RA packets |
| wutils.validate_connection(self.dut) |