Add networking code
Signed-off-by: Martin J. Bligh <mbligh@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@2480 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/client/bin/net/__init__.py b/client/bin/net/__init__.py
new file mode 100755
index 0000000..e69de29
--- /dev/null
+++ b/client/bin/net/__init__.py
diff --git a/client/bin/net/basic_machine.py b/client/bin/net/basic_machine.py
new file mode 100755
index 0000000..590c9af
--- /dev/null
+++ b/client/bin/net/basic_machine.py
@@ -0,0 +1,67 @@
+TYPE = 'BASIC'
+CALIBRATION_JOB = ''
+
+GLAG_FLAG = False
+DEVICES = ['eth0']
+CONNECTED_DEVICES = ['eth0']
+
+NDLINK_SPEED = 1000
+NDLINK_AUTONEG = True
+NDLINK_RX = None
+NDLINK_TX = False
+
+
+FLOODPING_NUMPKTS = 100000
+FLOODPING_CONSTRAINTS = ["rtt_max < 0.2",
+ "rtt_min < 0.05",
+ "rtt_avg < .06",
+ "rtt_mdev < .02",
+ ]
+
+PKT_SIZES = [64, 128, 256, 512, 1024, 1280, 1518]
+PKTGEN_CONSTRAINTS = []
+
+drives = ['hda']
+
+IOBW_CONSTRAINTS = ["throughput > 500",
+ "read_bw > 100000",
+ "write_bw > 100000"]
+
+
+NDTEST_TESTS = ['set_hwaddr',
+ 'set_maddr',
+ 'pkt_size',
+ 'zero_pad',
+ 'mac_filter',
+ 'multicast_filter',
+ 'broadcast',
+ 'promisc'
+ ]
+
+
+# list of ndlink tests
+NDLINK_TESTS = ['carrier',
+ 'phy_config',
+ 'offload_config',
+ 'pause_config'
+ ]
+
+NETPERF_TESTS = {'TCP_STREAM': {'meas_streams': 16,
+ 'constraints': ['Throughput > 800']
+ },
+ 'TCP_SENDFILE': {'meas_streams': 16,
+ 'constraints': ['Throughput > 800']
+ },
+ 'UDP_STREAM': {'meas_streams': 16,
+ 'constraints': ['Throughput > 500']
+ },
+ 'UDP_RR': {'meas_streams': 16,
+ 'constraints': ['Transfer_Rate > 1000']
+ },
+ 'TCP_RR': {'meas_streams': 16,
+ 'constraints': ['Transfer_Rate > 1000']
+ },
+ 'TCP_CRR': {'meas_streams': 16,
+ 'constraints': ['Transfer_Rate > 1000']
+ }
+ }
diff --git a/client/bin/net/common.py b/client/bin/net/common.py
new file mode 100755
index 0000000..7443052
--- /dev/null
+++ b/client/bin/net/common.py
@@ -0,0 +1,8 @@
+import os, sys
+dirname = os.path.dirname(sys.modules[__name__].__file__)
+client_dir = os.path.abspath(os.path.join(dirname, "../.."))
+sys.path.insert(0, client_dir)
+import setup_modules
+sys.path.pop(0)
+setup_modules.setup(base_path=client_dir,
+ root_module_name="autotest_lib.client")
diff --git a/client/bin/net/net_tc.py b/client/bin/net/net_tc.py
new file mode 100755
index 0000000..06cf8bb
--- /dev/null
+++ b/client/bin/net/net_tc.py
@@ -0,0 +1,413 @@
+"""Convenience methods for use to manipulate traffic control settings.
+
+see http://linux.die.net/man/8/tc for details about traffic controls in linux.
+
+Example
+ import common
+ from autotest_lib.client.bin.net.net_tc import *
+ from autotest_lib.client.bin.net.net_utils import *
+
+ class mock_netif(object):
+
+ def __init__(self, name):
+ self._name = name
+
+ def get_name(self):
+ return self._name
+
+
+ netem_qdisc = netem()
+ netem_qdisc.add_param('loss 100%')
+
+ ack_filter = u32filter()
+ ack_filter.add_rule('match ip protocol 6 0xff')
+ ack_filter.add_rule('match u8 0x10 0x10 at nexthdr+13')
+ ack_filter.set_dest_qdisc(netem_qdisc)
+
+ root_qdisc = prio()
+ root_qdisc.get_class(2).set_leaf_qdisc(netem_qdisc)
+ root_qdisc.add_filter(ack_filter)
+
+ lo_if = mock_netif('lo')
+
+ root_qdisc.setup(lo_if)
+
+ # run test here ...
+ root_qdisc.restore(lo_if)
+
+"""
+
+import commands, os, re
+import common
+from autotest_lib.client.common_lib import error, utils
+from autotest_lib.client.bin import autotest_utils
+from autotest_lib.client.bin.net import net_utils
+
+# TODO (chavey) clean up those global here and new_handle()
+handle_counter = 0
+INCR = 100
+
+
+def new_handle():
+ global handle_counter
+ handle_counter += INCR
+ return handle_counter
+
+
+class tcclass(object):
+
+ def __init__(self, handle, minor, leaf_qdisc=None):
+ self._parent_class = None
+ self._children = []
+ self._leaf_qdisc = leaf_qdisc
+ self._handle = handle
+ self._minor = minor
+
+
+ def get_leaf_qdisc(self):
+ return self._leaf_qdisc
+
+
+ def set_leaf_qdisc(self, leaf_qdisc):
+ leaf_qdisc.set_parent_class(self)
+ self._leaf_qdisc = leaf_qdisc
+
+
+ def get_parent_class(self):
+ return self._parent_class
+
+
+ def set_parent_class(self, parent_class):
+ self._parent_class = parent_class
+
+
+ def get_minor(self):
+ return self._minor
+
+
+ def id(self):
+ return '%s:%s' % (self._handle, self._minor)
+
+
+ def add_child(self, child_class):
+ child_class.set_parent_class(self)
+ if child_class not in self._children:
+ self._child.append(child_class)
+
+
+ def setup(self, netif):
+ # setup leaf qdisc
+ if self._leaf_qdisc:
+ self._leaf_qdisc.setup(netif)
+
+ # setup child classes
+ for child in self._children:
+ child.setup()
+
+
+ def restore(self, netif):
+ # restore child classes
+ children_copy = list(self._children)
+ children_copy.reverse()
+ for child in children_copy:
+ child.restore()
+
+ # restore leaf qdisc
+ if self._leaf_qdisc:
+ self._leaf_qdisc.restore(netif)
+
+
+class tcfilter(object):
+
+ _tc_cmd = 'tc filter %(cmd)s dev %(dev)s parent %(parent)s protocol ' \
+ '%(protocol)s prio %(priority)s %(filtertype)s \\\n ' \
+ '%(rules)s \\\n flowid %(flowid)s'
+
+ conf_device = 'dev'
+ conf_parent = 'parent'
+ conf_type = 'filtertype'
+ conf_protocol = 'protocol'
+ conf_priority = 'priority'
+ conf_flowid = 'flowid'
+ conf_command = 'cmd'
+ conf_rules = 'cmd'
+ conf_qdiscid = 'qdiscid'
+ conf_name = 'name'
+ conf_params = 'params'
+
+
+ def __init__(self):
+ self._parent_qdisc = None
+ self._dest_qdisc = None
+ self._protocol = 'ip'
+ self._priority = 1
+ self._handle = None
+ self._tc_conf = None
+
+
+ def get_parent_qdisc(self):
+ return self._parent_qdisc
+
+
+ def set_parent_qdisc(self, parent_qdisc):
+ self._parent_qdisc = parent_qdisc
+
+
+ def get_dest_qdisc(self):
+ return self._dest_qdisc
+
+
+ def set_dest_qdisc(self, dest_qdisc):
+ self._dest_qdisc = dest_qdisc
+
+
+ def get_protocol(self):
+ return self._protocol
+
+
+ def set_protocol(self, protocol):
+ self._protocol = protocol
+
+
+ def get_priority(self):
+ return self._priority
+
+
+ def set_priority(self, priority):
+ self._priority = priority
+
+
+ def get_handle(self):
+ return self._handle
+
+
+ def set_handle(self, handle):
+ self._handle = handle
+
+
+ def _get_tc_conf(self, netif):
+ if self._tc_conf:
+ return self._tc_conf
+ self._tc_conf = dict()
+ self._tc_conf[tcfilter.conf_device] = netif.get_name()
+ self._tc_conf[tcfilter.conf_parent] = self._parent_qdisc.id()
+ self._tc_conf[tcfilter.conf_type] = self.filtertype
+ self._tc_conf[tcfilter.conf_protocol] = self._protocol
+ self._tc_conf[tcfilter.conf_priotity] = self._priority
+ self._tc_conf[tcfilter.conf_flowid] = (
+ self._dest_qdisc.get_parent_class().id())
+ return self._tc_conf
+
+
+ def tc_cmd(self, tc_conf):
+ print self._tc_cmd % tc_conf
+
+
+ def setup(self, netif):
+ pass
+
+
+ def restore(self, netif):
+ pass
+
+
+class u32filter(tcfilter):
+
+ filtertype = 'u32'
+
+ def __init__(self):
+ super(u32filter, self).__init__()
+ self._rules = []
+
+
+ def _filter_rules(self):
+ return ' \\\n '.join(self._rules)
+
+
+ def add_rule(self, rule):
+ self._rules.append(rule)
+
+
+ def setup(self, netif):
+ tc_conf = self._get_tc_conf(netif)
+ tc_conf[tcfilter.conf_cmd] = 'add'
+ tc_conf[tcfilter.conf_rules] = self._filter_rules()
+ self.tc_cmd(tc_conf)
+
+
+ def restore(self, netif):
+ tc_conf = self._get_tc_conf(netif)
+ tc_conf[tcfilter.conf_cmd] = 'del'
+ tc_conf[tcfilter.conf_rules] = self._filter_rules()
+ self.tc_cmd(tc_conf)
+
+#TODO (ncrao): generate some typical rules: ack, syn, synack,
+# dport/sport, daddr/sddr, etc.
+class qdisc(object):
+
+ # tc command
+ _tc_cmd = 'tc qdisc %(cmd)s dev %(dev)s %(parent)s ' \
+ 'handle %(qdiscid)s %(name)s %(params)s'
+
+ def __init__(self, handle):
+ self._handle = handle
+ self._parent_class = None
+ self._tc_conf = None
+
+
+ def get_handle(self):
+ return self._handle
+
+
+ def get_parent_class(self):
+ return self._parent_class
+
+
+ def set_parent_class(self, parent_class):
+ self._parent_class = parent_class
+
+
+ def _get_tc_conf(self, netif):
+ if self._tc_conf:
+ return self._tc_conf
+ self._tc_conf = dict()
+ self._tc_conf[tcfilter.conf_device] = netif.get_name()
+ if self._parent_class:
+ self._tc_conf[tcfilter.conf_parent] = ('parent %s' %
+ self._parent_class.id())
+ else:
+ self._tc_conf[tcfilter.conf_parent] = 'root'
+ self._tc_conf[tcfilter.conf_qdiscid] = self.id()
+ self._tc_conf[tcfilter.conf_name] = self.name
+ self._tc_conf[tcfilter.conf_params] = ''
+ return self._tc_conf
+
+
+ def id(self):
+ return '%s:0' % self._handle
+
+
+ def tc_cmd(self, tc_conf):
+ print self._tc_cmd % tc_conf
+
+
+ def setup(self, netif):
+ tc_conf = self._get_tc_conf(netif)
+ tc_conf[tcfilter.conf_command] = 'add'
+ self.tc_cmd(tc_conf)
+
+
+ def restore(self, netif):
+ tc_conf = self._get_tc_conf(netif)
+ tc_conf[tcfilter.conf_command] = 'del'
+ self.tc_cmd(tc_conf)
+
+
+class classful_qdisc(qdisc):
+
+ classful = True
+
+ def __init__(self, handle):
+ super(classful_qdisc, self).__init__(handle)
+ self._classes = []
+ self._filters = []
+
+
+ def add_class(self, child_class):
+ self._classes.append(child_class)
+
+
+ def add_filter(self, filter):
+ filter.set_parent_qdisc(self)
+ self._filters.append(filter)
+
+
+ def setup(self, netif):
+ super(classful_qdisc, self).setup(netif)
+
+ # setup child classes
+ for child in self._classes:
+ child.setup(netif)
+
+ # setup filters
+ for filter in self._filters:
+ filter.setup(netif)
+
+
+ def restore(self, netif):
+ # restore filters
+ filters_copy = list(self._filters)
+ filters_copy.reverse()
+ for filter in filters_copy:
+ filter.restore(netif)
+
+ # restore child classes
+ classes_copy = list(self._classes)
+ classes_copy.reverse()
+ for child in classes_copy:
+ child.restore(netif)
+
+ super(classful_qdisc, self).restore(netif)
+
+
+class prio(classful_qdisc):
+
+ name = 'prio'
+
+ def __init__(self, handle=new_handle(), bands=3):
+ super(prio, self).__init__(handle)
+ self._bands = bands
+ for counter in range(bands):
+ self.add_class(tcclass(handle, counter + 1))
+
+
+ def setup(self, netif):
+ super(prio, self).setup(netif)
+
+
+ def get_class(self, band):
+ if band > self._bands:
+ raise error.TestError('error inserting %s at band %s' % \
+ (qdisc.name, band))
+ return self._classes[band]
+
+
+class classless_qdisc(qdisc):
+
+ classful = False
+
+ def __init__(self, handle):
+ super(classless_qdisc, self).__init__(handle)
+
+
+class pfifo(classless_qdisc):
+
+ name = 'pfifo'
+
+ def __init__(self, handle=new_handle()):
+ super(pfifo, self).__init__(handle)
+
+
+ def setup(self, netif):
+ super(pfifo, self).setup(netif)
+
+
+class netem(classless_qdisc):
+
+ name = 'netem'
+
+ def __init__(self, handle=new_handle()):
+ super(netem, self).__init__(handle)
+ self._params = list()
+
+
+ def add_param(self, param):
+ self._params.append(param)
+
+
+ def setup(self, netif):
+ super(netem, self).setup(netif)
+ tc_conf = self._get_tc_conf(netif)
+ tc_conf[tcfilter.conf_command] = 'change'
+ tc_conf[tcfilter.conf_params] = ' '.join(self._params)
+ self.tc_cmd(tc_conf)
diff --git a/client/bin/net/net_tc_unittest.py b/client/bin/net/net_tc_unittest.py
new file mode 100755
index 0000000..7f8c036
--- /dev/null
+++ b/client/bin/net/net_tc_unittest.py
@@ -0,0 +1,177 @@
+#!/usr/bin/python
+
+# TODO(chavey) complete all the unit test in this file
+
+import unittest, os, socket, time, sys
+import common
+from autotest_lib.client.bin import autotest_utils
+from autotest_lib.client.bin.net import net_tc, net_utils, net_utils_mock
+from autotest_lib.client.common_lib.test_utils import mock
+from autotest_lib.client.common_lib import utils, error
+
+class TestNetUtils(unittest.TestCase):
+ def setUp(self):
+ self.god = mock.mock_god()
+ self.god.stub_function(utils, "system")
+ self.god.stub_function(utils, "system_output")
+ os.environ['AUTODIR'] = "autodir"
+
+
+ def tearDown(self):
+ self.god.unstub_all()
+ del os.environ['AUTODIR']
+
+
+ #
+ # test tcclass
+ #
+ def test_tcclass_get_leaf_qdisc(self):
+ pass
+
+
+ def test_tcclass_get_parent_class(self):
+ pass
+
+
+ def test_tcclass_set_parent_class(self):
+ pass
+
+
+ def test_tcclass_get_minor(self):
+ pass
+
+
+ def test_tcclass_id(self):
+ pass
+
+
+ def test_tcclass_add_child(self):
+ pass
+
+
+ def test_tcclass_setup(self):
+ pass
+
+
+ def test_tcclass_restore(self):
+ pass
+
+ #
+ # test tcfilter
+ #
+ def test_tcfilter_get_parent_qdisc(self):
+ pass
+
+
+ def test_tcfilter_set_parent_qdisc(self):
+ pass
+
+
+ def test_tcfilter_get_dest_qdisc(self):
+ pass
+
+
+ def test_tcfilter_set_dest_qdisc(self):
+ pass
+
+
+ def test_tcfilter_get_protocol(self):
+ pass
+
+
+ def test_tcfilter_set_protocol(self):
+ pass
+
+
+ def test_tcfilter_get_priority(self):
+ pass
+
+
+ def test_tcfilter_set_priority(self):
+ pass
+
+
+ def test_tcfilter_get_handle(self):
+ pass
+
+
+ def test_tcfilter_set_handle(self):
+ pass
+
+
+ def test_tcfilter_tc_cmd(self):
+ pass
+
+
+ def test_tcfilter_setup(self):
+ pass
+
+
+ def test_tcfilter_restore(self):
+ pass
+
+ #
+ # test u32filter
+ #
+ def test_u32filter_add_rule(self):
+ pass
+
+
+ def test_u32filter_setup(self):
+ pass
+
+
+ def test_u32filter_restore(self):
+ pass
+
+
+ #
+ # test qdisc
+ #
+ def test_qdisc_add_class(self):
+ pass
+
+
+ def test_qdisc_add_filter(self):
+ pass
+
+
+ def test_qdisc_setup(self):
+ pass
+
+
+ def test_qdisc_restore(self):
+ pass
+
+
+ #
+ # test prio
+ #
+ def test_prio_setup(self):
+ pass
+
+
+ def test_prio_get_class(self):
+ pass
+
+
+ #
+ # test pfifo
+ #
+ def test_pfifo_setup(self):
+ pass
+
+
+ #
+ # test netem
+ #
+ def test_netem_add_param(self):
+ pass
+
+
+ def test_netem_setup(self):
+ pass
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/client/bin/net/net_utils.py b/client/bin/net/net_utils.py
new file mode 100755
index 0000000..2898d7e
--- /dev/null
+++ b/client/bin/net/net_utils.py
@@ -0,0 +1,730 @@
+"""Convenience functions for use by network tests or whomever.
+
+This library is to release in the public repository.
+"""
+
+import commands, os, re, socket, sys, time, struct
+from autotest_lib.client.common_lib import error, utils
+from autotest_lib.client.bin import autotest_utils
+
+TIMEOUT = 10 # Used for socket timeout and barrier timeout
+
+
+class network_utils(object):
+ def reset(self, ignore_status=False):
+ utils.system('service network restart', ignore_status=ignore_status)
+
+
+ def start(self, ignore_status=False):
+ utils.system('service network start', ignore_status=ignore_status)
+
+
+ def stop(self, ignore_status=False):
+ utils.system('service network stop', ignore_status=ignore_status)
+
+
+ def disable_ip_local_loopback(self, ignore_status=False):
+ utils.system("echo '1' > /proc/sys/net/ipv4/route/no_local_loopback",
+ ignore_status=ignore_status)
+ utils.system('echo 1 > /proc/sys/net/ipv4/route/flush',
+ ignore_status=ignore_status)
+
+
+ def enable_ip_local_loopback(self, ignore_status=False):
+ utils.system("echo '0' > /proc/sys/net/ipv4/route/no_local_loopback",
+ ignore_status=ignore_status)
+ utils.system('echo 1 > /proc/sys/net/ipv4/route/flush',
+ ignore_status=ignore_status)
+
+
+ def process_mpstat(self, mpstat_out, sample_count, loud = True):
+ """Parses mpstat output of the following two forms:
+ 02:10:17 0 0.00 0.00 0.00 0.00 0.00 0.00 \
+ 0.00 100.00 1012.87
+ 02:10:13 PM 0 0.00 0.00 0.00 0.00 0.00 0.00 \
+ 0.00 100.00 1019.00
+ """
+ mpstat_keys = ['time', 'CPU', 'user', 'nice', 'sys', 'iowait', 'irq',
+ 'soft', 'steal', 'idle', 'intr/s']
+ if loud:
+ print mpstat_out
+
+ # Remove the optional AM/PM appearing in time format
+ mpstat_out = mpstat_out.replace('AM', '')
+ mpstat_out = mpstat_out.replace('PM', '')
+
+ regex = re.compile('(\S+)')
+ stats = []
+ for line in mpstat_out.splitlines()[3:]:
+ match = regex.findall(line)
+ # Skip the "Average" computed by mpstat. We are gonna compute the
+ # average ourself. Pick only the aggregate 'all' CPU results
+ if match and match[0] != 'Average:' and match[1] == 'all':
+ stats.append(dict(zip(mpstat_keys, match)))
+
+ if sample_count >= 5:
+ # Throw away first and last sample
+ stats = stats[1:-1]
+
+ cpu_stats = {}
+ for key in ['user', 'nice', 'sys', 'iowait', 'irq', 'soft', 'steal',
+ 'idle', 'intr/s']:
+ x = [float(row[key]) for row in stats]
+ if len(x):
+ count = len(x)
+ else:
+ print 'net_utils.network_utils.process_mpstat: count is 0!!!\n'
+ count = 1
+ cpu_stats[key] = sum(x) / count
+
+ return cpu_stats
+
+
+def network():
+ try:
+ from autotest_lib.client.bin.net import site_net_utils
+ return site_net_utils.network_utils()
+ except:
+ return network_utils()
+
+
+class network_interface(object):
+
+ ENABLE, DISABLE = (True, False)
+
+ def __init__(self, name):
+ autodir = os.environ['AUTODIR']
+ self.ethtool = 'ethtool'
+ self._name = name
+ self.was_down = self.is_down()
+ self.orig_ipaddr = self.get_ipaddr()
+ self.was_loopback_enabled = self.is_loopback_enabled()
+ self._socket = socket.socket(socket.PF_PACKET, socket.SOCK_RAW)
+ self._socket.settimeout(TIMEOUT)
+ self._socket.bind((name, raw_socket.ETH_P_ALL))
+
+
+ def restore(self):
+ self.set_ipaddr(self.orig_ipaddr)
+ # TODO (msb): The additional conditional guard needs cleanup:
+ # Underlying driver should simply perform a noop
+ # for disabling loopback on an already-disabled device,
+ # instead of returning non-zero exit code.
+
+ # To avoid sending a RST to the autoserv ssh connection
+ # don't disable loopback until the IP address is restored.
+ if not self.was_loopback_enabled and self.is_loopback_enabled():
+ self.disable_loopback()
+ if self.was_down:
+ self.down()
+
+
+ def get_name(self):
+ return self._name
+
+
+ def parse_ethtool(self, field, match, option='', next_field=''):
+ output = utils.system_output('%s %s %s' % (self.ethtool,
+ option, self._name))
+ if output:
+ match = re.search('\n\s*%s:\s*(%s)%s' %
+ (field, match, next_field), output, re.S)
+ if match:
+ return match.group(1)
+
+ return ''
+
+
+ def get_stats(self):
+ stats = {}
+ stats_path = '/sys/class/net/%s/statistics/' % self._name
+ for stat in os.listdir(stats_path):
+ f = open(stats_path + stat, 'r')
+ if f:
+ stats[stat] = int(f.read())
+ f.close()
+ return stats
+
+
+ def get_stats_diff(self, orig_stats):
+ stats = self.get_stats()
+ for stat in stats.keys():
+ if stat in orig_stats:
+ stats[stat] = stats[stat] - orig_stats[stat]
+ else:
+ stats[stat] = stats[stat]
+ return stats
+
+
+ def get_driver(self):
+ driver_path = os.readlink('/sys/class/net/%s/device/driver' %
+ self._name)
+ return os.path.basename(driver_path)
+
+
+ def get_carrier(self):
+ f = open('/sys/class/net/%s/carrier' % self._name)
+ if not f:
+ return ''
+ carrier = f.read().strip()
+ f.close()
+ return carrier
+
+
+ def get_supported_link_modes(self):
+ result = self.parse_ethtool('Supported link modes', '.*',
+ next_field='Supports auto-negotiation')
+ return result.split()
+
+
+ def get_advertised_link_modes(self):
+ result = self.parse_ethtool('Advertised link modes', '.*',
+ next_field='Advertised auto-negotiation')
+ return result.split()
+
+
+ def is_autoneg_advertised(self):
+ result = self.parse_ethtool('Advertised auto-negotiation',
+ 'Yes|No')
+ return result == 'Yes'
+
+
+ def get_speed(self):
+ return int(self.parse_ethtool('Speed', '\d+'))
+
+
+ def is_full_duplex(self):
+ result = self.parse_ethtool('Duplex', 'Full|Half')
+ return result == 'Full'
+
+
+ def is_autoneg_on(self):
+ result = self.parse_ethtool('Auto-negotiation', 'on|off')
+ return result == 'on'
+
+
+ def get_wakeon(self):
+ return self.parse_ethtool('Wake-on', '\w+')
+
+
+ def is_rx_summing_on(self):
+ result = self.parse_ethtool('rx-checksumming', 'on|off', '-k')
+ return result == 'on'
+
+
+ def is_tx_summing_on(self):
+ result = self.parse_ethtool('tx-checksumming', 'on|off', '-k')
+ return result == 'on'
+
+
+ def is_scatter_gather_on(self):
+ result = self.parse_ethtool('scatter-gather', 'on|off', '-k')
+ return result == 'on'
+
+
+ def is_tso_on(self):
+ result = self.parse_ethtool('tcp segmentation offload',
+ 'on|off', '-k')
+ return result == 'on'
+
+
+ def is_pause_autoneg_on(self):
+ result = self.parse_ethtool('Autonegotiate', 'on|off', '-a')
+ return result == 'on'
+
+
+ def is_tx_pause_on(self):
+ result = self.parse_ethtool('TX', 'on|off', '-a')
+ return result == 'on'
+
+
+ def is_rx_pause_on(self):
+ result = self.parse_ethtool('RX', 'on|off', '-a')
+ return result == 'on'
+
+
+ def _set_loopback(self, mode, enable_disable):
+ return utils.system('%s -L %s %s %s' %
+ (self.ethtool, self._name, mode, enable_disable),
+ ignore_status=True)
+
+
+ def enable_loopback(self):
+ # If bonded do not set loopback mode.
+ # Try mac loopback first then phy loopback
+ # If both fail, raise an error
+ if (bond().is_enabled() or
+ (self._set_loopback('phyint', 'enable') > 0 and
+ self._set_loopback('mac', 'enable') > 0)):
+ raise error.TestError('Unable to enable loopback')
+ # Add a 1 second wait for drivers which do not have
+ # a synchronous loopback enable
+ # TODO (msb); Remove this wait once the drivers are fixed
+ if self.get_driver() in ['tg3', 'bnx2x']:
+ time.sleep(1)
+ self.wait_for_carrier(timeout=30)
+
+
+ def disable_loopback(self):
+ # If bonded, to not disable loopback.
+ # Try mac loopback first then phy loopback
+ # If both fail, raise an error
+ if (bond().is_enabled() or
+ (self._set_loopback('phyint', 'disable') > 0 and
+ self._set_loopback('mac', 'disable') > 0)):
+ raise error.TestError('Unable to disable loopback')
+
+
+ def is_loopback_enabled(self):
+ # Don't try ethtool -l on a bonded host
+ if bond().is_enabled():
+ return False
+ output = utils.system_output('%s -l %s' % (self.ethtool, self._name))
+ if output:
+ return 'enabled' in output
+ return False
+
+
+ def enable_promisc(self):
+ utils.system('ifconfig %s promisc' % self._name)
+
+
+ def disable_promisc(self):
+ utils.system('ifconfig %s -promisc' % self._name)
+
+
+ def get_hwaddr(self):
+ f = open('/sys/class/net/%s/address' % self._name)
+ hwaddr = f.read().strip()
+ f.close()
+ return hwaddr
+
+
+ def set_hwaddr(self, hwaddr):
+ utils.system('ifconfig %s hw ether %s' % (self._name, hwaddr))
+
+
+ def add_maddr(self, maddr):
+ utils.system('ip maddr add %s dev %s' % (maddr, self._name))
+
+
+ def del_maddr(self, maddr):
+ utils.system('ip maddr del %s dev %s' % (maddr, self._name))
+
+
+ def get_ipaddr(self):
+ ipaddr = "0.0.0.0"
+ output = utils.system_output('ifconfig %s' % self._name)
+ if output:
+ match = re.search("inet addr:([\d\.]+)", output)
+ if match:
+ ipaddr = match.group(1)
+ return ipaddr
+
+
+ def set_ipaddr(self, ipaddr):
+ utils.system('ifconfig %s %s' % (self._name, ipaddr))
+
+
+ def is_down(self):
+ output = utils.system_output('ifconfig %s' % self._name)
+ if output:
+ return 'UP' not in output
+ return False
+
+ def up(self):
+ utils.system('ifconfig %s up' % self._name)
+
+
+ def down(self):
+ utils.system('ifconfig %s down' % self._name)
+
+
+ def wait_for_carrier(self, timeout=60):
+ while timeout and self.get_carrier() != '1':
+ timeout -= 1
+ time.sleep(1)
+ if timeout == 0:
+ raise error.TestError('Timed out waiting for carrier.')
+
+
+ def send(self, buf):
+ self._socket.send(buf)
+
+
+ def recv(self, len):
+ return self._socket.recv(len)
+
+
+ def flush(self):
+ self._socket.close()
+ self._socket = socket.socket(socket.PF_PACKET, socket.SOCK_RAW)
+ self._socket.settimeout(TIMEOUT)
+ self._socket.bind((self._name, raw_socket.ETH_P_ALL))
+
+
+def netif(name):
+ try:
+ from autotest_lib.client.bin.net import site_net_utils
+ return site_net_utils.network_interface(name)
+ except:
+ return network_interface(name)
+
+
+class bonding(object):
+ """This class implements bonding interface abstraction."""
+
+ NO_MODE = 0
+ AB_MODE = 1
+ AD_MODE = 2
+
+ def is_enabled(self):
+ raise error.TestError('Undefined')
+
+
+ def is_bondable(self):
+ raise error.TestError('Undefined')
+
+
+ def enable(self):
+ raise error.TestError('Undefined')
+
+
+ def disable(self):
+ raise error.TestError('Undefined')
+
+
+ def get_mii_status(self):
+ return {}
+
+
+ def get_mode(self):
+ return bonding.NO_MODE
+
+
+ def wait_for_state_change(self):
+ """Wait for bonding state change.
+
+ Wait up to 90 seconds to successfully ping the gateway.
+ This is to know when LACP state change has converged.
+ (0 seconds is 3x lacp timeout, use by protocol)
+ """
+
+ netif('eth0').wait_for_carrier(timeout=60)
+ wait_time = 0
+ while wait_time < 100:
+ time.sleep(10)
+ if not autotest_utils.ping_default_gateway():
+ return True
+ wait_time += 10
+ return False
+
+
+ def get_active_interfaces(self):
+ return []
+
+
+ def get_slave_interfaces(self):
+ return []
+
+
+def bond():
+ try:
+ from autotest_lib.client.bin.net import site_net_utils
+ return site_net_utils.bonding()
+ except:
+ return bonding()
+
+
+class raw_socket(object):
+ """This class implements an raw socket abstraction."""
+ ETH_P_ALL = 0x0003 # Use for binding a RAW Socket to all protocols
+ SOCKET_TIMEOUT = 1
+ def __init__(self, iface_name):
+ """Initialize an interface for use.
+
+ Args:
+ iface_name: 'eth0' interface name ('eth0, eth1,...')
+ """
+ self._name = iface_name
+ self._socket = None
+ self._socket_timeout = raw_socket.SOCKET_TIMEOUT
+ socket.setdefaulttimeout(self._socket_timeout)
+ if self._name == None:
+ raise error.TestError('Invalid interface name')
+
+
+ def socket(self):
+ return self._socket
+
+
+ def socket_timeout(self):
+ """Get the timeout use by recv_from"""
+ return self._socket_timeout
+
+
+ def set_socket_timeout(self, timeout):
+ """Set the timeout use by recv_from.
+
+ Args:
+ timeout: time in seconds
+ """
+ self._socket_timeout = timeout
+
+ def open(self, protocol=None):
+ """Opens the raw socket to send and receive.
+
+ Args:
+ protocol : short in host byte order. None if ALL
+ """
+ if self._socket != None:
+ raise error.TestError('Raw socket already open')
+
+ if protocol == None:
+ self._socket = socket.socket(socket.PF_PACKET,
+ socket.SOCK_RAW)
+
+ self._socket.bind((self._name, self.ETH_P_ALL))
+ else:
+ self._socket = socket.socket(socket.PF_PACKET,
+ socket.SOCK_RAW,
+ socket.htons(protocol))
+ self._socket.bind((self._name, self.ETH_P_ALL))
+
+ self._socket.settimeout(1) # always running with 1 second timeout
+
+ def close(self):
+ """ Close the raw socket"""
+ if self._socket != None:
+ self._socket.close()
+ self._socket = None
+ else:
+ raise error.TestError('Raw socket not open')
+
+
+ def recv(self, timeout):
+ """Synchroneous receive.
+
+ Receives one packet from the interface and returns its content
+ in a string. Wait up to timeout for the packet if timeout is
+ not 0. This function filters out all the packets that are
+ less than the minimum ethernet packet size (60+crc).
+
+ Args:
+ timeout: max time in seconds to wait for the read to complete.
+ '0', wait for ever until a valid packet is received
+
+ Returns:
+ packet: None no packet was received
+ a binary string containing the received packet.
+ time_left: amount of time left in timeout
+ """
+ if self._socket == None:
+ raise error.TestError('Raw socket not open')
+
+ time_left = timeout
+ packet = None
+ while time_left or (timeout == 0):
+ try:
+ packet = self._socket.recv(ethernet.ETH_PACKET_MAX_SIZE)
+ if len(packet) >= (ethernet.ETH_PACKET_MIN_SIZE-4):
+ break
+ packet = None
+ if timeout and time_left:
+ time_left -= raw_socket.SOCKET_TIMEOUT
+ except socket.timeout:
+ packet = None
+ if timeout and time_left:
+ time_left -= raw_socket.SOCKET_TIMEOUT
+
+ return packet, time_left
+
+
+ def send(self, packet):
+ """Send an ethernet packet."""
+ if self._socket == None:
+ raise error.TestError('Raw socket not open')
+
+ self._socket.send(packet)
+
+
+ def send_to(self, dst_mac, src_mac, protocol, payload):
+ """Send an ethernet frame.
+
+ Send an ethernet frame, formating the header.
+
+ Args:
+ dst_mac: 'byte string'
+ src_mac: 'byte string'
+ protocol: short in host byte order
+ payload: 'byte string'
+ """
+ if self._socket == None:
+ raise error.TestError('Raw socket not open')
+ try:
+ packet = ethernet.pack(dst_mac, src_mac, protocol, payload)
+ except:
+ raise error.TestError('Invalid Packet')
+ self.send(packet)
+
+
+ def recv_from(self, dst_mac, src_mac, protocol):
+ """Receive an ethernet frame that matches the dst, src and proto.
+
+ Filters all received packet to find a matching one, then unpack
+ it and present it to the caller as a frame.
+
+ Waits up to self._socket_timeout for a matching frame before
+ returning.
+
+ Args:
+ dst_mac: 'byte string'. None do not use in filter.
+ src_mac: 'byte string'. None do not use in filter.
+ protocol: short in host byte order. None do not use in filter.
+
+ Returns:
+ ethernet frame: { 'dst' : byte string,
+ 'src' : byte string,
+ 'proto' : short in host byte order,
+ 'payload' : byte string
+ }
+ """
+ start_time = time.clock()
+ timeout = self._socket_timeout
+ while 1:
+ frame = None
+ packet, timeout = self.recv(timeout)
+ if packet != None:
+ frame = ethernet.unpack(packet)
+ if ((src_mac == None or frame['src'] == src_mac) and
+ (dst_mac == None or frame['dst'] == dst_mac) and
+ (protocol == None or frame['proto'] == protocol)):
+ break;
+ elif (timeout == 0 or
+ time.clock() - start_time > float(self._socket_timeout)):
+ frame = None
+ break
+ else:
+ if (timeout == 0 or
+ time.clock() - start_time > float(self._socket_timeout)):
+ frame = None
+ break
+ continue
+
+ return frame
+
+
+class ethernet(object):
+ """Provide ethernet packet manipulation methods."""
+ HDR_LEN = 14 # frame header length
+ CHECKSUM_LEN = 4 # frame checksum length
+
+ # Ethernet payload types - http://standards.ieee.org/regauth/ethertype
+ ETH_TYPE_IP = 0x0800 # IP protocol
+ ETH_TYPE_ARP = 0x0806 # address resolution protocol
+ ETH_TYPE_CDP = 0x2000 # Cisco Discovery Protocol
+ ETH_TYPE_8021Q = 0x8100 # IEEE 802.1Q VLAN tagging
+ ETH_TYPE_IP6 = 0x86DD # IPv6 protocol
+ ETH_TYPE_LOOPBACK = 0x9000 # used to test interfaces
+ ETH_TYPE_LLDP = 0x88CC # LLDP frame type
+
+ ETH_PACKET_MAX_SIZE = 1518 # maximum ethernet frane size
+ ETH_PACKET_MIN_SIZE = 64 # minimum ethernet frane size
+
+ ETH_LLDP_DST_MAC = '01:80:C2:00:00:0E' # LLDP destination mac
+
+ FRAME_KEY_DST_MAC = 'dst' # frame destination mac address
+ FRAME_KEY_SRC_MAC = 'src' # frame source mac address
+ FRAME_KEY_PROTO = 'proto' # frame protocol
+ FRAME_KEY_PAYLOAD = 'payload' # frame payload
+
+
+ def __init__(self):
+ pass;
+
+
+ @staticmethod
+ def mac_string_to_binary(hwaddr):
+ """Converts a MAC address text string to byte string.
+
+ Converts a MAC text string from a text string 'aa:aa:aa:aa:aa:aa'
+ to a byte string 'xxxxxxxxxxxx'
+
+ Args:
+ hwaddr: a text string containing the MAC address to convert.
+
+ Returns:
+ A byte string.
+ """
+ val = ''.join([chr(b) for b in [int(c, 16) \
+ for c in hwaddr.split(':',6)]])
+ return val
+
+
+ @staticmethod
+ def mac_binary_to_string(hwaddr):
+ """Converts a MAC address byte string to text string.
+
+ Converts a MAC byte string 'xxxxxxxxxxxx' to a text string
+ 'aa:aa:aa:aa:aa:aa'
+
+ Args:
+ hwaddr: a byte string containing the MAC address to convert.
+
+ Returns:
+ A text string.
+ """
+ return "%02x:%02x:%02x:%02x:%02x:%02x" % tuple(map(ord,hwaddr))
+
+
+ @staticmethod
+ def pack(dst, src, protocol, payload):
+ """Pack a frame in a byte string.
+
+ Args:
+ dst: destination mac in byte string format
+ src: src mac address in byte string format
+ protocol: short in network byte order
+ payload: byte string payload data
+
+ Returns:
+ An ethernet frame with header and payload in a byte string.
+ """
+ # numbers are converted to network byte order (!)
+ frame = struct.pack("!6s6sH", dst, src, protocol) + payload
+ return frame
+
+
+ @staticmethod
+ def unpack(raw_frame):
+ """Unpack a raw ethernet frame.
+
+ Returns:
+ None on error
+ { 'dst' : byte string,
+ 'src' : byte string,
+ 'proto' : short in host byte order,
+ 'payload' : byte string
+ }
+ """
+ packet_len = len(raw_frame)
+ if packet_len < ethernet.HDR_LEN:
+ return None
+
+ payload_len = packet_len - ethernet.HDR_LEN
+ frame = {}
+ frame[ethernet.FRAME_KEY_DST_MAC], \
+ frame[ethernet.FRAME_KEY_SRC_MAC], \
+ frame[ethernet.FRAME_KEY_PROTO] = \
+ struct.unpack("!6s6sH", raw_frame[:ethernet.HDR_LEN])
+ frame[ethernet.FRAME_KEY_PAYLOAD] = \
+ raw_frame[ethernet.HDR_LEN:ethernet.HDR_LEN+payload_len]
+ return frame
+
+
+def ethernet_packet():
+ try:
+ from autotest_lib.client.bin.net import site_net_utils
+ return site_net_utils.ethernet()
+ except:
+ return ethernet()
diff --git a/client/bin/net/net_utils_mock.py b/client/bin/net/net_utils_mock.py
new file mode 100755
index 0000000..0d8fba7
--- /dev/null
+++ b/client/bin/net/net_utils_mock.py
@@ -0,0 +1,125 @@
+"""Set of Mocks and stubs for network utilities unit tests.
+
+Implement a set of mocks and stubs use to implement unit tests
+for the network libraries.
+"""
+
+import socket
+from autotest_lib.client.common_lib.test_utils import mock
+from autotest_lib.client.bin.net import net_utils
+
+
+def os_open(*args, **kwarg):
+ return os_stub('open')
+
+
+class os_stub(mock.mock_function):
+ def __init__(self, symbol, **kwargs):
+ mock.mock_function.__init__(self, symbol, *kwargs)
+
+ readval = ""
+ def open(self, *args, **kwargs):
+ return self
+
+ def read(self, *args, **kwargs):
+ return os_stub.readval
+
+
+def netutils_netif(iface):
+ return netif_stub(iface, 'net_utils', net_utils.netif)
+
+
+class netif_stub(mock.mock_class):
+ def __init__(self, iface, cls, name, *args, **kwargs):
+ mock.mock_class.__init__(self, cls, name, args, *kwargs)
+
+
+ def wait_for_carrier(self, timeout):
+ return
+
+
+class socket_stub(mock.mock_class):
+ """Class use to mock sockets."""
+ def __init__(self, iface, cls, name, *args, **kwargs):
+ mock.mock_class.__init__(self, cls, name, args, *kwargs)
+ self.recv_val = ''
+ self.throw_timeout = False
+ self.send_val = None
+ self.timeout = None
+ self.family = None
+ self.type = None
+
+
+ def close(self):
+ pass
+
+
+ def socket(self, family, type):
+ self.family = family
+ self.type = type
+
+
+ def settimeout(self, timeout):
+ self.timeout = timeout
+ return
+
+
+ def send(self, buf):
+ self.send_val = buf
+
+
+ def recv(self, size):
+ if self.throw_timeout:
+ raise socket.timeout
+
+ if len(self.recv_val) > size:
+ return self.recv_val[:size]
+ return self.recv_val
+
+
+ def bind(self, arg):
+ pass
+
+
+class network_interface_mock(net_utils.network_interface):
+ def __init__(self, iface='some_name', test_init=False):
+ self._test_init = test_init # test network_interface __init__()
+ if self._test_init:
+ super(network_interface_mock, self).__init__(iface)
+ return
+
+ self.ethtool = '/mock/ethtool'
+ self._name = iface
+ self.was_down = False
+ self.orig_ipaddr = '1.2.3.4'
+ self.was_loopback_enabled = False
+ self._socket = socket_stub(iface, socket, socket)
+
+ self.loopback_enabled = False
+ self.driver = 'mock_driver'
+
+
+ def is_down(self):
+ if self._test_init:
+ return 'is_down'
+ return super(network_interface_mock, self).is_down()
+
+
+ def get_ipaddr(self):
+ if self._test_init:
+ return 'get_ipaddr'
+ return super(network_interface_mock, self).get_ipaddr()
+
+
+ def is_loopback_enabled(self):
+ if self._test_init:
+ return 'is_loopback_enabled'
+ return self.loopback_enabled
+
+
+ def get_driver(self):
+ return self.driver
+
+
+ def wait_for_carrier(self, timeout=1):
+ return
diff --git a/client/bin/net/net_utils_unittest.py b/client/bin/net/net_utils_unittest.py
new file mode 100755
index 0000000..453abd3
--- /dev/null
+++ b/client/bin/net/net_utils_unittest.py
@@ -0,0 +1,1345 @@
+#!/usr/bin/python
+import unittest, os, socket, time, sys, struct
+import common
+from autotest_lib.client.bin import autotest_utils
+from autotest_lib.client.bin.net import net_utils, net_utils_mock
+from autotest_lib.client.common_lib.test_utils import mock
+from autotest_lib.client.common_lib import utils, error
+
+
+class TestNetUtils(unittest.TestCase):
+ class network_interface_mock(net_utils_mock.network_interface_mock):
+ def __init__(self, iface='some_name', test_init=False):
+ super(TestNetUtils.network_interface_mock,
+ self).__init__(iface=iface, test_init=test_init)
+
+
+ def setUp(self):
+ self.god = mock.mock_god()
+ self.god.stub_function(utils, "system")
+ self.god.stub_function(utils, "system_output")
+ self.god.stub_function(autotest_utils, "module_is_loaded")
+ self.god.stub_function(net_utils, "open")
+ self.god.stub_function(time, 'sleep')
+
+ self.god.stub_with(net_utils,"bond", net_utils.bonding)
+ self.god.stub_with(os, 'open', net_utils_mock.os_open)
+ self.god.stub_with(net_utils, 'netif', net_utils_mock.netutils_netif)
+
+ os.environ['AUTODIR'] = "autodir"
+
+
+ def tearDown(self):
+ self.god.unstub_all()
+ del os.environ['AUTODIR']
+
+
+ #
+ # test network_util
+ #
+ def test_network_util_reset(self):
+ utils.system.expect_call('service network restart', ignore_status=False)
+ net_utils.network_utils().reset()
+ self.god.check_playback()
+
+
+ def test_network_util_start(self):
+ utils.system.expect_call('service network start', ignore_status=False)
+
+ net_utils.network_utils().start()
+ self.god.check_playback()
+
+
+ def test_network_util_stop(self):
+ utils.system.expect_call('service network stop', ignore_status=False)
+
+ net_utils.network_utils().stop()
+ self.god.check_playback()
+
+
+ def test_network_util_disable_ip_local_loopback(self):
+ msg = "echo '1' > /proc/sys/net/ipv4/route/no_local_loopback"
+ utils.system.expect_call(msg, ignore_status=False)
+ msg = 'echo 1 > /proc/sys/net/ipv4/route/flush'
+ utils.system.expect_call(msg, ignore_status=False)
+
+ net_utils.network_utils().disable_ip_local_loopback()
+ self.god.check_playback()
+
+
+ def test_network_util_enable_ip_local_loopback(self):
+ msg = "echo '0' > /proc/sys/net/ipv4/route/no_local_loopback"
+ utils.system.expect_call(msg, ignore_status=False)
+ msg = 'echo 1 > /proc/sys/net/ipv4/route/flush'
+ utils.system.expect_call(msg, ignore_status=False)
+
+ net_utils.network_utils().enable_ip_local_loopback()
+ self.god.check_playback()
+
+
+ #
+ # test network_interface
+ #
+ def test_network_interface_init(self):
+ self.god.stub_function(socket, 'socket')
+ s = net_utils_mock.socket_stub('eth0', socket, socket)
+ socket.socket.expect_call(socket.PF_PACKET,
+ socket.SOCK_RAW).and_return(s)
+ self.god.stub_function(s, 'bind')
+ self.god.stub_function(s, 'settimeout')
+ s.settimeout.expect_call(net_utils.TIMEOUT)
+ s.bind.expect_call(('eth0', net_utils.raw_socket.ETH_P_ALL))
+ mock_netif = self.network_interface_mock(iface='eth0', test_init=True)
+ self.god.check_playback()
+ self.assertEquals(mock_netif.ethtool, 'ethtool')
+ self.assertEquals(mock_netif._name, 'eth0')
+ self.assertEquals(mock_netif.was_down, 'is_down')
+ self.assertEquals(mock_netif.orig_ipaddr, 'get_ipaddr')
+ self.assertEquals(mock_netif.was_loopback_enabled,
+ 'is_loopback_enabled')
+ self.assertEquals(mock_netif._socket, s)
+
+
+ def test_network_interface_restore(self):
+ mock_netif = self.network_interface_mock('eth0')
+
+ mock_netif.was_loopback_enabled = False
+ mock_netif.loopback_enabled = True
+ mock_netif.was_down = False
+
+ self.god.stub_function(net_utils.bonding, 'is_enabled')
+
+ # restore using phyint
+ cmd = 'ifconfig %s %s' % (mock_netif._name, mock_netif.orig_ipaddr)
+ utils.system.expect_call(cmd)
+
+ net_utils.bonding.is_enabled.expect_call().and_return(False)
+
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'phyint', 'disable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(0)
+ mock_netif.restore()
+ self.god.check_playback()
+
+
+ # restore using mac
+ cmd = 'ifconfig %s %s' % (mock_netif._name, mock_netif.orig_ipaddr)
+ utils.system.expect_call(cmd)
+
+ net_utils.bonding.is_enabled.expect_call().and_return(False)
+
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'phyint', 'disable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(1)
+
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool,
+ mock_netif._name, 'mac', 'disable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(0)
+ mock_netif.restore()
+ self.god.check_playback()
+
+ # check that down is restored
+ mock_netif.was_loopback_enabled = False
+ mock_netif.loopback_enabled = True
+ mock_netif.was_down = True
+
+ cmd = 'ifconfig %s %s' % (mock_netif._name, mock_netif.orig_ipaddr)
+ utils.system.expect_call(cmd)
+
+ net_utils.bonding.is_enabled.expect_call().and_return(False)
+
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'phyint', 'disable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(0)
+
+ cmd = 'ifconfig %s down' % mock_netif._name
+ utils.system.expect_call(cmd)
+
+ mock_netif.restore()
+ self.god.check_playback()
+
+ # check that loopback, down are done in sequence
+ mock_netif.was_loopback_enabled = True
+ mock_netif.loopback_enabled = True
+ mock_netif.was_down = True
+
+ cmd = 'ifconfig %s %s' % (mock_netif._name,
+ mock_netif.orig_ipaddr)
+
+ utils.system.expect_call(cmd)
+ cmd = 'ifconfig %s down' % mock_netif._name
+ utils.system.expect_call(cmd)
+
+ mock_netif.restore()
+ self.god.check_playback()
+
+ # prior loopback matches current loopback
+ mock_netif.was_loopback_enabled = False
+ mock_netif.loopback_enabled = False
+ mock_netif.was_down = True
+
+ cmd = 'ifconfig %s %s' % (mock_netif._name,
+ mock_netif.orig_ipaddr)
+ utils.system.expect_call(cmd)
+ cmd = 'ifconfig %s down' % mock_netif._name
+ utils.system.expect_call(cmd)
+
+ mock_netif.restore()
+ self.god.check_playback()
+
+
+ def test_network_interface_get_name(self):
+ mock_netif = self.network_interface_mock(iface='eth0')
+ self.assertEquals(mock_netif.get_name(), 'eth0')
+
+
+ def test_network_interface_parse_ethtool(self):
+ mock_netif = self.network_interface_mock()
+ cmd = '%s %s %s' % (mock_netif.ethtool, '', mock_netif._name)
+
+ utils.system_output.expect_call(cmd).and_return('\n field: match')
+ self.assertEquals(mock_netif.parse_ethtool('field', 'some|match'),
+ 'match')
+
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return(None)
+ self.assertEquals(mock_netif.parse_ethtool('field',
+ 'some|match'), '')
+
+ utils.system_output.expect_call(cmd).and_return(' field: match')
+ self.assertEquals(mock_netif.parse_ethtool('field',
+ 'some|match'), '')
+ self.god.check_playback()
+
+
+ def test_network_interface_get_stats(self):
+ mock_netif = self.network_interface_mock()
+ self.god.stub_function(os, 'listdir')
+ stat_path = '/sys/class/net/%s/statistics/' % mock_netif._name
+
+ # no stat found
+ os.listdir.expect_call(stat_path).and_return(())
+ self.assertEquals(mock_netif.get_stats(), {})
+ self.god.check_playback()
+
+ # can not open stat file
+ os.listdir.expect_call(stat_path).and_return(('some_stat',))
+ f = self.god.create_mock_class(file, 'file')
+ net_utils.open.expect_call(stat_path + 'some_stat', 'r').and_return(None)
+ self.assertEquals(mock_netif.get_stats(), {})
+ self.god.check_playback()
+
+ # found a single stat
+ os.listdir.expect_call(stat_path).and_return(('some_stat',))
+ f = self.god.create_mock_class(file, 'file')
+ net_utils.open.expect_call(stat_path + 'some_stat', 'r').and_return(f)
+ f.read.expect_call().and_return(1234)
+ f.close.expect_call()
+ self.assertEquals(mock_netif.get_stats(), {'some_stat':1234})
+ self.god.check_playback()
+
+ # found multiple stats
+ os.listdir.expect_call(stat_path).and_return(('stat1','stat2'))
+ f = self.god.create_mock_class(file, 'file')
+ net_utils.open.expect_call(stat_path + 'stat1', 'r').and_return(f)
+ f.read.expect_call().and_return(1234)
+ f.close.expect_call()
+ net_utils.open.expect_call(stat_path + 'stat2', 'r').and_return(f)
+ f.read.expect_call().and_return(5678)
+ f.close.expect_call()
+
+ self.assertEquals(mock_netif.get_stats(), {'stat1':1234, 'stat2':5678})
+ self.god.check_playback()
+
+
+ def test_network_interface_get_stats_diff(self):
+ mock_netif = self.network_interface_mock()
+ self.god.stub_function(os, 'listdir')
+ stat_path = '/sys/class/net/%s/statistics/' % mock_netif._name
+
+ os.listdir.expect_call(stat_path).and_return(('stat1','stat2', 'stat4'))
+ f = self.god.create_mock_class(file, 'file')
+ net_utils.open.expect_call(stat_path + 'stat1', 'r').and_return(f)
+ f.read.expect_call().and_return(1234)
+ f.close.expect_call()
+ net_utils.open.expect_call(stat_path + 'stat2', 'r').and_return(f)
+ f.read.expect_call().and_return(0)
+ f.close.expect_call()
+ net_utils.open.expect_call(stat_path + 'stat4', 'r').and_return(f)
+ f.read.expect_call().and_return(10)
+ f.close.expect_call()
+ self.assertEquals(mock_netif.get_stats_diff({'stat1':1, 'stat2':2,
+ 'stat3':0}),
+ {'stat1':1233, 'stat2':-2, 'stat4':10})
+ self.god.check_playback()
+
+
+ def test_network_interface_get_driver(self):
+ mock_netif = self.network_interface_mock()
+ mock_netif.get_driver = net_utils.network_interface.get_driver
+ self.god.stub_function(os, 'readlink')
+ stat_path = '/sys/class/net/%s/device/driver' % mock_netif._name
+ os.readlink.expect_call(stat_path).and_return((
+ stat_path+'/driver_name'))
+ self.assertEquals(mock_netif.get_driver(mock_netif), 'driver_name')
+ self.god.check_playback()
+
+
+ def test_network_interface_get_carrier(self):
+ mock_netif = self.network_interface_mock()
+ self.god.stub_function(os, 'readlink')
+ stat_path = '/sys/class/net/%s/carrier' % mock_netif._name
+ f = self.god.create_mock_class(file, 'file')
+ net_utils.open.expect_call(stat_path).and_return(f)
+ f.read.expect_call().and_return(' 1 ')
+ f.close.expect_call()
+ self.assertEquals(mock_netif.get_carrier(), '1')
+ self.god.check_playback()
+
+ net_utils.open.expect_call(stat_path).and_return(f)
+ f.read.expect_call().and_return(' 0 ')
+ f.close.expect_call()
+ self.assertEquals(mock_netif.get_carrier(), '0')
+ self.god.check_playback()
+
+ net_utils.open.expect_call(stat_path).and_return(f)
+ f.read.expect_call().and_return('')
+ f.close.expect_call()
+ self.assertEquals(mock_netif.get_carrier(), '')
+ self.god.check_playback()
+
+ net_utils.open.expect_call(stat_path).and_return(None)
+ self.assertEquals(mock_netif.get_carrier(), '')
+ self.god.check_playback()
+
+
+ def test_network_interface_is_autoneg_advertised(self):
+ mock_netif = self.network_interface_mock()
+ cmd = '%s %s %s' % (mock_netif.ethtool, '', mock_netif._name)
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n Advertised auto-negotiation: Yes')
+ self.assertEquals(mock_netif.is_autoneg_advertised(), True)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n Advertised auto-negotiation: No')
+ self.assertEquals(mock_netif.is_autoneg_advertised(), False)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return('')
+ self.assertEquals(mock_netif.is_autoneg_advertised(), False)
+ self.god.check_playback()
+
+
+ def test_network_interface_get_speed(self):
+ mock_netif = self.network_interface_mock()
+ cmd = '%s %s %s' % (mock_netif.ethtool, '', mock_netif._name)
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n Speed: 1000')
+ self.assertEquals(mock_netif.get_speed(), 1000)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n Speed: 10000')
+ self.assertEquals(mock_netif.get_speed(), 10000)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return('')
+
+ try:
+ mock_netif.get_speed()
+ except ValueError:
+ pass
+ else:
+ self.assertEquals(0,1)
+ self.god.check_playback()
+
+
+ def test_network_interface_is_full_duplex(self):
+ mock_netif = self.network_interface_mock()
+ cmd = '%s %s %s' % (mock_netif.ethtool, '', mock_netif._name)
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n Duplex: Full')
+ self.assertEquals(mock_netif.is_full_duplex(), True)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n Duplex: Half')
+ self.assertEquals(mock_netif.is_full_duplex(), False)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return('')
+ self.assertEquals(mock_netif.is_full_duplex(), False)
+ self.god.check_playback()
+
+
+ def test_network_interface_is_autoneg_on(self):
+ mock_netif = self.network_interface_mock()
+ cmd = '%s %s %s' % (mock_netif.ethtool, '', mock_netif._name)
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n Auto-negotiation: on')
+ self.assertEquals(mock_netif.is_autoneg_on(), True)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n Auto-negotiation: off')
+ self.assertEquals(mock_netif.is_autoneg_on(), False)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return('')
+ self.assertEquals(mock_netif.is_autoneg_on(), False)
+ self.god.check_playback()
+
+
+ def test_network_interface_get_wakeon(self):
+ mock_netif = self.network_interface_mock()
+ cmd = '%s %s %s' % (mock_netif.ethtool, '', mock_netif._name)
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n Wake-on: g')
+ self.assertEquals(mock_netif.get_wakeon(), 'g')
+ self.god.check_playback()
+
+
+ def test_network_interface_is_rx_summing_on(self):
+ mock_netif = self.network_interface_mock()
+ cmd = '%s %s %s' % (mock_netif.ethtool, '-k', mock_netif._name)
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n rx-checksumming: on')
+ self.assertEquals(mock_netif.is_rx_summing_on(), True)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n rx-checksumming: off')
+ self.assertEquals(mock_netif.is_rx_summing_on(), False)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return('')
+ self.assertEquals(mock_netif.is_rx_summing_on(), False)
+ self.god.check_playback()
+
+
+ def test_network_interface_is_tx_summing_on(self):
+ mock_netif = self.network_interface_mock()
+ cmd = '%s %s %s' % (mock_netif.ethtool, '-k', mock_netif._name)
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n tx-checksumming: on')
+ self.assertEquals(mock_netif.is_tx_summing_on(), True)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n tx-checksumming: off')
+ self.assertEquals(mock_netif.is_tx_summing_on(), False)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return('')
+ self.assertEquals(mock_netif.is_tx_summing_on(), False)
+ self.god.check_playback()
+
+
+ def test_network_interface_is_scatter_gather_on(self):
+ mock_netif = self.network_interface_mock()
+ cmd = '%s %s %s' % (mock_netif.ethtool, '-k', mock_netif._name)
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n scatter-gather: on')
+ self.assertEquals(mock_netif.is_scatter_gather_on(), True)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n scatter-gather: off')
+ self.assertEquals(mock_netif.is_scatter_gather_on(), False)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return('')
+ self.assertEquals(mock_netif.is_scatter_gather_on(), False)
+ self.god.check_playback()
+
+
+ def test_network_interface_is_tso_on(self):
+ mock_netif = self.network_interface_mock()
+ cmd = '%s %s %s' % (mock_netif.ethtool, '-k', mock_netif._name)
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n tcp segmentation offload: on')
+ self.assertEquals(mock_netif.is_tso_on(), True)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n tcp segmentation offload: off')
+ self.assertEquals(mock_netif.is_tso_on(), False)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return('')
+ self.assertEquals(mock_netif.is_tso_on(), False)
+ self.god.check_playback()
+
+
+ def test_network_interface_is_pause_autoneg_on(self):
+ mock_netif = self.network_interface_mock()
+ cmd = '%s %s %s' % (mock_netif.ethtool, '-a', mock_netif._name)
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n Autonegotiate: on')
+ self.assertEquals(mock_netif.is_pause_autoneg_on(), True)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n Autonegotiate: off')
+ self.assertEquals(mock_netif.is_pause_autoneg_on(), False)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return('')
+ self.assertEquals(mock_netif.is_pause_autoneg_on(), False)
+ self.god.check_playback()
+
+
+ def test_network_interface_is_tx_pause_on(self):
+ mock_netif = self.network_interface_mock()
+ cmd = '%s %s %s' % (mock_netif.ethtool, '-a', mock_netif._name)
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n TX: on')
+ self.assertEquals(mock_netif.is_tx_pause_on(), True)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n TX: off')
+ self.assertEquals(mock_netif.is_tx_pause_on(), False)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return('')
+ self.assertEquals(mock_netif.is_tx_pause_on(), False)
+ self.god.check_playback()
+
+
+ def test_network_interface_is_rx_pause_on(self):
+ mock_netif = self.network_interface_mock()
+ cmd = '%s %s %s' % (mock_netif.ethtool, '-a', mock_netif._name)
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n RX: on')
+ self.assertEquals(mock_netif.is_rx_pause_on(), True)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return(
+ '\n RX: off')
+ self.assertEquals(mock_netif.is_rx_pause_on(), False)
+ self.god.check_playback()
+
+ utils.system_output.expect_call(cmd).and_return('')
+ self.assertEquals(mock_netif.is_rx_pause_on(), False)
+ self.god.check_playback()
+
+
+ def test_network_interface_enable_loopback(self):
+ mock_netif = self.network_interface_mock('eth0')
+
+ mock_netif.was_loopback_enabled = False
+ mock_netif.loopback_enabled = False
+ mock_netif.was_down = False
+
+ self.god.stub_function(net_utils.bonding, 'is_enabled')
+
+ # restore using phyint
+ net_utils.bonding.is_enabled.expect_call().and_return(False)
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'phyint', 'enable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(0)
+ mock_netif.enable_loopback()
+ self.god.check_playback()
+
+ # restore using mac
+ net_utils.bonding.is_enabled.expect_call().and_return(False)
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'phyint', 'enable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(1)
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'mac', 'enable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(0)
+ mock_netif.enable_loopback()
+ self.god.check_playback()
+
+ # catch exception on phyint and mac failures
+ net_utils.bonding.is_enabled.expect_call().and_return(False)
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'phyint', 'enable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(1)
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'mac', 'enable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(1)
+ try:
+ mock_netif.enable_loopback()
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(0,1)
+ self.god.check_playback()
+
+ # catch exception on bond enabled
+ net_utils.bonding.is_enabled.expect_call().and_return(True)
+ try:
+ mock_netif.enable_loopback()
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(0,1)
+ self.god.check_playback()
+
+ # check that setting tg3 and bnx2x driver have a sleep call
+ mock_netif.driver = 'tg3'
+ net_utils.bonding.is_enabled.expect_call().and_return(False)
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'phyint', 'enable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(0)
+ time.sleep.expect_call(1)
+ mock_netif.enable_loopback()
+ self.god.check_playback()
+
+ mock_netif.driver = 'bnx2x'
+ net_utils.bonding.is_enabled.expect_call().and_return(False)
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'phyint', 'enable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(0)
+ time.sleep.expect_call(1)
+ mock_netif.enable_loopback()
+ self.god.check_playback()
+
+
+ def test_network_interface_disable_loopback(self):
+ mock_netif = self.network_interface_mock('eth0')
+
+ mock_netif.was_loopback_enabled = False
+ mock_netif.loopback_enabled = True
+ mock_netif.was_down = False
+
+ self.god.stub_function(net_utils.bonding, 'is_enabled')
+
+ # restore using phyint
+ net_utils.bonding.is_enabled.expect_call().and_return(False)
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'phyint', 'disable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(0)
+ mock_netif.disable_loopback()
+ self.god.check_playback()
+
+ # restore using mac
+ net_utils.bonding.is_enabled.expect_call().and_return(False)
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'phyint', 'disable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(1)
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'mac', 'disable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(0)
+ mock_netif.disable_loopback()
+ self.god.check_playback()
+
+ # catch exception on bonding enabled
+ net_utils.bonding.is_enabled.expect_call().and_return(True)
+ try:
+ mock_netif.disable_loopback()
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(0,1)
+ self.god.check_playback()
+
+ # catch exception on phyint and mac failures
+ net_utils.bonding.is_enabled.expect_call().and_return(False)
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'phyint', 'disable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(1)
+ cmd = '%s -L %s %s %s' % (mock_netif.ethtool, mock_netif._name,
+ 'mac', 'disable')
+ utils.system.expect_call(cmd, ignore_status=True).and_return(1)
+ try:
+ mock_netif.disable_loopback()
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(0,1)
+ self.god.check_playback()
+
+
+ def test_network_interface_is_loopback_enabled(self):
+ mock_netif = self.network_interface_mock('eth0')
+ mock_netif.is_loopback_enabled = \
+ net_utils.network_interface.is_loopback_enabled
+ try:
+ mock_netif.is_loopback_enabled(mock_netif)
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(0,1)
+ self.god.check_playback()
+
+ self.god.stub_function(net_utils.bonding, 'is_enabled')
+ mock_netif._name = 'eth0'
+ net_utils.bonding.is_enabled.expect_call().and_return(False)
+ cmd = '%s -l %s' % (mock_netif.ethtool, mock_netif._name)
+ utils.system_output.expect_call(cmd).and_return('')
+ self.assertEquals(mock_netif.is_loopback_enabled(mock_netif), False)
+ self.god.check_playback()
+
+ for ifname in ('eth0', 'eth1', 'eth2', 'eth3', 'eth4'):
+ mock_netif._name = ifname
+ for bond_enable in (True, False):
+ for state in (('disabled', 'disabled', 'enabled'),
+ ('disabled', 'enabled', 'disabled'),
+ ('enabled', 'disabled', 'disabled'),
+ ('disabled', 'disabled', 'disabled')):
+ net_utils.bonding.is_enabled.expect_call().and_return(
+ bond_enable)
+ if bond_enable:
+ self.assertEquals(mock_netif.is_loopback_enabled(
+ mock_netif), False)
+ else:
+ cmd = '%s -l %s' % (mock_netif.ethtool, mock_netif._name)
+ out = 'MAC loopback is %s\n'\
+ 'PHY internal loopback is %s\n'\
+ 'PHY external loopback is %s' % (
+ state[0], state[1], state[2])
+ utils.system_output.expect_call(cmd).and_return(out)
+ self.assertEquals(mock_netif.is_loopback_enabled(
+ mock_netif), 'enabled' in state)
+ self.god.check_playback()
+
+
+ def test_network_interface_enable_promisc(self):
+ mock_netif = self.network_interface_mock('eth0')
+ cmd = 'ifconfig %s promisc' % mock_netif._name
+ utils.system.expect_call(cmd)
+ mock_netif.enable_promisc()
+ self.god.check_playback()
+
+
+ def test_network_interface_disable_promisc(self):
+ mock_netif = self.network_interface_mock()
+ cmd = 'ifconfig %s -promisc' % mock_netif._name
+ utils.system.expect_call(cmd)
+ mock_netif.disable_promisc()
+ self.god.check_playback()
+
+
+ def test_network_interface_get_hwaddr(self):
+ mock_netif = self.network_interface_mock()
+ f = self.god.create_mock_class(file, 'file')
+ net_utils.open.expect_call('/sys/class/net/%s/address'
+ % mock_netif._name).and_return(f)
+ hw_addr = '00:0e:0c:c3:7d:a8'
+ f.read.expect_call().and_return(' ' + hw_addr + ' ')
+ f.close.expect_call()
+ self.assertEquals(mock_netif.get_hwaddr(), hw_addr)
+ self.god.check_playback()
+
+
+ def test_network_interface_set_hwaddr(self):
+ mock_netif = self.network_interface_mock()
+ hw_addr = '00:0e:0c:c3:7d:a8'
+ cmd = 'ifconfig %s hw ether %s' % (mock_netif._name,
+ hw_addr)
+ utils.system.expect_call(cmd)
+ mock_netif.set_hwaddr(hw_addr)
+ self.god.check_playback()
+
+
+ def test_network_interface_add_maddr(self):
+ mock_netif = self.network_interface_mock()
+ maddr = '01:00:5e:00:00:01'
+ cmd = 'ip maddr add %s dev %s' % (maddr, mock_netif._name)
+ utils.system.expect_call(cmd)
+ mock_netif.add_maddr(maddr)
+ self.god.check_playback()
+
+
+ def test_network_interface_del_maddr(self):
+ mock_netif = self.network_interface_mock()
+ maddr = '01:00:5e:00:00:01'
+ cmd = 'ip maddr del %s dev %s' % (maddr, mock_netif._name)
+ utils.system.expect_call(cmd)
+ mock_netif.del_maddr(maddr)
+ self.god.check_playback()
+
+
+ def test_network_interface_get_ipaddr(self):
+ mock_netif = self.network_interface_mock()
+ ip_addr = '110.211.112.213'
+ out_format = \
+ 'eth0 Link encap:Ethernet HWaddr 00:0E:0C:C3:7D:A8\n'\
+ ' inet addr:%s Bcast:10.246.90.255'\
+ ' Mask:255.255.255.0\n'\
+ ' UP BROADCAST RUNNING MASTER MULTICAST MTU:1500'\
+ ' Metric:1\n'\
+ ' RX packets:463070 errors:0 dropped:0 overruns:0'\
+ ' frame:0\n'\
+ ' TX packets:32548 errors:0 dropped:0 overruns:0'\
+ ' carrier:0\n'\
+ ' collisions:0 txqueuelen:0'
+ out = out_format % ip_addr
+
+ cmd = 'ifconfig %s' % mock_netif._name
+ utils.system_output.expect_call(cmd).and_return(out)
+ self.assertEquals(mock_netif.get_ipaddr(), ip_addr)
+ self.god.check_playback()
+
+ cmd = 'ifconfig %s' % mock_netif._name
+ utils.system_output.expect_call(cmd).and_return('some output')
+ self.assertEquals(mock_netif.get_ipaddr(), '0.0.0.0')
+ self.god.check_playback()
+
+ cmd = 'ifconfig %s' % mock_netif._name
+ utils.system_output.expect_call(cmd).and_return(None)
+ self.assertEquals(mock_netif.get_ipaddr(), '0.0.0.0')
+ self.god.check_playback()
+
+ ip_addr = '1.2.3.4'
+ out = out_format % ip_addr
+ cmd = 'ifconfig %s' % mock_netif._name
+ utils.system_output.expect_call(cmd).and_return(out)
+ self.assertEquals(mock_netif.get_ipaddr(), ip_addr)
+ self.god.check_playback()
+
+
+ def test_network_interface_set_ipaddr(self):
+ mock_netif = self.network_interface_mock()
+ ip_addr = '1.2.3.4'
+ cmd = 'ifconfig %s %s' % (mock_netif._name, ip_addr)
+ utils.system.expect_call(cmd)
+ mock_netif.set_ipaddr(ip_addr)
+ self.god.check_playback()
+
+
+ def test_network_interface_is_down(self):
+ mock_netif = self.network_interface_mock()
+ out_format = \
+ 'eth0 Link encap:Ethernet HWaddr 00:0E:0C:C3:7D:A8\n'\
+ ' inet addr:1.2.3.4 Bcast:10.246.90.255'\
+ ' Mask:255.255.255.0\n'\
+ ' %s BROADCAST RUNNING MASTER MULTICAST MTU:1500'\
+ ' Metric:1\n'\
+ ' RX packets:463070 errors:0 dropped:0 overruns:0'\
+ ' frame:0\n'\
+ ' TX packets:32548 errors:0 dropped:0 overruns:0'\
+ ' carrier:0\n'\
+ ' collisions:0 txqueuelen:0'
+ for state in ('UP', 'DOWN', 'NONE', ''):
+ out = out_format % state
+ cmd = 'ifconfig %s' % mock_netif._name
+ utils.system_output.expect_call(cmd).and_return(out)
+ self.assertEquals(mock_netif.is_down(), state != 'UP')
+ self.god.check_playback()
+
+ cmd = 'ifconfig %s' % mock_netif._name
+ utils.system_output.expect_call(cmd).and_return(None)
+ self.assertEquals(mock_netif.is_down(), False)
+ self.god.check_playback()
+
+
+ def test_network_interface_up(self):
+ mock_netif = self.network_interface_mock()
+ cmd = 'ifconfig %s up' % mock_netif._name
+ utils.system.expect_call(cmd)
+ mock_netif.up()
+ self.god.check_playback()
+
+
+ def test_network_interface_down(self):
+ mock_netif = self.network_interface_mock()
+ cmd = 'ifconfig %s down' % mock_netif._name
+ utils.system.expect_call(cmd)
+ mock_netif.down()
+ self.god.check_playback()
+
+
+ def test_network_interface_wait_for_carrier(self):
+ mock_netif = self.network_interface_mock()
+ mock_netif.wait_for_carrier = \
+ net_utils.network_interface.wait_for_carrier
+ f = self.god.create_mock_class(file, 'file')
+ spath = '/sys/class/net/%s/carrier' % mock_netif._name
+ # y = 0 - test that an exception is thrown
+ # y = 1, 100 - check that carrier is checked until timeout
+ for y in (0, 1, 100):
+ max_timeout = y
+ if y:
+ for x in xrange(max_timeout - 1):
+ net_utils.open.expect_call(spath).and_return(f)
+ f.read.expect_call().and_return(' ' + '0' + ' ')
+ f.close.expect_call()
+ time.sleep.expect_call(1)
+
+ net_utils.open.expect_call(spath).and_return(f)
+ f.read.expect_call().and_return(' ' + '1' + ' ')
+ f.close.expect_call()
+ try:
+ mock_netif.wait_for_carrier(mock_netif, max_timeout)
+ except:
+ pass
+ else:
+ if not y:
+ self.assertEquals(0, 1)
+ self.god.check_playback()
+
+
+ def test_network_interface_send(self):
+ mock_netif = self.network_interface_mock()
+ mock_netif.send('test buffer')
+ self.assertEquals(mock_netif._socket.send_val, 'test buffer')
+
+
+ def test_network_interface_recv(self):
+ mock_netif = self.network_interface_mock()
+ test_str = 'test string'
+ mock_netif._socket.recv_val = test_str
+ rcv_str = mock_netif.recv(len(test_str))
+ self.assertEquals(rcv_str, test_str)
+
+
+ def test_network_interface_flush(self):
+ mock_netif = self.network_interface_mock()
+ self.god.stub_function(mock_netif._socket, 'close')
+ mock_netif._socket.close.expect_call()
+ s = self.god.create_mock_class(socket.socket, "socket")
+ self.god.stub_function(socket, 'socket')
+ socket.socket.expect_call(socket.PF_PACKET,
+ socket.SOCK_RAW).and_return(s)
+ s.settimeout.expect_call(net_utils.TIMEOUT)
+ s.bind.expect_call(('eth0', net_utils.raw_socket.ETH_P_ALL))
+
+
+ #
+ # bonding tests
+ #
+ def test_bonding_is_enabled(self):
+ try:
+ net_utils.bond().is_enabled()
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(1, 0)
+
+
+ def test_bonding_is_bondable(self):
+ try:
+ net_utils.bond().is_enabled()
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(1, 0)
+
+
+ def test_bonding_enable(self):
+ try:
+ net_utils.bond().is_enabled()
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(1, 0)
+
+
+ def test_bonding_disable(self):
+ try:
+ net_utils.bond().is_enabled()
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(1, 0)
+
+
+ def test_bonding_get_mii_status(self):
+ self.assertEquals(net_utils.bond().get_mii_status(), {})
+
+
+ def test_get_mode_bonding(self):
+ self.assertEquals(net_utils.bond().get_mode(), net_utils.bonding.NO_MODE)
+
+
+ def test_bonding_wait_for_state_change(self):
+ self.god.stub_function(autotest_utils, "ping_default_gateway")
+
+ time.sleep.expect_call(10)
+ autotest_utils.ping_default_gateway.expect_call().and_return(False)
+ self.assertEquals(net_utils.bond().wait_for_state_change(), True)
+
+ for x in xrange(9):
+ time.sleep.expect_call(10)
+ autotest_utils.ping_default_gateway.expect_call().and_return(True)
+
+ time.sleep.expect_call(10)
+ autotest_utils.ping_default_gateway.expect_call().and_return(False)
+ self.assertEquals(net_utils.bond().wait_for_state_change(), True)
+
+ for x in xrange(10):
+ time.sleep.expect_call(10)
+ autotest_utils.ping_default_gateway.expect_call().and_return(True)
+
+ self.assertEquals(net_utils.bond().wait_for_state_change(), False)
+
+ self.god.check_playback()
+
+
+ def test_bonding_get_active_interfaces(self):
+ self.assertEquals(net_utils.bond().get_active_interfaces(), [])
+ self.god.check_playback()
+
+
+ def test_bonding_get_slave_interfaces(self):
+ self.assertEquals(net_utils.bond().get_slave_interfaces(), [])
+ self.god.check_playback()
+
+
+ #
+ # ethernet tests
+ #
+
+ def test_ethernet_mac_string_to_binary(self):
+ mac_bin = net_utils.ethernet.mac_string_to_binary('00:01:02:03:04:05')
+ self.assertEqual(mac_bin, '\x00\x01\x02\x03\x04\x05')
+
+
+ def test_ethernet_mac_binary_to_string(self):
+ mac_str = net_utils.ethernet.mac_binary_to_string(
+ '\x00\x01\x02\x03\x04\x05')
+ self.assertEqual(mac_str, '00:01:02:03:04:05')
+
+
+ def test_ethernet_pack(self):
+ dst = net_utils.ethernet.mac_string_to_binary('00:01:02:03:04:05')
+ src = net_utils.ethernet.mac_string_to_binary('16:17:18:19:1A:1B')
+ protocol = 2030
+ payload = 'some payload'
+ frame = struct.pack("!6s6sH", dst, src, protocol) + payload
+ self.assertEquals(net_utils.ethernet.pack(dst, src,protocol, payload),
+ frame)
+
+
+ def test_ethernet_unpack(self):
+ dst = net_utils.ethernet.mac_string_to_binary('00:01:02:03:04:05')
+ src = net_utils.ethernet.mac_string_to_binary('16:17:18:19:1A:1B')
+ protocol = 2030
+ payload = 'some payload'
+ frame = net_utils.ethernet.pack(dst, src, protocol, payload)
+ uframe = net_utils.ethernet.unpack(frame)
+ self.assertEquals(uframe[net_utils.ethernet.FRAME_KEY_DST_MAC], dst)
+ self.assertEquals(uframe[net_utils.ethernet.FRAME_KEY_SRC_MAC], src)
+ self.assertEquals(uframe[net_utils.ethernet.FRAME_KEY_PROTO], protocol)
+ self.assertEquals(uframe[net_utils.ethernet.FRAME_KEY_PAYLOAD], payload)
+
+
+ # raw_socket tests
+ #
+ def test_raw_socket_open(self):
+ self.god.stub_function(socket, 'setdefaulttimeout')
+
+ s = self.god.create_mock_class(socket.socket, "socket")
+ self.god.stub_function(socket, 'socket')
+
+ # open without a protocol
+ socket.setdefaulttimeout.expect_call(1)
+ socket.socket.expect_call(socket.PF_PACKET,
+ socket.SOCK_RAW).and_return(s)
+ s.bind.expect_call(('eth0', net_utils.raw_socket.ETH_P_ALL))
+ s.settimeout.expect_call(1)
+ sock = net_utils.raw_socket('eth0')
+ sock.open(protocol=None)
+
+ self.god.check_playback()
+
+ # double open should throw an exception
+ try:
+ sock.open()
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(1, 0)
+
+ self.god.check_playback()
+
+ # open a protocol
+ socket.setdefaulttimeout.expect_call(1)
+ socket.socket.expect_call(socket.PF_PACKET,
+ socket.SOCK_RAW,
+ socket.htons(1234)).and_return(s)
+ s.bind.expect_call(('eth0', net_utils.raw_socket.ETH_P_ALL))
+ s.settimeout.expect_call(1)
+ sock = net_utils.raw_socket('eth0')
+ sock.open(protocol=1234)
+
+ self.god.check_playback()
+
+
+ def test_raw_socket_close(self):
+ self.god.stub_function(socket, 'setdefaulttimeout')
+
+ s = self.god.create_mock_class(socket.socket, "socket")
+ self.god.stub_function(socket, 'socket')
+
+ # close without open
+ socket.setdefaulttimeout.expect_call(1)
+ sock = net_utils.raw_socket('eth0')
+ try:
+ sock.close()
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(1, 0)
+
+ # close after open
+ socket.setdefaulttimeout.expect_call(1)
+ socket.socket.expect_call(socket.PF_PACKET,
+ socket.SOCK_RAW).and_return(s)
+ s.bind.expect_call(('eth0', net_utils.raw_socket.ETH_P_ALL))
+ s.settimeout.expect_call(1)
+ sock = net_utils.raw_socket('eth0')
+ sock.open(protocol=None)
+
+ s.close.expect_call()
+ sock.close()
+ self.god.check_playback()
+
+
+ def test_raw_socket_recv(self):
+ self.god.stub_function(socket, 'setdefaulttimeout')
+
+ self.god.create_mock_class(socket.socket, "socket")
+ self.god.stub_function(socket, 'socket')
+
+ # rcv without open
+ socket.setdefaulttimeout.expect_call(1)
+ sock = net_utils.raw_socket('eth0')
+ try:
+ sock.recv(10)
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(1, 0)
+
+ self.god.check_playback()
+
+ # open a protocol and try to get packets of varying sizes
+ # I could not get socket.recv to get a mock expect_call. To keep
+ # on going, added a socket stub
+ s = net_utils_mock.socket_stub('eth0', socket, socket)
+ socket.socket.expect_call(socket.PF_PACKET,
+ socket.SOCK_RAW,
+ socket.htons(1234)).and_return(s)
+
+ self.god.stub_function(s, 'bind')
+ self.god.stub_function(s, 'settimeout')
+ s.bind.expect_call(('eth0', net_utils.raw_socket.ETH_P_ALL))
+ s.settimeout.expect_call(1)
+ sock.open(protocol=1234)
+
+ s.recv_val = ''
+ self.assertEquals(sock.recv(1), (None, 0))
+
+ s.recv_val = '\xFF' * (net_utils.ethernet.ETH_PACKET_MIN_SIZE-5)
+ self.assertEquals(sock.recv(1), (None, 0))
+
+ # when receiving a packet, make sure the timeout is not change
+ s.recv_val = '\xEE' * (net_utils.ethernet.ETH_PACKET_MIN_SIZE-4)
+ self.assertEquals(sock.recv(1), (s.recv_val, 1))
+
+ s.recv_val = '\xDD' * (net_utils.ethernet.ETH_PACKET_MIN_SIZE)
+ self.assertEquals(sock.recv(1), (s.recv_val, 1))
+
+ s.recv_val = '\xCC' * (net_utils.ethernet.ETH_PACKET_MAX_SIZE)
+ self.assertEquals(sock.recv(1), (s.recv_val, 1))
+
+ s.recv_val = '\xBB' * (net_utils.ethernet.ETH_PACKET_MAX_SIZE+1)
+ packet, time_left = sock.recv(1)
+ self.assertEquals(len(packet), net_utils.ethernet.ETH_PACKET_MAX_SIZE)
+ self.assertEquals(packet,
+ s.recv_val[:net_utils.ethernet.ETH_PACKET_MAX_SIZE])
+
+
+ # test timeout
+ s.recv_val = ''
+ s.throw_timeout = False
+ sock.recv(5)
+ self.assertEquals(sock.recv(1), (None, 0))
+ s.throw_timeout = True
+ sock.recv(5)
+ self.assertEquals(sock.recv(1), (None, 0))
+
+ self.god.check_playback()
+
+
+ def test_raw_socket_send(self):
+ self.god.stub_function(socket, 'setdefaulttimeout')
+ self.god.create_mock_class(socket.socket, "socket")
+ self.god.stub_function(socket, 'socket')
+ self.god.stub_function(socket, 'send')
+
+ # send without open
+ socket.setdefaulttimeout.expect_call(1)
+ sock = net_utils.raw_socket('eth0')
+ try:
+ sock.send('test this packet')
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(1, 0)
+ self.god.check_playback()
+
+ # open a protocol and try to send a packet
+ s = net_utils_mock.socket_stub('eth0', socket, socket)
+ self.god.stub_function(s, 'bind')
+ self.god.stub_function(s, 'settimeout')
+ socket.socket.expect_call(socket.PF_PACKET,
+ socket.SOCK_RAW,
+ socket.htons(1234)).and_return(s)
+ s.bind.expect_call(('eth0', net_utils.raw_socket.ETH_P_ALL))
+ s.settimeout.expect_call(1)
+ packet = '\xFF\xAA\xBB\xCC\xDD\x11packet data\x00\x00'
+ s.send.expect_call(packet)
+ sock.open(protocol=1234)
+ sock.send(packet)
+ self.god.check_playback()
+
+
+ def test_raw_socket_send_to(self):
+ self.god.stub_function(socket, 'setdefaulttimeout')
+ self.god.create_mock_class(socket.socket, "socket")
+ self.god.stub_function(socket, 'socket')
+ self.god.stub_function(socket, 'send')
+
+ # send without open
+ socket.setdefaulttimeout.expect_call(1)
+ sock = net_utils.raw_socket('eth0')
+ try:
+ sock.send_to('0', '1', 1, 'test this packet')
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(1, 0)
+ self.god.check_playback()
+
+ # open a protocol and try to send a packet
+ s = net_utils_mock.socket_stub('eth0', socket, socket)
+ self.god.stub_function(s, 'bind')
+ self.god.stub_function(s, 'settimeout')
+ socket.socket.expect_call(socket.PF_PACKET,
+ socket.SOCK_RAW,
+ socket.htons(1234)).and_return(s)
+ s.bind.expect_call(('eth0', net_utils.raw_socket.ETH_P_ALL))
+ s.settimeout.expect_call(1)
+ packet = '\x00\x00packet data\x00\x00'
+ s.send.expect_call(packet)
+ sock.open(protocol=1234)
+ try:
+ sock.send_to(None, None, 1, packet)
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(1, 0)
+ self.god.check_playback()
+
+ dst_mac = '\x00\x01\x02\x03\x04\x05'
+ src_mac = '\xFF\xEE\xDD\xCC\xBB\xAA'
+ protocol = 1234
+ s.send.expect_call(dst_mac+src_mac+'%d'%protocol+packet)
+ sock.send_to(dst_mac, src_mac, protocol, packet)
+ self.god.check_playback()
+
+
+ def test_raw_socket_recv_from(self):
+
+ def __set_clock(sock):
+ time.clock.expect_call().and_return(0.0)
+ time.clock.expect_call().and_return(0.0)
+ time.clock.expect_call().and_return(float(sock.socket_timeout()) + 0.5)
+
+ self.god.stub_function(socket, 'setdefaulttimeout')
+
+ self.god.create_mock_class(socket.socket, "socket")
+ self.god.stub_function(socket, 'socket')
+
+ # rcv without open
+ socket.setdefaulttimeout.expect_call(1)
+ sock = net_utils.raw_socket('eth0')
+ try:
+ sock.recv_from(None, None, None)
+ except error.TestError:
+ pass
+ else:
+ self.assertEquals(1, 0)
+
+ self.god.check_playback()
+
+ # open a protocol and try to get packets of varying sizes
+ # I could not get socket.recv to get a mock expect_call. To keep
+ # on going, added a socket stub
+ s = net_utils_mock.socket_stub('eth0', socket, socket)
+ socket.socket.expect_call(socket.PF_PACKET,
+ socket.SOCK_RAW,
+ socket.htons(1234)).and_return(s)
+
+ self.god.stub_function(s, 'bind')
+ self.god.stub_function(s, 'settimeout')
+ s.bind.expect_call(('eth0', net_utils.raw_socket.ETH_P_ALL))
+ s.settimeout.expect_call(1)
+ sock.open(protocol=1234)
+
+ s.recv_val = ''
+ dst_mac = net_utils.ethernet.mac_string_to_binary('00:01:02:03:04:05')
+ src_mac = net_utils.ethernet.mac_string_to_binary('16:17:18:19:1A:1B')
+ t_mac = net_utils.ethernet.mac_string_to_binary('E6:E7:E8:E9:EA:EB')
+ protocol = 2030
+ t_protocol = 1234
+ data = '\xEE' * (net_utils.ethernet.ETH_PACKET_MIN_SIZE)
+
+ # no data to receive at socket
+ self.assertEquals(sock.recv_from(None, None, None), None)
+ self.assertEquals(sock.recv_from(dst_mac, None, None), None)
+ self.assertEquals(sock.recv_from(None, src_mac, None), None)
+ self.assertEquals(sock.recv_from(None, None, protocol), None)
+
+ # receive packet < min size
+ s.recv_val = (struct.pack("!6s6sH", dst_mac, src_mac, protocol) +
+ 'packet_to_short')
+ self.assertEquals(sock.recv_from(None, None, None), None)
+
+ # receive packet, filtering on mac address and protocol
+ s.recv_val = struct.pack("!6s6sH", dst_mac, t_mac, t_protocol) + data
+ frame = net_utils.ethernet.unpack(s.recv_val)
+ self.assertEquals(sock.recv_from(None, None, None), frame)
+ self.assertEquals(sock.recv_from(dst_mac, None, None), frame)
+
+ # use time clock to speed up the timeout in send_to()
+ self.god.stub_function(time, 'clock')
+ __set_clock(sock)
+ self.assertEquals(sock.recv_from(dst_mac, src_mac, None), None)
+ __set_clock(sock)
+ self.assertEquals(sock.recv_from(dst_mac, None, protocol), None)
+ __set_clock(sock)
+ self.assertEquals(sock.recv_from(dst_mac, src_mac, protocol), None)
+ self.god.unstub(time, 'clock')
+
+ s.recv_val = struct.pack("!6s6sH", dst_mac, src_mac, protocol) + data
+ frame = net_utils.ethernet.unpack(s.recv_val)
+ self.assertEquals(sock.recv_from(dst_mac, None, None), frame)
+ self.assertEquals(sock.recv_from(dst_mac, src_mac, None), frame)
+ self.assertEquals(sock.recv_from(dst_mac, src_mac, protocol), frame)
+ self.assertEquals(sock.recv_from(None, None, protocol), frame)
+ self.assertEquals(sock.recv_from(None, src_mac, None), frame)
+ self.god.stub_function(time, 'clock')
+ __set_clock(sock)
+ self.assertEquals(sock.recv_from(None, None, t_protocol), None)
+ __set_clock(sock)
+ self.assertEquals(sock.recv_from(None, t_mac, None), None)
+ self.god.unstub(time, 'clock')
+
+
+ self.god.check_playback()
+
+
+if __name__ == "__main__":
+ unittest.main()