This diff incorporates many changes.  First we add a patch to the C code
from whcih the client and server executables are built.  This adds some time
in order
to assure that the multiple clients (if their are more than one) are
synchronized.
Also we move some code from initialize() to setup() where it better
belongs.  run_one() is now documented. Also we add a parameter for
bi-directional tests
as well as the wait time on the client threads to help with synchronicity.
Also we now allow setting the processor affinity of the client and server
executables.
Also mpstat is run while the client is running.  From this data additional
keyvals are generated (including Efficiency).

Travis Miller



git-svn-id: http://test.kernel.org/svn/autotest/trunk@3278 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/client/tests/netperf2/netperf2.py b/client/tests/netperf2/netperf2.py
index 75b6e7b..f8857c2 100755
--- a/client/tests/netperf2/netperf2.py
+++ b/client/tests/netperf2/netperf2.py
@@ -1,102 +1,188 @@
-import os, time, logging
+import os, time, re, logging
 from autotest_lib.client.bin import test, utils
+from autotest_lib.client.bin.net import net_utils
 from autotest_lib.client.common_lib import error
 
+MPSTAT_IX = 0
+NETPERF_IX = 1
 
 class netperf2(test.test):
-    version = 2
+    version = 3
 
     # ftp://ftp.netperf.org/netperf/netperf-2.4.4.tar.gz
     def setup(self, tarball = 'netperf-2.4.4.tar.gz'):
+        self.job.require_gcc()
         tarball = utils.unmap_url(self.bindir, tarball, self.tmpdir)
         utils.extract_tarball_to_dir(tarball, self.srcdir)
         os.chdir(self.srcdir)
 
+        utils.system('patch -p0 < ../wait_before_data.patch')
         # Fixing up a compile issue under newer systems that have
         # CPU_SET_S defined on /usr/include/sched.h, backported from
         # upstream svn trunk
         utils.system('patch -p0 < ../fix_netperf_build.patch')
         utils.system('./configure')
         utils.system('make')
+        utils.system('sync')
 
 
     def initialize(self):
-        self.job.require_gcc()
-
-        self.server_path = '%s&' % os.path.join(self.srcdir,
-                                                'src/netserver')
-        # Add server_ip and arguments later
-        self.client_path = '%s %%s %%s' % os.path.join(self.srcdir,
-                                                       'src/netperf -H')
-
-        self.valid_tests = ['TCP_STREAM', 'TCP_RR', 'TCP_CRR', 'TCP_SENDFILE',
-                            'UDP_STREAM', 'UDP_RR']
+        self.server_prog = '%s&' % os.path.join(self.srcdir, 'src/netserver')
+        self.client_prog = '%s' % os.path.join(self.srcdir, 'src/netperf')
+        self.valid_tests = ['TCP_STREAM', 'TCP_MAERTS', 'TCP_RR', 'TCP_CRR',
+                            'TCP_SENDFILE', 'UDP_STREAM', 'UDP_RR']
         self.results = []
         self.actual_times = []
