[connectivity_test_utils] Utils methods for socket keepalive tests

CHERRY PICKED CHANGES FROM AOSP

Bug: 123774509
Test: Verified the changes
Change-Id: I582cc33403d636638572ea2d919c5a9a3b5c286f
Merged-In: Ie15c80db415d2af808b55674a1402983933ddaf2
diff --git a/acts/framework/acts/test_utils/net/connectivity_test_utils.py b/acts/framework/acts/test_utils/net/connectivity_test_utils.py
index 4b7668c..153630f 100644
--- a/acts/framework/acts/test_utils/net/connectivity_test_utils.py
+++ b/acts/framework/acts/test_utils/net/connectivity_test_utils.py
@@ -15,54 +15,95 @@
 
 from acts import asserts
 from acts.test_utils.net import connectivity_const as cconst
+from queue import Empty
 
-def start_natt_keepalive(ad, src_ip, src_port, dst_ip, interval = 10):
-    """ Start NAT-T keep alive on dut """
+def _listen_for_keepalive_event(ad, key, msg, ka_event):
+    """Listen for keepalive event and return status
 
-    ad.log.info("Starting NATT Keepalive")
-    status = None
-
-    key = ad.droid.connectivityStartNattKeepalive(
-        interval, src_ip, src_port, dst_ip)
-
-    ad.droid.connectivityNattKeepaliveStartListeningForEvent(key, "Started")
+    Args:
+        ad: DUT object
+        key: keepalive key
+        msg: Error message
+        event: Keepalive event type
+    """
+    ad.droid.socketKeepaliveStartListeningForEvent(key, ka_event)
     try:
-        event = ad.ed.pop_event("PacketKeepaliveCallback")
-        status = event["data"]["packetKeepaliveEvent"]
+        event = ad.ed.pop_event("SocketKeepaliveCallback")
+        status = event["data"]["socketKeepaliveEvent"] == ka_event
     except Empty:
-        msg = "Failed to receive confirmation of starting natt keepalive"
         asserts.fail(msg)
     finally:
-        ad.droid.connectivityNattKeepaliveStopListeningForEvent(
-            key, "Started")
-
-    if status != "Started":
-        ad.log.error("Received keepalive status: %s" % status)
-        ad.droid.connectivityRemovePacketKeepaliveReceiverKey(key)
-        return None
-    return key
-
-def stop_natt_keepalive(ad, key):
-    """ Stop NAT-T keep alive on dut """
-
-    ad.log.info("Stopping NATT keepalive")
-    status = False
-    ad.droid.connectivityStopNattKeepalive(key)
-
-    ad.droid.connectivityNattKeepaliveStartListeningForEvent(key, "Stopped")
-    try:
-        event = ad.ed.pop_event("PacketKeepaliveCallback")
-        status = event["data"]["packetKeepaliveEvent"] == "Stopped"
-    except Empty:
-        msg = "Failed to receive confirmation of stopping natt keepalive"
-        asserts.fail(msg)
-    finally:
-        ad.droid.connectivityNattKeepaliveStopListeningForEvent(
-            key, "Stopped")
-
-    ad.droid.connectivityRemovePacketKeepaliveReceiverKey(key)
+        ad.droid.socketKeepaliveStopListeningForEvent(key, ka_event)
+    if ka_event != "Started":
+        ad.droid.removeSocketKeepaliveReceiverKey(key)
+    if status:
+        ad.log.info("'%s' keepalive event successful" % ka_event)
     return status
 
+def start_natt_socket_keepalive(ad, udp_encap, src_ip, dst_ip, interval = 10):
+    """Start NATT SocketKeepalive on DUT
+
+    Args:
+        ad: DUT object
+        udp_encap: udp_encap socket key
+        src_ip: IP addr of the client
+        dst_ip: IP addr of the keepalive server
+        interval: keepalive time interval
+    """
+    ad.log.info("Starting Natt Socket Keepalive")
+    key = ad.droid.startNattSocketKeepalive(udp_encap, src_ip, dst_ip, interval)
+    msg = "Failed to receive confirmation of starting natt socket keeaplive"
+    status = _listen_for_keepalive_event(ad, key, msg, "Started")
+    return key if status else None
+
+def start_tcp_socket_keepalive(ad, socket, time_interval = 10):
+    """Start TCP socket keepalive on DUT
+
+    Args:
+        ad: DUT object
+        socket: TCP socket key
+        time_interval: Keepalive time interval
+    """
+    ad.log.info("Starting TCP Socket Keepalive")
+    key = ad.droid.startTcpSocketKeepalive(socket, time_interval)
+    msg = "Failed to receive confirmation of starting tcp socket keeaplive"
+    status = _listen_for_keepalive_event(ad, key, msg, "Started")
+    return key if status else None
+
+def socket_keepalive_error(ad, key):
+    """Verify Error callback
+
+    Args:
+        ad: DUT object
+        key: Keepalive key
+    """
+    ad.log.info("Verify Error callback on keepalive: %s" % key)
+    msg = "Failed to receive confirmation of Error callback"
+    return _listen_for_keepalive_event(ad, key, msg, "Error")
+
+def socket_keepalive_data_received(ad, key):
+    """Verify OnDataReceived callback
+
+    Args:
+        ad: DUT object
+        key: Keepalive key
+    """
+    ad.log.info("Verify OnDataReceived callback on keepalive: %s" % key)
+    msg = "Failed to receive confirmation of OnDataReceived callback"
+    return _listen_for_keepalive_event(ad, key, msg, "OnDataReceived")
+
+def stop_socket_keepalive(ad, key):
+    """Stop SocketKeepalive on DUT
+
+    Args:
+        ad: DUT object
+        key: Keepalive key
+    """
+    ad.log.info("Stopping Socket keepalive: %s" % key)
+    ad.droid.stopSocketKeepalive(key)
+    msg = "Failed to receive confirmation of stopping socket keepalive"
+    return _listen_for_keepalive_event(ad, key, msg, "Stopped")
+
 def set_private_dns(ad, dns_mode, hostname=None):
     """ Set private DNS mode on dut """
     if dns_mode == cconst.PRIVATE_DNS_MODE_OFF:
