Merge "Add usb 3 and usb 2 configuration"
diff --git a/acts/tests/google/usb/UsbTetheringThroughputTest.py b/acts/tests/google/usb/UsbTetheringThroughputTest.py
index 10bfd47..0b893ea 100644
--- a/acts/tests/google/usb/UsbTetheringThroughputTest.py
+++ b/acts/tests/google/usb/UsbTetheringThroughputTest.py
@@ -28,6 +28,7 @@
 USB_TETHERING_MODE = 'svc usb setFunctions rndis'
 DEVICE_IP_ADDRESS = 'ip address'
 
+
 class UsbTetheringThroughputTest(base_test.BaseTestClass):
     """Tests for usb tethering throughput test.
 
@@ -42,11 +43,18 @@
         self.ip_server = self.iperf_servers[0]
         self.port_num = self.ip_server.port
         req_params = [
-            'iperf_criteria',
+            'iperf_usb_3_criteria', 'iperf_usb_2_criteria', 'controller_path',
+            'iperf_test_sec'
         ]
         self.unpack_userparams(req_param_names=req_params)
+        if self.dut.model in self.user_params.get('legacy_projects', []):
+            self.usb_controller_path = self.user_params[
+                'legacy_controller_path']
+        else:
+            self.usb_controller_path = self.controller_path
 
     def setup_test(self):
+        self.dut.adb.wait_for_device()
         self.dut.droid.wakeLockAcquireBright()
         self.dut.droid.wakeUpNow()
         self.dut.unlock_screen()
@@ -54,12 +62,8 @@
     def teardown_test(self):
         self.dut.droid.wakeLockRelease()
         self.dut.droid.goToSleepNow()
-        self.dut.stop_services()
-        #Set usb function back to charge mode.
-        self.dut.adb.shell(USB_CHARGE_MODE)
-        self.dut.adb.wait_for_device()
-        self.dut.start_services()
-        self.ip_server.stop()
+        # Reboot device to avoid the side effect that cause adb missing after echo usb speed.
+        self.dut.reboot()
 
     def on_fail(self, test_name, begin_time):
         self.dut.take_bug_report(test_name, begin_time)
@@ -181,8 +185,49 @@
         except:
             self.log.error('Fail to execute iperf client.')
             return False, 0
-        rate = re.findall('(\d+.\d+) Mbits/sec.*receiver', result)
-        return True, rate[0]
+        rate = re.findall('(\d+.\d+) (.)bits/sec.*receiver', result)
+        return True, rate[0][0], rate[0][1]
+
+    def run_iperf_tx_rx(self, usb_speed, criteria):
+        """Run iperf tx and rx.
+
+        Args:
+            usb_speed: string which contains the speed info.
+            criteria: usb performance criteria.
+
+        Raises:
+            Signals.TestFailure is raised by usb tethering under criteria.
+        """
+        self.enable_usb_tethering()
+        self.dut.adb.wait_for_device()
+        self.ip_server.start()
+        pc_ip = self.get_pc_tethering_ip()
+        tx_success, tx_rate, tx_unit = self.run_iperf_client(
+            pc_ip, '-t{} -i1 -w2M'.format(self.iperf_test_sec))
+        rx_success, rx_rate, rx_unit = self.run_iperf_client(
+            pc_ip, '-t{} -i1 -w2M -R'.format(self.iperf_test_sec))
+        self.log.info('TestResult iperf_{}_rx'.format(usb_speed) + rx_rate +
+                      ' ' + rx_unit + 'bits/sec')
+        self.log.info('TestResult iperf_{}_tx'.format(usb_speed) + tx_rate +
+                      ' ' + tx_unit + 'bits/sec')
+        self.ip_server.stop()
+        if not tx_success or (float(tx_rate) < criteria and tx_unit != "G"):
+            raise signals.TestFailure('Iperf {}_tx test is {} {}bits/sec, '
+                                      'the throughput result failed.'.format(
+                                          usb_speed, tx_rate, tx_unit))
+        if not rx_success or (float(rx_rate) < criteria and rx_unit != "G"):
+            raise signals.TestFailure('Iperf {}_rx test is {} {}bits/sec, '
+                                      'the throughput result failed.'.format(
+                                          usb_speed, rx_rate, rx_unit))
+
+    def get_usb_speed(self):
+        """Get current usb speed."""
+        usb_controller_address = self.dut.adb.shell(
+            'getprop sys.usb.controller', ignore_status=True)
+        usb_speed = self.dut.adb.shell(
+            'cat sys/class/udc/{}/current_speed'.format(
+                usb_controller_address))
+        return usb_speed, usb_controller_address
 
     @test_tracker_info(uuid="e7e0dfdc-3d1c-4642-a468-27326c49e4cb")
     def test_tethering_ping(self):
@@ -197,34 +242,51 @@
         """
         self.enable_usb_tethering()
         if self.pc_can_ping() and self.dut_can_ping():