+        self.netif = ''
+        self.network = net_utils.network()
+        self.network_utils = net_utils.network_utils()
 
 
-    def run_once(self, server_ip, client_ip, role, test='TCP_STREAM',
-                 test_time=10, stream_list=[1]):
+    def run_once(self, server_ip, client_ip, role, test = 'TCP_STREAM',
+                 test_time = 15, stream_list = [1], test_specific_args = '',
+                 cpu_affinity = '', dev = '', bidi = False, wait_time = 5):
+        """
+        server_ip: IP address of host running netserver
+        client_ip: IP address of host running netperf client(s)
+        role: 'client' or 'server'
+        test: one of TCP_STREAM, TCP_MEARTS, TCP_RR, TCP_CRR, TCP_SENDFILE,
+            UDP_STREAM or UDP_RR
+        test_time: time to run the test for in seconds
+        stream_list: list of number of netperf streams to launch
+        test_specific_args: Optional test specific args.  For example to set
+            the request,response size for RR tests to 200,100, set it
+            to: '-- -r 200,100'.  Or, to set the send buffer size of STREAM
+            tests to 200, set it to: '-- -m 200'
+        cpu_affinity: netperf/netserver processes will get taskset to the
+            cpu_affinity.  cpu_affinity is specified as a bitmask in hex
+            without the leading 0x.  For example, to run on CPUs 0 & 5,
+            cpu_affinity needs to be '21'
+        dev: device on which to run traffic on.  For example, to run on
+            inteface eth1, set it to 'eth1'.
+        bidi: bi-directional traffic.  This is supported for TCP_STREAM
+            test only. The RR & CRR tests are bi-directional by nature.
+        wait_time: Time to wait after establishing data/control connections
+            but before sending data traffic.
+        """
         if test not in self.valid_tests:
             raise error.TestError('invalid test specified')
         self.role = role
         self.test = test
         self.test_time = test_time
+        self.wait_time = wait_time
         self.stream_list = stream_list
+        self.bidi = bidi
 
         server_tag = server_ip + '#netperf-server'
         client_tag = client_ip + '#netperf-client'
         all = [server_tag, client_tag]
 
+        # If a specific device has been requested, configure it.
+        if dev:
+            timeout = 60
+            if role == 'server':
+                self.configure_interface(dev, server_ip)
+                self.ping(client_ip, timeout)
+            else:
+                self.configure_interface(dev, client_ip)
+                self.ping(server_ip, timeout)
+
         for num_streams in stream_list:
             if role == 'server':
-                self.server_start()
+                self.server_start(cpu_affinity)
                 try:
                     # Wait up to ten minutes for the client to reach this
                     # point.
                     self.job.barrier(server_tag, 'start_%d' % num_streams,
-                                     600).rendevous(*all)
+                                     600).rendezvous(*all)
                     # Wait up to test_time + 5 minutes for the test to
                     # complete
                     self.job.barrier(server_tag, 'stop_%d' % num_streams,
-                                     test_time+300).rendevous(*all)
+                                     test_time+300).rendezvous(*all)
                 finally:
                     self.server_stop()
 
             elif role == 'client':
                 # Wait up to ten minutes for the server to start
                 self.job.barrier(client_tag, 'start_%d' % num_streams,
-                                 600).rendevous(*all)
-                self.client(server_ip, test, test_time, num_streams)
+                                 600).rendezvous(*all)
+                self.client(server_ip, test, test_time, num_streams,
+                            test_specific_args, cpu_affinity)
                 # Wait up to 5 minutes for the server to also reach this point
                 self.job.barrier(client_tag, 'stop_%d' % num_streams,
-                                 300).rendevous(*all)
+                                 300).rendezvous(*all)
             else:
                 raise error.TestError('invalid role specified')
 
+        self.restore_interface()
 
-    def server_start(self):
+
+    def configure_interface(self, dev, ip_addr):
+        self.netif = net_utils.netif(dev)
+        self.netif.up()
+        if self.netif.get_ipaddr() != ip_addr:
+            self.netif.set_ipaddr(ip_addr)
+
+
+    def restore_interface(self):
+        if self.netif:
+            self.netif.restore()
+
+
+    def server_start(self, cpu_affinity):
         utils.system('killall netserver', ignore_status=True)
-        self.results.append(utils.system_output(self.server_path,
-                                                retain_output=True))
+        cmd = self.server_prog
+        if cpu_affinity:
+            cmd = 'taskset %s %s' % (cpu_affinity, cmd)
+
+        self.results.append(utils.system_output(cmd, retain_output=True))
 
 
     def server_stop(self):
         utils.system('killall netserver', ignore_status=True)
 
 
