platform_Firewall: test opening holes in specific interfaces.
BUG=brillo:185
TEST=Passes on expresso
CQ-DEPEND=CL:249622
Change-Id: I3e695140751a5d2307a352b35e94f3c73a1c9887
Reviewed-on: https://chromium-review.googlesource.com/249651
Tested-by: Jorge Lucangeli Obes <jorgelo@chromium.org>
Reviewed-by: Zeping Qiu <zqiu@chromium.org>
Commit-Queue: Jorge Lucangeli Obes <jorgelo@chromium.org>
diff --git a/client/site_tests/platform_Firewall/platform_Firewall.py b/client/site_tests/platform_Firewall/platform_Firewall.py
index ff60d05..181ee9c 100644
--- a/client/site_tests/platform_Firewall/platform_Firewall.py
+++ b/client/site_tests/platform_Firewall/platform_Firewall.py
@@ -17,8 +17,12 @@
version = 1
_PORT = 1234
+ _IFACE = "eth0"
+
_TCP_RULE = "-A INPUT -p tcp -m tcp --dport %d -j ACCEPT" % _PORT
_UDP_RULE = "-A INPUT -p udp -m udp --dport %d -j ACCEPT" % _PORT
+ _IFACE_RULE = "-A INPUT -i %s -p tcp -m tcp --dport %d -j ACCEPT" % (_IFACE,
+ _PORT)
_POLL_INTERVAL = 5
@@ -39,22 +43,34 @@
self.tcp_r, self.tcp_w = os.pipe()
self.udp_r, self.udp_w = os.pipe()
+ self.iface_r, self.iface_w = os.pipe()
try:
tcp_lifeline = dbus.types.UnixFd(self.tcp_r)
- ret = pb.RequestTcpPortAccess(dbus.UInt16(self._PORT), tcp_lifeline)
+ ret = pb.RequestTcpPortAccess(dbus.UInt16(self._PORT), "",
+ tcp_lifeline)
# |ret| is a dbus.Boolean, but compares as int.
if ret == 0:
raise error.TestFail(
"RequestTcpPortAccess returned false.")
udp_lifeline = dbus.types.UnixFd(self.udp_r)
- ret = pb.RequestUdpPortAccess(dbus.UInt16(self._PORT), udp_lifeline)
+ ret = pb.RequestUdpPortAccess(dbus.UInt16(self._PORT), "",
+ udp_lifeline)
# |ret| is a dbus.Boolean, but compares as int.
if (ret == 0):
raise error.TestFail(
"RequestUdpPortAccess returned false.")
+ iface_lifeline = dbus.types.UnixFd(self.iface_r)
+ ret = pb.RequestTcpPortAccess(dbus.UInt16(self._PORT),
+ dbus.String(self._IFACE),
+ iface_lifeline)
+ # |ret| is a dbus.Boolean, but compares as int.
+ if (ret == 0):
+ raise error.TestFail(
+ "RequestTcpPortAccess(port, interface) returned false.")
+
rules = self._iptables_rules()
if self._TCP_RULE not in rules:
raise error.TestFail(
@@ -62,19 +78,25 @@
if self._UDP_RULE not in rules:
raise error.TestFail(
"RequestUdpPortAccess did not add iptables rule.")
+ if self._IFACE_RULE not in rules:
+ raise error.TestFail(
+ "RequestTcpPortAccess(port, interface)"
+ " did not add iptables rule.")
# permission_broker should plug the firewall hole
# when the requesting process exits.
# Simulate the process exiting by closing both write ends.
os.close(self.tcp_w)
os.close(self.udp_w)
+ os.close(self.iface_w)
# permission_broker checks every |_POLL_INTERVAL| seconds
# for processes that have exited.
# This is ugly, but it's either this or polling /var/log/messages.
time.sleep(self._POLL_INTERVAL + 1)
rules = self._iptables_rules()
- if self._TCP_RULE in rules or self._UDP_RULE in rules:
+ if (self._TCP_RULE in rules or self._UDP_RULE in rules
+ or self._IFACE_RULE in rules):
raise error.TestFail(
"permission_broker did not remove iptables rule.")
@@ -87,6 +109,7 @@
try:
os.close(self.tcp_w)
os.close(self.udp_w)
+ os.close(self.iface_w)
except OSError:
pass
@@ -98,3 +121,6 @@
utils.system(cmd, ignore_status=True)
cmd = self._IPTABLES_DEL_CMD % ("udp", "udp", self._PORT)
utils.system(cmd, ignore_status=True)
+ cmd = self._IPTABLES_DEL_CMD % ("tcp", "tcp", self._PORT)
+ cmd += " -i %s" % self._IFACE
+ utils.system(cmd, ignore_status=True)