[connectivity_test_utils] Utils methods for socket keepalive tests
CHERRY PICKED CHANGES FROM AOSP
Bug: 123774509
Test: Verified the changes
Change-Id: I582cc33403d636638572ea2d919c5a9a3b5c286f
Merged-In: Ie15c80db415d2af808b55674a1402983933ddaf2
diff --git a/acts/framework/acts/test_utils/net/connectivity_test_utils.py b/acts/framework/acts/test_utils/net/connectivity_test_utils.py
index 4b7668c..153630f 100644
--- a/acts/framework/acts/test_utils/net/connectivity_test_utils.py
+++ b/acts/framework/acts/test_utils/net/connectivity_test_utils.py
@@ -15,54 +15,95 @@
from acts import asserts
from acts.test_utils.net import connectivity_const as cconst
+from queue import Empty
-def start_natt_keepalive(ad, src_ip, src_port, dst_ip, interval = 10):
- """ Start NAT-T keep alive on dut """
+def _listen_for_keepalive_event(ad, key, msg, ka_event):
+ """Listen for keepalive event and return status
- ad.log.info("Starting NATT Keepalive")
- status = None
-
- key = ad.droid.connectivityStartNattKeepalive(
- interval, src_ip, src_port, dst_ip)
-
- ad.droid.connectivityNattKeepaliveStartListeningForEvent(key, "Started")
+ Args:
+ ad: DUT object
+ key: keepalive key
+ msg: Error message
+ event: Keepalive event type
+ """
+ ad.droid.socketKeepaliveStartListeningForEvent(key, ka_event)
try:
- event = ad.ed.pop_event("PacketKeepaliveCallback")
- status = event["data"]["packetKeepaliveEvent"]
+ event = ad.ed.pop_event("SocketKeepaliveCallback")
+ status = event["data"]["socketKeepaliveEvent"] == ka_event
except Empty:
- msg = "Failed to receive confirmation of starting natt keepalive"
asserts.fail(msg)
finally:
- ad.droid.connectivityNattKeepaliveStopListeningForEvent(
- key, "Started")
-
- if status != "Started":
- ad.log.error("Received keepalive status: %s" % status)
- ad.droid.connectivityRemovePacketKeepaliveReceiverKey(key)
- return None
- return key
-
-def stop_natt_keepalive(ad, key):
- """ Stop NAT-T keep alive on dut """
-
- ad.log.info("Stopping NATT keepalive")
- status = False
- ad.droid.connectivityStopNattKeepalive(key)
-
- ad.droid.connectivityNattKeepaliveStartListeningForEvent(key, "Stopped")
- try:
- event = ad.ed.pop_event("PacketKeepaliveCallback")
- status = event["data"]["packetKeepaliveEvent"] == "Stopped"
- except Empty:
- msg = "Failed to receive confirmation of stopping natt keepalive"
- asserts.fail(msg)
- finally:
- ad.droid.connectivityNattKeepaliveStopListeningForEvent(
- key, "Stopped")
-
- ad.droid.connectivityRemovePacketKeepaliveReceiverKey(key)
+ ad.droid.socketKeepaliveStopListeningForEvent(key, ka_event)
+ if ka_event != "Started":
+ ad.droid.removeSocketKeepaliveReceiverKey(key)
+ if status:
+ ad.log.info("'%s' keepalive event successful" % ka_event)
return status
+def start_natt_socket_keepalive(ad, udp_encap, src_ip, dst_ip, interval = 10):
+ """Start NATT SocketKeepalive on DUT
+
+ Args:
+ ad: DUT object
+ udp_encap: udp_encap socket key
+ src_ip: IP addr of the client
+ dst_ip: IP addr of the keepalive server
+ interval: keepalive time interval
+ """
+ ad.log.info("Starting Natt Socket Keepalive")
+ key = ad.droid.startNattSocketKeepalive(udp_encap, src_ip, dst_ip, interval)
+ msg = "Failed to receive confirmation of starting natt socket keeaplive"
+ status = _listen_for_keepalive_event(ad, key, msg, "Started")
+ return key if status else None
+
+def start_tcp_socket_keepalive(ad, socket, time_interval = 10):
+ """Start TCP socket keepalive on DUT
+
+ Args:
+ ad: DUT object
+ socket: TCP socket key
+ time_interval: Keepalive time interval
+ """
+ ad.log.info("Starting TCP Socket Keepalive")
+ key = ad.droid.startTcpSocketKeepalive(socket, time_interval)
+ msg = "Failed to receive confirmation of starting tcp socket keeaplive"
+ status = _listen_for_keepalive_event(ad, key, msg, "Started")
+ return key if status else None
+
+def socket_keepalive_error(ad, key):
+ """Verify Error callback
+
+ Args:
+ ad: DUT object
+ key: Keepalive key
+ """
+ ad.log.info("Verify Error callback on keepalive: %s" % key)
+ msg = "Failed to receive confirmation of Error callback"
+ return _listen_for_keepalive_event(ad, key, msg, "Error")
+
+def socket_keepalive_data_received(ad, key):
+ """Verify OnDataReceived callback
+
+ Args:
+ ad: DUT object
+ key: Keepalive key
+ """
+ ad.log.info("Verify OnDataReceived callback on keepalive: %s" % key)
+ msg = "Failed to receive confirmation of OnDataReceived callback"
+ return _listen_for_keepalive_event(ad, key, msg, "OnDataReceived")
+
+def stop_socket_keepalive(ad, key):
+ """Stop SocketKeepalive on DUT
+
+ Args:
+ ad: DUT object
+ key: Keepalive key
+ """
+ ad.log.info("Stopping Socket keepalive: %s" % key)
+ ad.droid.stopSocketKeepalive(key)
+ msg = "Failed to receive confirmation of stopping socket keepalive"
+ return _listen_for_keepalive_event(ad, key, msg, "Stopped")
+
def set_private_dns(ad, dns_mode, hostname=None):
""" Set private DNS mode on dut """
if dns_mode == cconst.PRIVATE_DNS_MODE_OFF:
diff --git a/acts/framework/acts/test_utils/net/net_test_utils.py b/acts/framework/acts/test_utils/net/net_test_utils.py
index c511fb6..484b4ec 100644
--- a/acts/framework/acts/test_utils/net/net_test_utils.py
+++ b/acts/framework/acts/test_utils/net/net_test_utils.py
@@ -14,9 +14,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+import os
from acts.controllers import adb
from acts import asserts
+from acts import signals
from acts import utils
from acts.controllers.adb import AdbError
from acts.logger import epoch_to_log_line_timestamp
@@ -39,6 +41,9 @@
VPN_PARAMS = cconst.VpnReqParams
TCPDUMP_PATH = "/data/local/tmp/tcpdump"
+GCE_SSH = "gcloud compute ssh "
+GCE_SCP = "gcloud compute scp "
+
def verify_lte_data_and_tethering_supported(ad):
"""Verify if LTE data is enabled and tethering supported"""
@@ -284,3 +289,81 @@
ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name)
return "%s/%s" % (log_path, file_name)
+
+def start_tcpdump_gce_server(ad, test_name, dest_port, gce):
+ """ Start tcpdump on gce server
+
+ Args:
+ ad: android device object
+ test_name: test case name
+ dest_port: port to collect tcpdump
+ gce: dictionary of gce instance
+
+ Returns:
+ process id and pcap file path from gce server
+ """
+ ad.log.info("Starting tcpdump on gce server")
+
+ # pcap file name
+ fname = "/tmp/%s_%s_%s_%s" % \
+ (test_name, ad.model, ad.serial,
+ time.strftime('%Y-%m-%d_%H-%M-%S', time.localtime(time.time())))
+
+ # start tcpdump
+ tcpdump_cmd = "sudo bash -c \'tcpdump -i %s -w %s.pcap port %s > \
+ %s.txt 2>&1 & echo $!\'" % (gce["interface"], fname, dest_port, fname)
+ gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
+ (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
+ gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
+ utils.exe_cmd(gce_ssh_cmd)
+
+ # get process id
+ ps_cmd = '%s "ps aux | grep tcpdump | grep %s"' % (gcloud_ssh_cmd, fname)
+ tcpdump_pid = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore").split()
+ if not tcpdump_pid:
+ raise signals.TestFailure("Failed to start tcpdump on gce server")
+ return tcpdump_pid[1], fname
+
+def stop_tcpdump_gce_server(ad, tcpdump_pid, fname, gce):
+ """ Stop and pull tcpdump file from gce server
+
+ Args:
+ ad: android device object
+ tcpdump_pid: process id for tcpdump file
+ fname: tcpdump file path
+ gce: dictionary of gce instance
+
+ Returns:
+ pcap file from gce server
+ """
+ ad.log.info("Stop and pull pcap file from gce server")
+
+ # stop tcpdump
+ tcpdump_cmd = "sudo kill %s" % tcpdump_pid
+ gcloud_ssh_cmd = "%s --project=%s --zone=%s %s@%s --command " % \
+ (GCE_SSH, gce["project"], gce["zone"], gce["username"], gce["hostname"])
+ gce_ssh_cmd = '%s "%s"' % (gcloud_ssh_cmd, tcpdump_cmd)
+ utils.exe_cmd(gce_ssh_cmd)
+
+ # verify tcpdump is stopped
+ ps_cmd = '%s "ps aux | grep tcpdump"' % gcloud_ssh_cmd
+ res = utils.exe_cmd(ps_cmd).decode("utf-8", "ignore")
+ if tcpdump_pid in res.split():
+ raise signals.TestFailure("Failed to stop tcpdump on gce server")
+ if not fname:
+ return None
+
+ # pull pcap file
+ gcloud_scp_cmd = "%s --project=%s --zone=%s %s@%s:" % \
+ (GCE_SCP, gce["project"], gce["zone"], gce["username"], gce["hostname"])
+ pull_file = '%s%s.pcap %s' % (gcloud_scp_cmd, fname, ad.log_path)
+ utils.exe_cmd(pull_file)
+ if not os.path.exists("%s/%s.pcap" % (ad.log_path, fname.split('/')[-1])):
+ raise signals.TestFailure("Failed to pull tcpdump from gce server")
+
+ # delete pcaps
+ utils.exe_cmd('%s "sudo rm %s.*"' % (gcloud_ssh_cmd, fname))
+
+ # return pcap file
+ pcap_file = "%s/%s.pcap" % (ad.log_path, fname.split('/')[-1])
+ return pcap_file
diff --git a/acts/framework/acts/test_utils/net/socket_test_utils.py b/acts/framework/acts/test_utils/net/socket_test_utils.py
index a8a05fc..42e4537 100644
--- a/acts/framework/acts/test_utils/net/socket_test_utils.py
+++ b/acts/framework/acts/test_utils/net/socket_test_utils.py
@@ -258,6 +258,18 @@
status = ad.droid.closeTcpServerSocket(socket)
asserts.assert_true(status, "Failed to socket")
+def shutdown_socket(ad, socket):
+ """ Shutdown socket
+
+ Args:
+ 1. ad - androidandroid device object
+ 2. socket - socket key
+ """
+ fd = ad.droid.getFileDescriptorOfSocket(socket)
+ asserts.assert_true(fd, "Failed to get FileDescriptor key")
+ status = ad.droid.shutdownFileDescriptor(fd)
+ asserts.assert_true(status, "Failed to shutdown socket")
+
def send_recv_data_sockets(client, server, client_sock, server_sock):
""" Send data over TCP socket from client to server.
Verify that server received the data