-    def client(self, server_ip, test, test_time, num_streams):
-        args = '-t %s -l %d' % (test, test_time)
-        cmd = self.client_path % (server_ip, args)
+    def client(self, server_ip, test, test_time, num_streams,
+               test_specific_args, cpu_affinity):
+        args = '-H %s -t %s -l %d' % (server_ip, test, test_time)
+        if self.wait_time:
+            args += ' -s %d ' % self.wait_time
+
+        # Append the test specific arguments.
+        if test_specific_args:
+            args += ' ' + test_specific_args
+
+        cmd = '%s %s' % (self.client_prog, args)
+
+        if cpu_affinity:
+            cmd = 'taskset %s %s' % (cpu_affinity, cmd)
 
         try:
+            cmds = []
+
+            # Get 5 mpstat samples. Since tests with large number of streams
+            # take a long time to start up all the streams, we'll toss out the
+            # first and last sample when recording results
+            interval = max(1, test_time / 5)
+            cmds.append('sleep %d && mpstat -P ALL %s 5' % (self.wait_time,
+                                                            interval))
+
+            # Add the netperf commands
+            for i in xrange(num_streams):
+                cmds.append(cmd)
+                if self.bidi and test == 'TCP_STREAM':
+                    cmds.append(cmd.replace('TCP_STREAM', 'TCP_MAERTS'))
+
             t0 = time.time()
-            self.results.append(utils.get_cpu_percentage(
-                utils.system_output_parallel, [cmd]*num_streams,
-                timeout=test_time+60, retain_output=True))
+            # Launch all commands in parallel
+            out = utils.run_parallel(cmds, timeout=test_time + 500,
+                                     ignore_status=True)
             t1 = time.time()
 