-            raise signals.TestPass('Ping test is passed. Network is reachable.')
+            raise signals.TestPass(
+                'Ping test is passed. Network is reachable.')
         raise signals.TestFailure(
             'Ping test failed. Maybe network is unreachable.')
 
     @test_tracker_info(uuid="8263c880-8a7e-4a68-b47f-e7caba3e9968")
-    def test_usb_tethering_iperf(self):
+    def test_usb_tethering_iperf_super_speed(self):
         """Enable usb tethering then executing iperf test.
 
         Steps:
         1. Stop SL4A service.
         2. Enable usb tethering.
         3. Restart SL4A service.
-        4. Execute iperf test for usb tethering and get the throughput result.
-        5. Check the iperf throughput result.
+        4. Skip test if device not support super-speed.
+        5. Execute iperf test for usb tethering and get the throughput result.
+        6. Check the iperf throughput result.
         """
-        self.enable_usb_tethering()
-        self.ip_server.start()
-        pc_ip = self.get_pc_tethering_ip()
-        tx_success, tx_rate = self.run_iperf_client(pc_ip, '-t5 -i1 -w2M')
-        rx_success, rx_rate = self.run_iperf_client(pc_ip, '-t5 -i1 -w2M -R')
-        self.log.info('Iperf rx result: ' + rx_rate + ' Mbits/sec')
-        self.log.info('Iperf tx result: ' + tx_rate + ' Mbits/sec')
-        self.ip_server.stop()
-        if not tx_success or float(tx_rate) < self.iperf_criteria:
-            raise signals.TestFailure(
-                'Iperf tx test is {} Mbits/sec, '
-                'the throughput result failed.'.format(tx_rate))
-        if not rx_success or float(rx_rate) < self.iperf_criteria:
-            raise signals.TestFailure(
-                'Iperf rx test is {} Mbits/sec, '
-                'the throughput result failed.'.format(rx_rate))
\ No newline at end of file
+        if (self.get_usb_speed()[0] != 'super-speed'):
+            raise signals.TestSkip(
+                'USB 3 not available for the device, skip super-speed performance test.'
+            )
+        self.run_iperf_tx_rx('usb_3', self.iperf_usb_3_criteria)
+
+    @test_tracker_info(uuid="5d8a22fd-1f9b-4758-a6b4-855d134b348a")
+    def test_usb_tethering_iperf_high_speed(self):
+        """Enable usb tethering then executing iperf test.
+
+        Steps:
+        1. Stop SL4A service.
+        2. Enable usb tethering.
+        3. Restart SL4A service.
+        4. Force set usb speed to high-speed if default is super-speed.
+        5. Execute iperf test for usb tethering and get the throughput result.
+        6. Check the iperf throughput result.
+        """
+        if (self.get_usb_speed()[0] != 'high-speed'):
+            self.log.info(
+                'Default usb speed is USB 3,Force set usb to high-speed.')
+            self.dut.stop_services()
+            self.dut.adb.wait_for_device()
+            self.dut.adb.shell(
+                'echo high > {}{}.ssusb/speed'.format(
+                    self.usb_controller_path,
+                    self.get_usb_speed()[1].strip('.dwc3')),
+                ignore_status=True)
+            time.sleep(IFCONFIG_SETTLE_TIME)
+            self.dut.start_services()
+        self.run_iperf_tx_rx('usb_2', self.iperf_usb_2_criteria)