diff --git a/acts/framework/acts/test_utils/net/net_test_utils.py b/acts/framework/acts/test_utils/net/net_test_utils.py
index c511fb6..484b4ec 100644
--- a/acts/framework/acts/test_utils/net/net_test_utils.py
+++ b/acts/framework/acts/test_utils/net/net_test_utils.py
@@ -14,9 +14,11 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 import logging
+import os
 
 from acts.controllers import adb
 from acts import asserts
+from acts import signals
 from acts import utils
 from acts.controllers.adb import AdbError
 from acts.logger import epoch_to_log_line_timestamp
@@ -39,6 +41,9 @@
 VPN_PARAMS = cconst.VpnReqParams
 TCPDUMP_PATH = "/data/local/tmp/tcpdump"
 
+GCE_SSH = "gcloud compute ssh "
+GCE_SCP = "gcloud compute scp "
+
 
 def verify_lte_data_and_tethering_supported(ad):
     """Verify if LTE data is enabled and tethering supported"""
@@ -284,3 +289,81 @@
     ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
     file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name)
     return "%s/%s" % (log_path, file_name)
+
+def start_tcpdump_gce_server(ad, test_name, dest_port, gce):
+    """ Start tcpdump on gce server
+
+    Args:
+        ad: android device object
+        test_name: test case name
+        dest_port: port to collect tcpdump
+        gce: dictionary of gce instance
+
+    Returns:
+       process id and pcap file path from gce server
+    """
+    ad.log.info("Starting tcpdump on gce server")
+
+    # pcap file name
+    fname = "/tmp/%s_%s_%s_%s" % \
+        (test_name, ad.model, ad.serial,
+         time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time())))
+
+    # start tcpdump
+    tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \
+        %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname)
+    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
+        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
+    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
+    utils.exe_cmd(gce_ssh_cmd)
+
+    # get process id
+    ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname)
+    tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split()
+    if not tcpdump_pid:
+        raise signals.TestFailure("Failed to start tcpdump on gce server")
+    return tcpdump_pid[1], fname
+
+def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce):
+    """ Stop and pull tcpdump file from gce server
+
+    Args:
+        ad: android device object
+        tcpdump_pid: process id for tcpdump file
+        fname: tcpdump file path
+        gce: dictionary of gce instance
+
+    Returns:
+       pcap file from gce server
+    """
+    ad.log.info("Stop and pull pcap file from gce server")
+
+    # stop tcpdump
+    tcpdump_cmd = "sudo kill %s" % tcpdump_pid
+    gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
+        (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
+    gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
+    utils.exe_cmd(gce_ssh_cmd)
+
+    # verify tcpdump is stopped
+    ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd
+    res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore")
+    if tcpdump_pid in res.split():
+        raise signals.TestFailure("Failed to stop tcpdump on gce server")
+    if not fname:
+        return None
+
+    # pull pcap file
+    gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \
+        (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"])
+    pull_file = '%s%s.pcap %s' % (gcloud_scp_cmd, fname, ad.log_path)
+    utils.exe_cmd(pull_file)
+    if not os.path.exists("%s/%s.pcap" % (ad.log_path, fname.split('/')[-1])):
+        raise signals.TestFailure("Failed to pull tcpdump from gce server")
+
+    # delete pcaps
+    utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname))
+
+    # return pcap file
+    pcap_file = "%s/%s.pcap" % (ad.log_path, fname.split('/')[-1])
+    return pcap_file
diff --git a/acts/framework/acts/test_utils/net/socket_test_utils.py b/acts/framework/acts/test_utils/net/socket_test_utils.py
index a8a05fc..42e4537 100644
--- a/acts/framework/acts/test_utils/net/socket_test_utils.py
+++ b/acts/framework/acts/test_utils/net/socket_test_utils.py
@@ -258,6 +258,18 @@
     status = ad.droid.closeTcpServerSocket(socket)
     asserts.assert_true(status, "Failed to socket")
 
+def shutdown_socket(ad, socket):
+    """ Shutdown socket
+
+    Args:
+      1. ad - androidandroid device object
+      2. socket - socket key
+    """
+    fd = ad.droid.getFileDescriptorOfSocket(socket)
+    asserts.assert_true(fd, "Failed to get FileDescriptor key")
+    status = ad.droid.shutdownFileDescriptor(fd)
+    asserts.assert_true(status, "Failed to shutdown socket")
+
 def send_recv_data_sockets(client, server, client_sock, server_sock):
     """ Send data over TCP socket from client to server.
         Verify that server received the data