-            self.actual_times.append(t1 - t0)
+            self.results.append(out)
+            self.actual_times.append(t1 - t0 - self.wait_time)
+            # Log test output
+            logging.info(out)
 
         except error.CmdError, e:
             """ Catch errors due to timeout, but raise others
@@ -110,8 +196,7 @@
             if ('within' in e.additional_text
                 or 'non-zero' in e.additional_text):
                 logging.debug(e.additional_text)
-                # Results are cpu%, outputs
-                self.results.append((0, None))
+                self.results.append(None)
                 self.actual_times.append(1)
             else:
                 raise
@@ -119,7 +204,9 @@
 
     def postprocess(self):
         if self.role == 'client':
-            if len(self.stream_list) != len(self.results):
+            # if profilers are enabled, the test gets runs twice
+            if (len(self.stream_list) != len(self.results) and
+               2*len(self.stream_list) != len(self.results)):
                 raise error.TestError('Mismatched number of results')
 
             function = None
@@ -127,7 +214,7 @@
 
             # Each of the functions return tuples in which the keys define
             # what that item in the tuple represents
-            if self.test in ['TCP_STREAM', 'TCP_SENDFILE']:
+            if self.test in ['TCP_STREAM', 'TCP_MAERTS', 'TCP_SENDFILE']:
                 function = self.process_tcp_stream
                 keys = ('Throughput',)
             elif self.test == 'UDP_STREAM':
@@ -139,43 +226,61 @@
             else:
                 raise error.TestError('Unhandled test')
 
-            # self.results is a list of tuples. The first element in each
-            # tuple is the cpu utilization for that run, and the second
-            # element is a list containing the output for each stream in that
-            # run.
             for i, streams in enumerate(self.stream_list):
                 attr = {'stream_count':streams}
                 keyval = {}
                 temp_vals = []
-                keyval['CPU'], outputs  = self.results[i]
-                actual_time  = self.actual_times[i]
 
                 # Short circuit to handle errors due to client timeouts
-                if not outputs:
+                if not self.results[i]:
                     self.write_iteration_keyval(attr, keyval)
                     continue
 
-                for result in outputs:
-                    temp_vals.append(function(result))
+                # Collect output of netperf sessions
+                failed_streams_count = 0
+                for result in self.results[i][NETPERF_IX:]:
+                    if result.exit_status:
+                        failed_streams_count += 1
+                    else:
+                        temp_vals.append(function(result.stdout))
+
+                keyval['Failed_streams_count'] = failed_streams_count
+
+                # Process mpstat output
+                mpstat_out = self.results[i][MPSTAT_IX].stdout
+                cpu_stats = self.network_utils.process_mpstat(mpstat_out, 5)
+                keyval['CPU_C'] = 100 - cpu_stats['idle']
+                keyval['CPU_C_SYS'] = cpu_stats['sys']
+                keyval['CPU_C_HI'] = cpu_stats['irq']
+                keyval['CPU_C_SI'] = cpu_stats['soft']
+                keyval['INTRS_C'] = cpu_stats['intr/s']
+
+                actual_time = self.actual_times[i]
+                keyval['actual_time'] = actual_time
+                logging.info('actual_time: %f', actual_time)
 
                 # Compute the sum of elements returned from function which
                 # represent the string contained in keys
                 for j, key in enumerate(keys):
                     vals = [x[j] for x in temp_vals]
                     # scale result by the actual time taken
-                    keyval[key] = sum(vals)*self.test_time/actual_time
+                    keyval[key] = sum(vals)
 
                 # record 'Efficiency' as perf/CPU
-                keyval['Efficieny'] = keyval[keys[0]]/keyval['CPU']
+                if keyval['CPU_C'] != 0:
+                    keyval['Efficieny_C'] = keyval[keys[0]]/keyval['CPU_C']
+                else:
+                    keyval['Efficieny_C'] = keyval[keys[0]]
 
                 self.write_iteration_keyval(attr, keyval)
 
 
     def process_tcp_stream(self, output):
-        """Parses the following (works for both TCP_STREAM and TCP_SENDFILE)
-        and returns a singleton containing throughput.
+        """Parses the following (works for both TCP_STREAM, TCP_MAERTS and
+        TCP_SENDFILE) and returns a singleton containing throughput.
 
-        TCP STREAM TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET to foo (10.1.1.1) port 0 AF_INET
+        TCP STREAM TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET to foo.bar.com \
+        (10.10.10.3) port 0 AF_INET
         Recv   Send    Send
         Socket Socket  Message  Elapsed
         Size   Size    Size     Time     Throughput
@@ -191,7 +296,8 @@
         """Parses the following and returns a touple containing throughput
         and the number of errors.
 
-        UDP UNIDIRECTIONAL SEND TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET to foo (10.1.1.1) port 0 AF_INET
+        UDP UNIDIRECTIONAL SEND TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET \
+        to foo.bar.com (10.10.10.3) port 0 AF_INET
         Socket  Message  Elapsed      Messages
         Size    Size     Time         Okay Errors   Throughput
         bytes   bytes    secs            #      #   10^6bits/sec
@@ -208,7 +314,8 @@
         """Parses the following which works for both rr (TCP and UDP) and crr
         tests and returns a singleton containing transfer rate.
 
-        TCP REQUEST/RESPONSE TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET to foo (10.1.1.1) port 0 AF_INET
+        TCP REQUEST/RESPONSE TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET \
+        to foo.bar.com (10.10.10.3) port 0 AF_INET
         Local /Remote
         Socket Size   Request  Resp.   Elapsed  Trans.
         Send   Recv   Size     Size    Time     Rate
@@ -219,3 +326,17 @@
         """
 
         return float(output.splitlines()[6].split()[5]),
+
+
+    def ping(self, ip, timeout):
+        curr_time = time.time()
+        end_time = curr_time + timeout
+        while curr_time < end_time:
+            if not os.system('ping -c 1 ' + ip):
+                # Ping succeeded
+                return
+            # Ping failed. Lets sleep a bit and try again.
+            time.sleep(5)
+            curr_time = time.time()
+
+        return